C#隊列學習筆記:RabbitMQ使用多線程提高消費吞吐率

来源:https://www.cnblogs.com/atomy/archive/2020/04/14/12680782.html
-Advertisement-
Play Games

一、引言 使用工作隊列的一個好處就是它能夠並行的處理隊列。如果堆積了很多任務,我們只需要添加更多的工作者(workers)就可以了,擴展很簡單。本例使用多線程來創建多通道並綁定隊列,達到多workers的目的。 二、示例 2.1、環境準備 在NuGet上安裝RabbitMQ.Client。 2.2、 ...


    一、引言

    使用工作隊列的一個好處就是它能夠並行的處理隊列。如果堆積了很多任務,我們只需要添加更多的工作者(workers)就可以了,擴展很簡單。本例使用多線程來創建多通道並綁定隊列,達到多workers的目的。

    二、示例

    2.1、環境準備

    在NuGet上安裝RabbitMQ.Client。

    2.2、工廠類

    添加一個工廠類RabbitMQFactory:

    /// <summary>
    /// 多路復用技術(Multiplexing)目的:為了避免創建多個TCP而造成系統資源的浪費和超載,從而有效地利用TCP連接。
    /// </summary>
    public static class RabbitMQFactory
    {
        private static IConnection sharedConnection;
        private static int ChannelCount { get; set; }
        private static readonly object _locker = new object();

        public static IConnection SharedConnection
        {
            get
            {
                if (ChannelCount >= 1000)
                {
                    if (sharedConnection != null && sharedConnection.IsOpen)
                    {
                        sharedConnection.Close();
                    }
                    sharedConnection = null;
                    ChannelCount = 0;
                }
                if (sharedConnection == null)
                {
                    lock (_locker)
                    {
                        if (sharedConnection == null)
                        {
                            sharedConnection = GetConnection();
                            ChannelCount++;
                        }
                    }
                }
                return sharedConnection;
            }
        }

        private static IConnection GetConnection()
        {
            var factory = new ConnectionFactory
            {
                HostName = "192.168.2.242",
                UserName = "hello",
                Password = "world",
                Port = AmqpTcpEndpoint.UseDefaultPort,//5672
                VirtualHost = ConnectionFactory.DefaultVHost,//使用預設值:"/"
                Protocol = Protocols.DefaultProtocol,
                AutomaticRecoveryEnabled = true
            };
            return factory.CreateConnection();
        }
    }
View Code

    2.3、主窗體

    代碼如下:

    public partial class RabbitMQMultithreading : Form
    {
        public delegate void ListViewDelegate<T>(T obj);

        public RabbitMQMultithreading()
        {
            InitializeComponent();
        }

        /// <summary>
        /// ShowMessage重載
        /// </summary>
        /// <param name="msg"></param>
        private void ShowMessage(string msg)
        {
            if (InvokeRequired)
            {
                BeginInvoke(new ListViewDelegate<string>(ShowMessage), msg);
            }
            else
            {
                ListViewItem item = new ListViewItem(new string[] { DateTime.Now.ToString("yyyy/MM/dd HH:mm:ss ffffff"), msg });
                lvwMsg.Items.Insert(0, item);
            }
        }

        /// <summary>
        /// ShowMessage重載
        /// </summary>
        /// <param name="format"></param>
        /// <param name="args"></param>
        private void ShowMessage(string format, params object[] args)
        {
            if (InvokeRequired)
            {
                BeginInvoke(new MethodInvoker(delegate ()
                {
                    ListViewItem item = new ListViewItem(new string[] { DateTime.Now.ToString("yyyy/MM/dd HH:mm:ss ffffff"), string.Format(format, args) });
                    lvwMsg.Items.Insert(0, item);
                }));
            }
            else
            {
                ListViewItem item = new ListViewItem(new string[] { DateTime.Now.ToString("yyyy/MM/dd HH:mm:ss ffffff"), string.Format(format, args) });
                lvwMsg.Items.Insert(0, item);
            }
        }

        /// <summary>
        /// 生產者
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void btnSend_Click(object sender, EventArgs e)
        {
            int messageCount = 100;
            var factory = new ConnectionFactory
            {
                HostName = "192.168.2.242",
                UserName = "hello",
                Password = "world",
                Port = AmqpTcpEndpoint.UseDefaultPort,//5672
                VirtualHost = ConnectionFactory.DefaultVHost,//使用預設值:"/"
                Protocol = Protocols.DefaultProtocol,
                AutomaticRecoveryEnabled = true
            };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null);
                    string message = "Hello World";
                    var body = Encoding.UTF8.GetBytes(message);
                    for (int i = 1; i <= messageCount; i++)
                    {
                        channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);
                        ShowMessage($"Send {message}");
                    }
                }
            }
        }

        /// <summary>
        /// 消費者
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private async void btnReceive_Click(object sender, EventArgs e)
        {
            Random random = new Random();
            int rallyNumber = random.Next(1, 1000);
            int channelCount = 0;

            await Task.Run(() =>
            {
                try
                {
                    int asyncCount = 10;
                    List<Task<bool>> tasks = new List<Task<bool>>();
                    var connection = RabbitMQFactory.SharedConnection;
                    for (int i = 1; i <= asyncCount; i++)
                    {
                        tasks.Add(Task.Factory.StartNew(() => MessageWorkItemCallback(connection, rallyNumber)));
                    }
                    Task.WaitAll(tasks.ToArray());

                    string syncResultMsg = $"集結號 {rallyNumber} 已吹起號角--" +
                        $"本次開啟通道成功數:{tasks.Count(s => s.Result == true)}," +
                        $"本次開啟通道失敗數:{tasks.Count() - tasks.Count(s => s.Result == true)}" +
                        $"累計開啟通道成功數:{channelCount + tasks.Count(s => s.Result == true)}";
                    ShowMessage(syncResultMsg);
                }
                catch (Exception ex)
                {
                    ShowMessage($"集結號 {rallyNumber} 消費異常:{ex.Message}");
                }
            });
        }

        /// <summary>
        /// 非同步方法
        /// </summary>
        /// <param name="state"></param>
        /// <param name="rallyNumber"></param>
        /// <returns></returns>
        private bool MessageWorkItemCallback(object state, int rallyNumber)
        {
            bool syncResult = false;
            IModel channel = null;
            try
            {
                IConnection connection = state as IConnection;
                //不能使用using (channel = connection.CreateModel())來創建通道,讓RabbitMQ自動回收channel。
                channel = connection.CreateModel();
                channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null);
                channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var message = Encoding.UTF8.GetString(ea.Body);
                    Thread.Sleep(1000);
                    ShowMessage($"集結號 {rallyNumber} Received {message}");
                    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                };
                channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);
                syncResult = true;
            }
            catch (Exception ex)
            {
                syncResult = false;
                ShowMessage(ex.Message);
            }
            return syncResult;
        }
    }
View Code

    2.4、運行結果

    多點幾次消費者即可增加通道,提升消費能力。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 這篇文章是關於我的GBA庫lib_hl中數學庫的定點數部分。 定點數是什麼?為什麼要用定點數? 在之前的文章中,我已經介紹了GBA的硬體,它的CPU竟然居然理所當然沒有浮點數運算單元! 我要寫的是光線追蹤程式,基本上都在做精確的數學運算,而這個CPU卻連浮點數都不支持,那不是沒得玩? 當然是有方法的 ...
  • 整理docker中常用的命令,方便大家學習和命令查詢。最後分享一個.NET Core docker部署的示例 ...
  • 在這篇文章中,我將通過一個示例,來講解ASP.NET Core中的請求處理管道。在這篇文章中,我們將討論下麵幾個點:理解ASP.NET Core請求處理管道怎樣在ASP.NET中創建並註冊多個中間件組件?請求管道中,中間件執行的順序是咋樣的?理解ASP.NET Core請求處理管道為了理解ASP.N... ...
  • 在netcore開發中,最常見的就是註入,比如想獲取appsettings.json的內容,我們就需要去註入,然後在controller裡面去獲取,但是我們如果想要在service中使用appsettings.json的內容,這樣就是一個問題,並且每個controller去註入也是非常麻煩的事情 下 ...
  • 如果你要問我WebApi是幹嘛,我只能說它是的給數據。哈哈哈哈哈,這幾天也才剛剛瞭解瞭解關於WebApi的知識,今天就來談談吧。 1.創建WebApi項目 第一步:選擇ASP.NET Web應用程式 第二步:選擇WebApi,記得要取消勾選Https配置,點擊創建 第三步:創建完成後形成的項目結構 ...
  • 讓 .NET 輕鬆構建中間件模式代碼 Intro 在 asp.net core 中中間件的設計令人嘆為觀止,如此高大上的設計何不集成到自己的代碼里呢。 於是就有了封裝了一個簡單通用的中間件模板的想法,以後有需要的時候就可以拿來即用。 介面定義 這裡按執行的委托是同步還是非同步分為了同步和非同步兩種構建方 ...
  • 前言 目前在開發abp電商模塊,打算做一步,寫一步,算是對自己的記錄,主要是參考nopcommoner 並結合abp模塊開發 知識都是連貫的,如果你熟悉asp.net core 3.x、abp(非vNext) 並且需要做電商功能,也許可以做個參考。即使不做電商,可能裡面的其它功能也可以作為參考,如: ...
  • 本文告訴大家一個有趣的動畫,在滑鼠點擊的時候,在點擊所在的點顯示一個圓圈,然後這個圓圈做動畫變大,但是顏色變淡的效果。本文的控制項可以讓大家將對應的容器放在自己應用裡面就能實現這個效果 這個效果特別簡單,屬於入門級的動畫,代碼也很少,請看效果 本文的控制項只是一個簡單的 Canvas 控制項,可以將本文的 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...