RabbitMQ入門教程——工作隊列

来源:http://www.cnblogs.com/AlvinLee/archive/2016/12/08/6145641.html
-Advertisement-
Play Games

什麼是工作隊列 工作隊列是為了避免等待一些占用大量資源或時間操作的一種處理方式。我們把任務封裝為消息發送到隊列中,消費者在後臺不停的取出任務並且執行。當運行了多個消費者工作進程時,隊列中的任務將會在每個消費者間進行共用。 使用工作隊列的好處就是能夠並行的處理任務。如果隊列中堆積了很多任務,只要添加更... ...


什麼是工作隊列

工作隊列是為了避免等待一些占用大量資源或時間操作的一種處理方式。我們把任務封裝為消息發送到隊列中,消費者在後臺不停的取出任務並且執行。當運行了多個消費者工作進程時,隊列中的任務將會在每個消費者間進行共用。

使用工作隊列的好處就是能夠並行的處理任務。如果隊列中堆積了很多任務,只要添加更多的消費著就可以了,拓展非常方便。

準備工作

1.創建生產者和消費者客戶端

2.在消費者中使用Thread.Sleep()模擬耗時操作

 

生產者 TaskQueuesProducer.cs

 

using RabbitMQ.Client;

using System;

using System.Collections.Generic;

using System.Diagnostics;

using System.Linq;

using System.Text;

using System.Threading.Tasks;

 

namespace RabbitMQProducer

{

    public class TaskQueuesProducer

    {

        static int processId = Process.GetCurrentProcess().Id;

        public static void Send()

        {

            Console.WriteLine($"我是生產者{processId}");

            var factory = new ConnectionFactory()

            {

                HostName = "127.0.0.1"

            };

            using (var connection = factory.CreateConnection())

            {

                using (var channel = connection.CreateModel())

                {

                    channel.QueueDeclare(queue: "taskqueue", durable: false, exclusive: false, autoDelete: false, arguments: null);

                    for (int item = 0; item < 20; item++)

                    {

                        string message = $"我是生產者{processId}發送的消息:{item}";

                        channel.BasicPublish(exchange: "", routingKey: "taskqueue", basicProperties: null, body: Encoding.UTF8.GetBytes(message));

                        Console.WriteLine(message);

                    }

 

                    Console.WriteLine(" Press [enter] to exit.");

                    Console.ReadLine();

                }

            }

        }

    }

}

 

消費者 TaskQueuesConsumer.cs

 

using System;

using System.Collections.Generic;

using System.Linq;

using System.Text;

using System.Threading.Tasks;

using RabbitMQ.Client;

using RabbitMQ.Client.Events;

using System.Diagnostics;

using System.Threading;

 

namespace RabbitMQConsumer

{

    public class TaskQueuesConsumer

    {

        static int processId = 0;

        static TaskQueuesConsumer()

        {

            processId = Process.GetCurrentProcess().Id;

        }

        public static void Receive()

        {

            Console.WriteLine($"我是消費者{processId}");

            var factory = new ConnectionFactory()

            {

                HostName = "127.0.0.1"

            };

            using (var connection = factory.CreateConnection())

            {

                using (var channel = connection.CreateModel())

                {

                    channel.QueueDeclare(queue: "taskqueue", durable: false, exclusive: false, autoDelete: false, arguments: null);

                    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

                    consumer.Received += Consumer_Received;

                    //noack=false 不自動消息確認 這時需要手動調用   channel.BasicAck(); 進行消息確認

                    //noack=true 自動消息確認,當消息被RabbitMQ發送給消費者(consumers)之後,馬上就會在記憶體中移除

                    channel.BasicConsume(queue: "taskqueue", noAck: false, consumer: consumer);

 

                    Console.WriteLine(" Press [enter] to exit.");

                    Console.ReadLine();

                }

            }

        }

 

        private static void Consumer_Received(object sender, BasicDeliverEventArgs e)

        {

            string message = Encoding.UTF8.GetString(e.Body);

            Console.WriteLine($"接收到消息:{message}");

            //Thread.Sleep(new Random().Next(1000, 1000 * 5)); //模擬消息處理耗時操作

            Console.WriteLine($"已處理完消息");

 

            //對應前面的 BasicConsume noack=false 發送消息確認回執

            //EventingBasicConsumer consumer = sender as EventingBasicConsumer;

            //consumer.Model.BasicAck(e.DeliveryTag, false);

        }

    }

}

 

當我們運行了三個consumer客戶端,一個producer客戶端後,發現producer發送的20條消息,被三個客戶端依次平均接收並處理了。

這是RabbitMQ預設的消息分發機制——輪詢( round-robin),預設情況下RabbitMQ會按順序把消息發送給每個消費者,平均每個消費者都會收到同等數量的消息。

 

消息確認

當前的代碼中當消息被RabbitMQ發送到consumer後,就會被立即刪除,這種情況給下,如果其中一個consumer客戶端被停止,那麼正在處理的消息就會丟失,同時所有發送到這個工作者並且還沒處理的消息也會丟失。這不是我們希望看到的,我們希望如果一個consumer客戶端掛掉後,希望把重新發送任務到其它的consumer客戶端。

為了防止消息丟失,RabbbitMQ提供了消息確認機制,消費者會通過一個ack,告訴RabbitMQ已經收到並處理了某條消息,然後RabbitMQ就會釋放並刪除這條消息。

如果consumer掛掉了,沒有發送相應,RabbitMQ就會認為消息沒有被處理,然後重新發送給其他消費者,這樣即使某個consumer掛掉,也不會丟失消息。

消息沒有超市的概念,當工作者和它斷開連接時,RabbitMQ會重新發送消息,這樣在處理耗時較長任務時就不會出現問題了。

之前的代碼中我們開啟了自動消息確認,這樣一旦consumer掛掉,就會發生消息丟失的情況,現在我們來修改兩處代碼,開啟消息確認機制。

 

修改參數noack為false,關閉自動消息確認

 

channel.BasicConsume(queue: "taskqueue", noAck: false, consumer: consumer);

 

取消下麵代碼的註釋,進行消息確認回執

 

EventingBasicConsumer consumer = sender as EventingBasicConsumer;

consumer.Model.BasicAck(e.DeliveryTag, false);

 

註意:一旦忘記消息確認,消息會在你程式推出之後就會重新發送,如果不能釋放沒響應的消息,RabbitMQ將會占用越來越來越多的記憶體

可通過以下指令檢查忘記確認的消息信息或在 RabbitMQWebweb管理頁面中查看

 

rabbitmqctl list_queues name messages_ready messages_unacknowledged

 

修改完成後再次運行,就不用擔心消息丟失的問題了

 

消息持久化

如果沒有特殊的設置,那麼在RabbitMQ服務關閉或崩潰的情況下將會丟失所有的隊列和消息。為了確保消息不會丟失需要做兩個事情,把隊列和消息設置為持久化

設置隊列為持久化,producer和consumer兩處都要修改

 

channel.QueueDeclare(queue: "taskqueue", durable: true, exclusive: false, autoDelete: false, arguments: null);

設置消息持久化

 

var properties = channel.CreateBasicProperties();

properties.DeliveryMode = 2; //DeliveryMode 消息的投遞模式,預設為1 非持久化的,DeliveryMode=2 持久化存儲消息內容

channel.BasicPublish(exchange: "", routingKey: "taskqueue", basicProperties: properties, body: Encoding.UTF8.GetBytes(message));

 

註意:

  1. RabbitMQ不允許你使用不同的參數重新定義一個隊列,也就是說我們之前定義了taskqueue隊列為非持久化,現在再定義為持久化將會返回失敗。
  2. 將消息設置為持久化並不能100%保證消息不會丟失,因為RabbitMQ保存到系統磁碟也需要時間,雖然時間很短,但是確實消耗一定的時間,另外RabbitMQ並不是對每個消息都做fsync,它可能僅僅是保存在cache中,還沒來得及保存到磁碟。因此即使我們做了以上幾個操作消息持久化的問題還是存在的。如果必須要保證持久化,可以通過使用transaction(事務)來做支持。

公平調度

RabbitMQ只管分發進入隊列的消息,而不關心那些consumer比較繁忙或空閑,這樣容易導致一些consumer比較繁忙,一些比較空閑,不能使資源被最大化的使用。

為瞭解決這樣的問題,RabbitMQ提供了basicQos方法,傳遞參數為prefetchCount = 1。這樣告訴RabbitMQ不要在同一時間給一個消費者超過一條消息。就是只有在消費者空閑的時候會發送下一條信息。

 

channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

 

執行以上設置之後會發現並沒有按照之前的輪詢(Round-robin)進行消息轉發,而是在消費者不忙時才進行轉發。

由於消息並沒有發出去,在動態添加了consumer後能夠立即投入工作,而預設的輪詢轉發機制則不支持動態添加消費者,因為此時消息已經分配完畢,無法立即加入工作即使還有很多未完成的任務。

註意:

這種方法可能會導致queue滿。當然,這種情況下你可能需要添加更多的Consumer,或者創建更多的virtualHost來細化你的設計。

完整代碼

 

生產者 TaskQueuesProducer.cs

 

using RabbitMQ.Client;

using System;

using System.Collections.Generic;

using System.Diagnostics;

using System.Linq;

using System.Text;

using System.Threading.Tasks;

 

namespace RabbitMQProducer

{

    public class TaskQueuesProducer

    {

        static int processId = Process.GetCurrentProcess().Id;

        public static void Send()

        {

            Console.WriteLine($"我是生產者{processId}");

            var factory = new ConnectionFactory()

            {

                HostName = "127.0.0.1"

            };

            using (var connection = factory.CreateConnection())

            {

                using (var channel = connection.CreateModel())

                {

                    //durable:持久化存儲隊列

                    //autoDelete:自動刪除,如果該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除。這種隊列適用於臨時隊列。

                    //exclusive:排他隊列,如果一個隊列被聲明為排他隊列,該隊列僅對首次聲明它的連接可見,併在連接斷開時自動刪除。註意事項:1,排他隊列是基於連接可見的,同一連接的不同通道是可以同時訪問同一個連接創建的排他隊列的。2"首次",如果一個連接已經聲明瞭一個排他隊列,其他連接是不允許建立同名的排他隊列的,這個與普通隊列不同。3,即使該隊列是持久化的,一旦連接關閉或者客戶端退出,該排他隊列都會被自動刪除的。這種隊列適用於只限於一個客戶端發送讀取消息的應用場景。

                    channel.QueueDeclare(queue: "taskqueue", durable: true, exclusive: false, autoDelete: false, arguments: null);

                    for (int item = 0; item < 200000; item++)

                    {

                        string message = $"我是生產者{processId}發送的消息:{item}";

                        var properties = channel.CreateBasicProperties();

                        properties.DeliveryMode = 2; //DeliveryMode 消息的投遞模式,預設為1 非持久化的,DeliveryMode=2 持久化存儲消息內容

                        channel.BasicPublish(exchange: "", routingKey: "taskqueue", basicProperties: properties, body: Encoding.UTF8.GetBytes(message));

                        Console.WriteLine(message);

                    }

 

                    Console.WriteLine(" Press [enter] to exit.");

                    Console.ReadLine();

                }

            }

        }

    }

}

 

消費者 TaskQueuesConsumer.cs

 

using System;

using System.Collections.Generic;

using System.Linq;

using System.Text;

using System.Threading.Tasks;

using RabbitMQ.Client;

using RabbitMQ.Client.Events;

using System.Diagnostics;

using System.Threading;

 

namespace RabbitMQConsumer

{

    public class TaskQueuesConsumer

    {

        static int processId = 0;

        static TaskQueuesConsumer()

        {

            processId = Process.GetCurrentProcess().Id;

        }

        public static void Receive()

        {

            Console.WriteLine($"我是消費者{processId}");

            var factory = new ConnectionFactory()

            {

                HostName = "127.0.0.1"

            };

            using (var connection = factory.CreateConnection())

            {

                using (var channel = connection.CreateModel())

                {

                    channel.QueueDeclare(queue: "taskqueue", durable: true, exclusive: false, autoDelete: false, arguments: null);

                    // BasicQos 方法設置prefetchCount = 1。這樣RabbitMQ就會使得每個Consumer在同一個時間點最多處理一個Message。換句話說,在接收到該Consumerack前,他它不會將新的Message分發給它

                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

                    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

                    consumer.Received += Consumer_Received;

                    //noack=false 不自動消息確認 這時需要手動調用   channel.BasicAck(); 進行消息確認

                    //noack=true 自動消息確認,當消息被RabbitMQ發送給消費者(consumers)之後,馬上就會在記憶體中移除

                    channel.BasicConsume(queue: "taskqueue", noAck: false, consumer: consumer);

 

                    Console.WriteLine(" Press [enter] to exit.");

                    Console.ReadLine();

                }

            }

        }

 

        private static void Consumer_Received(object sender, BasicDeliverEventArgs e)

        {

            string message = Encoding.UTF8.GetString(e.Body);

            Console.WriteLine($"接收到消息:{message}");

            Thread.Sleep(new Random().Next(1000, 1000 * 5)); //模擬消息處理耗時操作

            Console.WriteLine($"已處理完消息");

 

            //對應前面的 BasicConsume noack=false 發送消息確認回執

            EventingBasicConsumer consumer = sender as EventingBasicConsumer;

            consumer.Model.BasicAck(e.DeliveryTag, false);

        }

    }

}


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • CentOS 6.5 x86_64系統下安裝JDK-1.7,並且根據生產環境需要做了一定的配置。 ...
  • CentOS 6.5 x86_64系統下安裝PHP-5.6.4,並且根據生產環境需要做了一定的配置,比如增加了memcache、memcached和redis的擴展支持,對PHP環境安全做了一定配置等。 ...
  • 記憶體管理單元MMU(memory management unit)的主要功能是虛擬地址(virtual memory addresses)到物理地址(physical addresses)的轉換。除此之外,它還可以實現記憶體保護(memory protection)、緩存控制(cache contro... ...
  • 上次創建了欄目模型,這次主要做欄目的前臺顯示。涉及到數據存儲層、業務邏輯層和Web層。用到了遷移,更新資料庫和註入的一些內容。 一、添加數據存儲層 1、添加Ninesky.DataLibrary(與上次添加方法相同) 在解決方案(Ninesky)上點右鍵->添加->新建項目 選擇.NET Core ... ...
  • 本篇博文介紹了#define條件編譯的用途、用法,並結合具體實例進行說明;本文還說明瞭使用條件編譯時需要註意的事項,以及環境變數(或條件編譯符號)的設置方法。 ...
  • 返回目錄 題目有點意思,大家都知道Dictionary<K,V>不是線程安全的類型,而List<T>是線程安全的嗎?在今天之前大叔沒有去測試過,而就在今天也是一個VIP問我,說在我的代碼中使用了並行,然後為一個List賦值,說的直接一點就是:List元素是全局的,在各個線程里分別去操作它,測試數據是 ...
  • 在項目的web.config文件中添加 <connectionStrings> <add name="SQLConnectionString" connectionString="資料庫連接字元串"/> </connectionStrings> 頁面上使用需要添加命名空間 using System. ...
  • 1.許可權控制使用controller和 action來實現,許可權方式有很多種,最近開發項目使用控制控制器方式實現代碼如下 二.單點登錄方式使用application方式來實現 1.用戶登錄成功後記錄當前信息 2.使用ActionFilter來實現單點登錄,每次點擊控制器都去查詢過濾是否在其它地方登錄 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...