DotNet Core中使用RabbitMQ

来源:https://www.cnblogs.com/taotaozhuanyong/archive/2019/11/05/11765444.html
-Advertisement-
Play Games

上一篇隨筆記錄到RabbitMQ的安裝,安裝完成,我們就開始使用吧。 RabbitMQ簡介 AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標準,為面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息 ...


  上一篇隨筆記錄到RabbitMQ的安裝,安裝完成,我們就開始使用吧。

RabbitMQ簡介

  AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標準,為面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。

  AMQP的主要特征是面向消息、隊列、路由(包括點對點和發佈/訂閱)、可靠性、安全。
RabbitMQ是一個開源的AMQP實現,伺服器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分散式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。

  RabbitMQ提供了可靠的消息機制、跟蹤機制和靈活的消息路由,支持消息集群和分散式部署。適用於排隊演算法、秒殺活動、消息分發、非同步處理、數據同步、處理耗時任務、CQRS等應用場景。

DotNet Core使用RabbitMQ

通過nuget安裝:https://www.nuget.org/packages/RabbitMQ.Client/

定義生產者:

//創建連接工廠
ConnectionFactory factory = new ConnectionFactory
{
    UserName = "guest",//用戶名
    Password = "guest",//密碼
    HostName = "127.0.0.1"//rabbitmq ip
};

//創建連接
var connection = factory.CreateConnection();
//創建通道
var channel = connection.CreateModel();
//聲明一個隊列
channel.QueueDeclare("hello", false, false, false, null);

Console.WriteLine("\nRabbitMQ連接成功,請輸入消息,輸入exit退出!");

string input;
do
{
    input = Console.ReadLine();

    var sendBytes = Encoding.UTF8.GetBytes(input);
    //發佈消息
    channel.BasicPublish("", "hello", null, sendBytes);

} while (input.Trim().ToLower() != "exit");
channel.Close();
connection.Close();

定義消費者:

//創建連接工廠
ConnectionFactory factory = new ConnectionFactory
{
    UserName = "guest",//用戶名
    Password = "guest",//密碼
    HostName = "127.0.0.1"//rabbitmq ip
};

//創建連接
var connection = factory.CreateConnection();
//創建通道
var channel = connection.CreateModel();

//事件基本消費者
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

//接收到消息事件
consumer.Received += (ch, ea) =>
{
    var message = Encoding.UTF8.GetString(ea.Body);
    Console.WriteLine($"收到消息: {message}");
    //確認該消息已被消費
    channel.BasicAck(ea.DeliveryTag, false);
};
//啟動消費者 設置為手動應答消息
channel.BasicConsume("hello", false, consumer);
Console.WriteLine("消費者已啟動");
Console.ReadKey();
channel.Dispose();
connection.Close();

演示如下:

啟動了一個生產者,兩個消費者,可以看見兩個消費者都能接收到消息,消息投遞到哪個消費者是由RabbitMQ決定的。

RabbitMQ消費失敗的處理

  RabbitMQ採用消息應答機制,即消費者收到一個消息之後,需要發送一個應答,然後RabbitMQ才會將這個消息從隊列中刪除,如果消費者在消費過程中出現異常,斷開連接切沒有發送應答,那麼RabbitMQ會將這個消息重新投遞。

我們來修改一下消費者的代碼:

 //接收到消息事件
 consumer.Received += (ch, ea) =>
 {
     var message = Encoding.UTF8.GetString(ea.Body);

     Console.WriteLine($"收到消息: {message}");

     Console.WriteLine($"收到該消息[{ea.DeliveryTag}] 延遲10s發送回執");
     Thread.Sleep(10000);
     //確認該消息已被消費
     channel.BasicAck(ea.DeliveryTag, false);
     Console.WriteLine($"已發送回執[{ea.DeliveryTag}]");
 };

演示如下:

從圖中可以看出,設置了消息應答延遲10s,如果在這10s中,該消費者斷開了連接,那麼消息會被RabbitMQ重新投遞。

使用RabbitMQ的Exchange

前面的例子,我們可以看到生產者將消息投遞到Queue中,實際上這種方式在RabbitMQ中永遠都不會發生的。實際的情況是,生產者將消息發送到Exchange(交換器),下圖中的X,由Exchange(交換器)將消息路由到一個或多個Queue中(或者丟棄)。

 

AMQP協議中的核心思想就是生產者和消費者隔離,生產者從不直接將消息發送給隊列。生產者通常不知道是否一個消息會被髮送到隊列中,只是將消息發送到一個交換機。先由Exchange來接收,然後Exchange按照特定的策略轉發到Queue進行存儲。同理,消費者也是如此。Exchange 就類似於一個交換機,轉發各個消息分發到相應的隊列中。

Exchange Types(交換器類型)

RabbitMQ常用的Exchange Type有Fanout、Direct、Topic、Headers這四種

1、Fanout:

  這種類型的Exchange路由規則非常簡單,它會把所有發送到該Exchange的消息路由到所有與它綁定的Queue中,這時Routing key不起作用

 

 

 

Fanout Exchange 不需要處理RouteKey 。只需要簡單的將隊列綁定到exchange 上。這樣發送到exchange的消息都會被轉發到與該交換機綁定的所有隊列上。類似子網廣播,每檯子網內的主機都獲得了一份複製的消息。

所以,Fanout Exchange 轉發消息是最快的。

為了演示效果,定義了兩個隊列,分別為hello1,hello2,每個隊列都擁有一個消費者。

static void Main(string[] args)
{
    string exchangeName = "TestFanoutChange";
    string queueName1 = "hello1";
    string queueName2 = "hello2";
    string routeKey = "";

    //創建連接工廠
    ConnectionFactory factory = new ConnectionFactory
    {
        UserName = "guest",//用戶名
        Password = "guest",//密碼
        HostName = "127.0.0.1"//rabbitmq ip
    };

    //創建連接
    var connection = factory.CreateConnection();
    //創建通道
    var channel = connection.CreateModel();

    //定義一個Direct類型交換機
    channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, false, false, null);

    //定義隊列1
    channel.QueueDeclare(queueName1, false, false, false, null);
    //定義隊列2
    channel.QueueDeclare(queueName2, false, false, false, null);

    //將隊列綁定到交換機
    channel.QueueBind(queueName1, exchangeName, routeKey, null);
    channel.QueueBind(queueName2, exchangeName, routeKey, null);

    //生成兩個隊列的消費者
    ConsumerGenerator(queueName1);
    ConsumerGenerator(queueName2);


    Console.WriteLine($"\nRabbitMQ連接成功,\n\n請輸入消息,輸入exit退出!");

    string input;
    do
    {
        input = Console.ReadLine();

        var sendBytes = Encoding.UTF8.GetBytes(input);
        //發佈消息
        channel.BasicPublish(exchangeName, routeKey, null, sendBytes);

    } while (input.Trim().ToLower() != "exit");
    channel.Close();
    connection.Close();
}
 /// <summary>
 /// 根據隊列名稱生成消費者
 /// </summary>
 /// <param name="queueName"></param>
 static void ConsumerGenerator(string queueName)
 {
     //創建連接工廠
     ConnectionFactory factory = new ConnectionFactory
     {
         UserName = "guest",//用戶名
         Password = "guest",//密碼
         HostName = "127.0.0.1"//rabbitmq ip
     };

     //創建連接
     var connection = factory.CreateConnection();
     //創建通道
     var channel = connection.CreateModel();

     //事件基本消費者
     EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

     //接收到消息事件
     consumer.Received += (ch, ea) =>
     {
         var message = Encoding.UTF8.GetString(ea.Body);

         Console.WriteLine($"Queue:{queueName}收到消息: {message}");
         //確認該消息已被消費
         channel.BasicAck(ea.DeliveryTag, false);
     };
     //啟動消費者 設置為手動應答消息
     channel.BasicConsume(queueName, false, consumer);
     Console.WriteLine($"Queue:{queueName},消費者已啟動");
 }

運行效果如下:

2、Direct

  這種類型的Exchange路由規則也很簡單,它會把消息路由到哪些binding key與routingkey完全匹配的Queue中。

 

   Direct模式,可以使用rabbitMQ自帶的Exchange:default Exchange 。所以不需要將Exchange進行任何綁定(binding)操作 。消息傳遞時,RouteKey必須完全匹配,才會被隊列接收,否則該消息會被拋棄。

static void Main(string[] args)
{
    string exchangeName = "TestChange";
    string queueName = "hello";
    string routeKey = "helloRouteKey";

    //創建連接工廠
    ConnectionFactory factory = new ConnectionFactory
    {
        UserName = "guest",//用戶名
        Password = "guest",//密碼
        HostName = "127.0.0.1"//rabbitmq ip
    };

    //創建連接
    var connection = factory.CreateConnection();
    //創建通道
    var channel = connection.CreateModel();

    //定義一個Direct類型交換機
    channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, false, false, null);

    //定義一個隊列
    channel.QueueDeclare(queueName, false, false, false, null);

    //將隊列綁定到交換機
    channel.QueueBind(queueName, exchangeName, routeKey, null);

    Console.WriteLine($"\nRabbitMQ連接成功,Exchange:{exchangeName},Queue:{queueName},Route:{routeKey},\n\n請輸入消息,輸入exit退出!");

    string input;
    do
    {
        input = Console.ReadLine();

        var sendBytes = Encoding.UTF8.GetBytes(input);
        //發佈消息
        channel.BasicPublish(exchangeName, routeKey, null, sendBytes);

    } while (input.Trim().ToLower() != "exit");
    channel.Close();
    connection.Close();

運行效果如下:

3、Topic

  這種類型的Exchange的路由規則支持 binding key 和 routing key 的模糊匹配,會把消息路由到滿足條件的Queue。 binding key 中可以存在兩種特殊字元 *與 #,用於做模糊匹配,其中 * 用於匹配一個單詞,# 用於匹配0個或多個單詞,單詞以符號“.”為分隔符。

  以上圖中的配置為例,routingKey=”quick.orange.rabbit”的消息會同時路由到Q1與Q2,routingKey=”lazy.orange.fox”的消息會路由到Q1與Q2,routingKey=”lazy.brown.fox”的消息會路由到Q2,routingKey=”lazy.pink.rabbit”的消息會路由到Q2(只會投遞給Q2一次,雖然這個routingKey與Q2的兩個bindingKey都匹配);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的消息將會被丟棄,因為它們沒有匹配任何bindingKey。

  所以,Topic Exchange使用非常靈活。
static void Main(string[] args)
{
    string exchangeName = "TestTopicChange";
    string queueName = "hello";
    string routeKey = "TestRouteKey.*";

    //創建連接工廠
    ConnectionFactory factory = new ConnectionFactory
    {
        UserName = "guest",//用戶名
        Password = "guest",//密碼
        HostName = "127.0.0.1"//rabbitmq ip
    };

    //創建連接
    var connection = factory.CreateConnection();
    //創建通道
    var channel = connection.CreateModel();

    //定義一個Direct類型交換機
    channel.ExchangeDeclare(exchangeName, ExchangeType.Topic, false, false, null);

    //定義隊列1
    channel.QueueDeclare(queueName, false, false, false, null);

    //將隊列綁定到交換機
    channel.QueueBind(queueName, exchangeName, routeKey, null);



    Console.WriteLine($"\nRabbitMQ連接成功,\n\n請輸入消息,輸入exit退出!");

    string input;
    do
    {
        input = Console.ReadLine();

        var sendBytes = Encoding.UTF8.GetBytes(input);
        //發佈消息
        channel.BasicPublish(exchangeName, "TestRouteKey.one", null, sendBytes);

    } while (input.Trim().ToLower() != "exit");
    channel.Close();
    connection.Close();
}

運行效果如下:

 4、Headers

  這種類型的Exchange不依賴於 routing key 與 binding key 的匹配規則來路由消息,而是根據發送的消息內容中的 headers 屬性進行匹配。

參考:

  官網:https://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html

    https://www.cnblogs.com/stulzq/p/7551819.html

    https://www.jianshu.com/p/e55e971aebd8


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • using System; namespace program { class program1 { static void Main(string[] args) { int a = 100; int b = 100; Console.WriteLine("下麵使用豎式計算結果"); Consol ...
  • 前言 併發、並行。同步、非同步、互斥、多線程。我太難了。被這些詞搞懵了。前面我們在寫.Net基礎系列的時候寫過了關於.Net的非同步編程。那麼其他的都是些什麼東西呀。今天我們首先就來解決這個問題。把這些詞搞懂搞透。理清邏輯。然後最後我們進入並行編程的介紹。 概念初識 首先我們看併發和並行: 併發:併發指 ...
  • 鴿了好久,終於有個時間繼續寫了,繼上一篇之後,又寫(水)了一篇,有什麼不足之處請大家指出,多謝各位了。 下麵有兩個需要用到的軟體,putty和pscp,我已經上傳到博客園了,下載請點擊這裡。 一、準備伺服器 首先和之前一樣,先去騰訊雲整了個雲伺服器,選擇CentOS的鏡像。 然後跟之前一樣完成購買, ...
  • 1.阻止from提交:在按鈕的click事件中加入$("#btnSubmit").attr("disabled", "disabled");; 2.使用ajaxfrom提交不刷新頁面 必須要引<script src="~/Scripts/jquery.validate.min.js"></scrip ...
  • 總結下麵幾點 1.與下位機的連接儘量保持長連接,每次用到的時候去連接的話,過一段時間速度明顯下降,什麼問題並沒有找到 2.C#中的BitConverter 類可以非常方便的在位元組與其他類型之間進行轉換 3.周期性操作使用while迴圈,避免使用timer定時器 4.操作一些標誌位的操作,儘量放到一個 ...
  • VS2017打開項目時提示未能正確載入CSharpPackage包, 可以使用 devenv命令工具來解決,操作如下 打開vs2017開發人員命令提示符(請使用管理員身份運行),如圖 敲入 devenv /setup 回車執行 最後重啟vs解決。 有的再重啟vs時還會出現 未能正確載入“Micros ...
  • KindEditor使用JavaScript編寫,可以無縫的於Java、.NET、PHP、ASP等程式接合。 KindEditor非常適合在CMS、商城、論壇、博客、Wiki、電子郵件等互聯網應用上使用,2006年7月首次發佈2.0以來,KindEditor依靠出色的用戶體驗和領先的技術不斷擴大編輯 ...
  • 一、簡介 方法可以稱為函數,函數又可以稱為方法,方法主要的作用是將一堆代碼進行重用的一種機制,避免太多的冗餘的代碼,還有方便後期維護。 二、語法 函數的語法: 描述: public:訪問修飾符,公開的; static:靜態的; 返回值類型:比如int 、string 、double等的類型,如果不寫 ...
一周排行
    -Advertisement-
    Play Games
  • 前言 插件化的需求主要源於對軟體架構靈活性的追求,特別是在開發大型、複雜或需要不斷更新的軟體系統時,插件化可以提高軟體系統的可擴展性、可定製性、隔離性、安全性、可維護性、模塊化、易於升級和更新以及支持第三方開發等方面的能力,從而滿足不斷變化的業務需求和技術挑戰。 一、插件化探索 在WPF中我們想要開 ...
  • 歡迎ReaLTaiizor是一個用戶友好的、以設計為中心的.NET WinForms項目控制項庫,包含廣泛的組件。您可以使用不同的主題選項對項目進行個性化設置,並自定義用戶控制項,以使您的應用程式更加專業。 項目地址:https://github.com/Taiizor/ReaLTaiizor 步驟1: ...
  • EDP是一套集組織架構,許可權框架【功能許可權,操作許可權,數據訪問許可權,WebApi許可權】,自動化日誌,動態Interface,WebApi管理等基礎功能於一體的,基於.net的企業應用開發框架。通過友好的編碼方式實現數據行、列許可權的管控。 ...
  • Channel 是乾什麼的 The System.Threading.Channels namespace provides a set of synchronization data structures for passing data between producers and consume ...
  • efcore如何優雅的實現按年分庫按月分表 介紹 本文ShardinfCore版本 本期主角: ShardingCore 一款ef-core下高性能、輕量級針對分表分庫讀寫分離的解決方案,具有零依賴、零學習成本、零業務代碼入侵適配 距離上次發文.net相關的已經有很久了,期間一直在從事java相關的 ...
  • 前言 Spacesniffer 是一個免費的文件掃描工具,通過使用樹狀圖可視化佈局,可以立即瞭解大文件夾的位置,幫助用戶處理找到這些文件夾 當前系統C盤空間 清理後系統C盤空間 下載 Spacesniffer 下載地址:https://spacesniffer.en.softonic.com/dow ...
  • EDP是一套集組織架構,許可權框架【功能許可權,操作許可權,數據訪問許可權,WebApi許可權】,自動化日誌,動態Interface,WebApi管理等基礎功能於一體的,基於.net的企業應用開發框架。通過友好的編碼方式實現數據行、列許可權的管控。 ...
  • 一、ReZero簡介 ReZero是一款.NET中間件 : 全網唯一開源界面操作就能生成API , 可以集成到任何.NET6+ API項目,無破壞性,也可讓非.NET用戶使用exe文件 免費開源:MIT最寬鬆協議 , 一直從事開源事業十年,一直堅持開源 1.1 純ReZero開發 適合.Net Co ...
  • 一:背景 1. 講故事 停了一個月沒有更新文章了,主要是忙於寫 C#內功修煉系列的PPT,現在基本上接近尾聲,可以回頭繼續更新這段時間分析dump的一些事故報告,有朋友微信上找到我,說他們的系統出現了大量的http超時,程式不響應處理了,讓我幫忙看下怎麼回事,dump也抓到了。 二:WinDbg分析 ...
  • 開始做項目管理了(本人3年java,來到這邊之後真沒想到...),天天開會溝通整理需求,他們講話的時候忙裡偷閑整理一下常用的方法,其實語言還是有共通性的,基本上看到方法名就大概能猜出來用法。出去打水的時候看到外面太陽好好,真想在外面坐著曬太陽,回來的時候好兄弟三年前送給我的鍵盤D鍵不靈了,在打"等待 ...