RabbitMQ是一個由erlang開發的AMQP(Advanved Message Queue)的開源實現,是實現消息隊列應用的一個中間件,消息隊列中間件是分散式系統中重要的組件,主要解決應用耦合,非同步消息,流量削鋒等問題。實現高性能,高可用,可伸縮和最終一致性架構。是大型分散式系統不可缺少的中間... ...
RabbitMQ是一個由erlang開發的AMQP(Advanved Message Queue)的開源實現,是實現消息隊列應用的一個中間件,消息隊列中間件是分散式系統中重要的組件,主要解決應用耦合,非同步消息,流量削鋒等問題。實現高性能,高可用,可伸縮和最終一致性架構。是大型分散式系統不可缺少的中間件。EasyNetQ則是基於官方.NET組件RabbitMQ.Client 的又一層封裝,使用起來更加方便。本篇隨筆主要大概介紹下RabbitMQ的基礎知識和環境的準備,以及使用EasyNetQ的相關開發調用。
1、RabbitMQ基礎知識
AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標準,為面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、隊列、路由(包括點對點和發佈/訂閱)、可靠性、安全。
RabbitMQ 是一個開源的AMQP實現,伺服器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分散式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。
RabbitMQ的特點:強大的應用程式消息傳遞;使用方便;運行在所有主要操作系統上;支持大量開發人員平臺;開源和商業支持。消息隊列的模式有兩種模式:P2P(Point to Point),P2P模式包含三個角色:消息隊列(Queue),發送者(Sender),接收者(Receiver)。每個消息都被髮送到一個特定的隊列,接收者從隊列中獲取消息。隊列保留著消息,直到他們被消費或超時。Publish/Subscribe(Pub/Sub),包含三個角色主題(Topic),發佈者(Publisher),訂閱者(Subscriber) 。多個發佈者將消息發送到Topic,系統將這些消息傳遞給多個訂閱者。
EasyNetQ 的目標是提供一個使.NET中的RabbitMQ儘可能簡單的庫。在EasyNetQ中消息應由.NET類型表示,消息應通過其.NET類型進行路由。EasyNetQ按消息類型進行路由。發佈消息時,EasyNetQ會檢查其類型,並根據類型名稱,命名空間和裝配體給出一個路由密鑰。在消費方面,用戶訂閱類型。訂閱類型後,該類型的消息將路由到訂戶。預設情況下,EasyNetQ使用Newtonsoft.Json庫將.NET類型序列化為JSON。這具有消息是人類可讀的優點,因此您可以使用RabbitMQ管理應用程式等工具來調試消息問題。
EasyNetQ是在RabbitMQ.Client庫之上提供服務的組件集合。這些操作可以像序列化,錯誤處理,線程編組,連接管理等。它們由mini-IoC容器組成。您可以輕鬆地用自己的實現替換任何組件。因此,如果您希望XML序列化而不是內置的JSON,只需編寫一個ISerializer的實現並將其註冊到容器。以下是官方提供的一個結構圖,這個結構圖可以很好的解析該組件的結構:
2、RabbitMQ的環境準備
本處主要介紹在Windows系統中安裝RabbitMQ。
1. 下載安裝erlang
下載地址 http://www.erlang.org/downloads(根據操作系統選擇32還64位)
2. 下載安裝rabbitmq-server
下載地址 http://www.rabbitmq.com/install-windows.html
下載後獲得兩個安裝文件,按照順序安裝即可
安裝erlang環境後,一般會添加了一個ERLANG_HOME的系統變數,指向erlang的安裝目錄路徑,如下所示(一般都添加了,確認下)
安裝RabbitMQ後,在程式裡面可以看到
我們使用它的命令行來啟動RabbitMQ的服務
查看安裝是否成功命令 :rabbitmqctl status
安裝成功,在瀏覽器中輸入 http://127.0.0.1:15672/,可以看到如下界面,使用預設的賬號密碼均為guest登陸進行管理
guest 賬號是管理員賬號,可以添加Exchanges,Queues,Admin。但我們一般不使用guest賬號,可以選擇用命令來添加賬號和許可權,也可以使用管理界面進行添加相應的內容。
例如我添加相應的用戶賬號
一般我們還需要添加虛擬機,預設的虛擬機為/,我這裡添加了一個虛擬機myvhost。
然後綁定賬號到虛擬機上即可。
3、EasyNetQ組件的使用
EasyNetQ組件的使用方式比較簡單,跟很多組件都類似,例如:建立連接,進行操作做等等,對於EasyNetQ組件也是如此。
在.NET中使用EasyNetQ,要求至少基於 .NET4.5的框架基礎上進行開發,可以直接在VS項目上使用NuGet的程式包進行添加EasyNetQ的引用。
一般添加引用後,至少包含了下麵圖示的幾個引用DLL。
1)創建連接:
使用EasyNetQ連接RabbitMQ,是在應用程式啟動時創建一個IBus對象,並且,在應用程式關閉時釋放該對象。
RabbitMQ連接是基於IBus介面的,當IBus中的方法被調用,連接才會開啟。創建一個IBus對象的方法如下:
var bus = RabbitHutch.CreateBus(“host=myServer;virtualHost=myVirtualHost;username=admin;password=123456”);
與RabbitMQ伺服器的延遲連接由IBus介面表示,創建連接的方式連接字元串由格式為key = value的鍵/值對組成,每一個用分號(;)分隔。
- host,host=localhost 或者host =192.168.1.102或者host=my.rabbitmq.com,如果用到集群配置的話,那麼可以用逗號將服務地址隔開,例如host=a.com,b.com,c.com
- virtualHost,虛擬主機,預設為'/'
- username,用戶登錄名
- password,用戶登錄密碼
- requestedHeartbeat,心跳設置,預設是10秒
- prefetchcount,預設是50
- pubisherConfirms,預設為false
- persistentMessages,消息持久化,預設為true
- product,產品名
- platform,平臺
- timeout,預設為10秒
一般我們在代碼裡面測試的話,簡化連接代碼如下所示。
//初始化bus對象 bus = RabbitHutch.CreateBus("host=localhost");
2)關閉連接:
bus.Dispose();
要關閉連接,只需簡單地處理匯流排,這將關閉EasyNetQ使用的連接,渠道,消費者和所有其他資源。
如果我們在Winform窗體裡面初始化一個IBus對象,那麼在窗體關閉的時候,關閉這個介面即可。
private void FrmPublisher_FormClosed(object sender, FormClosedEventArgs e) { //關閉IBus介面 if(bus != null) { bus.Dispose(); } }
3)發佈消息:
EasyNetQ支持最簡單的消息模式是發佈和訂閱。發佈消息後,任意消費者可以訂閱該消息,也可以多個消費者訂閱。並且不需要額外配置。首先,如上文中需要先創建一個IBus對象,然後,在創建一個可序列化的.NET對象。調用Publish方法即可。
var message = new MyMessage { Text = "Hello Rabbit" }; bus.Publish(message);
4)訂閱消息:
EasyNetQ提供了消息訂閱,當調用Subscribe方法時候,EasyNetQ會創建一個用於接收消息的隊列,不過與消息發佈不同的是,消息訂閱增加了一個參數,subscribe_id.代碼如下:
bus.Subscribe<MyMessage>("my_subscription_id", msg => Console.WriteLine(msg.Text));
第一個參數是訂閱id,另外一個是delegate參數,用於處理接收到的消息。這裡要註意的是,subscribe_id參數很重要,假如開發者用同一個subscribeid訂閱了同一種消息類型兩次或者多次,RabbitMQ會以輪訓的方式給每個訂閱的隊列發送消息。接收到之後,其他隊列就接收不到該消息。如果用不同的subscribeid訂閱同一種消息類型,那麼生成的每一個隊列都會收到該消息。
需要註意的是,在收到消息處理消息時候,不要占用太多的時間,會影響消息的處理效率,所以,遇到占用長時間的處理方法,最好用非同步處理。
為了測試發佈和訂閱消息,我們可以建立幾個不同的項目來進行測試,如發佈放在一個Winform項目,訂閱放在一個Winform項目,另外一個項目放置共用的消息對象定義,如下所示。
定義消息對象類如下所示。
/// <summary> /// 定義的MQ消息類型 /// </summary> public class TextMessage { public string Text { get; set; } }
然後在發佈消息的Winform項目上創建一個處理的窗體,並添加如下代碼。
namespace MyRabbitMQ.Publisher { /// <summary> /// 測試RabbitMQ消息隊列的發佈 /// </summary> public partial class FrmPublisher : DevExpress.XtraEditors.XtraForm { //構建一個IBus公用介面對象 private IBus bus = null; public FrmPublisher() { InitializeComponent(); //初始化bus對象 bus = RabbitHutch.CreateBus("host=localhost"); //對指定消息類型進行回應 bus.Respond<MyRequest, MyResponse>(request => new MyResponse { Text = "Responding to: "+ request.Text}); //收到消息後輸出到控制臺上顯示 bus.Receive("my.queue", x => x .Add<MyMessage>(message => Console.WriteLine(message.ToJson())) .Add<MyOtherMessage>(message => Console.WriteLine(message.ToJson()))); }
發佈消息的處理代碼,如下代碼所示。
private void btnSend_Click(object sender, EventArgs e) { if (bus != null) { bus.Publish(new TextMessage { Text = this.txtContent.Text }); } }
然後在創建一個類似窗體,用來訂閱消息的處理窗體,如下所示代碼和窗體。
namespace MyRabbitMQ.Subcriber { /// <summary> /// 測試RabbitMQ消息隊列的訂閱 /// </summary> public partial class FrmSubcriber : DevExpress.XtraEditors.XtraForm { //構建一個IBus公用介面對象 private IBus bus = null; public FrmSubcriber() { InitializeComponent(); //初始化bus對象 bus = RabbitHutch.CreateBus("host=localhost"); if(bus != null) { //訂閱一個消息,並對接收到的消息進行處理,展示在控制項上 bus.Subscribe<TextMessage>("test", (msg) => { StringBuilder sb = new StringBuilder(); sb.AppendLine(msg.Text + "," + DateTime.Now.ToString()); sb.AppendLine(this.txtContent.Text); this.txtContent.Invoke(new MethodInvoker(delegate() { this.txtContent.Text = sb.ToString(); })); }); } //使用消息發送介面發送消息 bus.Send("my.queue", new MyMessage { Text = "Hello Widgets!" }); bus.Send("my.queue", new MyOtherMessage { Text = "Hello wuhuacong!" }); }
發送請求獲取響應的代碼如下所示。
private void btnRequest_Click(object sender, EventArgs e) { //定義請求消息的對象 var request = new MyRequest() { Text = string.Format("請求消息,{0}", DateTime.Now) }; //非同步獲取請求消息的結果併進行處理,展示應答消息在窗體中的 var task = bus.RequestAsync<MyRequest, MyResponse>(request); task.ContinueWith(response => { StringBuilder sb = new StringBuilder(); sb.AppendLine(response.Result.Text); sb.AppendLine(this.txtContent.Text); this.txtContent.Invoke(new MethodInvoker(delegate() { this.txtContent.Text = sb.ToString(); })); }); }
兩個項目聯合進行測試如下界面所示。
發佈者多次發送消息的情況下,訂閱者中,會進行消息的輪訓處理,也就是進行均勻分配。
5)消息發送(Send)和接收(Receive)
與Publish/Subscribe略有不同的是,Send/Receive 可以自己定義隊列名稱。
//發送端代碼 bus.Send("my.queue", new MyMessage{ Text = "Hello Widgets!" }); //接收端代碼 bus.Receive<MyMessage>("my.queue", message => Console.WriteLine("MyMessage: {0}", message.Text));
並且,也可以在同一個隊列上發送不同的消息類型,Receive方法可以這麼寫:
bus.Receive("my.queue", x => x .Add<MyMessage>(message => deliveredMyMessage = message) .Add<MyOtherMessage>(message => deliveredMyOtherMessage = message));
如果消息到達隊列,但是沒有發現相應消息類型的處理時,EasyNetQ會發送一條消息到error隊列,並且,帶上一個異常信息:No handler found for message type <message type>。與Subscribe類型,如果在同一個隊列,同一個消息類型,多次調用Receive方法時,消息會通過輪詢的形式發送給每個Receive端。
6)遠程過程調用:
var request = new TestRequestMessage {Text = "Hello from the client! "}; bus.Request<TestRequestMessage, TestResponseMessage>(request, response => Console.WriteLine("Got response: '{0}'", response.Text));
7)RPC伺服器:
bus.Respond<TestRequestMessage, TestResponseMessage>(request => new TestResponseMessage{ Text = request.Text + " all done!" });
8)記錄器:
var logger = new MyLogger() ; var bus = RabbitHutch.CreateBus(“my connection string”, x => x.Register<IEasyNetQLogger>(_ => logger));
9)路由:
Publish方法,可以加一個topic參數。
bus.Publish(message, "X.A");
消息訂閱方可以通過路由來過濾相應的消息。
* 匹配一個字元
#匹配0個或者多個字元
所以 X.A.2 會匹配到 "#", "X.#", "*.A.*" 但不會匹配 "X.B.*" 或者 "A". 當消息訂閱需要用到topic時候,需要調用Subscribe的重載方法
bus.Subscribe("my_id", handlerOfXDotStar, x => x.WithTopic("X.*")); bus.Subscribe("my_id", handlerOfStarDotB, x => x.WithTopic("*.B"));
上述這種方式,會將消息輪詢發送給兩個訂閱者,如果只需要一個訂閱者的話,可以這麼調用:
bus.Subscribe("my_id", handler, x => x.WithTopic("X.*").WithTopic("*.B"));
RabbitMQ具有非常好的功能,基於主題的路由,允許訂閱者基於多個標準過濾消息。*(星號)匹配一個字。#(哈希)匹配為零個或多個單詞。
RabbitMQ的應用場景,一般在快速處理訂單,以及非同步的多任務處理中可以得到很好的體現,下麵是幾個應用場景。
郵件和短消息的處理
訂單的解耦處理
RabbitMQ的伺服器架構
3、RabbitMQ查詢狀態出現錯誤的處理
安裝成功之後使用rabbitmqctl status命令之後出現如下錯誤。
Status of node rabbit@WUHUACONG ... Error: unable to perform an operation on node 'rabbit@WUHUACONG'. Please see diagnostics information and suggestions below. Most common reasons for this are: * Target node is unreachable (e.g. due to hostname resolution, TCP connection or firewall issues) * CLI tool fails to authenticate with the server (e.g. due to CLI tool's Erlang cookie not matching that of the server) * Target node is not running In addition to the diagnostics info below: * See the CLI, clustering and networking guides on http://rabbitmq.com/documentation.html to learn more * Consult server logs on node rabbit@WUHUACONG DIAGNOSTICS =========== attempted to contact: [rabbit@WUHUACONG] rabbit@WUHUACONG: * connected to epmd (port 4369) on WUHUACONG * epmd reports node 'rabbit' uses port 25672 for inter-node and CLI tool traffic * TCP connection succeeded but Erlang distribution failed * Authentication failed (rejected by the remote node), please check the Erlang cookie Current node details: * node name: rabbitmqcli100@WUHUACONG * effective user's home directory: C:\Users\Administrator * Erlang cookie hash: RgaUM2coc+rxIhJrfLS7Jw==
這個問題出現比較常見,主要原因是兩個目錄的.erlang.cookie文件內容不一樣。
要確保.erlang.cookie文件的一致性,不知道什麼原因導致了C:\Users\{UserName}\.erlang.cookie和預設情況下C:\WINDOWS\System32\config\systemprofile\.erlang.cookie不一致了,將Windows目錄下的拷貝到用戶目錄下就可以了。
反正無論如何,兩個地址的Cookie內容一致就可以了,然後重啟下RabbitMQ伺服器即可正常運行,並可以正常獲取它的狀態信息。