下載安裝 "Erlang" "RabbitMQ" 啟動RabbitMQ管理平臺插件 DOS下進入到安裝目錄\sbin,執行以下命令 當出現以下結果時,重啟RabbitMQ服務 訪問 "http://localhost:15672" (賬號密碼:guest) 註意:以下為C 代碼,請引用NuGet包: ...
下載安裝
啟動RabbitMQ管理平臺插件
DOS下進入到安裝目錄\sbin,執行以下命令
rabbitmq-plugins enable rabbitmq_management
當出現以下結果時,重啟RabbitMQ服務
set 3 plugins.
Offline change; changes will take effect at broker restart.
訪問http://localhost:15672(賬號密碼:guest)
註意:以下為C#代碼,請引用NuGet包:RabbitMQ.Client
參考文章
名詞解析
P(Publisher):生產者
C(Consumer):消費者
Channel:通道
Queue:隊列
Exchange:信息交換機
簡單演示
信息發送端
static void Send()
{
//1. 實例化連接工廠
var factory = new ConnectionFactory() { HostName = "localhost" };
//2. 建立連接
using (var connection = factory.CreateConnection())
{
//3. 創建通道
using (var channel = connection.CreateModel())
{
//4. 聲明隊列
channel.QueueDeclare(queue: "rabbitmq",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
//5. 構建位元組數據包
var message = "Hello RabbitMQ!";
var body = Encoding.UTF8.GetBytes(message);
//6. 發送數據包
channel.BasicPublish(exchange: "",
routingKey: "rabbitmq",
basicProperties: null,
body: body);
Console.WriteLine(" [x] Sent {0}", message);
}
}
}
信息接收端
static void Receive()
{
//1. 實例化連接工廠
var factory = new ConnectionFactory() { HostName = "localhost" };
//2. 建立連接
using (var connection = factory.CreateConnection())
{
//3. 創建通道
using (var channel = connection.CreateModel())
{
//4. 聲明隊列
channel.QueueDeclare(queue: "rabbitmq",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
//5. 構造消費者實例
var consumer = new EventingBasicConsumer(channel);
//6. 綁定消息接收後的事件委托
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body);
Console.WriteLine(" [x] Received {0}", message);
};
//7. 啟動消費者
channel.BasicConsume(queue: "rabbitmq",
autoAck: true,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
輪詢調度
P生產的多個任務進入到隊列中,多個C間可以並行處理任務。預設情況下,RabbitMQ把信息按順序發送給每一個C。平均每個C將獲得同等數量的信息。
信息確認
按照最簡單的演示來說,信息一旦發送到C中,則該信息就會從隊列中移除。一旦中間信息處理異常/失敗,C端程式退出等,都將會導致信息未處理完成,而此時隊列中已將信息移除了,那麼就會導致一系列的問題。我們可以在C端設置手動確認信息,從而解決上述問題的發生。
Receive代碼塊
//6. 綁定消息接收後的事件委托
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body);
Console.WriteLine(" [x] Received {0}", message);
Thread.Sleep(5000);//模擬耗時
Console.WriteLine(" [x] Done");
// 發送信息確認信號(手動信息確認)
channel.BasicAck(ea.DeliveryTag, false);
};
//7. 啟動消費者
/*
autoAck參數屬性
true:自動信息確認,當C端接收到信息後,自動發送ack信號,不管信息是否處理完畢
false:關閉自動信息確認,通過調用BasicAck方法手動進行信息確認
*/
channel.BasicConsume(queue: "rabbitmq",
autoAck: false,
consumer: consumer);
信息持久化
當RabbitMQ退出或死機時會清空隊列和信息。通過將隊列和信息標記為持久的,來告知RabbitMQ將信息持久化。
Send代碼塊
//4. 聲明隊列
//durable設置為true,表示此隊列為持久的。
//註意:RabbitMQ不允許你使用不同的參數重新定義一個已經存在的隊列,所以你需要重啟服務/更改隊列名稱
channel.QueueDeclare(queue: "rabbitmq",
durable: true, //標記隊列持久
exclusive: false,
autoDelete: false,
arguments: null);
//設置IbasicProperties.SetPersistent屬性值為true來標記我們的消息持久化
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
//5. 構建位元組數據包
var message = "Hello RabbitMQ!";
var body = Encoding.UTF8.GetBytes(message);
//6. 發送數據包
channel.BasicPublish(exchange: "",
routingKey: "rabbitmq",
basicProperties: properties, //指定BasicProperties
body: body);
公平調度
上述演示中,如果隊列中存在多個信息,在開啟多個C的情況下,只有一個C忙個不停,另外的卻一直處於空閑狀態。通過調用BasicQos,告知RabbitMQ在某個C信息處理完畢,並且已經收到信息確認之後,才可以繼續發送信息到這個C。否則,將會把信息分發到另外空閑的C。
Receive代碼塊
//4. 聲明隊列
channel.QueueDeclare(queue: "rabbitmq",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
////設置prefetchCount為1來告知RabbitMQ在未收到消費端的消息確認時,不再分發消息
channel.BasicQos(prefetchSize: 0,
prefetchCount: 1,
global: false);
發佈/訂閱
上述中的演示,P推送信息至隊列中,C從隊列中處理信息。但是如果需要將P推送的信息至每個訂閱的C中處理信息,那麼我們就可以使用Exchange。
fanout(將信息分發到exchange上綁定的所有隊列上)
Send代碼塊
//1. 實例化連接工廠
var factory = new ConnectionFactory() { HostName = "localhost" };
//2. 建立連接
using (var connection = factory.CreateConnection())
{
//3. 創建通道
using (var channel = connection.CreateModel())
{
//4. 聲明信息交換機
channel.ExchangeDeclare(exchange: "fanoutDemo",
type: "fanout");
//5. 構建位元組數據包
var message = "Hello RabbitMQ!";
var body = Encoding.UTF8.GetBytes(message);
//6. 發佈到指定exchange,fanout類型的會忽視routingKey的值,所以無需填寫
channel.BasicPublish(exchange: "fanoutDemo",
routingKey: "",
basicProperties: null,
body: body);
Console.WriteLine(" [x] Sent {0}", message);
}
}
Receive代碼塊
//1. 實例化連接工廠
var factory = new ConnectionFactory() { HostName = "localhost" };
//2. 建立連接
using (var connection = factory.CreateConnection())
{
//3. 創建通道
using (var channel = connection.CreateModel())
{
//4. 聲明信息交換機
channel.ExchangeDeclare(exchange: "fanoutDemo",
type: "fanout");
//生成隨機隊列名稱
var queueName = channel.QueueDeclare().QueueName;
//綁定隊列到指定fanout類型exchange
channel.QueueBind(queue: queueName,
exchange: "fanoutDemo",
routingKey: "");
//5. 構造消費者實例
var consumer = new EventingBasicConsumer(channel);
//6. 綁定消息接收後的事件委托
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body);
Console.WriteLine(" [x] Received {0}", message);
};
channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
direct(C綁定的隊列名稱須和P發佈指定的路由名稱一致)
Send代碼塊
//4. 聲明信息交換機
channel.ExchangeDeclare(exchange: "directDemo",
type: "direct");
//5. 構建位元組數據包
var message = "Hello RabbitMQ!";
var body = Encoding.UTF8.GetBytes(message);
//6. 發佈到指定exchange
channel.BasicPublish(exchange: "directDemo",
routingKey: "a",
basicProperties: null,
body: body);
Receive代碼塊
//4. 聲明信息交換機
channel.ExchangeDeclare(exchange: "directDemo",
type: "direct");
//生成隨機隊列名稱
var queueName = channel.QueueDeclare().QueueName;
//綁定隊列到指定direct類型exchange
channel.QueueBind(queue: queueName,
exchange: "directDemo",
routingKey: "b");
topic(支持通配符的路由規則)
通配字元:
- *:匹配一個單詞
- #:匹配0個或多個單詞
- .:僅作為分隔符
Send代碼塊
//4. 聲明信息交換機
channel.ExchangeDeclare(exchange: "topicDemo",
type: "topic");
//5. 構建位元組數據包
var message = "Hello RabbitMQ!";
var body = Encoding.UTF8.GetBytes(message);
//6. 發佈到指定exchange
channel.BasicPublish(exchange: "topicDemo",
routingKey: "admin.user.error", //模擬後臺用戶錯誤
basicProperties: null,
body: body);
Receive代碼塊
//4. 聲明信息交換機
channel.ExchangeDeclare(exchange: "topicDemo",
type: "topic");
//生成隨機隊列名稱
var queueName = channel.QueueDeclare().QueueName;
//綁定隊列到指定topic類型exchange
channel.QueueBind(queue: queueName,
exchange: "topicDemo",
routingKey: "admin.*.#"); //訂閱所有後臺異常錯誤
RPC(遠程過程調用)
- 進行遠程調用的客戶端需要指定接收遠程回調的隊列,並申明消費者監聽此隊列。
- 遠程調用的服務端除了要申明消費端接收遠程調用請求外,還要將結果發送到客戶端用來監聽的結果的隊列中去。
客戶端代碼塊
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
var correlationId = Guid.NewGuid().ToString();
var replyQueue = channel.QueueDeclare().QueueName;
var properties = channel.CreateBasicProperties();
properties.ReplyTo = replyQueue;
properties.CorrelationId = correlationId;
string number = args.Length > 0 ? args[0] : "30";
var body = Encoding.UTF8.GetBytes(number);
//發佈消息
channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: properties, body: body);
Console.WriteLine($"[*] Request fib({number})");
// //創建消費者用於消息回調
var callbackConsumer = new EventingBasicConsumer(channel);
channel.BasicConsume(queue: replyQueue, autoAck: true, consumer: callbackConsumer);
callbackConsumer.Received += (model, ea) =>
{
if (ea.BasicProperties.CorrelationId == correlationId)
{
var responseMsg = $"Get Response: {Encoding.UTF8.GetString(ea.Body)}";
Console.WriteLine($"[x]: {responseMsg}");
}
};
Console.ReadLine();
}
}
服務端代碼塊
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var conection = factory.CreateConnection())
{
using (var channel = conection.CreateModel())
{
channel.QueueDeclare(queue: "rpc_queue", durable: false,
exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
Console.WriteLine("[*] Waiting for message.");
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body);
int n = int.Parse(message);
Console.WriteLine($"Receive request of Fib({n})");
int result = Fib(n);
var properties = ea.BasicProperties;
var replyProerties = channel.CreateBasicProperties();
replyProerties.CorrelationId = properties.CorrelationId;
channel.BasicPublish(exchange: "", routingKey: properties.ReplyTo,
basicProperties: replyProerties, body: Encoding.UTF8.GetBytes(result.ToString()));
channel.BasicAck(ea.DeliveryTag, false);
Console.WriteLine($"Return result: Fib({n})= {result}");
};
channel.BasicConsume(queue: "rpc_queue", autoAck: false, consumer: consumer);
Console.ReadLine();
}
}
}
private static int Fib(int n)
{
if (n == 0 || n == 1)
{
return n;
}
return Fib(n - 1) + Fib(n - 2);
}