使用.NET Core搭建分散式音頻效果處理服務(五)利用消息隊列提升水平擴展靈活性

来源:https://www.cnblogs.com/SteveLee/archive/2018/08/17/9490914.html
-Advertisement-
Play Games

消息隊列 神馬是消息隊列,看看某度的原話“在項目中,將一些無需即時返回且耗時的操作提取出來,進行了非同步處理,而這種非同步處理的方式大大的節省了伺服器的請求響應時間,從而提高了系統的吞吐量”。 其實消息隊列還可以用於解耦,在多層項目模型或中型項目以上,都會用到消息隊列,減少層與層之間的耦合;還可以做跨進 ...


 

消息隊列

神馬是消息隊列,看看某度的原話“在項目中,將一些無需即時返回且耗時的操作提取出來,進行了非同步處理,而這種非同步處理的方式大大的節省了伺服器的請求響應時間,從而提高了系統的吞吐量”。

其實消息隊列還可以用於解耦,在多層項目模型或中型項目以上,都會用到消息隊列,減少層與層之間的耦合;還可以做跨進程間的通訊(傳輸率顯然比不上RPC)。

上一節說道最終需要採用消息隊列來進行分離前級和後級,並且採用非同步方式,用於提高業務伺服器的吞吐率,不過,雖然分離了,如果後級伺服器的處理能力達不到請求數或接近平衡,那麼分離也是無用的,甚至會影響整個系統的執行效率。比如這樣

1台業務邏輯伺服器 => 生產消息 => 消息伺服器 => 消費者(處理)

其實就等同於:

1台業務邏輯伺服器 => 消費者(處理)

或者換一種場景:

一個銀行有多個視窗,但目前只打開了一個視窗進行服務,我們假設這個視窗的服務人員是每半小時完成一個用戶,如果有10個用戶,那麼就是10*30=300分鐘,最後一個用戶需要排隊對待270分鐘後才輪到他到視窗,這是多麼荒唐的事情(很多服務行業的通病),用戶肯定會非常的不耐煩。如果我們再增開3個閑置的視窗,並且配上相應的服務人員,一次接待4位客人,那麼這個時間將會縮短3倍,變成只需要90分鐘即可輪到他。

在這個場景中,增設視窗就屬於水平擴展,而不是督促服務人員提高工作效率、這種垂直擴展來提高整體效率(畢竟不管是機器還是人,都有極限)。伺服器消息隊列中的消費者也是如此,並且相同類型(或處理邏輯相同)的擴展完全屬於傻瓜化的,可比增設視窗簡單多了。

 在來看一下上一節中的最後一張圖片:

“FFmpeg伺服器...n”就屬於傻瓜式的水平擴展,想想一下:同一份代碼,部署到不同的伺服器上面,是不是特別的輕鬆。

 

使用RabbitMQ進行水平擴展

rabbitmq的安裝這裡就不介紹了,先搞清楚他是一個AMQP標準即可,由於我們這個項目只涉及到一個處理邏輯——音頻處理,而不討論與其他項目相關,所以我們將交換機Exchange,隊列Queue,路由關鍵字Routing Key均設為直連一根線通到底,無需中間做任何交換,當然也不需要交換機進行廣播fanout,完全的direct即可。

去重(重覆消費)的問題:

ribbitmq利用ack機制來確定消息的可靠性,但是需要消費端完全完成這條消息後才會做出應答,這樣便會造成消費不等,即一個還在處理消費,而另一也緊跟著處理這個消費。一般出在任務超時,或者沒有及時返回狀態,引起任務重新入隊列,重新消費,在rabbtimq里連接的斷開也會觸發消息重新入隊列,解決方案有很多,也可以參考冪等性方法。

將一條消息做一個唯一的標簽,例如GUID,每次在處理前先判斷這個標簽的狀態是否被處理,如果已被處理,該消費端就放棄這條消息。

廢話不多,開始:

 

建立任務併發送消息

首先我們需要創建一個任務,這個任務可以是個標識,也可以是一個存儲,但任務名稱必須是唯一(ID)的,用隨機字元串生成一組唯一ID,筆者提供一個方法,供大家參考:

 1 ///<summary>
 2         ///生成隨機字元串 
 3         ///</summary>
 4         ///<param name="length">目標字元串的長度</param>
 5         ///<param name="useNum">是否包含數字,1=包含,預設為包含</param>
 6         ///<param name="useLow">是否包含小寫字母,1=包含,預設為包含</param>
 7         ///<param name="useUpp">是否包含大寫字母,1=包含,預設為包含</param>
 8         ///<param name="useSpe">是否包含特殊字元,1=包含,預設為不包含</param>
 9         ///<param name="custom">要包含的自定義字元,直接輸入要包含的字元列表</param>
10         ///<returns>指定長度的隨機字元串</returns>
11         public static string GetRandomString(int length, bool useNum, bool useLow, bool useUpp, bool useSpe,
12             string custom)
13         {
14             byte[] b = new byte[4];
15             new System.Security.Cryptography.RNGCryptoServiceProvider().GetBytes(b);
16             Random r = new Random(BitConverter.ToInt32(b, 0));
17             string s = null, str = custom;
18             if (useNum == true)
19             {
20                 str += "0123456789";
21             }
22 
23             if (useLow == true)
24             {
25                 str += "abcdefghijklmnopqrstuvwxyz";
26             }
27 
28             if (useUpp == true)
29             {
30                 str += "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
31             }
32 
33             if (useSpe == true)
34             {
35                 str += "!\"#$%&'()*+,-./:;<=>?@[\\]^_`{|}~";
36             }
37 
38             for (int i = 0; i < length; i++)
39             {
40                 s += str.Substring(r.Next(0, str.Length - 1), 1);
41             }
42 
43             return s;
44         }
View Code

 

再建立一個介面,用於接受來自客戶端的請求,根據請求非同步創建一個任務,並將任務名稱返回到請求客戶端。

1             var taskName = AudioParamFactory.GetRandomString(8, true, true, true, false, null);
2 
3             _iMsgBusService.Pubilsh(JsonConvert.SerializeObject(new
4             {
5                 frontFileUrl,
6                 backgounedAudioIndex,
7                 taskName
8             }), DispatchEndpoint.Media);
View Code

 

上述代碼中直接就兩句話,一:建立一個任務名稱;二:將消息發送到名為“media”的隊列中。

為何創建連接,創建通道,配置等等都沒有呢,這是因為在easyHub的框架中已經做好了,偷會懶吧o(∩_∩)o 。

通過請求8次,那麼Media隊列中將存在8條消息,如圖所示:

 

存儲消費者處理後的狀態

當消費完成,處理應答是必須的,否則這條消息會永遠的存在消息伺服器中。

 1 public void DoStart()
 2         {
 3             // 1:從消息隊列中取得需要處理的音頻消息
 4             Consumer consumer = new Consumer(MqConfig.MeidaQueueName);
 5             var channel = consumer.Channel;
 6             consumer.ReceivedEvent += (sender, args) =>
 7             {
 8                 var msg = Encoding.UTF8.GetString(args.Body);
 9                 Console.WriteLine(args.RoutingKey + "\r\n" + msg);
10                 Console.WriteLine();
11 
12                 // 2:執行同步處理(一次只調用一個同步處理單元)
13                 var nonObj = JsonConvert.DeserializeObject<Dictionary<string, object>>(msg);
14                 var nonBoy = JsonConvert.DeserializeObject<dynamic>(nonObj["Body"].ToString());
15                 string forntFileUrl = nonBoy.frontFileUrl;
16                 int backgounedAudioIndex = nonBoy.backgounedAudioIndex;
17                 string taskName = nonBoy.taskName;
18                 // 調用同步方法
19                 var r = SynthesisAudio(forntFileUrl, backgounedAudioIndex, taskName);
20                 Console.WriteLine(r.GetType());
21                 Console.WriteLine(typeof(AudioSynthesisSyncResult));
22                 if (r.GetType() == typeof(AudioSynthesisSyncResult))
23                 {
24                     // 3:處理完成,應答隊列伺服器
25                     channel.BasicAck(args.DeliveryTag, false);
26                     Console.WriteLine(taskName);
27                     Console.WriteLine("handler done, wait for the next message...");
28                 }
29                 else 
30                 {
31                     // 出現處理錯誤,則該條消息不做應答,併發送錯誤
32                     var error = ((JsonResult) r);
33                     Console.WriteLine(error.StatusCode);
34                     Console.WriteLine(error.Value);
35                 }
36             };
37         }
View Code

當任務進入到消息隊列,其實就和當時的請求是沒有任何聯繫的了,這樣來理解非同步也不錯,所以我們需要將任務的狀態進行分類存儲,以告訴客戶端在查詢的時候,當前的任務進行到哪一步了,我們可以用枚舉的方式來羅列:

 1     public enum AudioProcessingState
 2     {
 3         EmptyHandler = 0,
 4         StartHandler = 1,
 5         DownloadAudio = 2,
 6         SynthesisAudio = 3,
 7         UploadAudio = 4,
 8         UpdateDatabase = 5,
 9         HandlerException = 6,
10         InCompleted = 7
11     }

 

 筆者提供的任務狀態有8種,具體時候請根據自己的業務邏輯進行區分,很簡單,就是前面畫的那張垂直流程圖,不解釋。

當然,如果你把所有任務狀態都存到資料庫,那麼將會有個問題,這資料庫面對輪詢的壓力有點吃力,所以最好還是放到緩存中,至於喜歡放什麼緩存,這個根據業務場景和現有的而定,千萬別放本地緩存就行。

對了,狀態放緩存,而結果需要放資料庫,這是原則問題。

 

客戶端輪詢結果介面

接下來我們在創建一個提供查詢的介面,這裡實際就是查詢緩存而已,如果狀態是InCompleted,就直接從資料庫取結果,因為非常的簡單,筆者就不放代碼上來了。

不過有朋友喜歡將結果進行推送到客戶端,這也是非常好的,而且相比輪詢,推送更能減少伺服器壓力。

 

測試結果

為了驗證結果,筆者前前後後進行了多次的測試,在I7-2700K的WIN10上面模擬了多台伺服器,看看這截圖:

能分離的全都分離,包括請求和查詢也單列一臺伺服器。

經過測試,筆者通過模擬請求8個任務,採用逐級增加服務的方式,得到瞭如下的結果:

單機  最快(最早入隊)/ms
 最慢(最晚入隊)/ms
第一次  3241  19430
第二次  3271  19592
第三次  4564  19227
兩台    
第一次  4058  9819
第二次  3146  9014
第三次  4033  8798
三台    
第一次  3880  9830
第二次  3477  7700
第三次  3182  6993
六台    
第一次  3709  4800
第二次  3313  4773
第三次  3182  4793

最早入隊的任務時間基本鎖定在3-4s,為何會有這麼大的波動,畢竟筆者的電腦不是真正的伺服器電腦。而反觀最晚入隊的任務,在單機模式上,達到了19s,隨著逐級的增加服務(筆者電腦開6個已經吃不消了),達到了不到5s,整體時間縮短了近4倍,結果非常令人滿意。

下一節將介紹在NETCORE中如何使用中間件自動啟動任務調度,而不是採用quartz中間件。

 

感謝閱讀


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 雖然P圖 最好的還是用要學會使用PS,但是並不是每個人都會PS的,但是如果你會Python的話,也是可以為所欲為! 現代社會,不P圖的人簡直就像是恐龍一樣稀奇,大到瘦臉瘦腿瘦全身,小到濾鏡大眼高鼻梁,五花八門的手段令人應接不暇。那麼程式員作為這個星球的特殊物種,P圖才不會用毀圖秀秀這種軟體,下麵我們 ...
  • #include <stdio.h>#include <conio.h>void main(){ struct stuscore { char name[20]; float score[5]; float average; }x; int i; float sum; char rep; while ...
  • 繼續講枚舉的使用。前文說的是方法參數和pojo屬性定義成枚舉類型的好處。本文講在方法里使用枚舉的妙處。 ...
  • JAVA反射機制是在運行狀態中,對於任意一個類,都能夠知道這個類的所有屬性和方法;對於任意一個對象,都能夠調用它的任意方法和屬性;這種動態獲取信息以及動態調用對象方法的功能稱為java語言的反射機制。 一:Class類 在面向對象的世界里,萬物皆對象。類也是對象,類是java.lang.Class類 ...
  • 簡介 Python 是一種高層次的結合瞭解釋性、編譯性、互動性和麵向對象的腳本語言。Python 由 Guido van Rossum 於 1989 年底在荷蘭國家數學和電腦科學研究所發明,第一個公開發行版發行於 1991 年。 特點 易於學習:Python 有相對較少的關鍵字,結構簡單,和一個明 ...
  • 為何要用中間件來實現音頻處理的監聽服務 當然也可以使用Startup來進行服務的自啟動,或者也可以使用quartz定時調度任務來啟動音頻服務,大家隨意。 筆者認為使用中間件的目的,是為了分離應用和服務,也是一種解耦手段。 我們知道,在NETCORE中的中間件,有點類似像AOP的一種實現形式,他的調用 ...
  • http://cron.qqe2.com/ 如果不會 或者想檢驗自己是否寫的對就 通過這個網站 檢測 或自動生成 * * * * * * * [秒] [分] [小時] [日] [月] [周] [年] 共7個*號 序號 說明 是否必填 允許填寫的值 允許的通配符 1 秒 是 0-59 , - * /  ...
  • 昨天更新了VS到最新版本v15.8.0,但是編譯UWP出現了操蛋的bug。 谷歌一下,vs社區已經有答案了。 打開.csproj文件,在節點 <PropertyGroup> 裡面,加上一行 保存,重新載入項目,編譯即可。 原文參考:https://developercommunity.visuals ...
一周排行
    -Advertisement-
    Play Games
  • 前言 本文介紹一款使用 C# 與 WPF 開發的音頻播放器,其界面簡潔大方,操作體驗流暢。該播放器支持多種音頻格式(如 MP4、WMA、OGG、FLAC 等),並具備標記、實時歌詞顯示等功能。 另外,還支持換膚及多語言(中英文)切換。核心音頻處理採用 FFmpeg 組件,獲得了廣泛認可,目前 Git ...
  • OAuth2.0授權驗證-gitee授權碼模式 本文主要介紹如何筆者自己是如何使用gitee提供的OAuth2.0協議完成授權驗證並登錄到自己的系統,完整模式如圖 1、創建應用 打開gitee個人中心->第三方應用->創建應用 創建應用後在我的應用界面,查看已創建應用的Client ID和Clien ...
  • 解決了這個問題:《winForm下,fastReport.net 從.net framework 升級到.net5遇到的錯誤“Operation is not supported on this platform.”》 本文內容轉載自:https://www.fcnsoft.com/Home/Sho ...
  • 國內文章 WPF 從裸 Win 32 的 WM_Pointer 消息獲取觸摸點繪製筆跡 https://www.cnblogs.com/lindexi/p/18390983 本文將告訴大家如何在 WPF 裡面,接收裸 Win 32 的 WM_Pointer 消息,從消息裡面獲取觸摸點信息,使用觸摸點 ...
  • 前言 給大家推薦一個專為新零售快消行業打造了一套高效的進銷存管理系統。 系統不僅具備強大的庫存管理功能,還集成了高性能的輕量級 POS 解決方案,確保頁面載入速度極快,提供良好的用戶體驗。 項目介紹 Dorisoy.POS 是一款基於 .NET 7 和 Angular 4 開發的新零售快消進銷存管理 ...
  • ABP CLI常用的代碼分享 一、確保環境配置正確 安裝.NET CLI: ABP CLI是基於.NET Core或.NET 5/6/7等更高版本構建的,因此首先需要在你的開發環境中安裝.NET CLI。這可以通過訪問Microsoft官網下載並安裝相應版本的.NET SDK來實現。 安裝ABP ...
  • 問題 問題是這樣的:第三方的webapi,需要先調用登陸介面獲取Cookie,訪問其它介面時攜帶Cookie信息。 但使用HttpClient類調用登陸介面,返回的Headers中沒有找到Cookie信息。 分析 首先,使用Postman測試該登陸介面,正常返回Cookie信息,說明是HttpCli ...
  • 國內文章 關於.NET在中國為什麼工資低的分析 https://www.cnblogs.com/thinkingmore/p/18406244 .NET在中國開發者的薪資偏低,主要因市場需求、技術棧選擇和企業文化等因素所致。歷史上,.NET曾因微軟的閉源策略發展受限,儘管後來推出了跨平臺的.NET ...
  • 在WPF開發應用中,動畫不僅可以引起用戶的註意與興趣,而且還使軟體更加便於使用。前面幾篇文章講解了畫筆(Brush),形狀(Shape),幾何圖形(Geometry),變換(Transform)等相關內容,今天繼續講解動畫相關內容和知識點,僅供學習分享使用,如有不足之處,還請指正。 ...
  • 什麼是委托? 委托可以說是把一個方法代入另一個方法執行,相當於指向函數的指針;事件就相當於保存委托的數組; 1.實例化委托的方式: 方式1:通過new創建實例: public delegate void ShowDelegate(); 或者 public delegate string ShowDe ...