Producer:消息的生產者,也就是創建消息的對象 Exchange:消息的接受者,也就是用來接收消息的對象,Exchange接收到消息後將消息按照規則發送到與他綁定的Queue中。下麵我們來定義一個Producer與Exchange。 1.新建.netcore console項目,並引入Rabb ...
Producer:消息的生產者,也就是創建消息的對象
Exchange:消息的接受者,也就是用來接收消息的對象,Exchange接收到消息後將消息按照規則發送到與他綁定的Queue中。下麵我們來定義一個Producer與Exchange。
1.新建.netcore console項目,並引入RabbitMQ.Client的Nuget包
2.創建Exchange
using RabbitMQ.Client; namespace RabbitMQConsole { class Program { static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "39.**.**.**"; factory.Port = 5672; factory.VirtualHost = "/"; factory.UserName = "root"; factory.Password = "root"; var exchange = "change2"; var route = "route2"; var queue = "queue2"; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange, type:"direct", durable: true, autoDelete: false); //創建Exchange } } } } }
可以看到Echange的參數有:
type:可選項為,fanout,direct,topic,headers。區別如下:
fanout:發送到所有與當前Exchange綁定的Queue中
direct:發送到與消息的routeKey相同的Rueue中
topic:fanout的模糊版本
headers:發送到與消息的header屬性相同的Queue中
durable:持久化
autoDelete:當最後一個綁定(隊列或者exchange)被unbind之後,該exchange自動被刪除。
運行程式,可以在可視化界面看到change2
接下來我們可以創建與change2綁定的queue
3.創建Queue
using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange, type: "direct", durable: true, autoDelete: false); channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false); #創建queue2 channel.QueueBind(queue, exchange, route); #將queue2綁定到exchange2 }
可以看到Echange的參數有:
durable:持久化
exclusive:如果為true,則queue只在channel存在時存在,channel關閉則queue消失
autoDelete:當最後一個綁定(隊列或者exchange)被unbind之後,該exchange自動被刪除。
去可視化界面看Queue
4.發送消息
using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange, type: "direct", durable: true, autoDelete: false); channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false); channel.QueueBind(queue, exchange, route); var props = channel.CreateBasicProperties(); props.Persistent = true; #持久化 channel.BasicPublish(exchange, route, true, props, Encoding.UTF8.GetBytes("hello rabbit")); }
5.消費消息
using RabbitMQ.Client; using System; using System.Text; namespace RabbitMQClient { class Program { private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory() { HostName = "39.**.**.**", Port = 5672, UserName = "root", Password = "root", VirtualHost = "/" }; static void Main(string[] args) { var exchange = "change2"; var route = "route2"; var queue = "queue2"; using (IConnection conn = rabbitMqFactory.CreateConnection()) using (IModel channel = conn.CreateModel()) { channel.ExchangeDeclare(exchange, "direct", durable: true, autoDelete: false); channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false); channel.QueueBind(queue, exchange, route); while (true) { var message = channel.BasicGet(queue, true); #第二個參數說明自動釋放消息,如為false需手動釋放消息 if(message!=null) { var msgBody = Encoding.UTF8.GetString(message.Body); Console.WriteLine(string.Format("***接收時間:{0},消息內容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody)); } System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1)); } } } } }
運行查看結果
查看可視化界面
6.手動釋放消息
while (true) { var message = channel.BasicGet(queue, false);#設置為手動釋放 if(message!=null) { var msgBody = Encoding.UTF8.GetString(message.Body); Console.WriteLine(string.Format("***接收時間:{0},消息內容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody)); } channel.BasicAck(message.DeliveryTag, false); #手動釋放 System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1)); }
我們再發一條消息,然後開始消費,加個斷點調試一下
查看一下Queue中消息狀態
然後直接取消調試,不讓程式走到釋放的那一步,再查看一下消息狀態
這麼說來只要不走到 channel.BasicAck(message.DeliveryTag, false);這一行,消息就不會被釋放掉,我們讓程式直接走到這一行代碼,查看一下消息的狀態
如圖已經被釋放了
7.讓失敗的消息回到隊列中
while (true) { var message = channel.BasicGet(queue, false); if(message!=null) { var msgBody = Encoding.UTF8.GetString(message.Body); Console.WriteLine(string.Format("***接收時間:{0},消息內容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody)); Console.WriteLine(message.DeliveryTag); #當前消息被處理的次序數 if (1==1) channel.BasicReject(message.DeliveryTag, true); } System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1)); }
重新發送4條消息
開始消費
我們可以看到消息一直沒有沒消費,因為消息被處理之後又放到了隊尾
8.監聽消息
using (IConnection conn = rabbitMqFactory.CreateConnection()) using (IModel channel = conn.CreateModel()) { channel.ExchangeDeclare(exchange, "direct", durable: true, autoDelete: false); channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false); channel.QueueBind(queue, exchange, route); channel.BasicQos(prefetchSize: 0, prefetchCount: 10, global: false); #一次接受10條消息,否則rabbit會把所有的消息一次性推到client,會增大client的負荷 EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { Byte[] body = ea.Body; String message = Encoding.UTF8.GetString(body); Console.WriteLine( message+Thread.CurrentThread.ManagedThreadId); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: queue, autoAck: false, consumer: consumer); Console.ReadLine(); }