.Net使用RabbitMQ詳解

来源:http://www.cnblogs.com/knowledgesea/archive/2016/03/21/5296008.html
-Advertisement-
Play Games

這幾天呢,公司風波再起,去年一年公司CTO換啦4任,CEO換啦三個,這不剛來個新老大,感覺還不錯,卻沒乾過3個月又要走,索性趁老大們走來走去的時候,就給自己空出來,稍稍總結一下剛寫的一個日誌服務組件中用到的RabbitMQ,在.net中的實戰中應用。 首先不去討論我的日誌組件怎麼樣。因為有些日誌需要


序言

這幾天呢,公司風波再起,去年一年公司CTO換啦4任,CEO換啦三個,這不剛來個新老大,感覺還不錯,卻沒乾過3個月又要走,索性趁老大們走來走去的時候,就給自己空出來,稍稍總結一下剛寫的一個日誌服務組件中用到的RabbitMQ,在.net中的實戰中應用。

首先不去討論我的日誌組件怎麼樣。因為有些日誌需要走網路,有的又不需要走網路,也是有性能與業務場景的多般變化在其中,就把他拋開,我們只談消息RabbitMQ。

那麼什麼是RabbitMQ,它是用來解決什麼問題的,性能如何,又怎麼用?我會在下麵一一闡述,如有錯誤,不到之處,還望大家不吝賜教。

RabbitMQ簡介

必須一提的是rabbitmq是由LShift提供的一個消息隊列協議(AMQP)的開源實現,由以高性能、健壯以及可伸縮性出名的Erlang寫成(因此也是繼承了這些優點)。

百度百科對RabbitMQ闡述也非常明確,建議去看下,還有amqp協議。

RabbitMQ官網:http://www.rabbitmq.com/ 如果你要下載安裝,那麼必須先把Erlang語言裝上。

RabbitMQ的.net客戶端,可以在nuget中輸入rabbitmq輕鬆獲得。

RabbitMQ與其他消息隊列的對比,早有仙人給寫出來。 Message Queue Shootout

這篇文章中的測試案例為:1百萬條1k的消息,每秒種的收發情況如下圖。

 

如果你安裝好啦,rabbitmq,他會提供一個操作監控頁面,頁面如下,他幾乎提供啦,對rabbitmq的所有操作,與監控,所以,你裝上後,自己多看看,多操作下。

 

RabbitMQ中的一些名詞闡述與消息從投遞到消費的整個過程

從上圖的標題中可以看到一些陌生的英文單詞,讓我們感覺一無所知,更無從操作,那麼我給大家弄啦一個圖片大家可以看下,或許對您理解這些新鮮的單詞有所幫助。

 

看過這些名詞,之後,或許你還毫無頭緒,那麼我把消息從生產到消費的整個流程給大家說一下,或許會更深入一點,其中Exchange,與Queue都是可以設置相關屬性,隊列的持久化,交換器類型制定。

 

Note:首先這個過程走分三個部分,1、客戶端(生產消息隊列),2、RabbitMQ服務端(負責路由規則的綁定與消息的分發),3、客戶端(消費消息隊列中的消息)

 

 

Note:由圖可以看出,一個消息可以走一次網路卻被分發到不同的消息隊列中,然後被多個的客戶端消費,那麼這個過程就是RabbitMQ的核心機制,RabbitMQ的路由類型與消費模式。

RabbitMQ中Exchange的類型

類型有4種,direct,fanout,topic,headers。其中headers不常用,本篇不做介紹,其他三種類型,會做詳細介紹。

那麼這些類型是什麼意思呢?就是Exchange與隊列進行綁定後,消息根據exchang的類型,按照不同的綁定規則分發消息到消息隊列中,可以是一個消息被分發給多個消息隊列,也可以是一個消息分發到一個消息隊列。具體請看下文。

介紹之初還要說下RoutingKey,這是個什麼玩意呢?他是exchange與消息隊列綁定中的一個標識。有些路由類型會按照標識對應消息隊列,有些路由類型忽略routingkey。具體看下文。

1、Exchange類型direct

他是根據交換器名稱與routingkey來找隊列的。

 

Note:消息從client發出,傳送給交換器ChangeA,RoutingKey為routingkey.ZLH,那麼不管你發送給Queue1,還是Queue2一個消息都會保存在Queue1,Queue2,Queue3,三個隊列中。這就是交換器的direct類型的路由規則。只要找到路由器與routingkey綁定的隊列,那麼他有多少隊列,他就分發給多少隊列。

2、Exchange類型fanout

這個類型忽略Routingkey,他為廣播模式。

 

Note:消息從客戶端發出,只要queue與exchange有綁定,那麼他不管你的Routingkey是什麼他都會將消息分發給所有與該exchang綁定的隊列中。

3、Exchange類型topic

這個類型的路由規則如果你掌握啦,那是相當的好用,與靈活。他是根據RoutingKey的設置,來做匹配的,其中這裡還有兩個通配符為:

*,代表任意的一個詞。例如topic.zlh.*,他能夠匹配到,topic.zlh.one ,topic.zlh.two ,topic.zlh.abc, ....

#,代表任意多個詞。例如topic.#,他能夠匹配到,topic.zlh.one ,topic.zlh.two ,topic.zlh.abc, ....

 

Note:這個圖看上去很亂,但是他是根據匹配符做匹配的,這裡我建議你自己做下消息隊列的具體操作。

具體操作如下

 public static void Producer(int value)
        {
            try
            {
                var qName = "lhtest1";
                var exchangeName = "fanoutchange1";
                var exchangeType = "fanout";//topic、fanout
                var routingKey = "*";
                var uri = new Uri("amqp://192.168.10.121:5672/");
                var factory = new ConnectionFactory
                {
                    UserName = "123",
                    Password = "123",
                    RequestedHeartbeat = 0,
                    Endpoint = new AmqpTcpEndpoint(uri)
                };
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        //設置交換器的類型
                        channel.ExchangeDeclare(exchangeName, exchangeType);
                        //聲明一個隊列,設置隊列是否持久化,排他性,與自動刪除
                        channel.QueueDeclare(qName, true, false, false, null);
                        //綁定消息隊列,交換器,routingkey
                        channel.QueueBind(qName, exchangeName, routingKey);
                        var properties = channel.CreateBasicProperties();
                        //隊列持久化
                        properties.Persistent = true;
                        var m = new QMessage(DateTime.Now, value+"");
                        var body = Encoding.UTF8.GetBytes(DoJson.ModelToJson<QMessage>(m));
                        //發送信息
                        channel.BasicPublish(exchangeName, routingKey, properties, body);
                    }
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }

消息隊列的消費與消息確認Ack

1、消息隊列的消費

Note:如果一個消息隊列中有大量消息等待操作時,我們可以用多個客戶端來處理消息,這裡的分發機制是採用負載均衡演算法中的輪詢。第一個消息給A,下一個消息給B,下下一個消息給A,下下下一個消息給B......以此類推。

2、為啦保證消息的安全性,保證此消息被正確處理後才能在服務端的消息隊列中刪除。那麼rabbitmq提供啦ack應答機制,來實現這一功能。

ack應答有兩種方式:1、自動應答,2、手動應答。具體實現如下。

 public static void Consumer()
        {
            try
            {
                var qName = "lhtest1";
                var exchangeName = "fanoutchange1";
                var exchangeType = "fanout";//topic、fanout
                var routingKey = "*";
                var uri = new Uri("amqp://192.168.10.121:5672/");
                var factory = new ConnectionFactory
                {
                    UserName = "123",
                    Password = "123",
                    RequestedHeartbeat = 0,
                    Endpoint = new AmqpTcpEndpoint(uri)
                };
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchangeName, exchangeType);
                        channel.QueueDeclare(qName, true, false, false, null);
                        channel.QueueBind(qName, exchangeName, routingKey);
                        //定義這個隊列的消費者
                        QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
                        //false為手動應答,true為自動應答
                        channel.BasicConsume(qName, false, consumer);
                        while (true)
                        {
                            BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();                           
                            byte[] bytes = ea.Body;
                            var messageStr = Encoding.UTF8.GetString(bytes);
                            var message = DoJson.JsonToModel<QMessage>(messageStr);
                            Console.WriteLine("Receive a Message, DateTime:" + message.DateTime.ToString("yyyy-MM-dd HH:mm:ss") + " Title:" + message.Title);
                            //如果是自動應答,下下麵這句代碼不用寫啦。
                            if ((Convert.ToInt32(message.Title) % 2) == 1)
                            {
                                channel.BasicAck(ea.DeliveryTag, false);
                            }
                        }
                    }
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }

總結

RabbitMQ個人感覺比較簡易,文章寫的也可能比較簡易,呵呵,見諒。如果您開發中用到啦RabbitMQ,或者開發中有什麼疑惑,歡迎加入左上方的群,我們一起討論學習。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • scala如果你想實現像if ,while等類似的控制結構,根本沒有值要傳入花括弧{}直接的代碼里。要怎麼實現。 比如說,我們要實現一個斷言的控制結構,myAssert(), myAssert函數參考自帶參數和一個標示變數,如果標示位為false,則什麼都不做,如果為true則進行正常判斷。 下麵是
  • 用autohotkey腳本,調整常用按鍵,另外定義常用代碼塊為巨集。來實現快速輸入powershell腳本。
  • scala是一種函數式編程風格的語言,除了常見的if......else ,for ,while等傳統的流程式控制制結構,也可以自定義流程式控制制的控制結構。 再瞭解scala如何實現編寫新的流程結構,我們瞭解一下頭等函數的概念; scala的函數是頭等函數(first-class function).你不
  • 輸出為1
  • 一、順序結構(從上往下依次執行) 順序結構語法比較簡單,從上往下依次執行即可。 二、選擇結構(選擇性執行,如果....則.....) 1.if 語句 if語句,作用是根據判斷結果為真或假,選擇其中一個分支執行。 if (條件判斷,結果為布爾值){ 條件判斷為真時的執行語句(只能是一句語句,如果是多句
  • 迭代器模式,在不需要瞭解內部實現的前提下,遍歷一個聚合對象的內部元素。相比於傳統的編程模式,迭代器模式可以隱藏遍歷元素所需要的操作。 AllHacl.php index.php Hacl類相關內容參考數據對象映射模式。http://www.cnblogs.com/tianxintian22/p/52
  • (一)變數的命名及賦值: var=value (1)、其中var是變數名,value是變數的值。如果value不包含任何空白字元(空格),就不需要包含單引號或雙引號 (2)、var=value不同於var (空格)=(空格)value,前者是賦值操作,後者是相等操作 (3)、在變數名前加$或者$(v
  • 本實驗主要是實現蜂鳴器的操作,蜂鳴器的操作是非常簡單的,只有把簡單的事情做好,方可談其他複雜的事。本實驗部分會利用verilog一些巨集定義語句,其實在VGA實驗部分已經出現過,這裡為了鞏固,再次調用相關巨集定義命令,已達到最大化的可移植性,請讀者務必掌握這種用法,很實用。 談及蜂鳴器或者LED,多多少
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...