ASP.NET Core知識之RabbitMQ組件使用(二)

来源:https://www.cnblogs.com/LaoPaoEr/archive/2023/02/21/17139763.html
-Advertisement-
Play Games

近期,業務調整,需要內網讀取數據後存入到外網,同時,其他伺服器也需要讀取數據,於是我又盯上了RabbitMQ。在展開業務代碼前,先看下RabbitMQ整體架構,可以看到Exchange和隊列是多對多關係。 下麵,我們詳細說說RabbitMQ的隊列模式:簡單隊列、工作隊列、發佈訂閱模式、路由模式、主題 ...


  近期,業務調整,需要內網讀取數據後存入到外網,同時,其他伺服器也需要讀取數據,於是我又盯上了RabbitMQ。在展開業務代碼前,先看下RabbitMQ整體架構,可以看到Exchange和隊列是多對多關係。

  下麵,我們詳細說說RabbitMQ的隊列模式:簡單隊列、工作隊列、發佈訂閱模式、路由模式、主題模式、RPC模式。其中簡單隊列、工作隊列在前文 組件使用(一)中以提到了 ,感興趣的可以看看,本文主要闡述 發佈訂閱模式、路由模式、主題模式。

  • 發佈訂閱模式

   無選擇接收消息,一個消息生產者,一個交換器,多個消息隊列,多個消費者。稱為發佈/訂閱模式。在應用中,只需要簡單的將隊列綁定到交換機上。

  一個發送到交換機的消息都會被轉發到與該交換機綁定的所有隊列上。像子網廣播,每檯子網內的主機都獲得了一份複製的消息。 可以將消息發送給不同類型的消費者。做到發佈一次,多個消費者來消費。

  P 表示為生產者、 X 表示交換機、C1C2 表示為消費者,紅色表示隊列。

  • 路由模式

  在發佈/訂閱模式的基礎上,有選擇的接收消息,也就是通過 routing 路由進行匹配條件是否滿足接收消息。

  路由模式跟發佈訂閱模式類似,然後在訂閱模式的基礎上加上了類型,訂閱模式是分發到所有綁定到交換機的隊列,路由模式只分發到綁定在交換機上面指定路由鍵的隊列,我們可以看一下下麵這張圖:

  P 表示為生產者、 X 表示交換機、C1C2 表示為消費者,紅色表示隊列。

  上圖是一個結合日誌消費級別的配圖,在路由模式它會把消息路由到那些 binding key 與 routing key 完全匹配的 Queue 中,此模式也就是 Exchange 模式中的direct模式。 以上圖的配置為例,我們以 routingKey=“error” 發送消息到 Exchange,則消息會路由到Queue1(amqp.gen-S9b…,這是由RabbitMQ自動生成的Queue名稱)和Queue2(amqp.gen-Agl…)。如果我們以 routingKey=“info” 或 routingKey=“warning” 來發送消息,則消息只會路由到 Queue2。如果我們以其他 routingKey 發送消息,則消息不會路由到這兩個 Queue 中。

  • 主題模式

   同樣是在發佈/訂閱模式的基礎上,根據主題匹配進行篩選是否接收消息,比第四類更靈活。 topics 主題模式跟 routing 路由模式類似,只不過路由模式是指定固定的路由鍵 routingKey,而主題模式是可以模糊匹配路由鍵 routingKey,類似於SQL中 = 和 like 的關係。

   P 表示為生產者、 X 表示交換機、C1C2 表示為消費者,紅色表示隊列。

  topics 模式與 routing 模式比較相近,topics 模式不能具有任意的 routingKey,必須由一個英文句點號“.”分隔的字元串(我們將被句點號“.”分隔開的每一段獨立的字元串稱為一個單詞),比如 “lazy.orange.fox”。topics routingKey 中可以存在兩種特殊字元“*”與“#”,用於做模糊匹配,其中“*”用於匹配一個單詞,“#”用於匹配多個單詞(可以是零個)。 "*“ 表示任何一個詞 ”#" 表示0或多個詞 以上圖中的配置為例: 如果一個消息的 routingKey 設置為 “xxx.orange.rabbit”,那麼該消息會同時路由到 Q1 與 Q2,routingKey="lazy.orange.fox”的消息會路由到Q1與Q2; routingKey="lazy.brown.fox”的消息會路由到 Q2; routingKey="lazy.pink.rabbit”的消息會路由到 Q2(只會投遞給Q2一次,雖然這個routingKey 與 Q2 的兩個 bindingKey 都匹配); routingKey=“quick.brown.fox”、routingKey=”orange”、routingKey="quick.orange.male.rabbit”的消息將會被丟棄,因為它們沒有匹配任何bindingKey。

  • RPC模式

  與上面3種所不同之處,RPC模式是擁有請求/回覆的。也就是有響應的。RPC是指遠程過程調用,也就是說兩台伺服器A,B,一個應用部署在A伺服器上,想要調用B伺服器上應用提供的函數/方法,由於不在一個記憶體空間,不能直接調用,需要通過網路來表達調用的語義和傳達調用的數據。

  為什麼使用RPC呢?就是無法在一個進程內,甚至一個電腦內通過本地調用的方式完成的需求,比如不同的系統間的通訊,甚至不同的組織間的通訊。由於計算能力需要橫向擴展,需要在多台機器組成的集群上部署應用,RPC的協議有很多,比如最早的CORBA,Java RMI,Web Service的RPC風格,Hessian,Thrift,甚至Rest API。

img

  RPC的處理流程:

    • 當客戶端啟動時,創建一個匿名的回調隊列。
    • 客戶端為RPC請求設置2個屬性:replyTo,設置回調隊列名字;correlationId,標記request。
    • 請求被髮送到rpc_queue隊列中。
    • RPC伺服器端監聽rpc_queue隊列中的請求,當請求到來時,伺服器端會處理並且把帶有結果的消息發送給客戶端。接收的隊列就是replyTo設定的回調隊列。
    • 客戶端監聽回調隊列,當有消息時,檢查correlationId屬性,如果與request中匹配,那就是結果了。

  以上就是多模式的簡介,在實際生產中,我們不同模式需要定義自己交換機,其中:直接交換機、主題交換機、扇形交換機、預設交換機是常用模式。如圖:

  直接交換機、主題交換機、扇形交換機相關源碼不多再贅述,相關代碼如下:

  • 連接類
public class RabbitMQConnectHelper
    {
        /// <summary>
        /// 單方式連接
        /// </summary>
        /// <returns></returns>
        public static IConnection GetConnection()
        {
            var factory = new ConnectionFactory
            {
                HostName = "127.0.0.1",
                Port = 5672,
                UserName = "gerry",
                Password = "gerry",
                VirtualHost = "/",//虛擬主機
            };
            return factory.CreateConnection();
        }
        /// <summary>
        /// 集群方式連接
        /// </summary>
        /// <returns></returns>
        public static IConnection GetClusterConnectiont()
        {
            var factory = new ConnectionFactory
            {
                UserName = "gerry",
                Password = "gerry",
                VirtualHost = "/",//虛擬主機
            };
            List<AmqpTcpEndpoint> host_list = new List<AmqpTcpEndpoint>
            {
                new AmqpTcpEndpoint(){HostName= "127.0.0.1",Port=5672},
                new AmqpTcpEndpoint(){HostName= "127.0.0.1",Port=5673},
                new AmqpTcpEndpoint(){HostName= "127.0.0.1",Port=5674}
            };
            return factory.CreateConnection(host_list);
        }
    }

 

  • 生產者
public class RabbitMQ_Producer
    {
        /// <summary>
        /// Fanout 交換機,扇形隊列,數據同步發送到所有隊列
        /// </summary>
        public void Fanout_SendMessage()
        {
            using (var Connection = RabbitMQConnectHelper.GetConnection())
            {
                using (var channel = Connection.CreateModel())
                {
                    string queueName01 = "testQueue_01";
                    string queueName02 = "testQueue_02";
                    // 聲明交換機
                    channel.ExchangeDeclare("test_fanout_exchange", "fanout");
                    //聲明隊列
                    channel.QueueDeclare(queueName01, durable: true, exclusive: false, autoDelete: false, arguments: null);
                    channel.QueueDeclare(queueName02, durable: true, exclusive: false, autoDelete: false, arguments: null);
                    //綁定到交換機
                    //routingKey: "" 沒有綁定路由key時,消息同步到所有隊列,
                    channel.QueueBind(queue: queueName01, exchange: "test_fanout_exchange", routingKey: "", arguments: null);
                    channel.QueueBind(queue: queueName02, exchange: "test_fanout_exchange", routingKey: "", arguments: null);
                    //聲明基礎屬性
                    var properties = channel.CreateBasicProperties();
                    properties.AppId = Guid.NewGuid().ToString();
                    properties.DeliveryMode = 2;//設置持久性 1-非持久性;2-持久性
                    properties.Persistent = true;//設置持久化
                    properties.Type = queueName01;//消息類型名
                    properties.ContentType = "application/json";
                    properties.ContentEncoding = "utf-8";
                    for (int i = 1; i <= 20; i++)
                    {
                        string msg = $"RabbitMQ Fanout Send {i} Message!";
                        //routingKey: "" 沒有綁定路由key時,消息同步到所有隊列
                        channel.BasicPublish(exchange: "test_fanout_exchange", routingKey: "", basicProperties: properties, body: Encoding.UTF8.GetBytes(msg));
                        Console.WriteLine(msg);
                    }
                }
            }
        }
        /// <summary>
        ///Direct 交換機,直接隊列,數據同步發送到特定的隊列
        /// </summary>
        public void Direct_SendMessage()
        {
            using (var Connection = RabbitMQConnectHelper.GetConnection())
            {
                using (var channel = Connection.CreateModel())
                {
                    string queueName01 = "testQueue_Red";
                    string queueName02 = "testQueue_Yellow";
                    // 聲明交換機
                    channel.ExchangeDeclare("test_direct_exchange", "direct");
                    //聲明隊列  
                    channel.QueueDeclare(queueName01, durable: true, exclusive: false, autoDelete: false, arguments: null);
                    channel.QueueDeclare(queueName02, durable: true, exclusive: false, autoDelete: false, arguments: null);
                    //綁定到交換機
                    //routingKey: queueName 指定消息發送到某個特定隊列
                    channel.QueueBind(queue:queueName01, exchange: "test_direct_exchange", routingKey: queueName01, arguments: null);
                    channel.QueueBind(queue: queueName02, exchange: "test_direct_exchange", routingKey: queueName02, arguments: null);
                    //聲明一個基礎屬性
                    var properties = channel.CreateBasicProperties();
                    properties.AppId = Guid.NewGuid().ToString();
                    properties.DeliveryMode = 2;//設置持久性 1-非持久性;2-持久性
                    properties.Persistent = true;//設置持久化
                    properties.Type = queueName01;//消息類型名
                    properties.ContentType = "application/json";
                    properties.ContentEncoding = "utf-8";
                    for (int i = 1; i <= 20; i++)
                    {
                        string msg = $"RabbitMQ Direct Send {i} Message!";
                        //routingKey: queueName 指定消息發送到某個特定隊列 queueName01
                        channel.BasicPublish(exchange: "test_direct_exchange", routingKey: queueName01, basicProperties: properties, body: Encoding.UTF8.GetBytes(msg));
                        Console.WriteLine(msg);
                    }
                }
            }
        }
        /// <summary>
        /// Topic 交換機,模糊隊列,數據同步發送到模糊匹配隊列
        /// </summary>
        public void Topic_SendMessage()
        {
            using (var Connection = RabbitMQConnectHelper.GetConnection())
            {
                using (var channel = Connection.CreateModel())
                {
                    string queueName01 = "testQueue_01";
                    string queueName02 = "testQueue_02";
                    // 聲明交換機
                    channel.ExchangeDeclare("test_topic_exchange", "topic");
                    //聲明隊列
                    channel.QueueDeclare(queueName01, durable: true, exclusive: false, autoDelete: false, arguments: null);
                    channel.QueueDeclare(queueName02, durable: true, exclusive: false, autoDelete: false, arguments: null);
                    //綁定到交換機
                    //routingKey: queueName 指定消息發送到某個特定隊列
                    channel.QueueBind(queue:queueName01, exchange: "test_topic_exchange", routingKey: "data.*", arguments: null);
                    channel.QueueBind(queue: queueName02, exchange: "test_topic_exchange", routingKey: "data.Red", arguments: null);
                    //聲明一個基礎屬性
                    var properties = channel.CreateBasicProperties();
                    properties.AppId = Guid.NewGuid().ToString();
                    properties.DeliveryMode = 2;//設置持久性 1-非持久性;2-持久性
                    properties.Persistent = true;//設置持久化
                    properties.Type = queueName01;//消息類型名
                    properties.ContentType = "application/json";
                    properties.ContentEncoding = "utf-8";
                    for (int i = 1; i <= 20; i++)
                    {
                        string msg = $"RabbitMQ Topic Send {i} Message!";
                        //routingKey: "data.Red" 指定消息發送到路由是 "data.Red"的特定隊列,"data.*"屬於模糊路由,也會發送數據
                        channel.BasicPublish(exchange: "test_topic_exchange", routingKey: "data.Red", basicProperties: properties, body: Encoding.UTF8.GetBytes(msg));
                        Console.WriteLine(msg);
                    }
                }
            }
        }
    }

 

  • 消費者
public class RabbitMQ_Consumer
    {
        /// <summary>
        /// fanout 消費消息數據
        /// </summary>
        public void fanout_Received_Message()
        {
            var Connection = RabbitMQConnectHelper.GetConnection();
            var channel = Connection.CreateModel();
            string queueName01 = "testQueue_01";
            // 聲明交換機
            channel.ExchangeDeclare("test_fanout_exchange", "fanout");
            //聲明隊列
            channel.QueueDeclare(queueName01, durable: true, exclusive: false, autoDelete: false, arguments: null);
            //綁定到交換機
            //routingKey: "" 沒有綁定路由key時,消息同步到所有隊列,
            channel.QueueBind(queue: queueName01, exchange: "test_fanout_exchange", routingKey: "", arguments: null);
           
            //聲明消費者
            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var Str = Encoding.UTF8.GetString(body.ToArray());
                //autoAck: false 則手動在接收方法內提交BasicAck 可做成等待SQL執行返回True,以保證消息能消費成功且入庫
                //autoAck: true  不需要在接收方法內使用 BasicAck方法
                channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
            };
            //autoAck 值取決與 channel.BasicAck是否手動提交
            channel.BasicConsume(queue: queueName01, autoAck: false, consumer: consumer);
        }
        /// <summary>
        /// Direct 消費消息數據
        /// </summary>
        public void direct_Received_Message()
        {
            var Connection = RabbitMQConnectHelper.GetConnection();
            var channel = Connection.CreateModel();
            string queueName01 = "testQueue_Red";
            // 聲明交換機
            channel.ExchangeDeclare("test_direct_exchange", "direct");
            //聲明隊列  
            channel.QueueDeclare(queueName01, durable: true, exclusive: false, autoDelete: false, arguments: null);
            //綁定到交換機
            //routingKey: queueName 指定消息發送到某個特定隊列
            channel.QueueBind(queue: queueName01, exchange: "test_direct_exchange", routingKey: queueName01, arguments: null);

            //聲明消費者
            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var Str = Encoding.UTF8.GetString(body.ToArray());
                //autoAck: false 則手動在接收方法內提交BasicAck 可做成等待SQL執行返回True,以保證消息能消費成功且入庫
                //autoAck: true  不需要在接收方法內使用 BasicAck方法
                channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
            };
            //autoAck 值取決與 channel.BasicAck是否手動提交
            channel.BasicConsume(queue: queueName01, autoAck: false, consumer: consumer);
        }
        /// <summary>
        /// Topic 消費消息數據
        /// </summary>
        public void topic_Received_Message()
        {
            var Connection = RabbitMQConnectHelper.GetConnection();
            var channel = Connection.CreateModel();
            string queueName02 = "testQueue_02";
            // 聲明交換機
            channel.ExchangeDeclare("test_topic_exchange", "topic");
            //聲明隊列
            channel.QueueDeclare(queueName02, durable: true, exclusive: false, autoDelete: false, arguments: null);
            //綁定到交換機
            //routingKey: queueName 指定消息發送到某個特定隊列
            channel.QueueBind(queue: queueName02, exchange: "test_topic_exchange", routingKey: "data.Red", arguments: null);

            //聲明消費者
            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var Str = Encoding.UTF8.GetString(body.ToArray());
                //autoAck: false 則手動在接收方法內提交BasicAck 可做成等待SQL執行返回True,以保證消息能消費成功且入庫
                //autoAck: true  不需要在接收方法內使用 BasicAck方法
                channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
            };
            //autoAck 值取決與 channel.BasicAck是否手動提交
            channel.BasicConsume(queue: queueName02, autoAck: false, consumer: consumer);
        }
    }

 

使用預設交換機,簡潔使用。請訪問基礎篇 ASP.NET Core知識之RabbitMQ組件使用(一) 。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • Spire.XLS for C++ 是一個 Excel 庫,供開發人員在任何類型的 C++ 應用程式中操作 Excel 文檔(XLS、XLSX、XLSB 和 XLSM)。 本文演示瞭如何以兩種不同的方式將 Spire.XLS for C++ 集成到您的 C++ 應用程式中。 通過 NuGet 安裝S ...
  • 一、Spring Boot 是什麼 世界上最好的文檔來源自官方的《Spring Boot Reference Guide》,是這樣介紹的: Spring Boot makes it easy to create stand-alone, production-grade Spring based A ...
  • 最近OpenAI的ChatGPT真的是到處都在刷屏,我想你已經看過很多關於ChatGPT的文章或者視頻了,我就不過多介紹了。 不過你碰巧還不知道的話,可以先百度一下,然後再回來繼續。 與ChatGPT對話很有趣,甚至很有啟發性。有人用它聊天,有人用它寫代碼,太多省時省力的工作,都可以由它完成,我們的 ...
  • MD_CHANGE_MRP_DATA:在我看來,它是MRP上最重要的BAdI。它允許您在物料需求計劃或 MD04 交易記錄期間更改任何物料需求計劃元素。最常見的用途是使物料需求計劃元素與物料需求計劃無關,但它也可用於其他更改,例如數量更改。您可以在文檔“使用 BAdI MD_CHANGE_MRP_D ...
  • 這篇文章主要描述分散式通信中的發佈訂閱模式,這是一種非同步通信模式,它可以解耦消息的生產者和消費者,提高系統的穩定性。同時,文章也描述了Kafka的架構以及主題、分區和消費組。 ...
  • 輸入輸出 print()函數 sep=' ' 數據之間一空格分割,預設是空格 end='\n' 在列印後會額外的加一個數據,預設是換行符 print("hello", "world") print("hello", "world", sep=' ') """ 上面兩行代碼輸出語句分別為: hello ...
  • cron 有2種表達形式 6個時間刻度的話 * * * * * * 分別對應 秒 分 時 日 月 星期 ; 7個時間刻度的話 * * * * * * * 分別對應 秒 分 時 日 月 星期 年; >>>註意 時間刻度之間得用 ‘空格’分隔 對應的通配符有: * ? / - , 星號(*): 指的是 ...
  • 阿裡雲物聯網平臺專用工具基本涵蓋了阿裡雲物聯網平臺提供你主要管理功能,可以方便創建產品、設備、物模型,查看設備實時屬性,事件,發送服務和查看服務日誌等等 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...