PasteSpider是什麼? 一款使用.net編寫的開源的Linux容器部署助手,支持一鍵發佈,平滑升級,自動伸縮, Key-Value配置,項目網關,環境隔離,運行報表,差量升級,私有倉庫,集群部署,版本管理等! 30分鐘上手,讓開發也可以很容易的學會在linux上部署你得項目! [從需求角度介 ...
PasteSpider是什麼?
一款使用.net編寫的開源的Linux容器部署助手,支持一鍵發佈,平滑升級,自動伸縮,
Key-Value配置,項目網關,環境隔離,運行報表,差量升級,私有倉庫,集群部署,版本管理等!
30分鐘上手,讓開發也可以很容易的學會在linux上部署你得項目!
[從需求角度介紹PasteSpider(K8S平替部署工具適合於任何開發語言)]
(https://blog.csdn.net/Apeart/article/details/138819197?spm=1001.2014.3001.5501)
PasteCluster又是什麼?
一套使用.net編寫的中間件,如果你的API項目是.NET的,那麼通過引入這個組件可以讓你的項目快速支持集群模式!
(部署是部署工具和寫法的問題,模式是模式哈,有主從的那個是模式)
接入現有項目的案例
如上傳入必要的配置,關於ClusterConfig的內容有啥,需要查看源碼PasteCloveConfig.cs這個文件,大概有十多個配置項,也可以使用預設值哈!
這個配置在appsettings.json是這樣的
PasteClusterHandlr這個邏輯他會產生一個Channel的非同步隊列,業務代碼上處理這個業務隊列即可,比方說我們啟動一個HostedService來處理這個隊列
上面核心代碼其實只有2行
//如果已知當前節點的Host(這裡示例預設為80埠)信息,可以用下方的函數寫入,也可以在啟動的時候寫入配置中
// -e ClusterConfig:CurrentHost="http://192.168.1.100"
//如果使用PasteSpider部署,則是-e ClusterConfig:CurrentHost="http://{{App.ProAddress}}"
//_cluster_handler.Register("http://192.168.1.100",0,0);
//啟動集群中的當前節點(在這之前必須要確認當前節點的host是多少)
_cluster_handler.StartCluster();
和
//讀取集群產生的數據,比如其他節點發送的數據等,這裡一般是業務相關的消息
ReadClusterChannel();
其他部分概括下就是寫入集群節點列表,寫入當前節點的HOST信息的!
你只要關註你的業務代碼如下:
就是這個default的分支,嚴格來說應該是msg_type=0或者大於10的(1-10組件用於通知特定消息給業務,至於業務用不用要基於自己的業務需求,比如我上面這個案例就是節點掉線後把他從集群節點列表種刪除(記錄在redis種))
在最後將會貼出源碼
集群需求過程
如果說K8S是對運維人員的部署工具,那麼PasteSpider應該就是針對開發人員的部署助手了!
K8S不是有那個主從集群模式麽,一開始那會思考也整一個集群模式,這樣一個PasteSpider宕機了,還有其他的來承擔任務!
去中心化集群
一開始的思路是去中心化的集群模式,說白點就是資料庫都是一個節點一個的,當然這裡的去中心化和市面上的那個鏈不是一回事哈,至少沒有鏈銜接和鏈校驗的問題!
假設一個節點就是一套完整的PasteSpider,包括資料庫,對應的api和web管理端等
如何保證集群安全性?
集群內的所有節點通訊使用一個統一的密鑰,這樣可以防止其他集群亂入!
數據防重和數據唯一?
首先一個,資料庫的表我分幾種:
1.基礎表,比如會員信息表,這種基礎表肯定每一個節點都要同步的
2.統計表,這個明顯的代表就是報表,比如日報表,這樣要報表的時候不需要從日誌表中讀取統計
3.關係表,一般的是基礎表和基礎表的關係的描述的表,也是要同步的
4.日誌表,記錄一些操作的,視情況而定,不一定要同步完全
每個表添加3個欄位,節點,創建時間戳,更新時間戳,這樣在同步數據的時候可以基於這個時間戳獲取數據差,然後根據ID規則同步數據
由於對管理端來說,只要一個PasteSpider在工作,也就是Nginx中配置主備用模式,這樣也就是說主要的數據都是通過一個節點新建的,這樣可以排除數據重覆的問題
比如說,整個系統的項目的代碼需要唯一!
輔助組件如何做到集群
比如項目使用到了nginx,使用到了redis等,如果要完整的集群,甚至是多台伺服器,其實嚴格的集群我個人認為是不可能的!
如果可能,那麼每年就不會有那麼多這個服務宕機多久,那個宕機多久了!
市面上沒有100%的災備方案,因為在追求100%的路上,你會發覺一直在套娃... .. .
普通的集群的現狀
稍微查詢下目前比較流程的一些中間件,用到的集群模式大概有以下要點:
1.集群選舉過程中,不對外提供服務,可以理解成集群還沒有協商一致好!那麼引申的問題就是集群選舉越快越好!
2.幾乎的集群選舉採用的是單數,過半模式!看過各種解釋,說白點就是要少數服從多數,我個人覺得這個問題很扯的!
集群的數量發生變動了,這個是不可控的吧!
居然不可控,你咋保證宕機的數量一定是單數?那肯定單數複數都有可能,而且概率還很大,那這半數了個寂寞!
3.很多集群模式引入了多角色,比如蜂王,工蜂,監控蜂... .. . 啥意思?難不成只有工蜂和封王會宕機???如果你說不同角色宕機的概率不一樣,那我還是信的,畢竟負載不一樣!
如果都會宕機,那角色越多是不是意味著集群恢復的邏輯越複雜?角色切換來切換去好玩?
4.還有一種半通的集群情況,比如A,B,C,D,E,其中A只和B,C能通,其他的都能互通!
這什麼神奇組合,這種的我思考來思考去,還不如你搞2個集群,然後弄一個中轉,或者是業務需求變更下,因為為了實現這個半通,付出的代價太大了!
PasteCluster的方式
1.採用N個節點模式,N大於等於1即可,1的時候表示自己是節點,自己擔任任何角色(其實也就2個角色,master和cluster角色)
這裡2個角色的區別主要是在業務上,比方說你集群中的某些任務,需要有一個人專門管理,那麼這個任務就交給master來處理
2.不採用選舉模式,採用競選模式,當發生2個競選衝突的時候,基於vote_time時間戳和id來確定誰是master!
假設有A,B,C,D,E節點,如果當前A為master!
停止A,則會發生
問
B,C,D,E在不同時間點會發覺A不在了,這裡的發現可能併發也可能是按照一定的順序,取決於他們的輪詢線
假設B先發現A出問題了,立馬給自己的vote_time賦值時間戳ms,然後問(除了自己外的其他節點)你有master麽?
假設被問的是C,他這個時候還沒有發現A掛了,
他立馬健康檢查A,
如果通了,則返回A的信息給B,告訴他當前的master是A
競
如果不通,他則給自己的vote_time賦值,然後告訴B,C就是master!B收到返回後,就把C設為master
C這個時候會進入選舉階段,會把自己的master(C)信息群發給其他節點
其他節點收到後,會和自己的master做健康檢查,如果接通的話,再做vote_time對比,結合節點Id選舉出最小的返回給C,
C這個時候收到的結果,如果返回的master(E)(這個是對比vote_time的結果)不是C他這個節點,則C放棄自己作為節點,接受返回的master(E)為master!自己做cluster的角色
上述居然有返回E,那麼表示E也啟動了選舉,只是在和C的對比中E勝出了,C由於在遍歷節點的時候失敗了,所以中途退出了,也就是E在遍歷的過程中完成了一整個遍歷,沒有被中斷!
立
E在完成遍歷後,群發一條消息給所有節點,告知我是新的Master(E)
以上整個選舉過程就算完成,從時間上來說理論上是一個集群遍歷的是時間,不需要其他集群模式的反覆選舉!
一般的是誰先發現,然後下一個節點選舉勝出!主要是這個問的環節,如果沒有問,直接進入競,複雜度還會提升!
集群問答
1.集群節點保活問題
所有的集群節點都有心跳模式,因為你沒辦法知道對方下線沒有,所以就有一個輪詢,一定時間問對方線上否,PasteCluster採用雙心跳,心跳復用模式!
雙心跳是指master會定期向其他節點輪詢是否線上
節點也會問輪詢問master是否線上
復用模式在於,每次檢查,只有他們之間的交互時間大於設定的時間後才會發送這個心跳檢查,比如設定10秒鐘!如果近期10秒鐘這2個節點有通訊,則這一次心跳不在問詢的任務中!
2.如何防止小集群出現(腦裂?)
之前第一次寫的時候,後面測試發現會出現有多個master的情況,其實把整個選舉過程拆開,每一個步驟延長時間,就很好理解,出現多個master是可能的!
在節點的交互過程中,比如A節點發送信息給B節點,A的請求會附帶至少2個信息,A的master是誰,A的所在集群節點的總數,發送給B後,B會對這2個信息進行校驗,如何和A的不一致,則B會拋棄自己的master信息,轉而進入“問”階段,再根據這個問的結果,加入集群!
3.啥時候發生加入集群這個動作
加入集群有2種情況,
1.是剛剛啟動的時候,肯定得找個集群去加入,雖然有可能整個系統就你一個節點,過程嘛總要走的,說不定有其他節點(節點能通)呢!
2.在數據交互過程中,發現信息不對稱,拋棄自己得信息加入到現有得一個主節點
4.啥時候移除節點
1.節點離線的時候主動觸發,主動向master告知我這個節點離線了,master重新整理節點信息後,群發給所有其他線上的節點,從而更新整個集群!
2.在節點交互過程中,通訊錯誤次數超過設定的次數,這個交互包括數據通訊或者定時的健康檢查,最終在master健康檢查的時候移除這個節點,然後廣播
5.給集群分配節點列表信息
1.通過配置文件的方式在啟動的時候導入
2.通過AddNodeToList在啟動後添加,比如你通過讀取Redis獲得節點列表,然後導入
6.找到當前節點的IP等信息
節點之間是通過HOSTIP等通訊的,所以最基本的就是需要知道當前節點是哪個IP,有2種模式
1.已知節點列表有10個IP,現在是不知道哪個節點是哪個,把節點列表寫入到當前節點信息後,然後調用FindMe(),系統會去遍歷列表,從而找到哪個節點信息是當前節點,你可以使用
/api/cluster/status查看自己當前節點是多少來確定是否查找完成!
2.在服務啟動的時候給導入IP信息,比如如果你使用PasteSpider部署的項目,而且項目配置了項目網關,則可以在對應服務的配置中添加如下代碼即可:
如果是非80埠要自己修改下!看配置其實就是在啟動的額時候修改配置項ClusterConfig:CurrentHost的值!
7.手動加入一個現有集群
加入一個集群有2個步驟,一個是給當前自己打標簽,也就是為自己設定host等信息,然後是ScanMaster();也就是當前節點知道自己是誰後,然後去查找當前的集群信息,然後加入這個集群!
8.主動退出一個集群
其實你也可以主動調用Leave函數實現離開集群!
PasteCluste組件的源碼
你可以從Gitee拉取最新源碼 PasteCluster組件源碼和使用案例
拉取後打開項目,只需要關註如下代碼
其他的子項目是因為這個使用案例是使用PasteTemplate生成的,懶得精簡了,看下案例應該就知道如何使用到自己項目中了!
源碼解說
基於上圖,主要文件是2個
1.PasteClusterController.cs
看文件命名就知道,他是一個Controller,用於節點對外通訊的,每一個介面都有添加header校驗,用於校驗請求是否屬於這個集群,在最後代碼處:
要安全點的話,你可以加入那個time_temptoken,加密校驗,類似OAUTH那個協議模式,過期了丟棄這個token
上圖的fromnode用於修改最後交互時間,減少心跳中不必要的檢查,
masternode則用於校驗是否腦裂?也就是是否有多個master的情況,發現後及時處理!
在配置路由轉發的時候(nginx的配置),需要為這個路徑做代理
源碼如下:
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Options;
using Newtonsoft.Json.Linq;
namespace PasteCluster
{
/// <summary>
///
/// </summary>
[ApiController]
[Route("/api/cluster/[action]")]
public class PasteClusterController : Controller
{
private PasteClusterHandler _handler;
private PasteSloveConfig _config;
/// <summary>
///
/// </summary>
/// <param name="handler"></param>
/// <param name="config"></param>
public PasteClusterController(PasteClusterHandler handler, IOptions<PasteSloveConfig> config)
{
_config = config.Value;
_handler = handler;
}
/// <summary>
/// 健康檢查
/// </summary>
/// <returns></returns>
[HttpGet]
public PasteApiResponse health()
{
CheckHeader();
return _handler.Health();
}
/// <summary>
/// 讀取當前狀態
/// </summary>
/// <returns></returns>
[HttpGet]
public string status()
{
return _handler.Status();
}
/// <summary>
/// 問詢master信息
/// </summary>
/// <returns></returns>
[HttpGet]
public PasteApiResponse ask()
{
CheckHeader();
return _handler.Ask();
}
/// <summary>
/// 查找自己
/// </summary>
/// <param name="input"></param>
/// <returns></returns>
[HttpPost]
public async Task<PasteApiResponse> find(PasteNodeModel input)
{
CheckHeader();
return await _handler.Find(input);
}
/// <summary>
/// 有節點加入
/// </summary>
/// <param name="input"></param>
/// <returns></returns>
[HttpPost]
public PasteApiResponse join(PasteNodeModel input)
{
CheckHeader();
_handler.Join(input);
return _handler.State();
}
/// <summary>
/// 轉發消息
/// </summary>
/// <param name="input"></param>
/// <returns></returns>
[HttpPost]
public async Task<PasteApiResponse> msg(PasteSloveMessage input)
{
CheckHeader();
input.from_api = true;
return await _handler.Msg(input);
}
/// <summary>
/// 往集群中發送消息
/// </summary>
/// <param name="input"></param>
/// <returns></returns>
[HttpPost]
public PasteApiResponse push(PasteSloveMessage input)
{
CheckHeader();
_handler.PushMsgToNode(input.body);
return _handler.State();
}
/// <summary>
/// 這個一般是PasteSpider推送過來的信息
/// </summary>
/// <param name="input"></param>
/// <returns></returns>
[HttpPost]
public PasteApiResponse run()
{
JObject info = null;
if (base.Request.Body != null)
{
if (base.Request.Body != null)
{
using var stream = new StreamReader(base.Request.Body);
var bodystr = stream.ReadToEndAsync().GetAwaiter().GetResult();
//Console.WriteLine(bodystr);
info = Newtonsoft.Json.JsonConvert.DeserializeObject<JObject>(bodystr);
}
}
if (info != null)
{
var _host = string.Empty;
var _port = 80;
JObject input = info;
//if(info is JObject)
//{
// input = input as JObject;
//}
//else
//{
// Console.WriteLine("input is not JContainer");
// return _handler.State();
//}
if (input.ContainsKey("proAddress"))
{
_host = input["proAddress"].ToString();
}
if (input.ContainsKey("extendService"))
{
var _ser = input["extendService"];
if (_ser.Type == JTokenType.Object)
{
var _serobj = _ser as JObject;
if (_serobj.ContainsKey("listenPorts"))
{
var ports = _serobj["listenPorts"].ToString();
if (!String.IsNullOrEmpty(ports))
{
int.TryParse(ports.Split(',')[0], out var _pp);
if (_pp > 0)
{
_port = _pp;
}
}
}
}
}
if (!String.IsNullOrEmpty(_host))
{
if (_port != 80)
{
_handler.Register($"http://{_host}:{_port}", 0, 0);
}
else
{
_handler.Register($"http://{_host}", 0, 0);
}
}
else
{
Console.WriteLine(Newtonsoft.Json.JsonConvert.SerializeObject(input));
}
}
return _handler.State();
}
/// <summary>
/// 手動註冊自己
/// </summary>
/// <param name="input"></param>
/// <returns></returns>
[HttpGet]
public string joinin([FromQuery] PasteNodeModel input, string token)
{
if (token == _config.SloveToken)
{
_handler.Register(input.host, input.id, input.group);
}
else
{
return "token參數錯誤!";
}
return "手動加入!";
}
/// <summary>
/// 加入某一個節點,當前需要為master
/// </summary>
/// <param name="input"></param>
/// <returns></returns>
[HttpGet]
public async Task<string> addone([FromQuery] PasteNodeModel input, string token)
{
if (token == _config.SloveToken)
{
if (_handler.CurrentIsMaster || _handler.IsSingle)
{
var node = new PasteNodeModel
{
host = input.host,
id = input.id,
group = input.group,
};
await _handler.AddNodeToList(node);
}
else
{
return "當前節點不是Master,無法執行這個命令";
}
}
else
{
return "token參數錯誤!";
}
return "手動加入!";
}
/// <summary>
/// 節點集合
/// </summary>
/// <returns></returns>
[HttpGet]
public PasteNodeModel[] nodes()
{
return _handler.Nodes();
}
/// <summary>
/// 執行選舉
/// </summary>
/// <param name="input"></param>
/// <returns></returns>
[HttpPost]
public async Task<PasteApiResponse> vote(PasteNodeModel input)
{
CheckHeader();
return await _handler.Vote(input);
}
/// <summary>
/// 被通知,有成為master
/// </summary>
/// <param name="input"></param>
/// <returns></returns>
[HttpPost]
public async Task<PasteApiResponse> master(PasteMasterResponse input)
{
CheckHeader();
//Console.WriteLine(Newtonsoft.Json.JsonConvert.SerializeObject(input));
return await _handler.Master(input);
}
/// <summary>
/// 有節點離開
/// </summary>
/// <param name="input"></param>
/// <returns></returns>
[HttpPost]
public async Task<PasteApiResponse> leave(PasteNodeModel input)
{
CheckHeader();
return await _handler.Leave(input);
}
/// <summary>
/// 移除某節點
/// </summary>
/// <param name="input"></param>
/// <returns></returns>
[HttpPost]
public async Task<PasteApiResponse> remove(PasteNodeModel input)
{
CheckHeader();
return await _handler.Remove(input);
}
/// <summary>
/// 檢查信息是否合法
/// </summary>
/// <exception cref="Exception"></exception>
private void CheckHeader()
{
if (base.Request.Headers.ContainsKey("slovetoken"))
{
if (base.Request.Headers["slovetoken"] != _config.SloveToken)
{
throw new Exception("slovetoken錯誤,請重新輸入!");
}
else
{
if (base.Request.Headers.ContainsKey("fromnode"))
{
_handler.TargetMasterLast(base.Request.Headers["fromnode"].ToString());
}
if (base.Request.Headers.ContainsKey("masternode"))
{
_handler.ConfirmMaster(base.Request.Headers["masternode"].ToString());
}
}
}
else
{
throw new Exception("slovetoken不能為空,請重新輸入!");
}
}
}
}
2.PasteClusterHandler.cs
這個文件就是集群選舉,維護等的主要邏輯,裡面使用了2個Channel非同步隊列,一個是這個集群組件內部使用的,有寫入數據和讀取數據,另外一個就是給業務用的了,在上面的案例中的HostedService中有體現!
整個選舉過程和修複等,可以查看這個文件的源碼,有疑問的麻煩在評論中回覆!!!
源碼內容:
using System.Net;
using System.Text;
using System.Threading.Channels;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace PasteCluster
{
/// <summary>
/// 註意用IOC單例註入系統,多處調用
/// </summary>
public class PasteClusterHandler : IDisposable
{
/// <summary>
///
/// </summary>
private readonly IHttpClientFactory _httpClientFactory;
/// <summary>
/// 用於內部消息中轉等 節點中的任務
/// </summary>
private Channel<PasteEventModel> _channel_slove;
/// <summary>
/// 業務隊列 表示隊列中的消息要業務邏輯處理,當前可能為slove也可能為master
/// 業務監聽隊列
/// 表示業務要處理的消息
/// 當前節點可能是slove節點
/// 也可能是master節點
/// </summary>
public Channel<PasteSloveMessage> ChannelCluster;
/// <summary>
/// 當前節點信息
/// </summary>
private PasteNodeModel _current_node = null;
/// <summary>
/// 當前的Master是哪個
/// </summary>
private PasteNodeModel _current_master = null;
/// <summary>
///
/// </summary>
private ILogger<PasteClusterHandler> _logger = null;
/// <summary>
/// 當前是否是主節點
/// </summary>
public bool CurrentIsMaster
{
get
{
if (_current_master != null && _current_node != null)
{
return _current_master.host == _current_node.host;
}
return false;
}
}
/// <summary>
/// 是否是單例模式
/// 當前節點為null
/// 或者沒有節點列表
/// </summary>
public bool IsSingle
{
get
{
if (_current_master == null || _current_node == null)
{
return true;
}
if (node_list == null || node_list.Count <= 1)
{
return true;
}
return false;
}
}
/// <summary>
/// 當前節點的名稱 如果為空表示沒有或者本身配置就是空
/// </summary>
public string CurrentName
{
get
{
if (_current_node != null)
{
return _current_node.name;
}
return "";
}
}
/// <summary>
/// 獲取或設置當前集群的密鑰 如果要修改這個需要在所有的調用之前配置
/// </summary>
public string SloveToken
{
get
{
return _config.SloveToken;
}
set
{
_config.SloveToken = value;
}
}
/// <summary>
/// 當前節點的名稱 如果為空表示沒有或者本身配置就是空
/// </summary>
public string CurrentHost
{
get
{
if (_current_node != null)
{
return _current_node.host;
}
return "";
}
}
/// <summary>
/// 當前節點
/// </summary>
public PasteNodeModel CurrentNode
{
get
{
return _current_node;
}
}
/// <summary>
/// 當前節點的名稱 如果為空表示沒有或者本身配置就是空
/// </summary>
public int CurrentId
{
get
{
if (_current_node != null)
{
return _current_node.id;
}
return 0;
}
}
/// <summary>
/// 當前配置信息
/// </summary>
private PasteSloveConfig _config;
/// <summary>
///
/// </summary>
private List<PasteNodeModel> node_list = null;
/// <summary>
/// 是否在選舉中
/// </summary>
private bool voting = false;
/// <summary>
///
/// </summary>
private System.Timers.Timer _timer;
/// <summary>
/// 當前節點的代碼,初始值為空,可以被賦值,被賦值後不能再次被賦值
/// </summary>
private string node_code = String.Empty;
/// <summary>
/// 暫存發往節點的消息,這個是指消息,業務上的消息
/// </summary>
private List<PasteEventModel> ziplist = null;
/// <summary>
///
/// </summary>
private System.Threading.SemaphoreSlim _semaphoreSlim;
/// <summary>
///
/// </summary>
private long msg_count = 0;
/// <summary>
///
/// </summary>
public PasteClusterHandler(ILogger<PasteClusterHandler> logger, IOptions<PasteSloveConfig> config, IHttpClientFactory httpClientFactory)
{
_logger = logger;
_config = config.Value;
_httpClientFactory = httpClientFactory;
_semaphoreSlim = new SemaphoreSlim(1);
ziplist = new List<PasteEventModel>();
_channel_slove = Channel.CreateBounded<PasteEventModel>(_config.SloveChannelMsgCapacity);
ChannelCluster = Channel.CreateBounded<PasteSloveMessage>(_config.MasterChannelMsgCapacity);
_timer = new System.Timers.Timer(1000);
_timer.Elapsed += _timer_Elapsed;
_timer.AutoReset = true;
_timer.Start();
ActionSloveEvent();
}
/// <summary>
/// 當前計數的次數
/// </summary>
private int _tick_index = 0;
/// <summary>
/// 給當前節點賦值code
/// </summary>
/// <param name="code"></param>
/// <returns>是否賦值成功</returns>
public bool SetNodeCode(string code)
{
if (String.IsNullOrEmpty(node_code))
{
node_code = code;
}
else
{
return false;
}
return true;
}
/// <summary>
/// 本地用於讀取節點列表
/// </summary>
/// <returns></returns>
public PasteNodeModel[] Nodes()
{
if (node_list != null)
{
return node_list.ToArray();
}
return null;
}
/// <summary>
/// 啟動集群
/// 讀取ClusterHost作為集群列表
/// 讀取CurrentHost作為當前節點得Host
/// 如果有NodeList和NodeCode(CurrentCode)觸發查找自己
/// 否則觸發查找Master
/// </summary>
public async void StartCluster()
{
if (!string.IsNullOrEmpty(_config.CurrentHost))
{
Register(_config.CurrentHost, 0, 0);
}
if (!String.IsNullOrEmpty(_config.ClusterHost))
{
var _list = new List<PasteNodeModel>();
var strs = _config.ClusterHost.Split(";");
foreach (var str in strs)
{
if (!string.IsNullOrEmpty(str))
{
var its = str.Split(',');
var _one = new PasteNodeModel { };
_one.host = its[0];
if (its.Length > 1)
{
int.TryParse(its[1], out var _id);
_one.id = _id;
}
if (its.Length > 2)
{
int.TryParse(its[2], out var _group);
_one.group = _group;
}
if (!_list.Contains(_one))
{
_list.Add(_one);
}
}
}
if (_list.Count > 0)
{
foreach (var _one in _list)
{
await AddNodeToList(_one);
}
}
}
if (!String.IsNullOrEmpty(_config.CurrentCode))
{
SetNodeCode(_config.CurrentCode);
}
if (!String.IsNullOrEmpty(node_code))
{
if (node_list != null && node_list.Count > 0)
{
await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.findnode });
}
}
else
{
Console.WriteLine("begin to join cluster:" + node_list.Select(x => x.host).ToString());
await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.scanmaster, message = "", time = DateTime.Now.ToUnixTimeMilliseconds() });
}
}
/// <summary>
/// 需要確保先調用SetNodeCode 表示給自己打一個標記
/// 然後會啟動一個任務去遍歷集合,查找這個NodeCode,找到後就知道自己的HOST信息了
/// </summary>
public async void FindMe()
{
if (!String.IsNullOrEmpty(node_code))
{
if (node_list != null && node_list.Count > 0)
{
await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.findnode });
}
}
}
/// <summary>
/// 開始查找自己 需要有節點集合(通過AddNodeToList賦值) 需要有當前節點得node_code(通過SetNodeCode賦值)
/// </summary>
/// <returns></returns>
public async Task<bool> StartFindMe()
{
if (!String.IsNullOrEmpty(node_code))
{
if (node_list != null && node_list.Count > 0)
{
await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.findnode });
return true;
}
}
return false;
}
/// <summary>
/// 如果消息來自Master則更新我和Master的最後交互時間
/// 這個功能用於減少不必要的心跳資源浪費
/// </summary>
/// <param name="host"></param>
public void TargetMasterLast(string host)
{
if (_current_master != null)
{
if (_current_master.host == host)
{
_current_master.last_time = DateTime.Now.ToUnixTimeSeconds();
//Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} Node:{CurrentHost} Update Master Last Time!");
}
}
}
/// <summary>
/// 確認對方的Master和我的是否一致!
/// 如果不一致,則我進去查找Master階段
/// </summary>
/// <param name="host"></param>
public void ConfirmMaster(string host)
{
if (_current_master != null)
{
if (_current_master.host != host)
{
_channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.scanmaster });
}
}
}
/// <summary>
/// 擁有集群的地址列表,去找到自己
/// 這個是給內部調用的,請勿調用
/// </summary>
/// <param name="input"></param>
public async Task<PasteApiResponse> Find(PasteNodeModel input)
{
if (!String.IsNullOrEmpty(input.node_code))
{
if (node_code == input.node_code)
{
//找到自己了
_current_node = new PasteNodeModel
{
node_code = node_code,
group = input.group,
host = input.host,
id = input.id,
};
await AddNodeToList(_current_node);
await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.scanmaster, message = "", time = DateTime.Now.ToUnixTimeMilliseconds() });
}
}
return new PasteApiResponse { success = true, master = _current_master, node = _current_node };
}
/// <summary>
/// 定時檢查
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private async void _timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
{
try
{
if (_current_node != null)
{
_tick_index++;
//健康檢查 被檢查等!
if (_tick_index % _config.TickSloveHealth == 0)
{
var _ago = DateTime.Now.ToUnixTimeSeconds() - _config.TickScanSloveHealth;
if (CurrentIsMaster)
{
//健康檢查 檢查節點多久沒通訊了
if (node_list != null && node_list.Count > 0)
{
var _post = new PasteMasterResponse { };
_post.master = _current_node;
_post.nodes = node_list?.ToArray();
var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(_post);
try
{
await _semaphoreSlim.WaitAsync();
foreach (var _node in node_list)
{
if (_node.host != _current_node.host)
{
if (_node.last_time < _ago)
{
var _api = await slove_get(_node, _config.ApiHealth);
if (_api.success)
{
if (_api.node_count != node_list.Count || _api.master?.host != _current_master?.host)
{
//告知對方,你的集群數據有誤!
await slove_post(_node, _postBody, _config.ApiMaster);
}
}
}
}
else
{
_node.last_time = DateTime.Now.ToUnixTimeSeconds();//自己直接就是最新的
}
}
}
catch (Exception exl)
{
_logger.LogError("line.341" + exl.Message);
}
finally
{
_semaphoreSlim.Release();
}
//錯誤次數超過多少次的 執行移除操作!
var removes = node_list.Where(x => x.error_time > _config.RemoveByTime).ToList();
if (removes != null && removes.Count > 0)
{
foreach (var _node in removes)
{
await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.removenode, message = Newtonsoft.Json.JsonConvert.SerializeObject(_node) });
}
}
}
}
else
{
if (_current_master != null)
{
if (_current_master.last_time < _ago)
{
//Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} node:{_current_node?.host} master:{_current_master?.host} timeout!");
var _api = await slove_get(_current_master, _config.ApiHealth);
if (_api.success)
{
//我記錄的master和master記錄的master不一致
if (_api.node_count != node_list.Count || _api.master?.host != _current_master?.host)
{
//告知對方,你的集群數據有誤!
await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.scanmaster });
}
}
else
{
//master失聯了
await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.scanmaster });
}
}
}
else
{
//選舉?
await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.scanmaster });
}
}
}
}
if (_tick_index > 3600)
{
_tick_index = 0;
}
}
catch (Exception exlelapsed)
{
_logger.LogError("line.396" + exlelapsed.Message);
}
}
/// <summary>
/// 業務端調用 表示把消息打入到集群 發給所有節點或者某些節點
/// </summary>
/// <param name="msg"></param>
/// <param name="node_id"></param>
/// <param name="group"></param>
public void PushMsgToNode(string msg, int msg_type = 0, int node_id = 0, int group = 0, string name = "")
{
var _msg = new PasteSloveMessage
{
body = msg,
msg_type = msg_type,
from = _current_node
};
var _msgbody = Newtonsoft.Json.JsonConvert.SerializeObject(_msg);
if (!voting)
{
//如果當前在選舉中,則暫存!
_channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.msg_to_node, message = _msgbody, msg_type = msg_type, node_id = node_id, group = group, name = name });
}
else
{
ziplist.Add(new PasteEventModel { action = PasteSloveEvent.msg_to_node, message = _msgbody, msg_type = msg_type, node_id = node_id, group = group, name = name });
}
}
/// <summary>
/// 把消息發送給所有節點,包括自己
/// </summary>
/// <param name="msg"></param>
/// <param name="node_id"></param>
/// <param name="group"></param>
public void PushMsgToAll(string msg, int msg_type = 0)
{
var _msg = new PasteSloveMessage
{
body = msg,
msg_type = msg_type,
from = _current_node
};
var _msgbody = Newtonsoft.Json.JsonConvert.SerializeObject(_msg);
if (!voting)
{
//如果當前在選舉中,則暫存!
_channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.msg_to_all_node, message = _msgbody, msg_type = msg_type });
}
else
{
ziplist.Add(new PasteEventModel { action = PasteSloveEvent.msg_to_all_node, message = _msgbody, msg_type = msg_type });
}
}
/// <summary>
/// 發送消息給master所在的節點的業務處理,如果當前為master則變更channel,切換到給業務處理
/// </summary>
/// <param name="msg"></param>
public void PushMsgToMaster(string msg, int msg_type = 0)
{
var _msg = new PasteSloveMessage
{
body = msg,
from = _current_node,
msg_type = msg_type
};
var _msgbody = Newtonsoft.Json.JsonConvert.SerializeObject(_msg);
if (CurrentIsMaster || _current_master == null)
{
//切換隊列
ChannelCluster.Writer.WriteAsync(new PasteSloveMessage { body = _msgbody });
}
else
{
if (!voting)
{
_channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.msg_to_master, message = _msgbody, msg_type = msg_type });
}
else
{
ziplist.Add(new PasteEventModel { action = PasteSloveEvent.msg_to_master, message = _msgbody, msg_type = msg_type });
}
}
}
/// <summary>
/// 直接推送消息,主要作用在於立馬反饋
/// 如果沒有對象,則會發送給master
/// 可以從from中判斷是不是迴環了
/// </summary>
/// <param name="msg"></param>
/// <param name="msg_type"></param>
/// <param name="id"></param>
/// <param name="name"></param>
/// <param name="group"></param>
/// <param name="host"></param>
/// <returns></returns>
public async Task<bool> PushDirectionMsg(string msg, int msg_type = 0, int id = 0, string name = "", int group = 0, string host = "")
{
var _message = new PasteSloveMessage
{
body = msg,
msg_type = msg_type,
};
return await PushDirectionMsg(_message, id, name, group, host);
}
/// <summary>
/// 直接發送消息給某一個節點
/// 如果沒有命中則發送給Master
/// 也就是說,可能發送給當前!註意不要迴環了!!!
/// </summary>
/// <param name="message"></param>
/// <param name="id"></param>
/// <param name="name"></param>
/// <param name="group"></param>
/// <param name="host">示例http://192.168.1.5</param>
/// <returns>是否發送成功,發送給某一個節點了?</returns>
public async Task<bool> PushDirectionMsg(PasteSloveMessage message, int id = 0, string name = "", int group = 0, string host = "")
{
message.from = _current_node;
var _send = false;
if (id != 0)
{
if (CurrentId == id)
{
message.to = _current_node;
var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(message);
var _message = Newtonsoft.Json.JsonConvert.DeserializeObject<PasteSloveMessage>(_postBody);
await ChannelCluster.Writer.WriteAsync(_message);
return true;
}
else
{
var find = node_list.Where(x => x.id == id).FirstOrDefault();
if (find != null && find != default)
{
message.to = find;
var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(message);
var _api = await slove_post(find, _postBody, _config.ApiMsg);
return _api.success;
}
}
}
if (!string.IsNullOrEmpty(host) && node_list != null)
{
var find = node_list.Where(x => x.host == host).FirstOrDefault();
if (find != null && find != default)
{
message.to = find;
var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(message);
var _api = await slove_post(find, _postBody, _config.ApiMsg);
return _api.success;
}
}
if (!string.IsNullOrEmpty(name) && node_list != null)
{
var find = node_list.Where(x => x.name == name).FirstOrDefault();
if (find != null && find != default)
{
message.to = find;
var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(message);
var _api = await slove_post(find, _postBody, _config.ApiMsg);
return _api.success;
}
}
if (group != 0 && node_list != null)
{
var find = node_list.Where(x => x.group == group).FirstOrDefault();
if (find != null && find != default)
{
message.to = find;
var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(message);
var _api = await slove_post(find, _postBody, _config.ApiMsg);
return _api.success;
}
}
if (!_send)
{
if (CurrentIsMaster || IsSingle)
{
message.to = _current_node;
await ChannelCluster.Writer.WriteAsync(message);
return true;
}
else
{
if (_current_master != null)
{
message.to = _current_master;
var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(message);
var _api = await slove_post(_current_master, _postBody, _config.ApiMsg);
return _api.success;
}
}
}
return _send;
}
/// <summary>
/// 如果當前不是master則不執行,直接拋棄,如果當前是master則執行
/// </summary>
/// <param name="msg"></param>
public async Task<bool> PushMsgToOnlyMaster(string msg, int msg_type = 0)
{
var _msg = new PasteSloveMessage
{
body = msg,
msg_type = msg_type,
from = _current_node
};
var _msgbody = Newtonsoft.Json.JsonConvert.SerializeObject(_msg);
if (CurrentIsMaster || _current_master == null)
{
await ChannelCluster.Writer.WriteAsync(new PasteSloveMessage { body = _msgbody });
return true;
}
return false;
}
/// <summary>
/// 處理節點中隊列的消息
/// </summary>
private async void ActionSloveEvent()
{
try
{
var _read = await _channel_slove.Reader.ReadAsync();
if (_read != null && _read != default)
{
//_logger.LogInformation("Cluster.Event:" + Newtonsoft.Json.JsonConvert.SerializeObject(_read));
switch (_read.action)
{
case PasteSloveEvent.votemaster:
{
//告知別人,我是master
if (_current_node == null)
{
return;
}
_current_node.vote_time = DateTime.Now.ToUnixTimeMilliseconds();
if (node_list != null && node_list.Count > 0)
{
var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(_current_node);
//必須大於1,有2個及以上節點,才有master的說法
foreach (var _node in node_list)
{
if (_node.host != _current_node.host)
{
var _api = await slove_post(_node, _postBody, _config.ApiVote);
if (_api.success)
{
//_node.last_time = DateTime.Now.ToUnixTimeSeconds();
if (_api.master != null)
{
if (_api.master.host != _current_node.host)
{
//找到一個競爭者? 有節點說他才是master 而且時間比我小 我只好退位讓賢
_current_master = _api.master;
break;
}
}
}
}
}
//選舉完成了,我是不是master?
if (_current_master == null)
{
_current_master = _current_node;
await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.suremaster });
}
}
}
break;
case PasteSloveEvent.scanmaster:
{
if (_current_node == null)
{
return;
}
_current_node.vote_time = DateTime.Now.ToUnixTimeMilliseconds();
if (node_list != null && node_list.Count > 0)
{
voting = true;
//必須大於1,有2個及以上節點,才有master的說法
foreach (var _node in node_list)
{
var _api = await slove_get(_node, _config.ApiAsk);
if (_api.success && _api.master != null)
{
_current_master = _api.master;
if (_current_node.host == _current_master.host)
{
await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.suremaster });
}
break;
}
}
if (_current_master != null)
{
if (_current_node?.host != _current_master?.host)
{
//告知 master 我要加入集群
var _join = await slove_post(_current_master, Newtonsoft.Json.JsonConvert.SerializeObject(_current_node), _config.ApiJoinSlove);
if (!_join.success)
{
//這裡失敗了,咋辦??? 未完待續
voting = true;
_current_master = null;
}
}
}
else
{
//饒了一圈,全部不能訪問
_current_master = _current_node;
voting = false;
await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.suremaster });
}
}
else
{
//節點信息不完整 還是只有這麼一個節點? 未完待續
_current_master = _current_node;
voting = false;
await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.suremaster });
}
}
break;
case PasteSloveEvent.suremaster:
{
if (_current_node == null)
{
return;
}
if (node_list != null && node_list.Count > 0)
{
var _post = new PasteMasterResponse { };
_post.master = _current_node;
_post.nodes = node_list?.ToArray();
var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(_post);
//必須大於1,有2個及以上節點,才有master的說法
foreach (var _node in node_list)
{
if (_node.host != _current_node.host)
{
var _api = await slove_post(_node, _postBody, _config.ApiMaster);
}
}
await ChannelCluster.Writer.WriteAsync(new PasteSloveMessage { msg_type = 1, body = _postBody, from = _current_node, time = DateTime.Now.ToUnixTimeSeconds() });
}
}
break;
case PasteSloveEvent.msg_to_master:
{
var _message = Newtonsoft.Json.JsonConvert.DeserializeObject<PasteSloveMessage>(_read.message);
if (CurrentIsMaster)
{
_message.from = _current_node;
_message.to = _current_node;
await ChannelCluster.Writer.WriteAsync(_message);
}
else
{
//通過 api 發送給 master
if (_current_master != null)
{
_message.from = _current_node;
_message.to = _current_master;
var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(_message);
//如果發送失敗,暫停業務,進行vote階段
var _join = await slove_post(_current_master, _postBody, _config.ApiMsg);
if (!_join.success)
{
if (_read.try_time > 0)
{
_read.try_time = _read.try_time - 1;
//這裡失敗了,咋辦??? 未完待續
await _channel_slove.Writer.WriteAsync(_read);