近期,業務調整,需要內網讀取數據後存入到外網,同時,其他伺服器也需要讀取數據,於是我又盯上了RabbitMQ。在展開業務代碼前,先看下RabbitMQ整體架構,可以看到Exchange和隊列是多對多關係。 下麵,我們詳細說說RabbitMQ的隊列模式:簡單隊列、工作隊列、發佈訂閱模式、路由模式、主題 ...
近期,業務調整,需要內網讀取數據後存入到外網,同時,其他伺服器也需要讀取數據,於是我又盯上了RabbitMQ。在展開業務代碼前,先看下RabbitMQ整體架構,可以看到Exchange和隊列是多對多關係。
下麵,我們詳細說說RabbitMQ的隊列模式:簡單隊列、工作隊列、發佈訂閱模式、路由模式、主題模式、RPC模式。其中簡單隊列、工作隊列在前文 組件使用(一)中以提到了 ,感興趣的可以看看,本文主要闡述 發佈訂閱模式、路由模式、主題模式。
- 發佈訂閱模式
無選擇接收消息,一個消息生產者,一個交換器,多個消息隊列,多個消費者。稱為發佈/訂閱模式。在應用中,只需要簡單的將隊列綁定到交換機上。
一個發送到交換機的消息都會被轉發到與該交換機綁定的所有隊列上。像子網廣播,每檯子網內的主機都獲得了一份複製的消息。 可以將消息發送給不同類型的消費者。做到發佈一次,多個消費者來消費。
P 表示為生產者、 X 表示交換機、C1C2 表示為消費者,紅色表示隊列。
- 路由模式
在發佈/訂閱模式的基礎上,有選擇的接收消息,也就是通過 routing 路由進行匹配條件是否滿足接收消息。
路由模式跟發佈訂閱模式類似,然後在訂閱模式的基礎上加上了類型,訂閱模式是分發到所有綁定到交換機的隊列,路由模式只分發到綁定在交換機上面指定路由鍵的隊列,我們可以看一下下麵這張圖:
P 表示為生產者、 X 表示交換機、C1C2 表示為消費者,紅色表示隊列。
上圖是一個結合日誌消費級別的配圖,在路由模式它會把消息路由到那些 binding key 與 routing key 完全匹配的 Queue 中,此模式也就是 Exchange 模式中的direct模式。 以上圖的配置為例,我們以 routingKey=“error” 發送消息到 Exchange,則消息會路由到Queue1(amqp.gen-S9b…,這是由RabbitMQ自動生成的Queue名稱)和Queue2(amqp.gen-Agl…)。如果我們以 routingKey=“info” 或 routingKey=“warning” 來發送消息,則消息只會路由到 Queue2。如果我們以其他 routingKey 發送消息,則消息不會路由到這兩個 Queue 中。
- 主題模式
同樣是在發佈/訂閱模式的基礎上,根據主題匹配進行篩選是否接收消息,比第四類更靈活。 topics 主題模式跟 routing 路由模式類似,只不過路由模式是指定固定的路由鍵 routingKey,而主題模式是可以模糊匹配路由鍵 routingKey,類似於SQL中 = 和 like 的關係。
P 表示為生產者、 X 表示交換機、C1C2 表示為消費者,紅色表示隊列。
topics 模式與 routing 模式比較相近,topics 模式不能具有任意的 routingKey,必須由一個英文句點號“.”分隔的字元串(我們將被句點號“.”分隔開的每一段獨立的字元串稱為一個單詞),比如 “lazy.orange.fox”。topics routingKey 中可以存在兩種特殊字元“*”與“#”,用於做模糊匹配,其中“*”用於匹配一個單詞,“#”用於匹配多個單詞(可以是零個)。 "*“ 表示任何一個詞 ”#" 表示0或多個詞 以上圖中的配置為例: 如果一個消息的 routingKey 設置為 “xxx.orange.rabbit”,那麼該消息會同時路由到 Q1 與 Q2,routingKey="lazy.orange.fox”的消息會路由到Q1與Q2; routingKey="lazy.brown.fox”的消息會路由到 Q2; routingKey="lazy.pink.rabbit”的消息會路由到 Q2(只會投遞給Q2一次,雖然這個routingKey 與 Q2 的兩個 bindingKey 都匹配); routingKey=“quick.brown.fox”、routingKey=”orange”、routingKey="quick.orange.male.rabbit”的消息將會被丟棄,因為它們沒有匹配任何bindingKey。
- RPC模式
與上面3種所不同之處,RPC模式是擁有請求/回覆的。也就是有響應的。RPC是指遠程過程調用,也就是說兩台伺服器A,B,一個應用部署在A伺服器上,想要調用B伺服器上應用提供的函數/方法,由於不在一個記憶體空間,不能直接調用,需要通過網路來表達調用的語義和傳達調用的數據。
為什麼使用RPC呢?就是無法在一個進程內,甚至一個電腦內通過本地調用的方式完成的需求,比如不同的系統間的通訊,甚至不同的組織間的通訊。由於計算能力需要橫向擴展,需要在多台機器組成的集群上部署應用,RPC的協議有很多,比如最早的CORBA,Java RMI,Web Service的RPC風格,Hessian,Thrift,甚至Rest API。
RPC的處理流程:
-
- 當客戶端啟動時,創建一個匿名的回調隊列。
- 客戶端為RPC請求設置2個屬性:replyTo,設置回調隊列名字;correlationId,標記request。
- 請求被髮送到rpc_queue隊列中。
- RPC伺服器端監聽rpc_queue隊列中的請求,當請求到來時,伺服器端會處理並且把帶有結果的消息發送給客戶端。接收的隊列就是replyTo設定的回調隊列。
- 客戶端監聽回調隊列,當有消息時,檢查correlationId屬性,如果與request中匹配,那就是結果了。
以上就是多模式的簡介,在實際生產中,我們不同模式需要定義自己交換機,其中:直接交換機、主題交換機、扇形交換機、預設交換機是常用模式。如圖:
直接交換機、主題交換機、扇形交換機相關源碼不多再贅述,相關代碼如下:
- 連接類
public class RabbitMQConnectHelper
{
/// <summary>
/// 單方式連接
/// </summary>
/// <returns></returns>
public static IConnection GetConnection()
{
var factory = new ConnectionFactory
{
HostName = "127.0.0.1",
Port = 5672,
UserName = "gerry",
Password = "gerry",
VirtualHost = "/",//虛擬主機
};
return factory.CreateConnection();
}
/// <summary>
/// 集群方式連接
/// </summary>
/// <returns></returns>
public static IConnection GetClusterConnectiont()
{
var factory = new ConnectionFactory
{
UserName = "gerry",
Password = "gerry",
VirtualHost = "/",//虛擬主機
};
List<AmqpTcpEndpoint> host_list = new List<AmqpTcpEndpoint>
{
new AmqpTcpEndpoint(){HostName= "127.0.0.1",Port=5672},
new AmqpTcpEndpoint(){HostName= "127.0.0.1",Port=5673},
new AmqpTcpEndpoint(){HostName= "127.0.0.1",Port=5674}
};
return factory.CreateConnection(host_list);
}
}
- 生產者
public class RabbitMQ_Producer
{
/// <summary>
/// Fanout 交換機,扇形隊列,數據同步發送到所有隊列
/// </summary>
public void Fanout_SendMessage()
{
using (var Connection = RabbitMQConnectHelper.GetConnection())
{
using (var channel = Connection.CreateModel())
{
string queueName01 = "testQueue_01";
string queueName02 = "testQueue_02";
// 聲明交換機
channel.ExchangeDeclare("test_fanout_exchange", "fanout");
//聲明隊列
channel.QueueDeclare(queueName01, durable: true, exclusive: false, autoDelete: false, arguments: null);
channel.QueueDeclare(queueName02, durable: true, exclusive: false, autoDelete: false, arguments: null);
//綁定到交換機
//routingKey: "" 沒有綁定路由key時,消息同步到所有隊列,
channel.QueueBind(queue: queueName01, exchange: "test_fanout_exchange", routingKey: "", arguments: null);
channel.QueueBind(queue: queueName02, exchange: "test_fanout_exchange", routingKey: "", arguments: null);
//聲明基礎屬性
var properties = channel.CreateBasicProperties();
properties.AppId = Guid.NewGuid().ToString();
properties.DeliveryMode = 2;//設置持久性 1-非持久性;2-持久性
properties.Persistent = true;//設置持久化
properties.Type = queueName01;//消息類型名
properties.ContentType = "application/json";
properties.ContentEncoding = "utf-8";
for (int i = 1; i <= 20; i++)
{
string msg = $"RabbitMQ Fanout Send {i} Message!";
//routingKey: "" 沒有綁定路由key時,消息同步到所有隊列
channel.BasicPublish(exchange: "test_fanout_exchange", routingKey: "", basicProperties: properties, body: Encoding.UTF8.GetBytes(msg));
Console.WriteLine(msg);
}
}
}
}
/// <summary>
///Direct 交換機,直接隊列,數據同步發送到特定的隊列
/// </summary>
public void Direct_SendMessage()
{
using (var Connection = RabbitMQConnectHelper.GetConnection())
{
using (var channel = Connection.CreateModel())
{
string queueName01 = "testQueue_Red";
string queueName02 = "testQueue_Yellow";
// 聲明交換機
channel.ExchangeDeclare("test_direct_exchange", "direct");
//聲明隊列
channel.QueueDeclare(queueName01, durable: true, exclusive: false, autoDelete: false, arguments: null);
channel.QueueDeclare(queueName02, durable: true, exclusive: false, autoDelete: false, arguments: null);
//綁定到交換機
//routingKey: queueName 指定消息發送到某個特定隊列
channel.QueueBind(queue:queueName01, exchange: "test_direct_exchange", routingKey: queueName01, arguments: null);
channel.QueueBind(queue: queueName02, exchange: "test_direct_exchange", routingKey: queueName02, arguments: null);
//聲明一個基礎屬性
var properties = channel.CreateBasicProperties();
properties.AppId = Guid.NewGuid().ToString();
properties.DeliveryMode = 2;//設置持久性 1-非持久性;2-持久性
properties.Persistent = true;//設置持久化
properties.Type = queueName01;//消息類型名
properties.ContentType = "application/json";
properties.ContentEncoding = "utf-8";
for (int i = 1; i <= 20; i++)
{
string msg = $"RabbitMQ Direct Send {i} Message!";
//routingKey: queueName 指定消息發送到某個特定隊列 queueName01
channel.BasicPublish(exchange: "test_direct_exchange", routingKey: queueName01, basicProperties: properties, body: Encoding.UTF8.GetBytes(msg));
Console.WriteLine(msg);
}
}
}
}
/// <summary>
/// Topic 交換機,模糊隊列,數據同步發送到模糊匹配隊列
/// </summary>
public void Topic_SendMessage()
{
using (var Connection = RabbitMQConnectHelper.GetConnection())
{
using (var channel = Connection.CreateModel())
{
string queueName01 = "testQueue_01";
string queueName02 = "testQueue_02";
// 聲明交換機
channel.ExchangeDeclare("test_topic_exchange", "topic");
//聲明隊列
channel.QueueDeclare(queueName01, durable: true, exclusive: false, autoDelete: false, arguments: null);
channel.QueueDeclare(queueName02, durable: true, exclusive: false, autoDelete: false, arguments: null);
//綁定到交換機
//routingKey: queueName 指定消息發送到某個特定隊列
channel.QueueBind(queue:queueName01, exchange: "test_topic_exchange", routingKey: "data.*", arguments: null);
channel.QueueBind(queue: queueName02, exchange: "test_topic_exchange", routingKey: "data.Red", arguments: null);
//聲明一個基礎屬性
var properties = channel.CreateBasicProperties();
properties.AppId = Guid.NewGuid().ToString();
properties.DeliveryMode = 2;//設置持久性 1-非持久性;2-持久性
properties.Persistent = true;//設置持久化
properties.Type = queueName01;//消息類型名
properties.ContentType = "application/json";
properties.ContentEncoding = "utf-8";
for (int i = 1; i <= 20; i++)
{
string msg = $"RabbitMQ Topic Send {i} Message!";
//routingKey: "data.Red" 指定消息發送到路由是 "data.Red"的特定隊列,"data.*"屬於模糊路由,也會發送數據
channel.BasicPublish(exchange: "test_topic_exchange", routingKey: "data.Red", basicProperties: properties, body: Encoding.UTF8.GetBytes(msg));
Console.WriteLine(msg);
}
}
}
}
}
- 消費者
public class RabbitMQ_Consumer
{
/// <summary>
/// fanout 消費消息數據
/// </summary>
public void fanout_Received_Message()
{
var Connection = RabbitMQConnectHelper.GetConnection();
var channel = Connection.CreateModel();
string queueName01 = "testQueue_01";
// 聲明交換機
channel.ExchangeDeclare("test_fanout_exchange", "fanout");
//聲明隊列
channel.QueueDeclare(queueName01, durable: true, exclusive: false, autoDelete: false, arguments: null);
//綁定到交換機
//routingKey: "" 沒有綁定路由key時,消息同步到所有隊列,
channel.QueueBind(queue: queueName01, exchange: "test_fanout_exchange", routingKey: "", arguments: null);
//聲明消費者
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var Str = Encoding.UTF8.GetString(body.ToArray());
//autoAck: false 則手動在接收方法內提交BasicAck 可做成等待SQL執行返回True,以保證消息能消費成功且入庫
//autoAck: true 不需要在接收方法內使用 BasicAck方法
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
//autoAck 值取決與 channel.BasicAck是否手動提交
channel.BasicConsume(queue: queueName01, autoAck: false, consumer: consumer);
}
/// <summary>
/// Direct 消費消息數據
/// </summary>
public void direct_Received_Message()
{
var Connection = RabbitMQConnectHelper.GetConnection();
var channel = Connection.CreateModel();
string queueName01 = "testQueue_Red";
// 聲明交換機
channel.ExchangeDeclare("test_direct_exchange", "direct");
//聲明隊列
channel.QueueDeclare(queueName01, durable: true, exclusive: false, autoDelete: false, arguments: null);
//綁定到交換機
//routingKey: queueName 指定消息發送到某個特定隊列
channel.QueueBind(queue: queueName01, exchange: "test_direct_exchange", routingKey: queueName01, arguments: null);
//聲明消費者
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var Str = Encoding.UTF8.GetString(body.ToArray());
//autoAck: false 則手動在接收方法內提交BasicAck 可做成等待SQL執行返回True,以保證消息能消費成功且入庫
//autoAck: true 不需要在接收方法內使用 BasicAck方法
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
//autoAck 值取決與 channel.BasicAck是否手動提交
channel.BasicConsume(queue: queueName01, autoAck: false, consumer: consumer);
}
/// <summary>
/// Topic 消費消息數據
/// </summary>
public void topic_Received_Message()
{
var Connection = RabbitMQConnectHelper.GetConnection();
var channel = Connection.CreateModel();
string queueName02 = "testQueue_02";
// 聲明交換機
channel.ExchangeDeclare("test_topic_exchange", "topic");
//聲明隊列
channel.QueueDeclare(queueName02, durable: true, exclusive: false, autoDelete: false, arguments: null);
//綁定到交換機
//routingKey: queueName 指定消息發送到某個特定隊列
channel.QueueBind(queue: queueName02, exchange: "test_topic_exchange", routingKey: "data.Red", arguments: null);
//聲明消費者
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var Str = Encoding.UTF8.GetString(body.ToArray());
//autoAck: false 則手動在接收方法內提交BasicAck 可做成等待SQL執行返回True,以保證消息能消費成功且入庫
//autoAck: true 不需要在接收方法內使用 BasicAck方法
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
//autoAck 值取決與 channel.BasicAck是否手動提交
channel.BasicConsume(queue: queueName02, autoAck: false, consumer: consumer);
}
}
使用預設交換機,簡潔使用。請訪問基礎篇 ASP.NET Core知識之RabbitMQ組件使用(一) 。