消息隊列 神馬是消息隊列,看看某度的原話“在項目中,將一些無需即時返回且耗時的操作提取出來,進行了非同步處理,而這種非同步處理的方式大大的節省了伺服器的請求響應時間,從而提高了系統的吞吐量”。 其實消息隊列還可以用於解耦,在多層項目模型或中型項目以上,都會用到消息隊列,減少層與層之間的耦合;還可以做跨進 ...
消息隊列
神馬是消息隊列,看看某度的原話“在項目中,將一些無需即時返回且耗時的操作提取出來,進行了非同步處理,而這種非同步處理的方式大大的節省了伺服器的請求響應時間,從而提高了系統的吞吐量”。
其實消息隊列還可以用於解耦,在多層項目模型或中型項目以上,都會用到消息隊列,減少層與層之間的耦合;還可以做跨進程間的通訊(傳輸率顯然比不上RPC)。
上一節說道最終需要採用消息隊列來進行分離前級和後級,並且採用非同步方式,用於提高業務伺服器的吞吐率,不過,雖然分離了,如果後級伺服器的處理能力達不到請求數或接近平衡,那麼分離也是無用的,甚至會影響整個系統的執行效率。比如這樣
1台業務邏輯伺服器 => 生產消息 => 消息伺服器 => 消費者(處理)
其實就等同於:
1台業務邏輯伺服器 => 消費者(處理)
或者換一種場景:
一個銀行有多個視窗,但目前只打開了一個視窗進行服務,我們假設這個視窗的服務人員是每半小時完成一個用戶,如果有10個用戶,那麼就是10*30=300分鐘,最後一個用戶需要排隊對待270分鐘後才輪到他到視窗,這是多麼荒唐的事情(很多服務行業的通病),用戶肯定會非常的不耐煩。如果我們再增開3個閑置的視窗,並且配上相應的服務人員,一次接待4位客人,那麼這個時間將會縮短3倍,變成只需要90分鐘即可輪到他。
在這個場景中,增設視窗就屬於水平擴展,而不是督促服務人員提高工作效率、這種垂直擴展來提高整體效率(畢竟不管是機器還是人,都有極限)。伺服器消息隊列中的消費者也是如此,並且相同類型(或處理邏輯相同)的擴展完全屬於傻瓜化的,可比增設視窗簡單多了。
在來看一下上一節中的最後一張圖片:
“FFmpeg伺服器...n”就屬於傻瓜式的水平擴展,想想一下:同一份代碼,部署到不同的伺服器上面,是不是特別的輕鬆。
使用RabbitMQ進行水平擴展
rabbitmq的安裝這裡就不介紹了,先搞清楚他是一個AMQP標準即可,由於我們這個項目只涉及到一個處理邏輯——音頻處理,而不討論與其他項目相關,所以我們將交換機Exchange,隊列Queue,路由關鍵字Routing Key均設為直連一根線通到底,無需中間做任何交換,當然也不需要交換機進行廣播fanout,完全的direct即可。
去重(重覆消費)的問題:
ribbitmq利用ack機制來確定消息的可靠性,但是需要消費端完全完成這條消息後才會做出應答,這樣便會造成消費不等,即一個還在處理消費,而另一也緊跟著處理這個消費。一般出在任務超時,或者沒有及時返回狀態,引起任務重新入隊列,重新消費,在rabbtimq里連接的斷開也會觸發消息重新入隊列,解決方案有很多,也可以參考冪等性方法。
將一條消息做一個唯一的標簽,例如GUID,每次在處理前先判斷這個標簽的狀態是否被處理,如果已被處理,該消費端就放棄這條消息。
廢話不多,開始:
建立任務併發送消息
首先我們需要創建一個任務,這個任務可以是個標識,也可以是一個存儲,但任務名稱必須是唯一(ID)的,用隨機字元串生成一組唯一ID,筆者提供一個方法,供大家參考:
1 ///<summary> 2 ///生成隨機字元串 3 ///</summary> 4 ///<param name="length">目標字元串的長度</param> 5 ///<param name="useNum">是否包含數字,1=包含,預設為包含</param> 6 ///<param name="useLow">是否包含小寫字母,1=包含,預設為包含</param> 7 ///<param name="useUpp">是否包含大寫字母,1=包含,預設為包含</param> 8 ///<param name="useSpe">是否包含特殊字元,1=包含,預設為不包含</param> 9 ///<param name="custom">要包含的自定義字元,直接輸入要包含的字元列表</param> 10 ///<returns>指定長度的隨機字元串</returns> 11 public static string GetRandomString(int length, bool useNum, bool useLow, bool useUpp, bool useSpe, 12 string custom) 13 { 14 byte[] b = new byte[4]; 15 new System.Security.Cryptography.RNGCryptoServiceProvider().GetBytes(b); 16 Random r = new Random(BitConverter.ToInt32(b, 0)); 17 string s = null, str = custom; 18 if (useNum == true) 19 { 20 str += "0123456789"; 21 } 22 23 if (useLow == true) 24 { 25 str += "abcdefghijklmnopqrstuvwxyz"; 26 } 27 28 if (useUpp == true) 29 { 30 str += "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; 31 } 32 33 if (useSpe == true) 34 { 35 str += "!\"#$%&'()*+,-./:;<=>?@[\\]^_`{|}~"; 36 } 37 38 for (int i = 0; i < length; i++) 39 { 40 s += str.Substring(r.Next(0, str.Length - 1), 1); 41 } 42 43 return s; 44 }View Code
再建立一個介面,用於接受來自客戶端的請求,根據請求非同步創建一個任務,並將任務名稱返回到請求客戶端。
1 var taskName = AudioParamFactory.GetRandomString(8, true, true, true, false, null); 2 3 _iMsgBusService.Pubilsh(JsonConvert.SerializeObject(new 4 { 5 frontFileUrl, 6 backgounedAudioIndex, 7 taskName 8 }), DispatchEndpoint.Media);View Code
上述代碼中直接就兩句話,一:建立一個任務名稱;二:將消息發送到名為“media”的隊列中。
為何創建連接,創建通道,配置等等都沒有呢,這是因為在easyHub的框架中已經做好了,偷會懶吧o(∩_∩)o 。
通過請求8次,那麼Media隊列中將存在8條消息,如圖所示:
存儲消費者處理後的狀態
當消費完成,處理應答是必須的,否則這條消息會永遠的存在消息伺服器中。
1 public void DoStart() 2 { 3 // 1:從消息隊列中取得需要處理的音頻消息 4 Consumer consumer = new Consumer(MqConfig.MeidaQueueName); 5 var channel = consumer.Channel; 6 consumer.ReceivedEvent += (sender, args) => 7 { 8 var msg = Encoding.UTF8.GetString(args.Body); 9 Console.WriteLine(args.RoutingKey + "\r\n" + msg); 10 Console.WriteLine(); 11 12 // 2:執行同步處理(一次只調用一個同步處理單元) 13 var nonObj = JsonConvert.DeserializeObject<Dictionary<string, object>>(msg); 14 var nonBoy = JsonConvert.DeserializeObject<dynamic>(nonObj["Body"].ToString()); 15 string forntFileUrl = nonBoy.frontFileUrl; 16 int backgounedAudioIndex = nonBoy.backgounedAudioIndex; 17 string taskName = nonBoy.taskName; 18 // 調用同步方法 19 var r = SynthesisAudio(forntFileUrl, backgounedAudioIndex, taskName); 20 Console.WriteLine(r.GetType()); 21 Console.WriteLine(typeof(AudioSynthesisSyncResult)); 22 if (r.GetType() == typeof(AudioSynthesisSyncResult)) 23 { 24 // 3:處理完成,應答隊列伺服器 25 channel.BasicAck(args.DeliveryTag, false); 26 Console.WriteLine(taskName); 27 Console.WriteLine("handler done, wait for the next message..."); 28 } 29 else 30 { 31 // 出現處理錯誤,則該條消息不做應答,併發送錯誤 32 var error = ((JsonResult) r); 33 Console.WriteLine(error.StatusCode); 34 Console.WriteLine(error.Value); 35 } 36 }; 37 }View Code
當任務進入到消息隊列,其實就和當時的請求是沒有任何聯繫的了,這樣來理解非同步也不錯,所以我們需要將任務的狀態進行分類存儲,以告訴客戶端在查詢的時候,當前的任務進行到哪一步了,我們可以用枚舉的方式來羅列:
1 public enum AudioProcessingState 2 { 3 EmptyHandler = 0, 4 StartHandler = 1, 5 DownloadAudio = 2, 6 SynthesisAudio = 3, 7 UploadAudio = 4, 8 UpdateDatabase = 5, 9 HandlerException = 6, 10 InCompleted = 7 11 }
筆者提供的任務狀態有8種,具體時候請根據自己的業務邏輯進行區分,很簡單,就是前面畫的那張垂直流程圖,不解釋。
當然,如果你把所有任務狀態都存到資料庫,那麼將會有個問題,這資料庫面對輪詢的壓力有點吃力,所以最好還是放到緩存中,至於喜歡放什麼緩存,這個根據業務場景和現有的而定,千萬別放本地緩存就行。
對了,狀態放緩存,而結果需要放資料庫,這是原則問題。
客戶端輪詢結果介面
接下來我們在創建一個提供查詢的介面,這裡實際就是查詢緩存而已,如果狀態是InCompleted,就直接從資料庫取結果,因為非常的簡單,筆者就不放代碼上來了。
不過有朋友喜歡將結果進行推送到客戶端,這也是非常好的,而且相比輪詢,推送更能減少伺服器壓力。
測試結果
為了驗證結果,筆者前前後後進行了多次的測試,在I7-2700K的WIN10上面模擬了多台伺服器,看看這截圖:
能分離的全都分離,包括請求和查詢也單列一臺伺服器。
經過測試,筆者通過模擬請求8個任務,採用逐級增加服務的方式,得到瞭如下的結果:
單機 | 最快(最早入隊)/ms |
最慢(最晚入隊)/ms |
第一次 | 3241 | 19430 |
第二次 | 3271 | 19592 |
第三次 | 4564 | 19227 |
兩台 | ||
第一次 | 4058 | 9819 |
第二次 | 3146 | 9014 |
第三次 | 4033 | 8798 |
三台 | ||
第一次 | 3880 | 9830 |
第二次 | 3477 | 7700 |
第三次 | 3182 | 6993 |
六台 | ||
第一次 | 3709 | 4800 |
第二次 | 3313 | 4773 |
第三次 | 3182 | 4793 |
最早入隊的任務時間基本鎖定在3-4s,為何會有這麼大的波動,畢竟筆者的電腦不是真正的伺服器電腦。而反觀最晚入隊的任務,在單機模式上,達到了19s,隨著逐級的增加服務(筆者電腦開6個已經吃不消了),達到了不到5s,整體時間縮短了近4倍,結果非常令人滿意。
下一節將介紹在NETCORE中如何使用中間件自動啟動任務調度,而不是採用quartz中間件。
感謝閱讀