一、引言 在具體業務中可能會遇到一些要提前處理的消息,比如普通客戶的消息按先進先出的順序處理,Vip客戶的消息要提前處理。在RabbitMQ中,消息優先順序的實現方式是:在聲明queue時設置隊列的x-max-priority屬性,然後在publish消息時,設置消息的優先順序即可。 RabbitMQ優 ...
一、引言
在具體業務中可能會遇到一些要提前處理的消息,比如普通客戶的消息按先進先出的順序處理,Vip客戶的消息要提前處理。在RabbitMQ中,消息優先順序的實現方式是:在聲明queue時設置隊列的x-max-priority屬性,然後在publish消息時,設置消息的優先順序即可。
RabbitMQ優先順序隊列註意事項:
1)RabbitMQ3.5以後才支持優先順序隊列。
2)只有當消費者不足,不能及時進行消費的情況下,優先順序隊列才會生效。
3)優先順序取值範圍在0~9之間,數值越大則優先順序越高。
二、示例
2.1、發送端(生產端)
新建一個控制台項目Send,並添加一個類RabbitMQConfig。
class RabbitMQConfig { public static string Host { get; set; } public static string VirtualHost { get; set; } public static string UserName { get; set; } public static string Password { get; set; } public static int Port { get; set; } static RabbitMQConfig() { Host = "192.168.2.242"; VirtualHost = "/"; UserName = "hello"; Password = "world"; Port = 5672; } }RabbitMQConfig.cs
class Program { static void Main(string[] args) { Console.WriteLine("按任意鍵開始生產。"); Console.ReadLine(); PriorityMessagePublish(); Console.ReadLine(); } private static void PriorityMessagePublish() { const string MessagePrefix = "message_"; const int PublishMessageCount = 6; byte messagePriority = 0; var factory = new ConnectionFactory() { HostName = RabbitMQConfig.Host, Port = RabbitMQConfig.Port, VirtualHost = RabbitMQConfig.VirtualHost, UserName = RabbitMQConfig.UserName, Password = RabbitMQConfig.Password, Protocol = Protocols.DefaultProtocol }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { //設置隊列優先順序,取值範圍在0~255之間。 Dictionary<string, object> dict = new Dictionary<string, object> { { "x-max-priority", 255 } }; //聲明隊列 channel.QueueDeclare(queue: "priority", durable: true, exclusive: false, autoDelete: false, arguments: dict); //向該消息隊列發送消息message Random random = new Random(); for (int i = 0; i < PublishMessageCount; i++) { var properties = channel.CreateBasicProperties(); messagePriority = (byte)random.Next(0, 9); properties.Priority = messagePriority;//設置消息優先順序,取值範圍在0~9之間。 var message = MessagePrefix + i.ToString(); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "priority", basicProperties: properties, body: body); Console.WriteLine($"{DateTime.Now.ToString()} Send {message} , Priority {messagePriority}"); } } } } }Program.cs
2.2、接收端(消費端)
新建一個控制台項目Receive,按住Alt鍵,將發送端RabbitMQConfig類拖一個快捷方式到Receive項目中。
class Program { static void Main(string[] args) { Console.WriteLine("按任意鍵開始消費。"); Console.ReadLine(); PriorityMessageSubscribe(); } public static void PriorityMessageSubscribe() { var factory = new ConnectionFactory() { HostName = RabbitMQConfig.Host, UserName = RabbitMQConfig.UserName, Password = RabbitMQConfig.Password }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); var consumer = new EventingBasicConsumer(channel); consumer.Received += async (model, ea) => { await Task.Run(() => { var message = Encoding.UTF8.GetString(ea.Body); Thread.Sleep(1000 * 2); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);//手動消息確認 Console.WriteLine($"{DateTime.Now.ToString()} Received {message}"); }); }; channel.BasicConsume(queue: "priority", noAck: false, consumer: consumer);//需要啟用消息響應,否則priority無效。 Console.ReadKey(); } } } }Program.cs
2.3、運行結果
從消費情況可以看出,message_2及message_3由於priority優先順序最高都是7,所以它們會被最早消費,而message_5的priority是0,所以最後才被消費。