前言 本人覺得碼農的技術提升應該是從how to do到why do,而項目或產品都是從why do到how to do,按題來,所以呢下麵先從大的方面介紹一下消息隊列。 消息隊列是分散式高併發面目中必不可少的一部分,隨著互聯網、雲計算、大數據的使用,消息隊列的應用越來越多,消息隊列在系統的可伸縮性 ...
前言
本人覺得碼農的技術提升應該是從how to do到why do,而項目或產品都是從why do到how to do,按題來,所以呢下麵先從大的方面介紹一下消息隊列。
消息隊列是分散式高併發面目中必不可少的一部分,隨著互聯網、雲計算、大數據的使用,消息隊列的應用越來越多,消息隊列在系統的可伸縮性、穩定性、提升吞吐量等方面有著顯著的作用;它主要的作用一般如下:
1.通過非同步處理提高系統性能
如上圖,在不使用消息隊列伺服器的時候,用戶的請求數據直接寫入資料庫,在高併發的情況下資料庫壓力劇增,使得響應速度變慢。但是在使用消息隊列之後,用戶的請求數據發送給消息隊列之後立即 返回,再由消息隊列的消費者進程從消息隊列中獲取數據,非同步寫入資料庫。由於消息隊列伺服器處理速度快於資料庫(消息隊列也比資料庫有更好的伸縮性),因此響應速度得到大幅改善。
通過以上分析我們可以得出消息隊列具有很好的削峰作用的功能——即通過非同步處理,將短時間高併發產生的事務消息存儲在消息隊列中,從而削平高峰期的併發事務。 舉例:在電子商務一些秒殺、促銷活動中,合理使用消息隊列可以有效抵禦促銷活動剛開始大量訂單涌入對系統的衝擊。如下圖所示:
因為用戶請求數據寫入消息隊列之後就立即返回給用戶了,但是請求數據在後續的業務校驗、寫資料庫等操作中可能失敗。因此使用消息隊列進行非同步處理之後,需要適當修改業務流程進行配合,比如用戶在提交訂單之後,訂單數據寫入消息隊列,不能立即返回用戶訂單提交成功,需要在消息隊列的訂單消費者進程真正處理完該訂單之後,甚至出庫後,再通過電子郵件或簡訊通知用戶訂單成功,以免交易糾紛。這就類似我們平時手機訂火車票和電影票。
2.降低系統耦合性
我們知道模塊分散式部署以後聚合方式通常有兩種:1.分散式消息隊列和2.分散式服務。先來簡單說一下分散式服務:目前使用比較多的用來構建SOA(Service Oriented Architecture面向服務體繫結構)的分散式服務框架是阿裡巴巴開源的Dubbo.如果想深入瞭解Dubbo的可以看我寫的關於Dubbo的這一篇文章:《高性能優秀的服務框架-dubbo介紹》:https://juejin.im/post/5acadeb1f265da2375072f9c 再來談我們的分散式消息隊列:我們知道如果模塊之間不存在直接調用,那麼新增模塊或者修改模塊就對其他模塊影響較小,這樣系統的可擴展性無疑更好一些。
我們最常見的事件驅動架構類似生產者消費者模式,在大型網站中通常用利用消息隊列實現事件驅動結構。如下圖所示:
消息隊列使利用發佈-訂閱模式工作,消息發送者(生產者)發佈消息,一個或多個消息接受者(消費者)訂閱消息。 從上圖可以看到消息發送者(生產者)和消息接受者(消費者)之間沒有直接耦合,消息發送者將消息發送至分散式消息隊列即結束對消息的處理,消息接受者從分散式消息隊列獲取該消息後進行後續處理,並不需要知道該消息從何而來。對新增業務,只要對該類消息感興趣,即可訂閱該消息,對原有系統和業務沒有任何影響,從而實現網站業務的可擴展性設計。
消息接受者對消息進行過濾、處理、包裝後,構造成一個新的消息類型,將消息繼續發送出去,等待其他消息接受者訂閱該消息。因此基於事件(消息對象)驅動的業務架構可以是一系列流程。
另外為了避免消息隊列伺服器宕機造成消息丟失,會將成功發送到消息隊列的消息存儲在消息生產者伺服器上,等消息真正被消費者伺服器處理後才刪除消息。在消息隊列伺服器宕機後,生產者伺服器會選擇分散式消息隊列伺服器集群中的其他伺服器發佈消息。
前面說了這麼多消息隊列的重要性、使用場景、工作模式,有很多人就可能會說了,現有的ActiveMQ、RabbitMQ、Kafka、RocketMQ等多了去了,那在項目架構的時候選一個用上去就不行了,完全沒有必要重覆造輪子啊!本人認為對於重覆造輪子的事情和其它任何事情都是一樣的——任何事情沒有絕對的好處或者壞處,比如是剛入門的碼農、又或者很急的項目,完全可以選用現有一種通用的、成熟的產品,沒必要去從零開始做;實際上沒有任何一個優秀的產品全部使用三方的產品來組裝完成的,任何一個好一點的項目發展到一定的時候都不約而同的進行底層開發。原因很簡單:第一個任何通用型的產品總用功能覆蓋不到的場景;第二個任何通用型的產品為了實現通用必將做了一些性能或架構的犧牲;現在道理都講完了,開始動手了(都聽你逼半天,能動手就儘量少逼逼!)。
概述
動手前先構思一下,本人需要一個簡單的、可發佈訂閱的、高吞吐量的消息隊列,並將之簡單大的方面分成QServer、QClient;QServer主要有Exchange、Binding、MessageQueue構成;QClient和QServer共用一套相同的傳輸編解碼器QCoder ,主要實現Publish、Subscribe、Unsubcribe、Closes等功能;先想這麼多,開乾!
Exchange
主要在QServer中提供發佈、訂閱、連接、隊列信息等管理
1 /**************************************************************************** 2 *Copyright (c) 2018 Microsoft All Rights Reserved. 3 *CLR版本: 4.0.30319.42000 4 *機器名稱:WENLI-PC 5 *公司名稱:Microsoft 6 *命名空間:SAEA.QueueSocket.Model 7 *文件名: Exchange 8 *版本號: V1.0.0.0 9 *唯一標識:6a576aad-edcc-446d-b7e5-561a622549bf 10 *當前的用戶域:WENLI-PC 11 *創建人: yswenli 12 *電子郵箱:[email protected] 13 *創建時間:2018/3/5 16:36:44 14 *描述: 15 * 16 *===================================================================== 17 *修改標記 18 *修改時間:2018/3/5 16:36:44 19 *修改人: yswenli 20 *版本號: V1.0.0.0 21 *描述: 22 * 23 *****************************************************************************/ 24 25 using SAEA.Commom; 26 using SAEA.Sockets.Interface; 27 using System; 28 using System.Collections.Generic; 29 using System.Linq; 30 using System.Text; 31 using System.Threading; 32 using System.Threading.Tasks; 33 34 namespace SAEA.QueueSocket.Model 35 { 36 class Exchange : ISyncBase 37 { 38 object _syncLocker = new object(); 39 40 public object SyncLocker 41 { 42 get 43 { 44 return _syncLocker; 45 } 46 } 47 48 long _pNum = 0; 49 50 long _cNum = 0; 51 52 long _inNum = 0; 53 54 long _outNum = 0; 55 56 private Binding _binding; 57 58 private MessageQueue _messageQueue; 59 60 public Exchange() 61 { 62 this._binding = new Binding(); 63 64 this._messageQueue = new MessageQueue(); 65 } 66 67 68 public void AcceptPublish(string sessionID, QueueResult pInfo) 69 { 70 lock (_syncLocker) 71 { 72 this._binding.Set(sessionID, pInfo.Name, pInfo.Topic); 73 74 this._messageQueue.Enqueue(pInfo.Topic, pInfo.Data); 75 76 _pNum = this._binding.GetPublisherCount(); 77 78 Interlocked.Increment(ref _inNum); 79 } 80 } 81 82 public void AcceptPublishForBatch(string sessionID, QueueResult[] datas) 83 { 84 if (datas != null) 85 { 86 foreach (var data in datas) 87 { 88 if (data != null) 89 { 90 AcceptPublish(sessionID, data); 91 } 92 } 93 } 94 } 95 96 97 public void GetSubscribeData(string sessionID, QueueResult sInfo, int maxSize = 500, int maxTime = 500, Action<List<string>> callBack = null) 98 { 99 lock (_syncLocker) 100 { 101 var result = this._binding.GetBingInfo(sInfo); 102 103 if (result == null) 104 { 105 this._binding.Set(sessionID, sInfo.Name, sInfo.Topic, false); 106 107 _cNum = this._binding.GetSubscriberCount(); 108 109 Task.Factory.StartNew(() => 110 { 111 while (this._binding.Exists(sInfo)) 112 { 113 var list = this._messageQueue.DequeueForList(sInfo.Topic, maxSize, maxTime); 114 if (list != null) 115 { 116 list.ForEach(i => { Interlocked.Increment(ref _outNum); }); 117 callBack?.Invoke(list); 118 list.Clear(); 119 list = null; 120 } 121 } 122 }); 123 } 124 } 125 } 126 127 public void Unsubscribe(QueueResult sInfo) 128 { 129 Interlocked.Decrement(ref _cNum); 130 this._binding.Del(sInfo.Name, sInfo.Topic); 131 } 132 133 public void Clear(string sessionID) 134 { 135 lock (_syncLocker) 136 { 137 var data = this._binding.GetBingInfo(sessionID); 138 139 if (data != null) 140 { 141 if (data.Flag) 142 { 143 Interlocked.Decrement(ref _pNum); 144 } 145 else 146 { 147 Interlocked.Decrement(ref _cNum); 148 } 149 this._binding.Remove(sessionID); 150 } 151 } 152 } 153 154 public Tuple<long, long, long, long> GetConnectInfo() 155 { 156 return new Tuple<long, long, long, long>(_pNum, _cNum, _inNum, _outNum); 157 } 158 159 public List<Tuple<string, long>> GetQueueInfo() 160 { 161 List<Tuple<string, long>> result = new List<Tuple<string, long>>(); 162 lock (_syncLocker) 163 { 164 var list = this._messageQueue.ToList(); 165 if (list != null) 166 { 167 var tlts = list.Select(b => b.Topic).Distinct().ToList(); 168 169 if (tlts != null) 170 { 171 foreach (var topic in tlts) 172 { 173 var count = this._messageQueue.GetCount(topic); 174 var t = new Tuple<string, long>(topic, count); 175 result.Add(t); 176 } 177 tlts.Clear(); 178 } 179 list.Clear(); 180 } 181 } 182 return result; 183 } 184 185 } 186 }
思維發散:這裡可以增加全局消息隊列、指定連接消息隊列等;將連接通過類型redis cluster模式進行一個均衡分佈等
Binding
主要功能是將連接、主題進行映射管理
1 /**************************************************************************** 2 *Copyright (c) 2018 Microsoft All Rights Reserved. 3 *CLR版本: 4.0.30319.42000 4 *機器名稱:WENLI-PC 5 *公司名稱:Microsoft 6 *命名空間:SAEA.QueueSocket.Model 7 *文件名: Binding 8 *版本號: V1.0.0.0 9 *唯一標識:7472dabd-1b6a-4ffe-b19f-2d1cf7348766 10 *當前的用戶域:WENLI-PC 11 *創建人: yswenli 12 *電子郵箱:[email protected] 13 *創建時間:2018/3/5 17:10:19 14 *描述: 15 * 16 *===================================================================== 17 *修改標記 18 *修改時間:2018/3/5 17:10:19 19 *修改人: yswenli 20 *版本號: V1.0.0.0 21 *描述: 22 * 23 *****************************************************************************/ 24 25 using SAEA.Commom; 26 using SAEA.Sockets.Interface; 27 using System; 28 using System.Collections.Generic; 29 using System.Linq; 30 using System.Text; 31 32 namespace SAEA.QueueSocket.Model 33 { 34 /// <summary> 35 /// 連接與主題的映射 36 /// </summary> 37 class Binding : ISyncBase, IDisposable 38 { 39 List<BindInfo> _list = new List<BindInfo>(); 40 41 object _syncLocker = new object(); 42 43 public object SyncLocker 44 { 45 get 46 { 47 return _syncLocker; 48 } 49 } 50 51 bool _isDisposed = false; 52 53 int _minutes = 10; 54 55 public Binding(int minutes = 10) 56 { 57 _minutes = minutes; 58 59 ThreadHelper.PulseAction(() => 60 { 61 lock (_syncLocker) 62 { 63 var list = _list.Where(b => b.Expired <= DateTimeHelper.Now).ToList(); 64 if (list != null) 65 { 66 list.ForEach(item => 67 { 68 _list.Remove(item); 69 }); 70 list.Clear(); 71 list = null; 72 } 73 } 74 }, new TimeSpan(0, 0, 10), _isDisposed); 75 } 76 77 78 public void Set(string sessionID, string name, string topic, bool isPublisher = true) 79 { 80 81 lock (_syncLocker) 82 { 83 var result = _list.FirstOrDefault(b => b.Name == name && b.Topic == topic); 84 if (result == null) 85 { 86 _list.Add(new BindInfo() 87 { 88 SessionID = sessionID, 89 Name = name, 90 Topic = topic, 91 Flag = isPublisher, 92 Expired = DateTimeHelper.Now.AddMinutes(_minutes) 93 }); 94 } 95 else 96 { 97 result.Expired = DateTimeHelper.Now.AddMinutes(_minutes); 98 } 99 } 100 } 101 102 public void Del(string sessionID, string topic) 103 { 104 lock (_syncLocker) 105 { 106 var result = _list.FirstOrDefault(b => b.Name == sessionID && b.Topic == topic); 107 if (result != null) 108 { 109 _list.Remove(result); 110 } 111 } 112 } 113 114 public void Remove(string sessionID) 115 { 116 lock (_syncLocker) 117 { 118 var result = _list.Where(b => b.SessionID == sessionID).ToList(); 119 if (result != null) 120 { 121 result.ForEach((item) => 122 { 123 _list.Remove(item); 124 }); 125 result.Clear(); 126 } 127 } 128 } 129 130 public BindInfo GetBingInfo(QueueResult sInfo) 131 { 132 lock (_syncLocker) 133 { 134 var bi = _list.FirstOrDefault(b => b.Name == sInfo.Name && b.Topic == sInfo.Topic); 135 136 if (bi != null) 137 { 138 if (bi.Expired <= DateTimeHelper.Now) 139 { 140 Remove(bi.SessionID); 141 } 142 else 143 { 144 return bi; 145 } 146 } 147 return null; 148 } 149 } 150 151 public BindInfo GetBingInfo(string sessionID) 152 { 153 lock (_syncLocker) 154 { 155 return _list.FirstOrDefault(b => b.SessionID == sessionID); 156 } 157 } 158 159 public bool Exists(QueueResult sInfo) 160 { 161 lock (_syncLocker) 162 { 163 var data = _list.FirstOrDefault(b => b.Name == sInfo.Name && b.Topic == sInfo.Topic); 164 165 if (data != null) 166 { 167 if (data.Expired <= DateTimeHelper.Now) 168 { 169 Remove(data.SessionID); 170 171 return false; 172 } 173 174 data.Expired = DateTimeHelper.Now.AddMinutes(_minutes); 175 176 return true; 177 } 178 } 179 return false; 180 } 181 182 183 public IEnumerable<BindInfo> GetPublisher() 184 { 185 lock (_syncLocker) 186 { 187 return _list.Where(b => b.Flag); 188 } 189 } 190 191 public int GetPublisherCount() 192 { 193 lock (_syncLocker) 194 { 195 return _list.Where(b => b.Flag).Count(); 196 } 197 } 198 199 public IEnumerable<BindInfo> GetSubscriber() 200 { 201 lock (_syncLocker) 202 { 203 return _list.Where(b => !b.Flag); 204 } 205 } 206 207 public int GetSubscriberCount() 208 { 209 lock (_syncLocker) 210 { 211 return _list.Where(b => !b.Flag).Count(); 212 } 213 } 214 215 216 public void Dispose() 217 { 218 _isDisposed = true; 219 lock (_syncLocker) 220 { 221 _list.Clear(); 222 _list = null; 223 } 224 } 225 } 226 }
思維發散:實現多個QServer的主題與隊列映射克隆、或者隊列消息轉發實現容災集群或大容量集群等
MessageQueue
將主題與隊列形成一個映射,並對主題映射進行管理
1 /**************************************************************************** 2 *Copyright (c) 2018 Microsoft All Rights Reserved. 3 *CLR版本: 4.0.30319.42000 4 *機器名稱:WENLI-PC 5 *公司名稱:Microsoft 6 *命名空間:SAEA.QueueSocket.Model 7 *文件名: QueueCollection 8 *版本號: V1.0.0.0 9 *唯一標識:89a65c12-c4b3-486b-a933-ad41c3db6621 10 *當前的用戶域:WENLI-PC 11 *創建人: yswenli 12 *電子郵箱:[email protected] 13 *創建時間:2018/3/6 10:31:11 14 *描述: 15 * 16 *===================================================================== 17 *修改標記 18 *修改時間:2018/3/6 10:31:11 19 *修改人: yswenli 20 *版本號: V1.0.0.0 21 *描述: 22 * 23 *****************************************************************************/ 24 25 using SAEA.Commom; 26 using SAEA.Sockets.Interface; 27 using System; 28 using System.Collections.Concurrent; 29 using System.Collections.Generic; 30 using System.Linq; 31 using System.Threading.Tasks; 32 33 namespace SAEA.QueueSocket.Model 34 { 35 public class MessageQueue : ISyncBase, IDisposable 36 { 37 bool _isDisposed = false; 38 39 ConcurrentDictionary<string, QueueBase> _list; 40 41 object _syncLocker = new object(); 42 43 public object SyncLocker 44 { 45 get 46 { 47 return _syncLocker; 48 } 49 } 50