Hello 大家好,我是TANZAME,我們又見面了。今天我們來聊聊怎麼手擼一個 Redis Cluster 集群客戶端,純手工有乾貨,您細品。 隨著業務增長,線上環境的QPS暴增,自然而然將當前的單機 Redis 切換到群集模式。燃鵝,我們悲劇地發現,ServiceStack.Redis這個官方推 ...
Hello 大家好,我是TANZAME,我們又見面了。今天我們來聊聊怎麼手擼一個 Redis Cluster 集群客戶端,純手工有乾貨,您細品。
隨著業務增長,線上環境的QPS暴增,自然而然將當前的單機 Redis 切換到群集模式。燃鵝,我們悲劇地發現,ServiceStack.Redis這個官方推薦的 .NET 客戶端並沒有支持集群模式。一通度娘翻牆無果後,決定自己強擼一個基於ServiceStack.Redis的Redis集群訪問組件。
話不多說,先上運行效果圖:
Redis-Cluster集群使用 hash slot 演算法對每個key計算CRC16值,然後對16383取模,可以獲取key對應的 hash slot。Redis-Cluster中每個master都會持有部分 slot,在訪問key時根據計算出來的hash slot去找到具體的master節點,再由當前找到的節點去執行具體的 Redis 命令(具體可查閱官方說明文檔)。
由於 ServiceStack.Redis已經實現了單個實例的Redis命令,因此我們可以將即將要實現的 Redis 集群客戶端當做一個代理,它只負責計算 key 落在哪一個具體節點(定址)然後將Redis命令轉發給對應的節點執行即可。
ServiceStack.Redis的RedisClient是非線程安全的,ServiceStack.Redis 使用緩存客戶端管理器(PooledRedisClientManager)來提高性能和併發能力,我們的Redis Cluster集群客戶端也應集成PooledRedisClientManager來獲取 RedisClient 實例。
同時,Redis-Cluster集群支持線上動態擴容和slot遷移,我們的Redis集群客戶端也應具備自動智能發現新節點和自動刷新 slot 分佈的能力。
總結起來,要實現一個Redis-Cluster客戶端,需要實現以下幾個要點:
如下麵類圖所示,接下來我們詳細分析具體的代碼實現。
一、CRC16
CRC即迴圈冗餘校驗碼,是信息系統中一種常見的檢錯碼。CRC校驗碼不同的機構有不同的標準,這裡Redis遵循的標準是CRC-16-CCITT標準,這也是被XMODEM協議使用的CRC標準,所以也常用XMODEM CRC代指,是比較經典的“基於位元組查表法的CRC校驗碼生成演算法”。
1 /// <summary> 2 /// 根據 key 計算對應的哈希槽 3 /// </summary> 4 public static int GetSlot(string key) 5 { 6 key = CRC16.ExtractHashTag(key); 7 // optimization with modulo operator with power of 2 equivalent to getCRC16(key) % 16384 8 return GetCRC16(key) & (16384 - 1); 9 } 10 11 /// <summary> 12 /// 計算給定位元組組的 crc16 檢驗碼 13 /// </summary> 14 public static int GetCRC16(byte[] bytes, int s, int e) 15 { 16 int crc = 0x0000; 17 18 for (int i = s; i < e; i++) 19 { 20 crc = ((crc << 8) ^ LOOKUP_TABLE[((crc >> 8) ^ (bytes[i] & 0xFF)) & 0xFF]); 21 } 22 return crc & 0xFFFF; 23 }
二、讀取集群節點
從集群中的任意節點使用 CLUSTER NODES 命令可以讀取到集群中所有的節點信息,包括連接狀態,它們的標誌,屬性和分配的槽等等。CLUSTER NODES 以串列格式提供所有這些信息,輸出示例:
d99b65a25ef726c64c565901e345f98c496a1a47 127.0.0.1:7007 master - 0 1592288083308 8 connected 2d71879d6529d1edbfeed546443051986245c58e 127.0.0.1:7003 master - 0 1592288084311 11 connected 10923-16383 654cdc25a5fa11bd44b5b716cdf07d4ce176efcd 127.0.0.1:7005 slave 484e73948d8aacd8327bf90b89469b52bff464c5 0 1592288085313 10 connected ed65d52dad7ef6854e0e261433b56a551e5e11cb 127.0.0.1:7004 slave 754d0ec7a7f5c7765f784a6a2c370ea38ea0c089 0 1592288081304 9 connected 754d0ec7a7f5c7765f784a6a2c370ea38ea0c089 127.0.0.1:7001 master - 0 1592288080300 9 connected 0-5460 484e73948d8aacd8327bf90b89469b52bff464c5 127.0.0.1:7002 master - 0 1592288082306 10 connected 5461-10922 2223bc6d099bd9838e5d2f1fbd9a758f64c554c4 127.0.0.1:7006 myself,slave 2d71879d6529d1edbfeed546443051986245c58e 0 0 6 connected
每個欄位的含義如下:
1. id
:節點 ID,一個40個字元的隨機字元串,當一個節點被創建時不會再發生變化(除非CLUSTER RESET HARD
被使用)。
2. ip:port
:客戶端應該聯繫節點以運行查詢的節點地址。
3. flags
:逗號列表分隔的標誌:myself
,master
,slave
,fail?
,fail
,handshake
,noaddr
,noflags
。標誌在下一節詳細解釋。
4. master
:如果節點是從屬節點,並且主節點已知,則節點ID為主節點,否則為“ - ”字元。
5. ping-sent
:以毫秒為單位的當前激活的ping發送的unix時間,如果沒有掛起的ping,則為零。
6. pong-recv
:毫秒 unix 時間收到最後一個乒乓球。
7. config-epoch
:當前節點(或當前主節點,如果該節點是從節點)的配置時期(或版本)。每次發生故障切換時,都會創建一個新的,唯一的,單調遞增的配置時期。如果多個節點聲稱服務於相同的哈希槽,則具有較高配置時期的節點將獲勝。
8. link-state
:用於節點到節點集群匯流排的鏈路狀態。我們使用此鏈接與節點進行通信。可以是connected
或disconnected
。
9. slot
:散列槽號或範圍。從參數9開始,但總共可能有16384個條目(限制從未達到)。這是此節點提供的散列槽列表。如果條目僅僅是一個數字,則被解析為這樣。如果它是一個範圍,它是在形式start-end
,並且意味著節點負責所有散列時隙從start
到end
包括起始和結束值。
標誌的含義(欄位編號3):
myself
:您正在聯繫的節點。
master
:節點是主人。
slave
:節點是從屬的。
fail?
:節點處於PFAIL
狀態。對於正在聯繫的節點無法訪問,但仍然可以在邏輯上訪問(不處於FAIL
狀態)。
fail
:節點處於FAIL
狀態。對於將PFAIL
狀態提升為FAIL
的多個節點而言,這是無法訪問的。
handshake
:不受信任的節點,我們握手。
noaddr
:此節點沒有已知的地址。
noflags
:根本沒有標誌。
1 // 讀取集群上的節點信息 2 static IList<InternalClusterNode> ReadClusterNodes(IEnumerable<ClusterNode> source) 3 { 4 RedisClient c = null; 5 StringReader reader = null; 6 IList<InternalClusterNode> result = null; 7 8 int index = 0; 9 int rowCount = source.Count(); 10 11 foreach (var node in source) 12 { 13 try 14 { 15 // 從當前節點讀取REDIS集群節點信息 16 index += 1; 17 c = new RedisClient(node.Host, node.Port, node.Password); 18 RedisData data = c.RawCommand("CLUSTER".ToUtf8Bytes(), "NODES".ToUtf8Bytes()); 19 string info = Encoding.UTF8.GetString(data.Data); 20 21 // 將讀回的字元文本轉成強類型節點實體 22 reader = new StringReader(info); 23 string line = reader.ReadLine(); 24 while (line != null) 25 { 26 if (result == null) result = new List<InternalClusterNode>(); 27 InternalClusterNode n = InternalClusterNode.Parse(line); 28 n.Password = node.Password; 29 result.Add(n); 30 31 line = reader.ReadLine(); 32 } 33 34 // 只要任意一個節點拿到集群信息,直接退出 35 if (result != null && result.Count > 0) break; 36 } 37 catch (Exception ex) 38 { 39 // 出現異常,如果還沒到最後一個節點,則繼續使用下一下節點讀取集群信息 40 // 否則拋出異常 41 if (index < rowCount) 42 Thread.Sleep(100); 43 else 44 throw new RedisClusterException(ex.Message, c != null ? c.GetHostString() : string.Empty, ex); 45 } 46 finally 47 { 48 if (reader != null) reader.Dispose(); 49 if (c != null) c.Dispose(); 50 } 51 } 52 53 54 if (result == null) 55 result = new List<InternalClusterNode>(0); 56 return result; 57 } 58 59 /// <summary> 60 /// 從 cluster nodes 的每一行命令里讀取出集群節點的相關信息 61 /// </summary> 62 /// <param name="line">集群命令</param> 63 /// <returns></returns> 64 public static InternalClusterNode Parse(string line) 65 { 66 if (string.IsNullOrEmpty(line)) 67 throw new ArgumentException("line"); 68 69 InternalClusterNode node = new InternalClusterNode(); 70 node._nodeDescription = line; 71 string[] segs = line.Split(' '); 72 73 node.NodeId = segs[0]; 74 node.Host = segs[1].Split(':')[0]; 75 node.Port = int.Parse(segs[1].Split(':')[1]); 76 node.MasterNodeId = segs[3] == "-" ? null : segs[3]; 77 node.PingSent = long.Parse(segs[4]); 78 node.PongRecv = long.Parse(segs[5]); 79 node.ConfigEpoch = int.Parse(segs[6]); 80 node.LinkState = segs[7]; 81 82 string[] flags = segs[2].Split(','); 83 node.IsMater = flags[0] == MYSELF ? flags[1] == MASTER : flags[0] == MASTER; 84 node.IsSlave = !node.IsMater; 85 int start = 0; 86 if (flags[start] == MYSELF) 87 start = 1; 88 if (flags[start] == SLAVE || flags[start] == MASTER) 89 start += 1; 90 node.NodeFlag = string.Join(",", flags.Skip(start)); 91 92 if (segs.Length > 8) 93 { 94 string[] slots = segs[8].Split('-'); 95 node.Slot.Start = int.Parse(slots[0]); 96 if (slots.Length > 1) node.Slot.End = int.Parse(slots[1]); 97 98 for (int index = 9; index < segs.Length; index++) 99 { 100 if (node.RestSlots == null) 101 node.RestSlots = new List<HashSlot>(); 102 103 slots = segs[index].Split('-'); 104 105 int s1 = 0; 106 int s2 = 0; 107 bool b1 = int.TryParse(slots[0], out s1); 108 bool b2 = int.TryParse(slots[1], out s2); 109 if (!b1 || !b2) 110 continue; 111 else 112 node.RestSlots.Add(new HashSlot(s1, slots.Length > 1 ? new Nullable<int>(s2) : null)); 113 } 114 } 115 116 return node; 117 }View Code
三、為節點分配緩存客戶端管理器
在單實例的Redis中,我們通過 PooledRedisClientManager 這個管理器來獲取RedisClient。借鑒這個思路,在Redis Cluster集群中,我們為每一個主節點實例化一個 PooledRedisClientManager,並且該主節點持有的 slot 都共用一個 PooledRedisClientManager 實例。以 slot 做為 key 將 slot 與 PooledRedisClientManager 一一映射並緩存起來。
1 // 初始化集群管理 2 void Initialize(IList<InternalClusterNode> clusterNodes = null) 3 { 4 // 從 redis 讀取集群信息 5 IList<InternalClusterNode> nodes = clusterNodes == null ? RedisCluster.ReadClusterNodes(_source) : clusterNodes; 6 7 // 生成主節點,每個主節點的 slot 對應一個REDIS客戶端緩衝池管理器 8 IList<InternalClusterNode> masters = null; 9 IDictionary<int, PooledRedisClientManager> managers = null; 10 foreach (var n in nodes) 11 { 12 // 節點無效或者 13 if (!(n.IsMater && 14 !string.IsNullOrEmpty(n.Host) && 15 string.IsNullOrEmpty(n.NodeFlag) && 16 (string.IsNullOrEmpty(n.LinkState) || n.LinkState == InternalClusterNode.CONNECTED))) continue; 17 18 n.SlaveNodes = nodes.Where(x => x.MasterNodeId == n.NodeId); 19 if (masters == null) 20 masters = new List<InternalClusterNode>(); 21 masters.Add(n); 22 23 // 用每一個主節點的哈希槽做鍵,導入REDIS客戶端緩衝池管理器 24 // 然後,方法表指針(又名類型對象指針)上場,占據 4 個位元組。 4 * 16384 / 1024 = 64KB 25 if (managers == null) 26 managers = new Dictionary<int, PooledRedisClientManager>(); 27 28 string[] writeHosts = new[] { n.HostString }; 29 string[] readHosts = n.SlaveNodes.Where(n => false).Select(n => n.HostString).ToArray(); 30 var pool = new PooledRedisClientManager(writeHosts, readHosts, _config); 31 managers.Add(n.Slot.Start, pool); 32 if (n.Slot.End != null) 33 { 34 // 這個範圍內的哈希槽都用同一個緩衝池 35 for (int s = n.Slot.Start + 1; s <= n.Slot.End.Value; s++) 36 managers.Add(s, pool); 37 } 38 if (n.RestSlots != null) 39 { 40 foreach (var slot in n.RestSlots) 41 { 42 managers.Add(slot.Start, pool); 43 if (slot.End != null) 44 { 45 // 這個範圍內的哈希槽都用同一個緩衝池 46 for (int s = slot.Start + 1; s <= slot.End.Value; s++) 47 managers.Add(s, pool); 48 } 49 } 50 } 51 } 52 53 _masters = masters; 54 _redisClientManagers = managers; 55 _clusterNodes = nodes != null ? nodes : null; 56 57 if (_masters == null) _masters = new List<InternalClusterNode>(0); 58 if (_clusterNodes == null) _clusterNodes = new List<InternalClusterNode>(0); 59 if (_redisClientManagers == null) _redisClientManagers = new Dictionary<int, PooledRedisClientManager>(0); 60 61 if (_masters.Count > 0) 62 _source = _masters.Select(n => new ClusterNode(n.Host, n.Port, n.Password)).ToList(); 63 }View Code
四、將 hash slot 路由到正確的節點
在訪問一個 key 時,根據第三步緩存起來的 PooledRedisClientManager ,用 key 計算出來的 hash slot 值可以快速找出這個 key 對應的 PooledRedisClientManager 實例,調用 PooledRedisClientManager.GetClient() 即可將 hash slot 路由到正確的主節點。
1 // 執行指定動作並返回值 2 private T DoExecute<T>(string key, Func<RedisClient, T> action) => this.DoExecute(() => this.GetRedisClient(key), action); 3 4 // 執行指定動作並返回值 5 private T DoExecute<T>(Func<RedisClient> slot, Func<RedisClient, T> action, int tryTimes = 1) 6 { 7 RedisClient c = null; 8 try 9 { 10 c = slot(); 11 return action(c); 12 } 13 catch (Exception ex) 14 { 15 // 此處省略 ... 16 } 17 finally 18 { 19 if (c != null) 20 c.Dispose(); 21 } 22 } 23 24 // 獲取指定key對應的主設備節點 25 private RedisClient GetRedisClient(string key) 26 { 27 if (string.IsNullOrEmpty(key)) 28 throw new ArgumentNullException("key"); 29 30 int slot = CRC16.GetSlot(key); 31 if (!_redisClientManagers.ContainsKey(slot)) 32 throw new SlotNotFoundException(string.Format("No reachable node in cluster for slot {{{0}}}", slot), slot, key); 33 34 var pool = _redisClientManagers[slot]; 35 return (RedisClient)pool.GetClient(); 36 }
五、自動發現新節點和自動刷新slot分佈
在實際生產環境中,Redis 集群經常會有添加/刪除節點、遷移 slot 、主節點宕機從節點轉主節點等,針對這些情況,我們的 Redis Cluster 組件必須具備自動發現節點和刷新在 第三步 緩存起來的 slot 的能力。在這裡我的實現思路是當節點執行 Redis 命令時返回 RedisException 異常時就強制刷新集群節點信息並重新緩存 slot 與 節點之間的映射。
1 // 執行指定動作並返回值 2 private T DoExecute<T>(Func<RedisClient> slot, Func<RedisClient, T> action, int tryTimes = 1) 3 { 4 RedisClient c = null; 5 try 6 { 7 c = slot(); 8 return action(c); 9 } 10 catch (Exception ex) 11 { 12 if (!(ex is RedisException) || tryTimes == 0) throw new RedisClusterException(ex.Message, c != null ? c.GetHostString() : string.Empty, ex); 13 else 14 { 15 tryTimes -= 1; 16 // 嘗試重新刷新集群信息 17 bool isRefresh = DiscoveryNodes(_source, _config); 18 if (isRefresh) 19 // 集群節點有更新過,重新執行 20 return this.DoExecute(slot, action, tryTimes); 21 else 22 // 集群節點未更新過,直接拋出異常 23 throw new RedisClusterException(ex.Message, c != null ? c.GetHostString() : string.Empty, ex); 24 } 25 } 26 finally 27 { 28 if (c != null) 29 c.Dispose(); 30 } 31 } 32 33 // 重新刷新集群信息 34 private bool DiscoveryNodes(IEnumerable<ClusterNode> source, RedisClientManagerConfig config) 35 { 36 bool lockTaken = false; 37 try 38 { 39 // noop 40 if (_isDiscoverying) { } 41 42 Monitor.Enter(_objLock, ref lockTaken); 43 44 _source = source; 45 _config = config; 46 _isDiscoverying = true; 47 48 // 跟上次同步時間相隔 {MONITORINTERVAL} 秒鐘以上才需要同步 49 if ((DateTime.Now - _lastDiscoveryTime).TotalMilliseconds >= MONITORINTERVAL) 50 { 51 bool isRefresh = false; 52 IList<InternalClusterNode> newNodes = RedisCluster.ReadClusterNodes(_source); 53 foreach (var node in newNodes) 54 { 55 var n = _clusterNodes.FirstOrDefault(x => x.HostString == node.HostString); 56 isRefresh = 57 n == null || // 新節點 58 n.Password != node.Password || // 密碼變了 59 n.IsMater != node.IsMater || // 主變從或者從變主 60 n.IsSlave != node.IsSlave || // 主變從或者從變主 61 n.NodeFlag != node.NodeFlag || // 節點標記位變了 62 n.LinkState != node.LinkState || // 節點狀態位變了 63 n.Slot.Start != node.Slot.Start || // 哈希槽變了 64 n.Slot.End != node.Slot.End || // 哈希槽變了 65 (n.RestSlots == null && node.RestSlots != null) || 66 (n.RestSlots != null && node.RestSlots == null); 67 if (!isRefresh && n.RestSlots != null && node.RestSlots != null) 68 { 69 var slots1 = n.RestSlots.OrderBy(x => x.Start).ToList(); 70 var slots2 = node.RestSlots.OrderBy(x => x.Start).ToList(); 71 for (int index = 0; index < slots1.Count; index++) 72 { 73 isRefresh = 74 slots1[index].Start != slots2[index].Start || // 哈希槽變了 75 slots1[index].End != slots2[index].End; // 哈希槽變了 76 if (isRefresh) break; 77 } 78 } 79 80 if (isRefresh) break; 81 } 82 83 if (isRefresh) 84 { 85 // 重新初始化集群 86 this.Dispose(); 87 this.Initialize(newNodes); 88 this._lastDiscoveryTime = DateTime.Now; 89 } 90 } 91 92 // 最後刷新時間在 {MONITORINTERVAL} 內,表示是最新群集信息 newest 93 return (DateTime.Now - _lastDiscoveryTime).TotalMilliseconds < MONITORINTERVAL; 94 } 95 finally 96 { 97 if (lockTaken) 98 { 99 _isDiscoverying = false; 100 Monitor.Exit(_objLock); 101 } 102 } 103 }View Code
六、配置訪問組件調用入口
最後我們需要為組件提供訪問入口,我們用 RedisCluster 類實現 字元串、列表、哈希、集合、有序集合和Keys的基本操作,並且用 RedisClusterFactory 工廠類對外提供單例操作,這樣就可以像單實例 Redis 那樣調用 Redis Cluster 集群。調用示例:
var node = new ClusterNode("127.0.0.1", 7001); var redisCluster = RedisClusterFactory.Configure(node, co