RabbitMQ與.net core(二)Producer與Exchange

来源:https://www.cnblogs.com/chenyishi/archive/2019/01/08/10233629.html
-Advertisement-
Play Games

Producer:消息的生產者,也就是創建消息的對象 Exchange:消息的接受者,也就是用來接收消息的對象,Exchange接收到消息後將消息按照規則發送到與他綁定的Queue中。下麵我們來定義一個Producer與Exchange。 1.新建.netcore console項目,並引入Rabb ...


Producer:消息的生產者,也就是創建消息的對象

Exchange:消息的接受者,也就是用來接收消息的對象,Exchange接收到消息後將消息按照規則發送到與他綁定的Queue中。下麵我們來定義一個Producer與Exchange。

1.新建.netcore console項目,並引入RabbitMQ.Client的Nuget包

2.創建Exchange

using RabbitMQ.Client;

namespace RabbitMQConsole
{
    class Program
    {
        static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "39.**.**.**";
            factory.Port = 5672;
            factory.VirtualHost = "/";
            factory.UserName = "root";
            factory.Password = "root";

            var exchange = "change2";
            var route = "route2";
            var queue = "queue2";

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange, type:"direct", durable: true, autoDelete: false);   //創建Exchange
                    
                }
            }
        }
    }
}

可以看到Echange的參數有:

type:可選項為,fanout,direct,topic,headers。區別如下:

    fanout:發送到所有與當前Exchange綁定的Queue中

    direct:發送到與消息的routeKey相同的Rueue中

    topic:fanout的模糊版本

    headers:發送到與消息的header屬性相同的Queue中

durable:持久化

autoDelete:當最後一個綁定(隊列或者exchange)被unbind之後,該exchange自動被刪除。

 運行程式,可以在可視化界面看到change2

接下來我們可以創建與change2綁定的queue

3.創建Queue

                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange, type: "direct", durable: true, autoDelete: false);
                    channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false);  #創建queue2
                    channel.QueueBind(queue, exchange, route);  #將queue2綁定到exchange2
                }

可以看到Echange的參數有:

durable:持久化

exclusive:如果為true,則queue只在channel存在時存在,channel關閉則queue消失

autoDelete:當最後一個綁定(隊列或者exchange)被unbind之後,該exchange自動被刪除。

去可視化界面看Queue

4.發送消息

                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange, type: "direct", durable: true, autoDelete: false);
                    channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false);
                    channel.QueueBind(queue, exchange, route);
                    var props = channel.CreateBasicProperties();
                    props.Persistent = true; #持久化
                    channel.BasicPublish(exchange, route, true, props, Encoding.UTF8.GetBytes("hello rabbit"));
                }

5.消費消息

using RabbitMQ.Client;
using System;
using System.Text;

namespace RabbitMQClient
{
    class Program
    {
        private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory()
        {
            HostName = "39.**.**.**",
            Port = 5672,
            UserName = "root",
            Password = "root",
            VirtualHost = "/"
        };
        static void Main(string[] args)
        {
            var exchange = "change2";
            var route = "route2";
            var queue = "queue2";


            using (IConnection conn = rabbitMqFactory.CreateConnection())
            using (IModel channel = conn.CreateModel())
            {
                channel.ExchangeDeclare(exchange, "direct", durable: true, autoDelete: false);
                channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false);
                channel.QueueBind(queue, exchange, route);
                while (true)
                {
                    var message = channel.BasicGet(queue, true);  #第二個參數說明自動釋放消息,如為false需手動釋放消息
                    if(message!=null)
                    {
                        var msgBody = Encoding.UTF8.GetString(message.Body);
                        Console.WriteLine(string.Format("***接收時間:{0},消息內容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));
                    }
                    System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1));
                }
            }
        }
    }
}

運行查看結果

查看可視化界面

6.手動釋放消息

                while (true)
                {
                    var message = channel.BasicGet(queue, false);#設置為手動釋放
                    if(message!=null)
                    {
                        var msgBody = Encoding.UTF8.GetString(message.Body);
                        Console.WriteLine(string.Format("***接收時間:{0},消息內容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));
                    }
                    channel.BasicAck(message.DeliveryTag, false); #手動釋放
                    System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1));
                }

我們再發一條消息,然後開始消費,加個斷點調試一下

查看一下Queue中消息狀態

然後直接取消調試,不讓程式走到釋放的那一步,再查看一下消息狀態

這麼說來只要不走到 channel.BasicAck(message.DeliveryTag, false);這一行,消息就不會被釋放掉,我們讓程式直接走到這一行代碼,查看一下消息的狀態

如圖已經被釋放了

7.讓失敗的消息回到隊列中

                while (true)
                {
                    var message = channel.BasicGet(queue, false);
                    if(message!=null)
                    {
                        var msgBody = Encoding.UTF8.GetString(message.Body);
                        Console.WriteLine(string.Format("***接收時間:{0},消息內容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));
                        Console.WriteLine(message.DeliveryTag);    #當前消息被處理的次序數
                        if (1==1)
                            channel.BasicReject(message.DeliveryTag, true);
                    }
                    
                    System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1));
                }

重新發送4條消息

開始消費

我們可以看到消息一直沒有沒消費,因為消息被處理之後又放到了隊尾

8.監聽消息

 using (IConnection conn = rabbitMqFactory.CreateConnection())
            using (IModel channel = conn.CreateModel())
            {
                channel.ExchangeDeclare(exchange, "direct", durable: true, autoDelete: false);
                channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false);
                channel.QueueBind(queue, exchange, route);

                channel.BasicQos(prefetchSize: 0, prefetchCount: 10, global: false);  #一次接受10條消息,否則rabbit會把所有的消息一次性推到client,會增大client的負荷
                EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    Byte[] body = ea.Body;
                    String message = Encoding.UTF8.GetString(body);
                    Console.WriteLine( message+Thread.CurrentThread.ManagedThreadId);
                    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                };

                channel.BasicConsume(queue: queue, autoAck: false, consumer: consumer);
                Console.ReadLine();
            }

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 到了ASP.NET Core,項目結構也發生了一些變化,看下麵的圖片。 共有6點跟.NET Framework不一樣,下麵來一一說明。1,lauchSettings.json該文件用於指定應用程式的啟動設置。2,wwwroot目錄一般用來放靜態資源文件,比如:js,css,img,ico等。3,依賴... ...
  • 最近頻繁使用非同步所以自己綜合的學習了一把非同步相關的知識,自己稍加整理了一下(這也是我試著寫的第一篇,如果有不對的,希望大神來指正!) 首先是 委托實現的非同步 class Program { public delegate int weituo();//定義了個委托 public int xxx() ...
  • 註冊按鈕事件: private void btnRegister_Click(object sender, EventArgs e) { string username = txtUserName.Text; string userpwd = txtUserPwd.Text; string tel ...
  • 上一篇我們講了關於direct類型的Exchange,這一片我們來瞭解一下fanout類型的Exchange。 1.Exchange的fanout類型 fanout類型的Exchange的特點是會把消息發送給與之綁定的所有Queue中,我們來測試一下。代碼如下 運行代碼,去可視化工具中查看一下 消費 ...
  • 主要功能: 所編寫的程式需將串口1、串口2數據互通,即:串口1接收到數據的同時將數據通過串口2發出,串口2接收到數據的同時將數據通過串口1發出。 並根據需要由指定串口發送或獲取數據。 代碼如下: ...
  • Acrobat.dllc#PDFPDFRender4NET.dllpdf轉圖片 GitHub Clone Adress : https://github.com/stone0090/OfficeTools.Pdf2Image.Word2Image.git (you get it). 前段時間公司安排 ...
  • 緣起 哈嘍大家周二好呀,剛剛經歷過了幾天火車搶票,整個人都不好了,不知道小伙伴對今年的春節是否還一如既往的期待呢,眼看都要春節了,本來也想寫篇2018總結篇,但是怕不免會出現雞湯文的窠臼嫌疑,想想還是算了,這幾天和老李聊起來關於寫博客文章,總感覺這一系列還沒有寫完,或者說還有一些沒有收尾好,眼看就要 ...
  • 一.服務的生存期 在容器中每個註冊的服務,根據程式應用需求都可以選擇合適的服務生存期,ASP.NET Core 服務有三種生存期配置: (1) Transient:暫時生存期,在每次請求時被創建。 這種生存期適合輕量級的,無狀態的服務。 (2) Scoped: 作用域生存期,在每次請求被創建一次。 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...