C#隊列學習筆記:RabbitMQ使用多線程提高消費吞吐率

来源:https://www.cnblogs.com/atomy/archive/2020/04/14/12680782.html
-Advertisement-
Play Games

一、引言 使用工作隊列的一個好處就是它能夠並行的處理隊列。如果堆積了很多任務,我們只需要添加更多的工作者(workers)就可以了,擴展很簡單。本例使用多線程來創建多通道並綁定隊列,達到多workers的目的。 二、示例 2.1、環境準備 在NuGet上安裝RabbitMQ.Client。 2.2、 ...


    一、引言

    使用工作隊列的一個好處就是它能夠並行的處理隊列。如果堆積了很多任務,我們只需要添加更多的工作者(workers)就可以了,擴展很簡單。本例使用多線程來創建多通道並綁定隊列,達到多workers的目的。

    二、示例

    2.1、環境準備

    在NuGet上安裝RabbitMQ.Client。

    2.2、工廠類

    添加一個工廠類RabbitMQFactory:

    /// <summary>
    /// 多路復用技術(Multiplexing)目的:為了避免創建多個TCP而造成系統資源的浪費和超載,從而有效地利用TCP連接。
    /// </summary>
    public static class RabbitMQFactory
    {
        private static IConnection sharedConnection;
        private static int ChannelCount { get; set; }
        private static readonly object _locker = new object();

        public static IConnection SharedConnection
        {
            get
            {
                if (ChannelCount >= 1000)
                {
                    if (sharedConnection != null && sharedConnection.IsOpen)
                    {
                        sharedConnection.Close();
                    }
                    sharedConnection = null;
                    ChannelCount = 0;
                }
                if (sharedConnection == null)
                {
                    lock (_locker)
                    {
                        if (sharedConnection == null)
                        {
                            sharedConnection = GetConnection();
                            ChannelCount++;
                        }
                    }
                }
                return sharedConnection;
            }
        }

        private static IConnection GetConnection()
        {
            var factory = new ConnectionFactory
            {
                HostName = "192.168.2.242",
                UserName = "hello",
                Password = "world",
                Port = AmqpTcpEndpoint.UseDefaultPort,//5672
                VirtualHost = ConnectionFactory.DefaultVHost,//使用預設值:"/"
                Protocol = Protocols.DefaultProtocol,
                AutomaticRecoveryEnabled = true
            };
            return factory.CreateConnection();
        }
    }
View Code

    2.3、主窗體

    代碼如下:

    public partial class RabbitMQMultithreading : Form
    {
        public delegate void ListViewDelegate<T>(T obj);

        public RabbitMQMultithreading()
        {
            InitializeComponent();
        }

        /// <summary>
        /// ShowMessage重載
        /// </summary>
        /// <param name="msg"></param>
        private void ShowMessage(string msg)
        {
            if (InvokeRequired)
            {
                BeginInvoke(new ListViewDelegate<string>(ShowMessage), msg);
            }
            else
            {
                ListViewItem item = new ListViewItem(new string[] { DateTime.Now.ToString("yyyy/MM/dd HH:mm:ss ffffff"), msg });
                lvwMsg.Items.Insert(0, item);
            }
        }

        /// <summary>
        /// ShowMessage重載
        /// </summary>
        /// <param name="format"></param>
        /// <param name="args"></param>
        private void ShowMessage(string format, params object[] args)
        {
            if (InvokeRequired)
            {
                BeginInvoke(new MethodInvoker(delegate ()
                {
                    ListViewItem item = new ListViewItem(new string[] { DateTime.Now.ToString("yyyy/MM/dd HH:mm:ss ffffff"), string.Format(format, args) });
                    lvwMsg.Items.Insert(0, item);
                }));
            }
            else
            {
                ListViewItem item = new ListViewItem(new string[] { DateTime.Now.ToString("yyyy/MM/dd HH:mm:ss ffffff"), string.Format(format, args) });
                lvwMsg.Items.Insert(0, item);
            }
        }

        /// <summary>
        /// 生產者
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void btnSend_Click(object sender, EventArgs e)
        {
            int messageCount = 100;
            var factory = new ConnectionFactory
            {
                HostName = "192.168.2.242",
                UserName = "hello",
                Password = "world",
                Port = AmqpTcpEndpoint.UseDefaultPort,//5672
                VirtualHost = ConnectionFactory.DefaultVHost,//使用預設值:"/"
                Protocol = Protocols.DefaultProtocol,
                AutomaticRecoveryEnabled = true
            };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null);
                    string message = "Hello World";
                    var body = Encoding.UTF8.GetBytes(message);
                    for (int i = 1; i <= messageCount; i++)
                    {
                        channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);
                        ShowMessage($"Send {message}");
                    }
                }
            }
        }

        /// <summary>
        /// 消費者
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private async void btnReceive_Click(object sender, EventArgs e)
        {
            Random random = new Random();
            int rallyNumber = random.Next(1, 1000);
            int channelCount = 0;

            await Task.Run(() =>
            {
                try
                {
                    int asyncCount = 10;
                    List<Task<bool>> tasks = new List<Task<bool>>();
                    var connection = RabbitMQFactory.SharedConnection;
                    for (int i = 1; i <= asyncCount; i++)
                    {
                        tasks.Add(Task.Factory.StartNew(() => MessageWorkItemCallback(connection, rallyNumber)));
                    }
                    Task.WaitAll(tasks.ToArray());

                    string syncResultMsg = $"集結號 {rallyNumber} 已吹起號角--" +
                        $"本次開啟通道成功數:{tasks.Count(s => s.Result == true)}," +
                        $"本次開啟通道失敗數:{tasks.Count() - tasks.Count(s => s.Result == true)}" +
                        $"累計開啟通道成功數:{channelCount + tasks.Count(s => s.Result == true)}";
                    ShowMessage(syncResultMsg);
                }
                catch (Exception ex)
                {
                    ShowMessage($"集結號 {rallyNumber} 消費異常:{ex.Message}");
                }
            });
        }

        /// <summary>
        /// 非同步方法
        /// </summary>
        /// <param name="state"></param>
        /// <param name="rallyNumber"></param>
        /// <returns></returns>
        private bool MessageWorkItemCallback(object state, int rallyNumber)
        {
            bool syncResult = false;
            IModel channel = null;
            try
            {
                IConnection connection = state as IConnection;
                //不能使用using (channel = connection.CreateModel())來創建通道,讓RabbitMQ自動回收channel。
                channel = connection.CreateModel();
                channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null);
                channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var message = Encoding.UTF8.GetString(ea.Body);
                    Thread.Sleep(1000);
                    ShowMessage($"集結號 {rallyNumber} Received {message}");
                    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                };
                channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);
                syncResult = true;
            }
            catch (Exception ex)
            {
                syncResult = false;
                ShowMessage(ex.Message);
            }
            return syncResult;
        }
    }
View Code

    2.4、運行結果

    多點幾次消費者即可增加通道,提升消費能力。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 這篇文章是關於我的GBA庫lib_hl中數學庫的定點數部分。 定點數是什麼?為什麼要用定點數? 在之前的文章中,我已經介紹了GBA的硬體,它的CPU竟然居然理所當然沒有浮點數運算單元! 我要寫的是光線追蹤程式,基本上都在做精確的數學運算,而這個CPU卻連浮點數都不支持,那不是沒得玩? 當然是有方法的 ...
  • 整理docker中常用的命令,方便大家學習和命令查詢。最後分享一個.NET Core docker部署的示例 ...
  • 在這篇文章中,我將通過一個示例,來講解ASP.NET Core中的請求處理管道。在這篇文章中,我們將討論下麵幾個點:理解ASP.NET Core請求處理管道怎樣在ASP.NET中創建並註冊多個中間件組件?請求管道中,中間件執行的順序是咋樣的?理解ASP.NET Core請求處理管道為了理解ASP.N... ...
  • 在netcore開發中,最常見的就是註入,比如想獲取appsettings.json的內容,我們就需要去註入,然後在controller裡面去獲取,但是我們如果想要在service中使用appsettings.json的內容,這樣就是一個問題,並且每個controller去註入也是非常麻煩的事情 下 ...
  • 如果你要問我WebApi是幹嘛,我只能說它是的給數據。哈哈哈哈哈,這幾天也才剛剛瞭解瞭解關於WebApi的知識,今天就來談談吧。 1.創建WebApi項目 第一步:選擇ASP.NET Web應用程式 第二步:選擇WebApi,記得要取消勾選Https配置,點擊創建 第三步:創建完成後形成的項目結構 ...
  • 讓 .NET 輕鬆構建中間件模式代碼 Intro 在 asp.net core 中中間件的設計令人嘆為觀止,如此高大上的設計何不集成到自己的代碼里呢。 於是就有了封裝了一個簡單通用的中間件模板的想法,以後有需要的時候就可以拿來即用。 介面定義 這裡按執行的委托是同步還是非同步分為了同步和非同步兩種構建方 ...
  • 前言 目前在開發abp電商模塊,打算做一步,寫一步,算是對自己的記錄,主要是參考nopcommoner 並結合abp模塊開發 知識都是連貫的,如果你熟悉asp.net core 3.x、abp(非vNext) 並且需要做電商功能,也許可以做個參考。即使不做電商,可能裡面的其它功能也可以作為參考,如: ...
  • 本文告訴大家一個有趣的動畫,在滑鼠點擊的時候,在點擊所在的點顯示一個圓圈,然後這個圓圈做動畫變大,但是顏色變淡的效果。本文的控制項可以讓大家將對應的容器放在自己應用裡面就能實現這個效果 這個效果特別簡單,屬於入門級的動畫,代碼也很少,請看效果 本文的控制項只是一個簡單的 Canvas 控制項,可以將本文的 ...
一周排行
    -Advertisement-
    Play Games
  • 前言 本文介紹一款使用 C# 與 WPF 開發的音頻播放器,其界面簡潔大方,操作體驗流暢。該播放器支持多種音頻格式(如 MP4、WMA、OGG、FLAC 等),並具備標記、實時歌詞顯示等功能。 另外,還支持換膚及多語言(中英文)切換。核心音頻處理採用 FFmpeg 組件,獲得了廣泛認可,目前 Git ...
  • OAuth2.0授權驗證-gitee授權碼模式 本文主要介紹如何筆者自己是如何使用gitee提供的OAuth2.0協議完成授權驗證並登錄到自己的系統,完整模式如圖 1、創建應用 打開gitee個人中心->第三方應用->創建應用 創建應用後在我的應用界面,查看已創建應用的Client ID和Clien ...
  • 解決了這個問題:《winForm下,fastReport.net 從.net framework 升級到.net5遇到的錯誤“Operation is not supported on this platform.”》 本文內容轉載自:https://www.fcnsoft.com/Home/Sho ...
  • 國內文章 WPF 從裸 Win 32 的 WM_Pointer 消息獲取觸摸點繪製筆跡 https://www.cnblogs.com/lindexi/p/18390983 本文將告訴大家如何在 WPF 裡面,接收裸 Win 32 的 WM_Pointer 消息,從消息裡面獲取觸摸點信息,使用觸摸點 ...
  • 前言 給大家推薦一個專為新零售快消行業打造了一套高效的進銷存管理系統。 系統不僅具備強大的庫存管理功能,還集成了高性能的輕量級 POS 解決方案,確保頁面載入速度極快,提供良好的用戶體驗。 項目介紹 Dorisoy.POS 是一款基於 .NET 7 和 Angular 4 開發的新零售快消進銷存管理 ...
  • ABP CLI常用的代碼分享 一、確保環境配置正確 安裝.NET CLI: ABP CLI是基於.NET Core或.NET 5/6/7等更高版本構建的,因此首先需要在你的開發環境中安裝.NET CLI。這可以通過訪問Microsoft官網下載並安裝相應版本的.NET SDK來實現。 安裝ABP ...
  • 問題 問題是這樣的:第三方的webapi,需要先調用登陸介面獲取Cookie,訪問其它介面時攜帶Cookie信息。 但使用HttpClient類調用登陸介面,返回的Headers中沒有找到Cookie信息。 分析 首先,使用Postman測試該登陸介面,正常返回Cookie信息,說明是HttpCli ...
  • 國內文章 關於.NET在中國為什麼工資低的分析 https://www.cnblogs.com/thinkingmore/p/18406244 .NET在中國開發者的薪資偏低,主要因市場需求、技術棧選擇和企業文化等因素所致。歷史上,.NET曾因微軟的閉源策略發展受限,儘管後來推出了跨平臺的.NET ...
  • 在WPF開發應用中,動畫不僅可以引起用戶的註意與興趣,而且還使軟體更加便於使用。前面幾篇文章講解了畫筆(Brush),形狀(Shape),幾何圖形(Geometry),變換(Transform)等相關內容,今天繼續講解動畫相關內容和知識點,僅供學習分享使用,如有不足之處,還請指正。 ...
  • 什麼是委托? 委托可以說是把一個方法代入另一個方法執行,相當於指向函數的指針;事件就相當於保存委托的數組; 1.實例化委托的方式: 方式1:通過new創建實例: public delegate void ShowDelegate(); 或者 public delegate string ShowDe ...