簡介 RabbitMQ是實現了高級消息隊列協議(AMQP)的開源消息中間件,基於Erlang語言編寫。 P:(producling)生產者,生產只意味著發送消息。 Q: (queue_name)隊列,隊列是位於rabbitmq中的post box的名稱 C: (Consuming)消費者,消費者主要 ...
簡介
RabbitMQ是實現了高級消息隊列協議(AMQP)的開源消息中間件,基於Erlang語言編寫。
AMQP是什麼
AMQP 0-9-1(高級消息隊列協議)是一種消息傳遞協議,它允許一致的客戶端應用程式與一致的消息傳遞中間件代理進行通信。
消息傳遞代理接收來自發佈者(發佈它們的應用程式,也稱為生產者)的消息,並將它們路由到消費者(處理它們的應用程式)。
由於它是一個網路協議,發佈者、消費者和代理都可以駐留在不同的機器上。
AMQP 0-9-1模型簡介
AMQP 0-9-1模型具有以下世界視圖:消息發佈到交換,這通常與郵局或郵箱進行比較。交換然後使用名為綁定的規則將消息副本分發到隊列。然後,代理將消息傳遞給訂閱隊列的消費者,或者消費者根據需要從隊列獲取/拉取消息。
發佈消息時,發佈者可以指定各種消息屬性(消息元數據)。有些元數據可以由代理使用,但是,其餘的元數據對代理是完全不透明的,只能由接收消息的應用程式使用。
網路不可靠,應用程式可能無法處理消息,因此AMQP 0-9-1模型具有消息確認的概念:當消息傳遞給消費者時,消費者會自動或在應用程式開發人員選擇時立即通知代理。當消息確認正在使用時,代理將僅在收到消息(或消息組)通知時從隊列中完全刪除消息。
例如,在某些情況下,當消息無法路由時,消息可能會返回給發佈者、丟棄,或者,如果代理實現擴展,則將消息放入所謂的“死信隊列”。發佈者通過使用某些參數發佈消息來選擇如何處理這種情況。
隊列、交換和綁定統稱為AMQP實體。
交換和交換類型
交換機是發送消息的實體,交換機接收消息並將消息路由到零個或者多個隊列當中,使用的路由演算法取決於綁定的交換類型和規則,因此AMQP 0-9-1提供了以下四種交換類型:
- Direct exchange
- Fanout exchange
- Topic exchange
- Headers exchang
除了交換類型之外,還使用許多屬性聲明交換其中最重要的是:
- 耐久性(Durability) :交易所在經紀人重啟後仍能存活
- 自動刪除(Auto-delete):當最後一個隊列與其解除綁定時,將刪除Exchange
- 參數(arguments) :可選,由插件和特定於代理的功能使用
交換可以是持久的,也可以是暫時的。持久性交易所能在經紀重啟後存活下來,而短暫性交易所則不能(當經紀重新上線時,必須重新申報)。並非所有的場景和用例都需要持久的交換。
本文主要記錄了Direct模式學習RabbitMQ
P:(producling)生產者,生產只意味著發送消息。
Q: (queue_name)隊列,隊列是位於rabbitmq中的post box的名稱
C: (Consuming)消費者,消費者主要是等待接收消息的程式
開發準備
- netCoreTset.core:該工程主要封裝了RabbitMQ的公用方法
- RabbitMQClient :該工程為生產者
- RabbitMQServer :該工程為消費者
1.創建netCoreTset.core類庫項目
1.1 安裝項目依賴
2.定義介面
using netCoreTest.core.Model; using System; using System.Collections.Generic; using System.Text; namespace netCoreTest.core.Iserver { public interface IConnectionServer { /// <summary> /// 連接服務 /// </summary> void Connection(); /// <summary> /// 創建消息隊列 /// </summary> /// <param name="queName">隊列名稱</param> void CreateQueueDir(); /// <summary> /// 關閉連接 /// </summary> void CloseConnection(); /// <summary> /// 關閉通道 /// </summary> void CloseChannel(); } }
using System; using System.Collections.Generic; using System.Text; namespace netCoreTest.core.Iserver { public interface IMessageService { /// <summary> /// 發送消息 /// </summary> /// <param name="msg">消息內容</param> void SendMsg(string msg); /// <summary> /// 獲取消息 /// </summary> /// <returns></returns> string GetMsg(); } }
using System; using System.Collections.Generic; using System.Text; namespace netCoreTest.core.Iserver { public interface IRabbitMqService:IMessageService,IConnectionServer { } }
3.編寫RabbitMQ輔助類
using netCoreTest.core.Iserver; using netCoreTest.core.Model; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Text; namespace netCoreTest.core { public class RabbitMQModel : IRabbitMqService { private readonly ConnectionFactory factory = null; private IModel channel; private IConnection connetction; readonly string exchangeName;//交換機名稱 readonly string routeKey;//路由名稱 readonly string queueName;///隊列名稱 public RabbitMQModel(HostModel model) { /// <summary> /// 創建連接工廠 /// </summary> factory = new ConnectionFactory { UserName = model.UserName, Password = model.PassWord, HostName = "localhost", Port = model.Port, }; exchangeName = model.ExChangeModel.ExChangeName; routeKey = model.ExChangeModel.RouteKey; queueName = model.ExChangeModel.QueueName; } /// <summary> /// 創建連接 /// </summary> public void Connection() { try { //創建連接 connetction = factory.CreateConnection(); //創建通道 channel = connetction.CreateModel(); } catch (Exception ex) { Console.WriteLine(ex.ToString()); } } public void CreateQueueDir() { //定義一個direct類型的交換機 channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, false, false, null); //定義一個隊列 channel.QueueDeclare(queueName, false, false, false, null); //將隊列綁定交換機 channel.QueueBind(queueName, exchangeName, routeKey, null); }public void SendMsg(string msg) { var sendBytes = Encoding.UTF8.GetBytes(msg); channel.BasicPublish(exchangeName, routeKey, null, sendBytes); } public void CloseChannel() { channel.Close(); } public void CloseConnection() { connetction.Close(); } public string GetMsg() { //事件基本消費者 EventingBasicConsumer consumer = new EventingBasicConsumer(channel); string msg = null; //接收到消息事件 consumer.Received += (ch, ea) => { var message = Encoding.UTF8.GetString(ea.Body); msg = message; Console.WriteLine($"收到消息: {message}"); //確認該消息已被消費 channel.BasicAck(ea.DeliveryTag, false); }; //啟動消費者 設置為手動應答消息 channel.BasicConsume(queueName, false, consumer); Console.WriteLine("消費者已啟動"); Console.ReadKey(); CloseConnection(); CloseChannel(); return msg; } } }
4.創建direct模式發送類
using netCoreTest.core.Model; using System; using System.Collections.Generic; using System.Text; namespace netCoreTest.core.ExchangeTypeModel { /// <summary> /// Direct模式發送 /// </summary> public class DirectPost { RabbitMQModel rabbitMQModel; public DirectPost() { HostModel hostModel = new HostModel(); hostModel.UserName = "admin"; hostModel.PassWord = "admin"; hostModel.Host = "127.0.0.1"; hostModel.Port = 5672; hostModel.ExChangeModel =new ExChangeModel { ExChangeName = "ClentName", QueueName = "Clent", RouteKey = "ClentRoute" }; rabbitMQModel = new RabbitMQModel(hostModel); rabbitMQModel.Connection(); } public void CreateQueue() { rabbitMQModel.CreateQueueDir(); } public void SendMsg(string msg) { rabbitMQModel.SendMsg(msg); } public void GetMsg() { rabbitMQModel.GetMsg(); } } }
5.創建RabbitMQClient控制台應用程式
using netCoreTest.core; using netCoreTest.core.ExchangeTypeModel; using netCoreTest.core.Model; using RabbitMQ.Client; using System; namespace RabbitMQClient { class Program { static void Main(string[] args) { Console.WriteLine("消息生產者開始生產數據!"); Console.WriteLine("輸入exit退出!"); DirectPost directPost = new DirectPost(); directPost.CreateQueue(); string input; do { input = Console.ReadLine(); directPost.SendMsg(input); } while (input.Trim().ToLower() != "exit"); } } }
6.創建RabbitMQService控制台應用程式
using netCoreTest.core; using netCoreTest.core.ExchangeTypeModel; using netCoreTest.core.Model; using System; using System.Text; namespace RabbitMQServer { class Program { static void Main(string[] args) { Console.WriteLine("Hello World!"); DirectPost directPost = new DirectPost(); directPost.GetMsg(); } } }
7.執行RabbitMQclient和RabbitMQserver