上一篇我們講了關於direct類型的Exchange,這一片我們來瞭解一下fanout類型的Exchange。 1.Exchange的fanout類型 fanout類型的Exchange的特點是會把消息發送給與之綁定的所有Queue中,我們來測試一下。代碼如下 運行代碼,去可視化工具中查看一下 消費 ...
上一篇我們講了關於direct類型的Exchange,這一片我們來瞭解一下fanout類型的Exchange。
1.Exchange的fanout類型
fanout類型的Exchange的特點是會把消息發送給與之綁定的所有Queue中,我們來測試一下。代碼如下
using RabbitMQ.Client; using System; using System.Text; using System.Threading; using System.Threading.Tasks; namespace RabbitMQConsole { class Program { static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "39.**.**.**"; factory.Port = 5672; factory.VirtualHost = "/"; factory.UserName = "root"; factory.Password = "root"; var exchange = "change3"; var route = "route2"; var queue3 = "queue3"; var queue4 = "queue4"; var queue5 = "queue5"; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange, type: "fanout", durable: true, autoDelete: false); channel.QueueDeclare(queue3, durable: true, exclusive: false, autoDelete: false); channel.QueueBind(queue3, exchange, queue3); channel.QueueDeclare(queue4, durable: true, exclusive: false, autoDelete: false); channel.QueueBind(queue4, exchange, queue4); channel.QueueDeclare(queue5, durable: true, exclusive: false, autoDelete: false); channel.QueueBind(queue5, exchange, queue5); var props = channel.CreateBasicProperties(); props.Persistent = true; channel.BasicPublish(exchange, route, true, props, Encoding.UTF8.GetBytes("hello rabbit")); } } } } }
運行代碼,去可視化工具中查看一下
消費其中的一個
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Text; using System.Threading; namespace RabbitMQClient { class Program { private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory() { HostName = "39.**.**.**", Port = 5672, UserName = "root", Password = "root", VirtualHost = "/" }; static void Main(string[] args) { var exchange = "change3"; var route = "route2"; var queue = "queue3"; using (IConnection conn = rabbitMqFactory.CreateConnection()) using (IModel channel = conn.CreateModel()) { channel.ExchangeDeclare(exchange, "fanout", durable: true, autoDelete: false); channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false); channel.QueueBind(queue, exchange, route); channel.BasicQos(prefetchSize: 0, prefetchCount: 50, global: false); EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { Byte[] body = ea.Body; String message = Encoding.UTF8.GetString(body); Console.WriteLine( message+Thread.CurrentThread.ManagedThreadId); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: queue, autoAck: false, consumer: consumer); Console.ReadLine(); } } } }
結果如下
大家可以依次消費其他兩個Queue,這裡就不演示了
2.消息的過期時間
我們在發送一些消息的時候,有時希望給消息設置一下過期時間,我們可以通過兩種方式來設置
2.1設置隊列的過期時間
using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Threading.Tasks; namespace RabbitMQConsole { class Program { static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "39.**.**.**"; factory.Port = 5672; factory.VirtualHost = "/"; factory.UserName = "root"; factory.Password = "root"; var exchange = "change4"; var route = "route2"; var queue7 = "queue7"; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange, type: "fanout", durable: true, autoDelete: false); //隊列過期時間,單位毫秒 channel.QueueDeclare(queue7, durable: true, exclusive: false, autoDelete: false,arguments:new Dictionary<string, object> { { "x-message-ttl", 8000 } }); channel.QueueBind(queue7, exchange, queue7); var props = channel.CreateBasicProperties(); props.Persistent = true; channel.BasicPublish(exchange, route, true, props, Encoding.UTF8.GetBytes("hello rabbit")); } } } } }
這樣過8秒去Queue就看不到該消息了
2.2設置message的過期時間
using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Threading.Tasks; namespace RabbitMQConsole { class Program { static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "39.**.**.**"; factory.Port = 5672; factory.VirtualHost = "/"; factory.UserName = "root"; factory.Password = "root"; var exchange = "change4"; var route = "route2"; var queue7 = "queue7"; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange, type: "fanout", durable: true, autoDelete: false); channel.QueueDeclare(queue7, durable: true, exclusive: false, autoDelete: false,arguments:new Dictionary<string, object> { { "x-message-ttl", 8000 } }); channel.QueueBind(queue7, exchange, queue7); var props = channel.CreateBasicProperties(); //message過期時間,單位毫秒 props.Expiration = "30000"; props.Persistent = true; channel.BasicPublish(exchange, route, true, props, Encoding.UTF8.GetBytes("hello rabbit")); } } } } }
我們發現還是8秒就過期了,說明如果同時設置了隊列與消息的過期時間,則按照隊列的時間過期。我們把隊列的過期時間去掉重新試一下。
using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Threading.Tasks; namespace RabbitMQConsole { class Program { static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "39.**.**.**"; factory.Port = 5672; factory.VirtualHost = "/"; factory.UserName = "root"; factory.Password = "root"; var exchange = "change4"; var route = "route2"; var queue7 = "queue7"; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange, type: "fanout", durable: true, autoDelete: false); channel.QueueDeclare(queue7, durable: true, exclusive: false, autoDelete: false); channel.QueueBind(queue7, exchange, queue7); var props = channel.CreateBasicProperties(); props.Expiration = "30000"; props.Persistent = true; channel.BasicPublish(exchange, route, true, props, Encoding.UTF8.GetBytes("hello rabbit")); } } } } }
3.隊列生存時間
我們還可以設置一個隊列的生存時間
using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Threading.Tasks; namespace RabbitMQConsole { class Program { static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "39.**.**.**"; factory.Port = 5672; factory.VirtualHost = "/"; factory.UserName = "root"; factory.Password = "root"; var exchange = "change4"; var route = "route2"; var queue8 = "queue8"; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange, type: "fanout", durable: true, autoDelete: false); channel.QueueDeclare(queue8, durable: true, exclusive: false, autoDelete: false,arguments: new Dictionary<string, object> { { "x-expires",10000} //設置當前隊列的過期時間為10000毫秒 }); channel.QueueBind(queue8, exchange, queue8); var props = channel.CreateBasicProperties(); props.Persistent = true; channel.BasicPublish(exchange, route, true, props, Encoding.UTF8.GetBytes("hello rabbit")); } } } } }
這樣10秒後隊列就消失了