使用.NET Core搭建分散式音頻效果處理服務(五)利用消息隊列提升水平擴展靈活性

来源:https://www.cnblogs.com/SteveLee/archive/2018/08/17/9490914.html
-Advertisement-
Play Games

消息隊列 神馬是消息隊列,看看某度的原話“在項目中,將一些無需即時返回且耗時的操作提取出來,進行了非同步處理,而這種非同步處理的方式大大的節省了伺服器的請求響應時間,從而提高了系統的吞吐量”。 其實消息隊列還可以用於解耦,在多層項目模型或中型項目以上,都會用到消息隊列,減少層與層之間的耦合;還可以做跨進 ...


 

消息隊列

神馬是消息隊列,看看某度的原話“在項目中,將一些無需即時返回且耗時的操作提取出來,進行了非同步處理,而這種非同步處理的方式大大的節省了伺服器的請求響應時間,從而提高了系統的吞吐量”。

其實消息隊列還可以用於解耦,在多層項目模型或中型項目以上,都會用到消息隊列,減少層與層之間的耦合;還可以做跨進程間的通訊(傳輸率顯然比不上RPC)。

上一節說道最終需要採用消息隊列來進行分離前級和後級,並且採用非同步方式,用於提高業務伺服器的吞吐率,不過,雖然分離了,如果後級伺服器的處理能力達不到請求數或接近平衡,那麼分離也是無用的,甚至會影響整個系統的執行效率。比如這樣

1台業務邏輯伺服器 => 生產消息 => 消息伺服器 => 消費者(處理)

其實就等同於:

1台業務邏輯伺服器 => 消費者(處理)

或者換一種場景:

一個銀行有多個視窗,但目前只打開了一個視窗進行服務,我們假設這個視窗的服務人員是每半小時完成一個用戶,如果有10個用戶,那麼就是10*30=300分鐘,最後一個用戶需要排隊對待270分鐘後才輪到他到視窗,這是多麼荒唐的事情(很多服務行業的通病),用戶肯定會非常的不耐煩。如果我們再增開3個閑置的視窗,並且配上相應的服務人員,一次接待4位客人,那麼這個時間將會縮短3倍,變成只需要90分鐘即可輪到他。

在這個場景中,增設視窗就屬於水平擴展,而不是督促服務人員提高工作效率、這種垂直擴展來提高整體效率(畢竟不管是機器還是人,都有極限)。伺服器消息隊列中的消費者也是如此,並且相同類型(或處理邏輯相同)的擴展完全屬於傻瓜化的,可比增設視窗簡單多了。

 在來看一下上一節中的最後一張圖片:

“FFmpeg伺服器...n”就屬於傻瓜式的水平擴展,想想一下:同一份代碼,部署到不同的伺服器上面,是不是特別的輕鬆。

 

使用RabbitMQ進行水平擴展

rabbitmq的安裝這裡就不介紹了,先搞清楚他是一個AMQP標準即可,由於我們這個項目只涉及到一個處理邏輯——音頻處理,而不討論與其他項目相關,所以我們將交換機Exchange,隊列Queue,路由關鍵字Routing Key均設為直連一根線通到底,無需中間做任何交換,當然也不需要交換機進行廣播fanout,完全的direct即可。

去重(重覆消費)的問題:

ribbitmq利用ack機制來確定消息的可靠性,但是需要消費端完全完成這條消息後才會做出應答,這樣便會造成消費不等,即一個還在處理消費,而另一也緊跟著處理這個消費。一般出在任務超時,或者沒有及時返回狀態,引起任務重新入隊列,重新消費,在rabbtimq里連接的斷開也會觸發消息重新入隊列,解決方案有很多,也可以參考冪等性方法。

將一條消息做一個唯一的標簽,例如GUID,每次在處理前先判斷這個標簽的狀態是否被處理,如果已被處理,該消費端就放棄這條消息。

廢話不多,開始:

 

建立任務併發送消息

首先我們需要創建一個任務,這個任務可以是個標識,也可以是一個存儲,但任務名稱必須是唯一(ID)的,用隨機字元串生成一組唯一ID,筆者提供一個方法,供大家參考:

 1 ///<summary>
 2         ///生成隨機字元串 
 3         ///</summary>
 4         ///<param name="length">目標字元串的長度</param>
 5         ///<param name="useNum">是否包含數字,1=包含,預設為包含</param>
 6         ///<param name="useLow">是否包含小寫字母,1=包含,預設為包含</param>
 7         ///<param name="useUpp">是否包含大寫字母,1=包含,預設為包含</param>
 8         ///<param name="useSpe">是否包含特殊字元,1=包含,預設為不包含</param>
 9         ///<param name="custom">要包含的自定義字元,直接輸入要包含的字元列表</param>
10         ///<returns>指定長度的隨機字元串</returns>
11         public static string GetRandomString(int length, bool useNum, bool useLow, bool useUpp, bool useSpe,
12             string custom)
13         {
14             byte[] b = new byte[4];
15             new System.Security.Cryptography.RNGCryptoServiceProvider().GetBytes(b);
16             Random r = new Random(BitConverter.ToInt32(b, 0));
17             string s = null, str = custom;
18             if (useNum == true)
19             {
20                 str += "0123456789";
21             }
22 
23             if (useLow == true)
24             {
25                 str += "abcdefghijklmnopqrstuvwxyz";
26             }
27 
28             if (useUpp == true)
29             {
30                 str += "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
31             }
32 
33             if (useSpe == true)
34             {
35                 str += "!\"#$%&'()*+,-./:;<=>?@[\\]^_`{|}~";
36             }
37 
38             for (int i = 0; i < length; i++)
39             {
40                 s += str.Substring(r.Next(0, str.Length - 1), 1);
41             }
42 
43             return s;
44         }
View Code

 

再建立一個介面,用於接受來自客戶端的請求,根據請求非同步創建一個任務,並將任務名稱返回到請求客戶端。

1             var taskName = AudioParamFactory.GetRandomString(8, true, true, true, false, null);
2 
3             _iMsgBusService.Pubilsh(JsonConvert.SerializeObject(new
4             {
5                 frontFileUrl,
6                 backgounedAudioIndex,
7                 taskName
8             }), DispatchEndpoint.Media);
View Code

 

上述代碼中直接就兩句話,一:建立一個任務名稱;二:將消息發送到名為“media”的隊列中。

為何創建連接,創建通道,配置等等都沒有呢,這是因為在easyHub的框架中已經做好了,偷會懶吧o(∩_∩)o 。

通過請求8次,那麼Media隊列中將存在8條消息,如圖所示:

 

存儲消費者處理後的狀態

當消費完成,處理應答是必須的,否則這條消息會永遠的存在消息伺服器中。

 1 public void DoStart()
 2         {
 3             // 1:從消息隊列中取得需要處理的音頻消息
 4             Consumer consumer = new Consumer(MqConfig.MeidaQueueName);
 5             var channel = consumer.Channel;
 6             consumer.ReceivedEvent += (sender, args) =>
 7             {
 8                 var msg = Encoding.UTF8.GetString(args.Body);
 9                 Console.WriteLine(args.RoutingKey + "\r\n" + msg);
10                 Console.WriteLine();
11 
12                 // 2:執行同步處理(一次只調用一個同步處理單元)
13                 var nonObj = JsonConvert.DeserializeObject<Dictionary<string, object>>(msg);
14                 var nonBoy = JsonConvert.DeserializeObject<dynamic>(nonObj["Body"].ToString());
15                 string forntFileUrl = nonBoy.frontFileUrl;
16                 int backgounedAudioIndex = nonBoy.backgounedAudioIndex;
17                 string taskName = nonBoy.taskName;
18                 // 調用同步方法
19                 var r = SynthesisAudio(forntFileUrl, backgounedAudioIndex, taskName);
20                 Console.WriteLine(r.GetType());
21                 Console.WriteLine(typeof(AudioSynthesisSyncResult));
22                 if (r.GetType() == typeof(AudioSynthesisSyncResult))
23                 {
24                     // 3:處理完成,應答隊列伺服器
25                     channel.BasicAck(args.DeliveryTag, false);
26                     Console.WriteLine(taskName);
27                     Console.WriteLine("handler done, wait for the next message...");
28                 }
29                 else 
30                 {
31                     // 出現處理錯誤,則該條消息不做應答,併發送錯誤
32                     var error = ((JsonResult) r);
33                     Console.WriteLine(error.StatusCode);
34                     Console.WriteLine(error.Value);
35                 }
36             };
37         }
View Code

當任務進入到消息隊列,其實就和當時的請求是沒有任何聯繫的了,這樣來理解非同步也不錯,所以我們需要將任務的狀態進行分類存儲,以告訴客戶端在查詢的時候,當前的任務進行到哪一步了,我們可以用枚舉的方式來羅列:

 1     public enum AudioProcessingState
 2     {
 3         EmptyHandler = 0,
 4         StartHandler = 1,
 5         DownloadAudio = 2,
 6         SynthesisAudio = 3,
 7         UploadAudio = 4,
 8         UpdateDatabase = 5,
 9         HandlerException = 6,
10         InCompleted = 7
11     }

 

 筆者提供的任務狀態有8種,具體時候請根據自己的業務邏輯進行區分,很簡單,就是前面畫的那張垂直流程圖,不解釋。

當然,如果你把所有任務狀態都存到資料庫,那麼將會有個問題,這資料庫面對輪詢的壓力有點吃力,所以最好還是放到緩存中,至於喜歡放什麼緩存,這個根據業務場景和現有的而定,千萬別放本地緩存就行。

對了,狀態放緩存,而結果需要放資料庫,這是原則問題。

 

客戶端輪詢結果介面

接下來我們在創建一個提供查詢的介面,這裡實際就是查詢緩存而已,如果狀態是InCompleted,就直接從資料庫取結果,因為非常的簡單,筆者就不放代碼上來了。

不過有朋友喜歡將結果進行推送到客戶端,這也是非常好的,而且相比輪詢,推送更能減少伺服器壓力。

 

測試結果

為了驗證結果,筆者前前後後進行了多次的測試,在I7-2700K的WIN10上面模擬了多台伺服器,看看這截圖:

能分離的全都分離,包括請求和查詢也單列一臺伺服器。

經過測試,筆者通過模擬請求8個任務,採用逐級增加服務的方式,得到瞭如下的結果:

單機  最快(最早入隊)/ms
 最慢(最晚入隊)/ms
第一次  3241  19430
第二次  3271  19592
第三次  4564  19227
兩台    
第一次  4058  9819
第二次  3146  9014
第三次  4033  8798
三台    
第一次  3880  9830
第二次  3477  7700
第三次  3182  6993
六台    
第一次  3709  4800
第二次  3313  4773
第三次  3182  4793

最早入隊的任務時間基本鎖定在3-4s,為何會有這麼大的波動,畢竟筆者的電腦不是真正的伺服器電腦。而反觀最晚入隊的任務,在單機模式上,達到了19s,隨著逐級的增加服務(筆者電腦開6個已經吃不消了),達到了不到5s,整體時間縮短了近4倍,結果非常令人滿意。

下一節將介紹在NETCORE中如何使用中間件自動啟動任務調度,而不是採用quartz中間件。

 

感謝閱讀


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 雖然P圖 最好的還是用要學會使用PS,但是並不是每個人都會PS的,但是如果你會Python的話,也是可以為所欲為! 現代社會,不P圖的人簡直就像是恐龍一樣稀奇,大到瘦臉瘦腿瘦全身,小到濾鏡大眼高鼻梁,五花八門的手段令人應接不暇。那麼程式員作為這個星球的特殊物種,P圖才不會用毀圖秀秀這種軟體,下麵我們 ...
  • #include <stdio.h>#include <conio.h>void main(){ struct stuscore { char name[20]; float score[5]; float average; }x; int i; float sum; char rep; while ...
  • 繼續講枚舉的使用。前文說的是方法參數和pojo屬性定義成枚舉類型的好處。本文講在方法里使用枚舉的妙處。 ...
  • JAVA反射機制是在運行狀態中,對於任意一個類,都能夠知道這個類的所有屬性和方法;對於任意一個對象,都能夠調用它的任意方法和屬性;這種動態獲取信息以及動態調用對象方法的功能稱為java語言的反射機制。 一:Class類 在面向對象的世界里,萬物皆對象。類也是對象,類是java.lang.Class類 ...
  • 簡介 Python 是一種高層次的結合瞭解釋性、編譯性、互動性和麵向對象的腳本語言。Python 由 Guido van Rossum 於 1989 年底在荷蘭國家數學和電腦科學研究所發明,第一個公開發行版發行於 1991 年。 特點 易於學習:Python 有相對較少的關鍵字,結構簡單,和一個明 ...
  • 為何要用中間件來實現音頻處理的監聽服務 當然也可以使用Startup來進行服務的自啟動,或者也可以使用quartz定時調度任務來啟動音頻服務,大家隨意。 筆者認為使用中間件的目的,是為了分離應用和服務,也是一種解耦手段。 我們知道,在NETCORE中的中間件,有點類似像AOP的一種實現形式,他的調用 ...
  • http://cron.qqe2.com/ 如果不會 或者想檢驗自己是否寫的對就 通過這個網站 檢測 或自動生成 * * * * * * * [秒] [分] [小時] [日] [月] [周] [年] 共7個*號 序號 說明 是否必填 允許填寫的值 允許的通配符 1 秒 是 0-59 , - * /  ...
  • 昨天更新了VS到最新版本v15.8.0,但是編譯UWP出現了操蛋的bug。 谷歌一下,vs社區已經有答案了。 打開.csproj文件,在節點 <PropertyGroup> 裡面,加上一行 保存,重新載入項目,編譯即可。 原文參考:https://developercommunity.visuals ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...