什麼是工作隊列 工作隊列是為了避免等待一些占用大量資源或時間操作的一種處理方式。我們把任務封裝為消息發送到隊列中,消費者在後臺不停的取出任務並且執行。當運行了多個消費者工作進程時,隊列中的任務將會在每個消費者間進行共用。 使用工作隊列的好處就是能夠並行的處理任務。如果隊列中堆積了很多任務,只要添加更... ...
工作隊列是為了避免等待一些占用大量資源或時間操作的一種處理方式。我們把任務封裝為消息發送到隊列中,消費者在後臺不停的取出任務並且執行。當運行了多個消費者工作進程時,隊列中的任務將會在每個消費者間進行共用。
使用工作隊列的好處就是能夠並行的處理任務。如果隊列中堆積了很多任務,只要添加更多的消費著就可以了,拓展非常方便。
準備工作
1.創建生產者和消費者客戶端
2.在消費者中使用Thread.Sleep()模擬耗時操作
生產者 TaskQueuesProducer.cs
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace RabbitMQProducer
{
public class TaskQueuesProducer
{
static int processId = Process.GetCurrentProcess().Id;
public static void Send()
{
Console.WriteLine($"我是生產者{processId}");
var factory = new ConnectionFactory()
{
HostName = "127.0.0.1"
};
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "taskqueue", durable: false, exclusive: false, autoDelete: false, arguments: null);
for (int item = 0; item < 20; item++)
{
string message = $"我是生產者{processId}發送的消息:{item}";
channel.BasicPublish(exchange: "", routingKey: "taskqueue", basicProperties: null, body: Encoding.UTF8.GetBytes(message));
Console.WriteLine(message);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}
}
消費者 TaskQueuesConsumer.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Diagnostics;
using System.Threading;
namespace RabbitMQConsumer
{
public class TaskQueuesConsumer
{
static int processId = 0;
static TaskQueuesConsumer()
{
processId = Process.GetCurrentProcess().Id;
}
public static void Receive()
{
Console.WriteLine($"我是消費者{processId}");
var factory = new ConnectionFactory()
{
HostName = "127.0.0.1"
};
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "taskqueue", durable: false, exclusive: false, autoDelete: false, arguments: null);
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
consumer.Received += Consumer_Received;
//noack=false 不自動消息確認 這時需要手動調用 channel.BasicAck(); 進行消息確認
//noack=true 自動消息確認,當消息被RabbitMQ發送給消費者(consumers)之後,馬上就會在記憶體中移除
channel.BasicConsume(queue: "taskqueue", noAck: false, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
private static void Consumer_Received(object sender, BasicDeliverEventArgs e)
{
string message = Encoding.UTF8.GetString(e.Body);
Console.WriteLine($"接收到消息:{message}");
//Thread.Sleep(new Random().Next(1000, 1000 * 5)); //模擬消息處理耗時操作
Console.WriteLine($"已處理完消息");
//對應前面的 BasicConsume 中 noack=false 發送消息確認回執
//EventingBasicConsumer consumer = sender as EventingBasicConsumer;
//consumer.Model.BasicAck(e.DeliveryTag, false);
}
}
}
當我們運行了三個consumer客戶端,一個producer客戶端後,發現producer發送的20條消息,被三個客戶端依次平均接收並處理了。
這是RabbitMQ預設的消息分發機制——輪詢( round-robin),預設情況下RabbitMQ會按順序把消息發送給每個消費者,平均每個消費者都會收到同等數量的消息。
消息確認
當前的代碼中當消息被RabbitMQ發送到consumer後,就會被立即刪除,這種情況給下,如果其中一個consumer客戶端被停止,那麼正在處理的消息就會丟失,同時所有發送到這個工作者並且還沒處理的消息也會丟失。這不是我們希望看到的,我們希望如果一個consumer客戶端掛掉後,希望把重新發送任務到其它的consumer客戶端。
為了防止消息丟失,RabbbitMQ提供了消息確認機制,消費者會通過一個ack,告訴RabbitMQ已經收到並處理了某條消息,然後RabbitMQ就會釋放並刪除這條消息。
如果consumer掛掉了,沒有發送相應,RabbitMQ就會認為消息沒有被處理,然後重新發送給其他消費者,這樣即使某個consumer掛掉,也不會丟失消息。
消息沒有超市的概念,當工作者和它斷開連接時,RabbitMQ會重新發送消息,這樣在處理耗時較長任務時就不會出現問題了。
之前的代碼中我們開啟了自動消息確認,這樣一旦consumer掛掉,就會發生消息丟失的情況,現在我們來修改兩處代碼,開啟消息確認機制。
修改參數noack為false,關閉自動消息確認
channel.BasicConsume(queue: "taskqueue", noAck: false, consumer: consumer);
取消下麵代碼的註釋,進行消息確認回執
EventingBasicConsumer consumer = sender as EventingBasicConsumer;
consumer.Model.BasicAck(e.DeliveryTag, false);
註意:一旦忘記消息確認,消息會在你程式推出之後就會重新發送,如果不能釋放沒響應的消息,RabbitMQ將會占用越來越來越多的記憶體
可通過以下指令檢查忘記確認的消息信息或在 RabbitMQWebweb管理頁面中查看
rabbitmqctl list_queues name messages_ready messages_unacknowledged
修改完成後再次運行,就不用擔心消息丟失的問題了
消息持久化
如果沒有特殊的設置,那麼在RabbitMQ服務關閉或崩潰的情況下將會丟失所有的隊列和消息。為了確保消息不會丟失需要做兩個事情,把隊列和消息設置為持久化
設置隊列為持久化,producer和consumer兩處都要修改
channel.QueueDeclare(queue: "taskqueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
設置消息持久化
var properties = channel.CreateBasicProperties();
properties.DeliveryMode = 2; //DeliveryMode 消息的投遞模式,預設為1 非持久化的,DeliveryMode=2 持久化存儲消息內容
channel.BasicPublish(exchange: "", routingKey: "taskqueue", basicProperties: properties, body: Encoding.UTF8.GetBytes(message));
註意:
- RabbitMQ不允許你使用不同的參數重新定義一個隊列,也就是說我們之前定義了taskqueue隊列為非持久化,現在再定義為持久化將會返回失敗。
- 將消息設置為持久化並不能100%保證消息不會丟失,因為RabbitMQ保存到系統磁碟也需要時間,雖然時間很短,但是確實消耗一定的時間,另外RabbitMQ並不是對每個消息都做fsync,它可能僅僅是保存在cache中,還沒來得及保存到磁碟。因此即使我們做了以上幾個操作消息持久化的問題還是存在的。如果必須要保證持久化,可以通過使用transaction(事務)來做支持。
公平調度
RabbitMQ只管分發進入隊列的消息,而不關心那些consumer比較繁忙或空閑,這樣容易導致一些consumer比較繁忙,一些比較空閑,不能使資源被最大化的使用。
為瞭解決這樣的問題,RabbitMQ提供了basicQos方法,傳遞參數為prefetchCount = 1。這樣告訴RabbitMQ不要在同一時間給一個消費者超過一條消息。就是只有在消費者空閑的時候會發送下一條信息。
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
執行以上設置之後會發現並沒有按照之前的輪詢(Round-robin)進行消息轉發,而是在消費者不忙時才進行轉發。
由於消息並沒有發出去,在動態添加了consumer後能夠立即投入工作,而預設的輪詢轉發機制則不支持動態添加消費者,因為此時消息已經分配完畢,無法立即加入工作即使還有很多未完成的任務。
註意:
這種方法可能會導致queue滿。當然,這種情況下你可能需要添加更多的Consumer,或者創建更多的virtualHost來細化你的設計。
完整代碼
生產者 TaskQueuesProducer.cs
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace RabbitMQProducer
{
public class TaskQueuesProducer
{
static int processId = Process.GetCurrentProcess().Id;
public static void Send()
{
Console.WriteLine($"我是生產者{processId}");
var factory = new ConnectionFactory()
{
HostName = "127.0.0.1"
};
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
//durable:持久化存儲隊列
//autoDelete:自動刪除,如果該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除。這種隊列適用於臨時隊列。
//exclusive:排他隊列,如果一個隊列被聲明為排他隊列,該隊列僅對首次聲明它的連接可見,併在連接斷開時自動刪除。註意事項:1,排他隊列是基於連接可見的,同一連接的不同通道是可以同時訪問同一個連接創建的排他隊列的。2,"首次",如果一個連接已經聲明瞭一個排他隊列,其他連接是不允許建立同名的排他隊列的,這個與普通隊列不同。3,即使該隊列是持久化的,一旦連接關閉或者客戶端退出,該排他隊列都會被自動刪除的。這種隊列適用於只限於一個客戶端發送讀取消息的應用場景。
channel.QueueDeclare(queue: "taskqueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
for (int item = 0; item < 200000; item++)
{
string message = $"我是生產者{processId}發送的消息:{item}";
var properties = channel.CreateBasicProperties();
properties.DeliveryMode = 2; //DeliveryMode 消息的投遞模式,預設為1 非持久化的,DeliveryMode=2 持久化存儲消息內容
channel.BasicPublish(exchange: "", routingKey: "taskqueue", basicProperties: properties, body: Encoding.UTF8.GetBytes(message));
Console.WriteLine(message);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}
}
消費者 TaskQueuesConsumer.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Diagnostics;
using System.Threading;
namespace RabbitMQConsumer
{
public class TaskQueuesConsumer
{
static int processId = 0;
static TaskQueuesConsumer()
{
processId = Process.GetCurrentProcess().Id;
}
public static void Receive()
{
Console.WriteLine($"我是消費者{processId}");
var factory = new ConnectionFactory()
{
HostName = "127.0.0.1"
};
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "taskqueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
// BasicQos 方法設置prefetchCount = 1。這樣RabbitMQ就會使得每個Consumer在同一個時間點最多處理一個Message。換句話說,在接收到該Consumer的ack前,他它不會將新的Message分發給它
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
consumer.Received += Consumer_Received;
//noack=false 不自動消息確認 這時需要手動調用 channel.BasicAck(); 進行消息確認
//noack=true 自動消息確認,當消息被RabbitMQ發送給消費者(consumers)之後,馬上就會在記憶體中移除
channel.BasicConsume(queue: "taskqueue", noAck: false, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
private static void Consumer_Received(object sender, BasicDeliverEventArgs e)
{
string message = Encoding.UTF8.GetString(e.Body);
Console.WriteLine($"接收到消息:{message}");
Thread.Sleep(new Random().Next(1000, 1000 * 5)); //模擬消息處理耗時操作
Console.WriteLine($"已處理完消息");
//對應前面的 BasicConsume 中 noack=false 發送消息確認回執
EventingBasicConsumer consumer = sender as EventingBasicConsumer;
consumer.Model.BasicAck(e.DeliveryTag, false);
}
}
}