1.消息的優先順序 假如現在有個需求,我們需要讓一些優先順序最高的通知推送到客戶端,我們可以使用redis的sortedset,也可以使用我們今天要說的rabbit的消息優先順序屬性 Producer代碼 consumer代碼 運行producer 在運行consumer 可以看出消息是按優先順序消費的 2 ...
1.消息的優先順序
假如現在有個需求,我們需要讓一些優先順序最高的通知推送到客戶端,我們可以使用redis的sortedset,也可以使用我們今天要說的rabbit的消息優先順序屬性
Producer代碼
using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Threading.Tasks; namespace RabbitMQConsole { class Program { static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "39.**.**.**"; factory.Port = 5672; factory.VirtualHost = "/"; factory.UserName = "root"; factory.Password = "root"; var exchange = "change4"; var route = "route2"; var queue9 = "queue9"; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange, type: "fanout", durable: true, autoDelete: false); //x-max-priority屬性必須設置,否則消息優先順序不生效 channel.QueueDeclare(queue9, durable: true, exclusive: false, autoDelete: false,arguments: new Dictionary<string, object> { { "x-max-priority", 50 } }); channel.QueueBind(queue9, exchange, queue9); while(true) { var messagestr = Console.ReadLine(); var messagepri = Console.ReadLine(); var props = channel.CreateBasicProperties(); props.Persistent = true; props.Priority = (byte)int.Parse(messagepri);//設置消息優先順序 channel.BasicPublish(exchange, route, true, props, Encoding.UTF8.GetBytes(messagestr)); } } } } } }
consumer代碼
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Text; using System.Threading; namespace RabbitMQClient { class Program { private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory() { HostName = "39.**.**.**", Port = 5672, UserName = "root", Password = "root", VirtualHost = "/" }; static void Main(string[] args) { var exchange = "change4"; var route = "route2"; var queue9 = "queue9"; using (IConnection conn = rabbitMqFactory.CreateConnection()) using (IModel channel = conn.CreateModel()) { channel.ExchangeDeclare(exchange, "fanout", durable: true, autoDelete: false); channel.QueueDeclare(queue9, durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object> { { "x-max-priority", 50 } }); channel.QueueBind(queue9, exchange, route); channel.BasicQos(prefetchSize: 0, prefetchCount: 50, global: false); EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { Byte[] body = ea.Body; String message = Encoding.UTF8.GetString(body); Console.WriteLine( message); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: queue9, autoAck: false, consumer: consumer); Console.ReadLine(); } } } }
運行producer
在運行consumer
可以看出消息是按優先順序消費的
2.死信隊列
死信隊列可以用來做容錯機制,當我們的消息處理異常時我們可以把消息放入到死信隊列中,以便後期處理,死信的產生有三種
1.消息被拒(basic.reject or basic.nack)並且沒有重新入隊(requeue=false);
2.當前隊列中的消息數量已經超過最大長度。
3.消息在隊列中過期,即當前消息在隊列中的存活時間已經超過了預先設置的TTL(Time To Live)時間;
看代碼
using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Threading.Tasks; namespace RabbitMQConsole { class Program { static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "39.**.**.**"; factory.Port = 5672; factory.VirtualHost = "/"; factory.UserName = "root"; factory.Password = "root"; var exchangeA = "changeA"; var routeA = "routeA"; var queueA = "queueA"; var exchangeD = "changeD"; var routeD = "routeD"; var queueD = "queueD"; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchangeD, type: "fanout", durable: true, autoDelete: false); channel.QueueDeclare(queueD, durable: true, exclusive: false, autoDelete: false); channel.QueueBind(queueD, exchangeD, routeD); channel.ExchangeDeclare(exchangeA, type: "fanout", durable: true, autoDelete: false); channel.QueueDeclare(queueA, durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object> { { "x-dead-letter-exchange",exchangeD}, //設置當前隊列的DLX { "x-dead-letter-routing-key",routeD}, //設置DLX的路由key,DLX會根據該值去找到死信消息存放的隊列 { "x-message-ttl",10000} //設置消息的存活時間,即過期時間 }); channel.QueueBind(queueA, exchangeA, routeA); var properties = channel.CreateBasicProperties(); properties.Persistent = true; //發佈消息 channel.BasicPublish(exchange: exchangeA, routingKey: routeA, basicProperties: properties, body: Encoding.UTF8.GetBytes("message")); } } } } }
這樣10秒後消息過期,我們可以看到queueD中有了消息