DotNet Core中使用RabbitMQ

来源:https://www.cnblogs.com/taotaozhuanyong/archive/2019/11/05/11765444.html
-Advertisement-
Play Games

上一篇隨筆記錄到RabbitMQ的安裝,安裝完成,我們就開始使用吧。 RabbitMQ簡介 AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標準,為面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息 ...


  上一篇隨筆記錄到RabbitMQ的安裝,安裝完成,我們就開始使用吧。

RabbitMQ簡介

  AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標準,為面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。

  AMQP的主要特征是面向消息、隊列、路由(包括點對點和發佈/訂閱)、可靠性、安全。
RabbitMQ是一個開源的AMQP實現,伺服器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分散式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。

  RabbitMQ提供了可靠的消息機制、跟蹤機制和靈活的消息路由,支持消息集群和分散式部署。適用於排隊演算法、秒殺活動、消息分發、非同步處理、數據同步、處理耗時任務、CQRS等應用場景。

DotNet Core使用RabbitMQ

通過nuget安裝:https://www.nuget.org/packages/RabbitMQ.Client/

定義生產者:

//創建連接工廠
ConnectionFactory factory = new ConnectionFactory
{
    UserName = "guest",//用戶名
    Password = "guest",//密碼
    HostName = "127.0.0.1"//rabbitmq ip
};

//創建連接
var connection = factory.CreateConnection();
//創建通道
var channel = connection.CreateModel();
//聲明一個隊列
channel.QueueDeclare("hello", false, false, false, null);

Console.WriteLine("\nRabbitMQ連接成功,請輸入消息,輸入exit退出!");

string input;
do
{
    input = Console.ReadLine();

    var sendBytes = Encoding.UTF8.GetBytes(input);
    //發佈消息
    channel.BasicPublish("", "hello", null, sendBytes);

} while (input.Trim().ToLower() != "exit");
channel.Close();
connection.Close();

定義消費者:

//創建連接工廠
ConnectionFactory factory = new ConnectionFactory
{
    UserName = "guest",//用戶名
    Password = "guest",//密碼
    HostName = "127.0.0.1"//rabbitmq ip
};

//創建連接
var connection = factory.CreateConnection();
//創建通道
var channel = connection.CreateModel();

//事件基本消費者
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

//接收到消息事件
consumer.Received += (ch, ea) =>
{
    var message = Encoding.UTF8.GetString(ea.Body);
    Console.WriteLine($"收到消息: {message}");
    //確認該消息已被消費
    channel.BasicAck(ea.DeliveryTag, false);
};
//啟動消費者 設置為手動應答消息
channel.BasicConsume("hello", false, consumer);
Console.WriteLine("消費者已啟動");
Console.ReadKey();
channel.Dispose();
connection.Close();

演示如下:

啟動了一個生產者,兩個消費者,可以看見兩個消費者都能接收到消息,消息投遞到哪個消費者是由RabbitMQ決定的。

RabbitMQ消費失敗的處理

  RabbitMQ採用消息應答機制,即消費者收到一個消息之後,需要發送一個應答,然後RabbitMQ才會將這個消息從隊列中刪除,如果消費者在消費過程中出現異常,斷開連接切沒有發送應答,那麼RabbitMQ會將這個消息重新投遞。

我們來修改一下消費者的代碼:

 //接收到消息事件
 consumer.Received += (ch, ea) =>
 {
     var message = Encoding.UTF8.GetString(ea.Body);

     Console.WriteLine($"收到消息: {message}");

     Console.WriteLine($"收到該消息[{ea.DeliveryTag}] 延遲10s發送回執");
     Thread.Sleep(10000);
     //確認該消息已被消費
     channel.BasicAck(ea.DeliveryTag, false);
     Console.WriteLine($"已發送回執[{ea.DeliveryTag}]");
 };

演示如下:

從圖中可以看出,設置了消息應答延遲10s,如果在這10s中,該消費者斷開了連接,那麼消息會被RabbitMQ重新投遞。

使用RabbitMQ的Exchange

前面的例子,我們可以看到生產者將消息投遞到Queue中,實際上這種方式在RabbitMQ中永遠都不會發生的。實際的情況是,生產者將消息發送到Exchange(交換器),下圖中的X,由Exchange(交換器)將消息路由到一個或多個Queue中(或者丟棄)。

 

AMQP協議中的核心思想就是生產者和消費者隔離,生產者從不直接將消息發送給隊列。生產者通常不知道是否一個消息會被髮送到隊列中,只是將消息發送到一個交換機。先由Exchange來接收,然後Exchange按照特定的策略轉發到Queue進行存儲。同理,消費者也是如此。Exchange 就類似於一個交換機,轉發各個消息分發到相應的隊列中。

Exchange Types(交換器類型)

RabbitMQ常用的Exchange Type有Fanout、Direct、Topic、Headers這四種

1、Fanout:

  這種類型的Exchange路由規則非常簡單,它會把所有發送到該Exchange的消息路由到所有與它綁定的Queue中,這時Routing key不起作用

 

 

 

Fanout Exchange 不需要處理RouteKey 。只需要簡單的將隊列綁定到exchange 上。這樣發送到exchange的消息都會被轉發到與該交換機綁定的所有隊列上。類似子網廣播,每檯子網內的主機都獲得了一份複製的消息。

所以,Fanout Exchange 轉發消息是最快的。

為了演示效果,定義了兩個隊列,分別為hello1,hello2,每個隊列都擁有一個消費者。

static void Main(string[] args)
{
    string exchangeName = "TestFanoutChange";
    string queueName1 = "hello1";
    string queueName2 = "hello2";
    string routeKey = "";

    //創建連接工廠
    ConnectionFactory factory = new ConnectionFactory
    {
        UserName = "guest",//用戶名
        Password = "guest",//密碼
        HostName = "127.0.0.1"//rabbitmq ip
    };

    //創建連接
    var connection = factory.CreateConnection();
    //創建通道
    var channel = connection.CreateModel();

    //定義一個Direct類型交換機
    channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, false, false, null);

    //定義隊列1
    channel.QueueDeclare(queueName1, false, false, false, null);
    //定義隊列2
    channel.QueueDeclare(queueName2, false, false, false, null);

    //將隊列綁定到交換機
    channel.QueueBind(queueName1, exchangeName, routeKey, null);
    channel.QueueBind(queueName2, exchangeName, routeKey, null);

    //生成兩個隊列的消費者
    ConsumerGenerator(queueName1);
    ConsumerGenerator(queueName2);


    Console.WriteLine($"\nRabbitMQ連接成功,\n\n請輸入消息,輸入exit退出!");

    string input;
    do
    {
        input = Console.ReadLine();

        var sendBytes = Encoding.UTF8.GetBytes(input);
        //發佈消息
        channel.BasicPublish(exchangeName, routeKey, null, sendBytes);

    } while (input.Trim().ToLower() != "exit");
    channel.Close();
    connection.Close();
}
 /// <summary>
 /// 根據隊列名稱生成消費者
 /// </summary>
 /// <param name="queueName"></param>
 static void ConsumerGenerator(string queueName)
 {
     //創建連接工廠
     ConnectionFactory factory = new ConnectionFactory
     {
         UserName = "guest",//用戶名
         Password = "guest",//密碼
         HostName = "127.0.0.1"//rabbitmq ip
     };

     //創建連接
     var connection = factory.CreateConnection();
     //創建通道
     var channel = connection.CreateModel();

     //事件基本消費者
     EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

     //接收到消息事件
     consumer.Received += (ch, ea) =>
     {
         var message = Encoding.UTF8.GetString(ea.Body);

         Console.WriteLine($"Queue:{queueName}收到消息: {message}");
         //確認該消息已被消費
         channel.BasicAck(ea.DeliveryTag, false);
     };
     //啟動消費者 設置為手動應答消息
     channel.BasicConsume(queueName, false, consumer);
     Console.WriteLine($"Queue:{queueName},消費者已啟動");
 }

運行效果如下:

2、Direct

  這種類型的Exchange路由規則也很簡單,它會把消息路由到哪些binding key與routingkey完全匹配的Queue中。

 

   Direct模式,可以使用rabbitMQ自帶的Exchange:default Exchange 。所以不需要將Exchange進行任何綁定(binding)操作 。消息傳遞時,RouteKey必須完全匹配,才會被隊列接收,否則該消息會被拋棄。

static void Main(string[] args)
{
    string exchangeName = "TestChange";
    string queueName = "hello";
    string routeKey = "helloRouteKey";

    //創建連接工廠
    ConnectionFactory factory = new ConnectionFactory
    {
        UserName = "guest",//用戶名
        Password = "guest",//密碼
        HostName = "127.0.0.1"//rabbitmq ip
    };

    //創建連接
    var connection = factory.CreateConnection();
    //創建通道
    var channel = connection.CreateModel();

    //定義一個Direct類型交換機
    channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, false, false, null);

    //定義一個隊列
    channel.QueueDeclare(queueName, false, false, false, null);

    //將隊列綁定到交換機
    channel.QueueBind(queueName, exchangeName, routeKey, null);

    Console.WriteLine($"\nRabbitMQ連接成功,Exchange:{exchangeName},Queue:{queueName},Route:{routeKey},\n\n請輸入消息,輸入exit退出!");

    string input;
    do
    {
        input = Console.ReadLine();

        var sendBytes = Encoding.UTF8.GetBytes(input);
        //發佈消息
        channel.BasicPublish(exchangeName, routeKey, null, sendBytes);

    } while (input.Trim().ToLower() != "exit");
    channel.Close();
    connection.Close();

運行效果如下:

3、Topic

  這種類型的Exchange的路由規則支持 binding key 和 routing key 的模糊匹配,會把消息路由到滿足條件的Queue。 binding key 中可以存在兩種特殊字元 *與 #,用於做模糊匹配,其中 * 用於匹配一個單詞,# 用於匹配0個或多個單詞,單詞以符號“.”為分隔符。

  以上圖中的配置為例,routingKey=”quick.orange.rabbit”的消息會同時路由到Q1與Q2,routingKey=”lazy.orange.fox”的消息會路由到Q1與Q2,routingKey=”lazy.brown.fox”的消息會路由到Q2,routingKey=”lazy.pink.rabbit”的消息會路由到Q2(只會投遞給Q2一次,雖然這個routingKey與Q2的兩個bindingKey都匹配);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的消息將會被丟棄,因為它們沒有匹配任何bindingKey。

  所以,Topic Exchange使用非常靈活。
static void Main(string[] args)
{
    string exchangeName = "TestTopicChange";
    string queueName = "hello";
    string routeKey = "TestRouteKey.*";

    //創建連接工廠
    ConnectionFactory factory = new ConnectionFactory
    {
        UserName = "guest",//用戶名
        Password = "guest",//密碼
        HostName = "127.0.0.1"//rabbitmq ip
    };

    //創建連接
    var connection = factory.CreateConnection();
    //創建通道
    var channel = connection.CreateModel();

    //定義一個Direct類型交換機
    channel.ExchangeDeclare(exchangeName, ExchangeType.Topic, false, false, null);

    //定義隊列1
    channel.QueueDeclare(queueName, false, false, false, null);

    //將隊列綁定到交換機
    channel.QueueBind(queueName, exchangeName, routeKey, null);



    Console.WriteLine($"\nRabbitMQ連接成功,\n\n請輸入消息,輸入exit退出!");

    string input;
    do
    {
        input = Console.ReadLine();

        var sendBytes = Encoding.UTF8.GetBytes(input);
        //發佈消息
        channel.BasicPublish(exchangeName, "TestRouteKey.one", null, sendBytes);

    } while (input.Trim().ToLower() != "exit");
    channel.Close();
    connection.Close();
}

運行效果如下:

 4、Headers

  這種類型的Exchange不依賴於 routing key 與 binding key 的匹配規則來路由消息,而是根據發送的消息內容中的 headers 屬性進行匹配。

參考:

  官網:https://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html

    https://www.cnblogs.com/stulzq/p/7551819.html

    https://www.jianshu.com/p/e55e971aebd8


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • using System; namespace program { class program1 { static void Main(string[] args) { int a = 100; int b = 100; Console.WriteLine("下麵使用豎式計算結果"); Consol ...
  • 前言 併發、並行。同步、非同步、互斥、多線程。我太難了。被這些詞搞懵了。前面我們在寫.Net基礎系列的時候寫過了關於.Net的非同步編程。那麼其他的都是些什麼東西呀。今天我們首先就來解決這個問題。把這些詞搞懂搞透。理清邏輯。然後最後我們進入並行編程的介紹。 概念初識 首先我們看併發和並行: 併發:併發指 ...
  • 鴿了好久,終於有個時間繼續寫了,繼上一篇之後,又寫(水)了一篇,有什麼不足之處請大家指出,多謝各位了。 下麵有兩個需要用到的軟體,putty和pscp,我已經上傳到博客園了,下載請點擊這裡。 一、準備伺服器 首先和之前一樣,先去騰訊雲整了個雲伺服器,選擇CentOS的鏡像。 然後跟之前一樣完成購買, ...
  • 1.阻止from提交:在按鈕的click事件中加入$("#btnSubmit").attr("disabled", "disabled");; 2.使用ajaxfrom提交不刷新頁面 必須要引<script src="~/Scripts/jquery.validate.min.js"></scrip ...
  • 總結下麵幾點 1.與下位機的連接儘量保持長連接,每次用到的時候去連接的話,過一段時間速度明顯下降,什麼問題並沒有找到 2.C#中的BitConverter 類可以非常方便的在位元組與其他類型之間進行轉換 3.周期性操作使用while迴圈,避免使用timer定時器 4.操作一些標誌位的操作,儘量放到一個 ...
  • VS2017打開項目時提示未能正確載入CSharpPackage包, 可以使用 devenv命令工具來解決,操作如下 打開vs2017開發人員命令提示符(請使用管理員身份運行),如圖 敲入 devenv /setup 回車執行 最後重啟vs解決。 有的再重啟vs時還會出現 未能正確載入“Micros ...
  • KindEditor使用JavaScript編寫,可以無縫的於Java、.NET、PHP、ASP等程式接合。 KindEditor非常適合在CMS、商城、論壇、博客、Wiki、電子郵件等互聯網應用上使用,2006年7月首次發佈2.0以來,KindEditor依靠出色的用戶體驗和領先的技術不斷擴大編輯 ...
  • 一、簡介 方法可以稱為函數,函數又可以稱為方法,方法主要的作用是將一堆代碼進行重用的一種機制,避免太多的冗餘的代碼,還有方便後期維護。 二、語法 函數的語法: 描述: public:訪問修飾符,公開的; static:靜態的; 返回值類型:比如int 、string 、double等的類型,如果不寫 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...