什麼是發佈訂閱 發佈訂閱是一種設計模式定義了一對多的依賴關係,讓多個訂閱者對象同時監聽某一個主題對象。這個主題對象在自身狀態變化時,會通知所有的訂閱者對象,使他們能夠自動更新自己的狀態。 為了描述這種模式,我們將會構建一個簡單的日誌系統。它包括兩個程式——第一個程式負責發送日誌消息,第二個程式負責獲... ...
發佈訂閱是一種設計模式定義了一對多的依賴關係,讓多個訂閱者對象同時監聽某一個主題對象。這個主題對象在自身狀態變化時,會通知所有的訂閱者對象,使他們能夠自動更新自己的狀態。
為了描述這種模式,我們將會構建一個簡單的日誌系統。它包括兩個程式——第一個程式負責發送日誌消息,第二個程式負責獲取消息並輸出內容。在我們的這個日誌系統中,所有正在運行的接收方程式都會接受消息。我們用其中一個接收者(receiver)把日誌寫入硬碟中,另外一個接受者(receiver)把日誌輸出到屏幕上。最終,日誌消息被廣播給所有的接受者(receivers)。
Exchanges
RabbitMQ消息模型的核心理念是生產者永遠不會直接發送任何消息給隊列,生產者只能發送消息給到exchange,exchange比較簡單,一邊從生產者就收消息,一邊把消息推送到隊列中。exchange必須清楚的知道消息應該按照什麼規則路由到對應的隊列中,而具體使用那種路由演算法是由exchange type決定的。AMQP協議提供了四種交換機類型:
Name(交換機類型) | Default pre-declared names(預聲明的預設名稱) |
Direct exchange(直連交換機) | (Empty string) and amq.direct |
Fanout exchange(扇型交換機) | amq.fanout |
Topic exchange(主題交換機) | amq.topic |
Headers exchange(頭交換機) | amq.match (and amq.headers in RabbitMQ) |
除交換機類型外,在聲明交換機時還可以附帶許多其他的屬性,其中最重要的幾個分別是:
- Name
- Durability (消息代理重啟後,交換機是否還存在)
- Auto-delete (當所有與之綁定的消息隊列都完成了對此交換機的使用後,刪掉它)
- Arguments(依賴代理本身)
交換機可以有兩個狀態:持久(durable)、暫存(transient)。持久化的交換機會在消息代理(broker)重啟後依舊存在,而暫存的交換機則不會(它們需要在代理再次上線後重新被聲明)。然而並不是所有的應用場景都需要持久化的交換機。
本文中具體講解下以下兩種交換機:直連交換機(前面幾個例子中使用的交換機類型),扇形交換機(本文中要使用的交換機類型)
直連交換機
直連交換機(direct exchange)可以使用消息攜帶的路由鍵(routing key)將消息投遞給對應的隊列中。用來處理消息的單播路由(unicast routing),也可以處理多播路由。
那麼它具體是如何工作的呢
- 將一個隊列綁定到某個交換機上,同時給該綁定指定一個路由鍵(routing key)
- 當一個攜帶路由鍵為R的消息被髮送到直連交換機時,交換機會把它路由給綁定值同樣為R的隊列。
直連交換機經常用來迴圈分發任務給多個工作者,當這樣做時,一定要明白,這時消息的負載均衡是發生在消費者(consumer)之間的,而不是隊列(queue)中。
直連交換機圖例:
扇形交換機
扇形交換機(funout exchange)將消息路由給綁定到它身上的所有隊列,不關心所綁定的路由鍵(routing key)。扇形交換機用來處理消息的廣播路由(broadcast routing)。
由於扇形交換機投遞消息到所有綁定他的隊列,以下幾個場景比較適合使用扇形交換機:
- 大規模多用戶線上(MMO)游戲可以使用它來處理排行榜更新等全局事件
- 體育新聞網站可以用它來近乎實時地將比分更新分發給移動客戶端
- 分發系統使用它來廣播各種狀態和配置更新
- 在群聊的時候,它被用來分發消息給參與群聊的用戶。(AMQP沒有內置presence的概念,因此XMPP可能會是個更好的選擇)
扇形交換機圖例
創建exchange
channel.ExchangeDeclare(exchange: "log_exchange", //exchange 名稱
type: ExchangeType.Fanout, //exchange 類型
durable: false,
autoDelete: false,
arguments: null);
臨時隊列
之前的幾個示例中我們在為每一個聲名的隊列都指定了一個名字,因為我們希望consumer指向正確的隊列。當我們希望在生產者和消費者之間共用隊列時,為隊列命名就非常的重要了。
不過我們要實現的日誌系統只是想要得到所有的消息,而且只對當前正在傳遞的消息感興趣,並不關心隊列的名稱,所以為了滿足我們的需求,要做兩件事情:
無論什麼時間連接到RabbitMQ我們都需要一個新的空的隊列。為了達到目的我們可以使用隨機數創建隊列,或讓伺服器給我們提供一個隨機的名稱。
一旦消費者與RabbitMQ斷開,消費者所接受的隊列都應該被自動刪除。
創建臨時隊列
//創建一個未命名的新的消息隊列,
QueueDeclareOk queue = channel.QueueDeclare(queue: "", //隊列名稱,為空時有系統自動分配
durable: false,
exclusive: false,
autoDelete: true,//自動刪除,如果該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除。這種隊列適用於臨時隊列。
arguments: null);
//或
//queue = channel.QueueDeclare();
綁定
我們已經創建了一個扇型交換機(fanout)和一個隊列。現在我們需要告訴交換機如何發送消息給我們的隊列。交換器和隊列之間的聯繫我們稱之為綁定(binding)
創建交換機與隊列的關係
//扇形交換機(funout exchange)將消息路由給綁定到它身上的所有隊列,不關心所綁定的路由鍵(routing key)
//fanout exchange不需要指定routing key 指定了也沒用
//通過綁定告訴exchange 需要發送消息到哪些消息隊列
channel.QueueBind(queue: queueName, exchange: EXCHANGE_NAME, routingKey: ROUTING_KEY, arguments: null);
完整代碼:
生產者 Pub_SubProducer.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Client;
namespace RabbitMQProducer
{
public class Pub_SubProducer
{
const string EXCHANGE_NAME = "log_exchange";
const string ROUTING_KEY = "";
//直接發送消息到交換機
public static void Publish()
{
var factory = new ConnectionFactory()
{
HostName = "127.0.0.1"
};
using (var connection = factory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: EXCHANGE_NAME, //exchange 名稱
type: ExchangeType.Fanout, //exchange 類型
durable: false,
autoDelete: false,
arguments: null);
Parallel.For(1, 100, item =>
{
string message = $"日誌內容{DateTime.Now.ToString()}";
channel.BasicPublish(exchange: EXCHANGE_NAME, routingKey: ROUTING_KEY, basicProperties: null, body: Encoding.UTF8.GetBytes(message));
Console.WriteLine(message);
});
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}
}
消費者 Pub_SubConsumer.cs
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Client.Events;
using System.IO;
namespace RabbitMQConsumer
{
public class Pub_SubConsumer
{
const string EXCHANGE_NAME = "log_exchange";
const string ROUTING_KEY = "";
//輸出到屏幕
public static void Subscribe()
{
var factory = new ConnectionFactory()
{
HostName = "127.0.0.1"
};
using (var connection = factory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: EXCHANGE_NAME, //exchange 名稱
type: ExchangeType.Fanout, //exchange 類型
durable: false,
autoDelete: false,
arguments: null);
//創建一個未命名的新的消息隊列,
QueueDeclareOk queue = channel.QueueDeclare(queue: "", //隊列名稱,為空時有系統自動分配
durable: false,
exclusive: false,
autoDelete: true,//自動刪除,如果該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除。這種隊列適用於臨時隊列。
arguments: null);
//或
//queue = channel.QueueDeclare();
string queueName = queue.QueueName;
//扇形交換機(funout exchange)將消息路由給綁定到它身上的所有隊列,不關心所綁定的路由鍵(routing key)
//fanout exchange不需要指定routing key 指定了也沒用
//通過綁定告訴exchange 需要發送消息到哪些消息隊列
channel.QueueBind(queue: queueName, exchange: EXCHANGE_NAME, routingKey: ROUTING_KEY, arguments: null);
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, args) =>
{
string message = Encoding.UTF8.GetString(args.Body);
Console.WriteLine(message);
};
channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
/// <summary>
/// 輸出到文件
/// </summary>
public static void SubscribeFile()
{
var factory = new ConnectionFactory()
{
HostName = "127.0.0.1"
};
using (var connection = factory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: EXCHANGE_NAME, //exchange 名稱
type: ExchangeType.Fanout, //exchange 類型
durable: false,
autoDelete: false,
arguments: null);
//創建一個未命名的新的消息隊列,
QueueDeclareOk queue = channel.QueueDeclare(queue: "", //隊列名稱,為空時有系統自動分配
durable: false,
exclusive: false,
autoDelete: true,//自動刪除,如果該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除。這種隊列適用於臨時隊列。
arguments: null);
//或
//queue = channel.QueueDeclare();
string queueName = queue.QueueName;
//扇形交換機(funout exchange)將消息路由給綁定到它身上的所有隊列,不關心所綁定的路由鍵(routing key)
//fanout exchange不需要指定routing key 指定了也沒用
//通過綁定告訴exchange 需要發送消息到哪些消息隊列
channel.QueueBind(queue: queueName, exchange: EXCHANGE_NAME, routingKey: ROUTING_KEY, arguments: null);
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, args) =>
{
string message = Encoding.UTF8.GetString(args.Body);
//寫入日誌到txt文件
using (StreamWriter writer = new StreamWriter(@"c:\log\log.txt", true, Encoding.UTF8))
{
writer.WriteLine(message);
writer.Close();
}
Console.WriteLine(message);
};
channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}
}
運行以上實例代碼發現,每個訂閱者實例 都能得到相同的內容。