發佈/訂閱 在之前的案例中我們創建了一個工作隊列,這個工作隊列的實現思想就是一個把每一個任務平均分配給每一個執行者,在這個篇文章我們會做一些不一樣的東西,把一個消息發送給多個消費者,這種模式就被稱作"發佈/訂閱". 為了說明這個模式,我們將要創建一個簡單的日誌系統,一個負責發佈消息,另外一個負責接收 ...
發佈/訂閱
在之前的案例中我們創建了一個工作隊列,這個工作隊列的實現思想就是一個把每一個任務平均分配給每一個執行者,在這個篇文章我們會做一些不一樣的東西,把一個消息發送給多個消費者,這種模式就被稱作"發佈/訂閱".
為了說明這個模式,我們將要創建一個簡單的日誌系統,一個負責發佈消息,另外一個負責接收列印他們.
在我們的日誌系統中,每一個運行中的接收者副本將都會獲得消息,這種方式可以讓我們在運行一個接收者直接把消息保存在磁碟的同時,另外一個消費者可以把消息列印到屏幕上.
本質上,發佈一個日誌消息將會廣播給所有的接收者
交換機(Exchanges)
在之前的文章中,我們接受和發送消息都是通過一個隊列來完成了,現在是時候引入RabbitMQ的全部工作模型了.
讓我們快速回憶一下之前涉及到的模型
--生產者(發佈者),是一個負責發送消息的用戶應用程式.
--隊列,負責存儲消息
--消費者(接收者),負責接收消息的用戶程式.
RabbitMQ的核心思想是生產者永遠不會直接把消息發送給隊列,事實上生產者甚至經常不知道一個發出去的消息是否可以有隊列去接收它.
相應的,生產者只能消息發送給交換機,交換機的工作機制非常簡單,一方面它從生產者那裡接收到消息,另一方面它會把消息發送給相應的隊列上.交換機必須要知道怎麼處理接收到的消息,它應該被放入一個特殊的隊列嗎?它是否應該被放入多個隊列?或者它是否需要被忽略.
處理這工作的方式是通過交換機類型來實現的.
這裡有幾個可用的交換機類型:direct,topic,headers,fanout 我們將會關註最後一個(fanout),讓我們創建一個fanout的交換機,名字叫做'logs'
channel.ExchangeDeclare("logs", "fanout");
這個fanout的交換機功能非常簡單(你也許已經從名字中猜到了他的方式),把接收到的消息廣播給所有已知的隊列,這個這是我們的日誌系統需要的.
列出RabbitMQ已添加的交換機:
cmd:rabbitmqctl list_exchanges
無命名的交換機:在之前的案例中我們對於交換機一無所知,但是仍然可以把消息發送到隊列上,這是因為我們使用的是一個預設的交互機,名字為空(""),回顧一下我們之前發送消息的方式
var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "",routingKey: "hello", basicProperties: null,body: body);
第一個參數就是交換機的名稱,空字元串表示預設的無命名的交換機:消息通過存在的RoutingKey被髮送到隊列上.
現在我們發送命名的交換機代替:
var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body);
臨時隊列
在之前的案例中,我們使用的隊列是一個指定了名字的隊列(記得hello 和task_queue 嗎),給一個隊名命名是嚴格的,我們需要執行者連接的同樣的隊列來工作,當你想在生產者和消費者之間共用隊列的時候指定一個隊列名是非常重要的.但是我們的日誌系統則不在此列,
我們想要監聽到所有的日誌消息,而不僅僅是他們的子集,我們也僅僅對當前正在流轉的消息感興趣,而不是老的消息,結局這個問題我們需要2件事情.
首先,無論何時我們連接到隊列,我們都需要一個新鮮的,空的隊列,為了實現這個目標我們可以每次創建一個隨機名稱的隊列,或者更加便捷的方式--讓服務為我們的隊列隨機命名.
第二,一旦我們斷開到消費者到隊列的連接,我們需要自動刪除隊列.
在.Net客戶端,我們使用無參的queueDeclare()方法來創建一個隨機命名的非持久的,自動刪除的排他隊列.
var queueName = channel.QueueDeclare().QueueName;
queueName就是一個隨機的隊列名,如:amq.gen-JzTY20BRgKO-HjmUJj0wLg.
綁定
我們已經創建了一個fanout的交換機和一個隊列,現在我們需要告訴我們交換機發送消息到我們的隊列,交換機和隊列之間的關係叫做綁定.
channel.QueueBind(queue: queueName,exchange: "logs", routingKey: "");
從現在開始logs 交換機將會把消息放入我們的隊列當中.
列出隊列cmd: rabbitmqctl list_bindings
彙總
負責發送消息的生產者可之前案例基本上是一樣的,最大的不同是我們將消息發送到了我們的命名隊列logs上而不是預設的隊列上,發送的時候我們需要使用routingKey,但是它的值是被fanout交換機忽略的.
EmitLog.cs
class EmitLog { public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "logs", type: "fanout"); var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body); Console.WriteLine(" [x] Sent {0}", message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } private static string GetMessage(string[] args) { return ((args.Length > 0) ? string.Join(" ", args) : "info: Hello World!"); } }
正如你看到的,我們在建立連接之後創建了一個隊列,這一步是必須的,因為發送到一個不存在的交換機是不被允許的。
當隊列還沒有綁定到交換機是發送的消息將會丟失,但是這對我們日誌系統來說沒有問題,當沒有消費者監聽時我們可以安全的忽略這個消息。
ReceiveLogs.cs:
class ReceiveLogs { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "logs", type: "fanout"); var queueName = channel.QueueDeclare().QueueName; channel.QueueBind(queue: queueName,exchange: "logs",routingKey: ""); Console.WriteLine(" [*] Waiting for logs."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] {0}", message); }; channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }
同時運行兩個receive,可以看到兩個接收端可以同時接收到一個消息。