使用EasyNetQ組件操作RabbitMQ消息隊列服務

来源:https://www.cnblogs.com/wuhuacong/archive/2018/04/24/8927096.html
-Advertisement-
Play Games

RabbitMQ是一個由erlang開發的AMQP(Advanved Message Queue)的開源實現,是實現消息隊列應用的一個中間件,消息隊列中間件是分散式系統中重要的組件,主要解決應用耦合,非同步消息,流量削鋒等問題。實現高性能,高可用,可伸縮和最終一致性架構。是大型分散式系統不可缺少的中間... ...


RabbitMQ是一個由erlang開發的AMQP(Advanved Message Queue)的開源實現,是實現消息隊列應用的一個中間件,消息隊列中間件是分散式系統中重要的組件,主要解決應用耦合,非同步消息,流量削鋒等問題。實現高性能,高可用,可伸縮和最終一致性架構。是大型分散式系統不可缺少的中間件。EasyNetQ則是基於官方.NET組件RabbitMQ.Client 的又一層封裝,使用起來更加方便。本篇隨筆主要大概介紹下RabbitMQ的基礎知識和環境的準備,以及使用EasyNetQ的相關開發調用。

1、RabbitMQ基礎知識

AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標準,為面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、隊列、路由(包括點對點和發佈/訂閱)、可靠性、安全。
RabbitMQ 是一個開源的AMQP實現,伺服器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分散式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。

RabbitMQ的特點強大的應用程式消息傳遞;使用方便;運行在所有主要操作系統上;支持大量開發人員平臺;開源商業支持。消息隊列的模式有兩種模式:P2P(Point to Point),P2P模式包含三個角色:消息隊列(Queue),發送者(Sender),接收者(Receiver)。每個消息都被髮送到一個特定的隊列,接收者從隊列中獲取消息。隊列保留著消息,直到他們被消費或超時。Publish/Subscribe(Pub/Sub),包含三個角色主題(Topic),發佈者(Publisher),訂閱者(Subscriber) 。多個發佈者將消息發送到Topic,系統將這些消息傳遞給多個訂閱者。

 EasyNetQ 的目標是提供一個使.NET中的RabbitMQ儘可能簡單的庫。在EasyNetQ中消息應由.NET類型表示,消息應通過其.NET類型進行路由。EasyNetQ按消息類型進行路由。發佈消息時,EasyNetQ會檢查其類型,並根據類型名稱,命名空間和裝配體給出一個路由密鑰。在消費方面,用戶訂閱類型。訂閱類型後,該類型的消息將路由到訂戶。預設情況下,EasyNetQ使用Newtonsoft.Json庫將.NET類型序列化為JSON。這具有消息是人類可讀的優點,因此您可以使用RabbitMQ管理應用程式等工具來調試消息問題。

   EasyNetQ是在RabbitMQ.Client庫之上提供服務的組件集合。這些操作可以像序列化,錯誤處理,線程編組,連接管理等。它們由mini-IoC容器組成。您可以輕鬆地用自己的實現替換任何組件。因此,如果您希望XML序列化而不是內置的JSON,只需編寫一個ISerializer的實現並將其註冊到容器。以下是官方提供的一個結構圖,這個結構圖可以很好的解析該組件的結構:

 

2、RabbitMQ的環境準備

本處主要介紹在Windows系統中安裝RabbitMQ。

 1. 下載安裝erlang 

      下載地址 http://www.erlang.org/downloads(根據操作系統選擇32還64位)  

  2. 下載安裝rabbitmq-server

     下載地址 http://www.rabbitmq.com/install-windows.html

下載後獲得兩個安裝文件,按照順序安裝即可

 安裝erlang環境後,一般會添加了一個ERLANG_HOME的系統變數,指向erlang的安裝目錄路徑,如下所示(一般都添加了,確認下

 

安裝RabbitMQ後,在程式裡面可以看到

 我們使用它的命令行來啟動RabbitMQ的服務

查看安裝是否成功命令 :rabbitmqctl status

安裝成功,在瀏覽器中輸入 http://127.0.0.1:15672/,可以看到如下界面,使用預設的賬號密碼均為guest登陸進行管理

 guest 賬號是管理員賬號,可以添加Exchanges,Queues,Admin。但我們一般不使用guest賬號,可以選擇用命令來添加賬號和許可權,也可以使用管理界面進行添加相應的內容。

例如我添加相應的用戶賬號

一般我們還需要添加虛擬機,預設的虛擬機為/,我這裡添加了一個虛擬機myvhost。

然後綁定賬號到虛擬機上即可。

 

 3、EasyNetQ組件的使用

EasyNetQ組件的使用方式比較簡單,跟很多組件都類似,例如:建立連接,進行操作做等等,對於EasyNetQ組件也是如此。

在.NET中使用EasyNetQ,要求至少基於 .NET4.5的框架基礎上進行開發,可以直接在VS項目上使用NuGet的程式包進行添加EasyNetQ的引用。

一般添加引用後,至少包含了下麵圖示的幾個引用DLL。

 

  1)創建連接:

使用EasyNetQ連接RabbitMQ,是在應用程式啟動時創建一個IBus對象,並且,在應用程式關閉時釋放該對象。

RabbitMQ連接是基於IBus介面的,當IBus中的方法被調用,連接才會開啟。創建一個IBus對象的方法如下:

var bus = RabbitHutch.CreateBus(“host=myServer;virtualHost=myVirtualHost;username=admin;password=123456”);

與RabbitMQ伺服器的延遲連接由IBus介面表示,創建連接的方式連接字元串由格式為key = value的鍵/值對組成,每一個用分號(;)分隔。

  • host,host=localhost 或者host =192.168.1.102或者host=my.rabbitmq.com,如果用到集群配置的話,那麼可以用逗號將服務地址隔開,例如host=a.com,b.com,c.com
  • virtualHost,虛擬主機,預設為'/'
  • username,用戶登錄名
  • password,用戶登錄密碼
  • requestedHeartbeat,心跳設置,預設是10秒
  • prefetchcount,預設是50
  • pubisherConfirms,預設為false
  • persistentMessages,消息持久化,預設為true
  • product,產品名
  • platform,平臺
  • timeout,預設為10秒

一般我們在代碼裡面測試的話,簡化連接代碼如下所示。

 //初始化bus對象
 bus = RabbitHutch.CreateBus("host=localhost");

 

   2關閉連接:

bus.Dispose();

   要關閉連接,只需簡單地處理匯流排,這將關閉EasyNetQ使用的連接,渠道,消費者和所有其他資源。

如果我們在Winform窗體裡面初始化一個IBus對象,那麼在窗體關閉的時候,關閉這個介面即可。

        private void FrmPublisher_FormClosed(object sender, FormClosedEventArgs e)
        {
            //關閉IBus介面
            if(bus != null)
            {
                bus.Dispose();
            }
        }

 

   3發佈消息:

EasyNetQ支持最簡單的消息模式是發佈和訂閱。發佈消息後,任意消費者可以訂閱該消息,也可以多個消費者訂閱。並且不需要額外配置。首先,如上文中需要先創建一個IBus對象,然後,在創建一個可序列化的.NET對象。調用Publish方法即可。

var message = new MyMessage { Text = "Hello Rabbit" };
bus.Publish(message);

 

 4訂閱消息:

EasyNetQ提供了消息訂閱,當調用Subscribe方法時候,EasyNetQ會創建一個用於接收消息的隊列,不過與消息發佈不同的是,消息訂閱增加了一個參數,subscribe_id.代碼如下:

bus.Subscribe<MyMessage>("my_subscription_id", msg => Console.WriteLine(msg.Text));

第一個參數是訂閱id,另外一個是delegate參數,用於處理接收到的消息。這裡要註意的是,subscribe_id參數很重要,假如開發者用同一個subscribeid訂閱了同一種消息類型兩次或者多次,RabbitMQ會以輪訓的方式給每個訂閱的隊列發送消息。接收到之後,其他隊列就接收不到該消息。如果用不同的subscribeid訂閱同一種消息類型,那麼生成的每一個隊列都會收到該消息。

需要註意的是,在收到消息處理消息時候,不要占用太多的時間,會影響消息的處理效率,所以,遇到占用長時間的處理方法,最好用非同步處理。

為了測試發佈和訂閱消息,我們可以建立幾個不同的項目來進行測試,如發佈放在一個Winform項目,訂閱放在一個Winform項目,另外一個項目放置共用的消息對象定義,如下所示。

定義消息對象類如下所示。

    /// <summary>
    /// 定義的MQ消息類型
    /// </summary>
    public class TextMessage
    {
        public string Text { get; set; }
    }

然後在發佈消息的Winform項目上創建一個處理的窗體,並添加如下代碼。

namespace MyRabbitMQ.Publisher
{
    /// <summary>
    /// 測試RabbitMQ消息隊列的發佈
    /// </summary>
    public partial class FrmPublisher : DevExpress.XtraEditors.XtraForm
    {
        //構建一個IBus公用介面對象
        private IBus bus = null;

        public FrmPublisher()
        {
            InitializeComponent();

            //初始化bus對象
            bus = RabbitHutch.CreateBus("host=localhost");
            //對指定消息類型進行回應
            bus.Respond<MyRequest, MyResponse>(request => new MyResponse { Text = "Responding to: "+ request.Text});

            //收到消息後輸出到控制臺上顯示
            bus.Receive("my.queue", x => x
            .Add<MyMessage>(message => Console.WriteLine(message.ToJson()))
            .Add<MyOtherMessage>(message => Console.WriteLine(message.ToJson())));
        }

發佈消息的處理代碼,如下代碼所示。

        private void btnSend_Click(object sender, EventArgs e)
        {
            if (bus != null)
            {
                bus.Publish(new TextMessage
                {
                    Text = this.txtContent.Text
                });
            }
        }

然後在創建一個類似窗體,用來訂閱消息的處理窗體,如下所示代碼和窗體。

namespace MyRabbitMQ.Subcriber
{   
    /// <summary>
    /// 測試RabbitMQ消息隊列的訂閱
    /// </summary>
    public partial class FrmSubcriber : DevExpress.XtraEditors.XtraForm
    {
        //構建一個IBus公用介面對象
        private IBus bus = null;

        public FrmSubcriber()
        {
            InitializeComponent();

            //初始化bus對象
            bus = RabbitHutch.CreateBus("host=localhost");
            if(bus != null)
            {
                //訂閱一個消息,並對接收到的消息進行處理,展示在控制項上
                bus.Subscribe<TextMessage>("test", (msg) =>
                {
                    StringBuilder sb = new StringBuilder();
                    sb.AppendLine(msg.Text + "," + DateTime.Now.ToString());
                    sb.AppendLine(this.txtContent.Text);

                    this.txtContent.Invoke(new MethodInvoker(delegate()
                    {
                        this.txtContent.Text = sb.ToString();
                    }));
                });
            }

            //使用消息發送介面發送消息
            bus.Send("my.queue", new MyMessage { Text = "Hello Widgets!" });
            bus.Send("my.queue", new MyOtherMessage { Text = "Hello wuhuacong!" });
        }

發送請求獲取響應的代碼如下所示。

        private void btnRequest_Click(object sender, EventArgs e)
        {
            //定義請求消息的對象
            var request = new MyRequest()
            {
                Text = string.Format("請求消息,{0}", DateTime.Now)
            };

            //非同步獲取請求消息的結果併進行處理,展示應答消息在窗體中的
            var task = bus.RequestAsync<MyRequest, MyResponse>(request);
            task.ContinueWith(response =>
            {
                StringBuilder sb = new StringBuilder();
                sb.AppendLine(response.Result.Text);
                sb.AppendLine(this.txtContent.Text);
                this.txtContent.Invoke(new MethodInvoker(delegate()
                {
                    this.txtContent.Text = sb.ToString();
                }));
            });
        }

 

兩個項目聯合進行測試如下界面所示。

 

發佈者多次發送消息的情況下,訂閱者中,會進行消息的輪訓處理,也就是進行均勻分配。

 

  5)消息發送(Send)和接收(Receive)

與Publish/Subscribe略有不同的是,Send/Receive 可以自己定義隊列名稱。

//發送端代碼
bus.Send("my.queue", new MyMessage{ Text = "Hello Widgets!" });

//接收端代碼
bus.Receive<MyMessage>("my.queue", message => Console.WriteLine("MyMessage: {0}", message.Text));

並且,也可以在同一個隊列上發送不同的消息類型,Receive方法可以這麼寫:

bus.Receive("my.queue", x => x
    .Add<MyMessage>(message => deliveredMyMessage = message)
    .Add<MyOtherMessage>(message => deliveredMyOtherMessage = message));

如果消息到達隊列,但是沒有發現相應消息類型的處理時,EasyNetQ會發送一條消息到error隊列,並且,帶上一個異常信息:No handler found for message type <message type>。與Subscribe類型,如果在同一個隊列,同一個消息類型,多次調用Receive方法時,消息會通過輪詢的形式發送給每個Receive端。

 

   6)遠程過程調用:

var request = new TestRequestMessage {Text = "Hello from the client! "};
bus.Request<TestRequestMessage, TestResponseMessage>(request, response => 
    Console.WriteLine("Got response: '{0}'", response.Text));

   7RPC伺服器:

bus.Respond<TestRequestMessage, TestResponseMessage>(request => 
    new TestResponseMessage{ Text = request.Text + " all done!" });

   8記錄器:

var logger = new MyLogger() ;
var bus = RabbitHutch.CreateBus(“my connection string”, x => x.Register<IEasyNetQLogger>(_ => logger));

   9路由:

Publish方法,可以加一個topic參數。

bus.Publish(message, "X.A");

 消息訂閱方可以通過路由來過濾相應的消息。

  * 匹配一個字元

  #匹配0個或者多個字元

  所以 X.A.2 會匹配到 "#", "X.#", "*.A.*" 但不會匹配 "X.B.*" 或者 "A". 當消息訂閱需要用到topic時候,需要調用Subscribe的重載方法

bus.Subscribe("my_id", handlerOfXDotStar, x => x.WithTopic("X.*"));
bus.Subscribe("my_id", handlerOfStarDotB, x => x.WithTopic("*.B"));

上述這種方式,會將消息輪詢發送給兩個訂閱者,如果只需要一個訂閱者的話,可以這麼調用:

bus.Subscribe("my_id", handler, x => x.WithTopic("X.*").WithTopic("*.B"));

RabbitMQ具有非常好的功能,基於主題的路由,允許訂閱者基於多個標準過濾消息。*(星號)匹配一個字。#(哈希)匹配為零個或多個單詞。

 RabbitMQ的應用場景,一般在快速處理訂單,以及非同步的多任務處理中可以得到很好的體現,下麵是幾個應用場景。

郵件和短消息的處理

訂單的解耦處理

RabbitMQ的伺服器架構

 

3、RabbitMQ查詢狀態出現錯誤的處理

安裝成功之後使用rabbitmqctl status命令之後出現如下錯誤。

Status of node rabbit@WUHUACONG ...
Error: unable to perform an operation on node 'rabbit@WUHUACONG'. Please see diagnostics information and suggestions below.

Most common reasons for this are:

 * Target node is unreachable (e.g. due to hostname resolution, TCP connection or firewall issues)
 * CLI tool fails to authenticate with the server (e.g. due to CLI tool's Erlang cookie not matching that of the server)
 * Target node is not running

In addition to the diagnostics info below:

 * See the CLI, clustering and networking guides on http://rabbitmq.com/documentation.html to learn more
 * Consult server logs on node rabbit@WUHUACONG

DIAGNOSTICS
===========

attempted to contact: [rabbit@WUHUACONG]

rabbit@WUHUACONG:
  * connected to epmd (port 4369) on WUHUACONG
  * epmd reports node 'rabbit' uses port 25672 for inter-node and CLI tool traffic
  * TCP connection succeeded but Erlang distribution failed

  * Authentication failed (rejected by the remote node), please check the Erlang cookie


Current node details:
 * node name: rabbitmqcli100@WUHUACONG
 * effective user's home directory: C:\Users\Administrator
 * Erlang cookie hash: RgaUM2coc+rxIhJrfLS7Jw==

這個問題出現比較常見,主要原因是兩個目錄的.erlang.cookie文件內容不一樣。

要確保.erlang.cookie文件的一致性,不知道什麼原因導致了C:\Users\{UserName}\.erlang.cookie和預設情況下C:\WINDOWS\System32\config\systemprofile\.erlang.cookie不一致了,將Windows目錄下的拷貝到用戶目錄下就可以了。

反正無論如何,兩個地址的Cookie內容一致就可以了,然後重啟下RabbitMQ伺服器即可正常運行,並可以正常獲取它的狀態信息。

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • Day47筆記Linux+redis入門 Day47 知識講解:Jedis 1、Linux上jdk,mysql,tomcat安裝(看著文檔安裝) 準備工作: 因為JDK,TOMCAT,MYSQL的安裝過程中需要從網上下載部分支持包才可以繼續,所以要求同學們提前安裝下載好依賴 yum install ...
  • Golang的包管理亂得不行,各種工具橫空出世,各顯神通啊。用了幾個下來,發現 Glide 是比較好用的,使用了 vender 來進行管理,多個開發環境的版本不衝突,功能強大,配置文件也足夠簡單。初始化一個已有的工程想要引入glide進行管理→ glide create這時,Glide會掃描工程中所... ...
  • 之前用 Servlet + JSP 實現了一個 "簡易版的學生管理系統" ,在學習了 SSM 框架之後,我們來對之前寫過的項目重構一下! 技術準備 為了完成這個項目,需要掌握如下技術: Java 基礎知識 前端: HTML, CSS, JAVASCRIPT, JQUERY J2EE: Tomcat, ...
  • 從知乎作者Rui L學來的一招。應該用過 IPython 吧?想象一下,拋出異常時自動把你帶到 IPython Shell 是不是很開心?而且和普通的IPython不同,這個時候可以調用 p (print), up(up stack), down(down stack) 之類的命令。還能創建臨時變數... ...
  • 課前預習 1.編譯型語言:將源碼一次性轉化為二進位代碼再執行(嚼碎了再吃,執行效率高)優點:執行效率高;缺點:開發效率低,不能跨平臺。 解釋型語言:程式運行時,從上往下將源碼一句一句轉化為二進位的同時執行(邊解釋邊執行,一心二用所以執行效率低)優點:開發效率高,可跨平臺;缺點:執行效率低。 2.py ...
  • .NET 單元測試的利劍——模擬框架Moq 前言 這篇文章是翻譯文,因為通過自己參與的項目,越發覺得單元測試的重要性,特別是當跟業務數據打交道的時候的,Moq就如雪中送炭,所以想學習這個框架,就從這篇譯文開始吧,順便提升下自己英文閱讀水平吧,由於英語實在不行,藉助有道翻譯有時候還理解不了原文的意思。 ...
  • 1 namespace Test 2 { 3 class Program 4 { 5 static void Main(string[] args) 6 { 7 //作業:橡皮rubber鴨子、木wood鴨子、真實的鴨子realduck。 8 //三個鴨子都會游泳,而橡皮鴨子和真實的鴨... ...
  • 場景: EF底層,獲取完主表,點擊按鈕,添加主表,字表內容時,報以上錯誤 解決方案: 在EF文件的空白處右鍵--屬性,將“應用延遲載入”,改為False ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...