RabbitMQ入門教程——工作隊列

来源:http://www.cnblogs.com/AlvinLee/archive/2016/12/08/6145641.html
-Advertisement-
Play Games

什麼是工作隊列 工作隊列是為了避免等待一些占用大量資源或時間操作的一種處理方式。我們把任務封裝為消息發送到隊列中,消費者在後臺不停的取出任務並且執行。當運行了多個消費者工作進程時,隊列中的任務將會在每個消費者間進行共用。 使用工作隊列的好處就是能夠並行的處理任務。如果隊列中堆積了很多任務,只要添加更... ...


什麼是工作隊列

工作隊列是為了避免等待一些占用大量資源或時間操作的一種處理方式。我們把任務封裝為消息發送到隊列中,消費者在後臺不停的取出任務並且執行。當運行了多個消費者工作進程時,隊列中的任務將會在每個消費者間進行共用。

使用工作隊列的好處就是能夠並行的處理任務。如果隊列中堆積了很多任務,只要添加更多的消費著就可以了,拓展非常方便。

準備工作

1.創建生產者和消費者客戶端

2.在消費者中使用Thread.Sleep()模擬耗時操作

 

生產者 TaskQueuesProducer.cs

 

using RabbitMQ.Client;

using System;

using System.Collections.Generic;

using System.Diagnostics;

using System.Linq;

using System.Text;

using System.Threading.Tasks;

 

namespace RabbitMQProducer

{

    public class TaskQueuesProducer

    {

        static int processId = Process.GetCurrentProcess().Id;

        public static void Send()

        {

            Console.WriteLine($"我是生產者{processId}");

            var factory = new ConnectionFactory()

            {

                HostName = "127.0.0.1"

            };

            using (var connection = factory.CreateConnection())

            {

                using (var channel = connection.CreateModel())

                {

                    channel.QueueDeclare(queue: "taskqueue", durable: false, exclusive: false, autoDelete: false, arguments: null);

                    for (int item = 0; item < 20; item++)

                    {

                        string message = $"我是生產者{processId}發送的消息:{item}";

                        channel.BasicPublish(exchange: "", routingKey: "taskqueue", basicProperties: null, body: Encoding.UTF8.GetBytes(message));

                        Console.WriteLine(message);

                    }

 

                    Console.WriteLine(" Press [enter] to exit.");

                    Console.ReadLine();

                }

            }

        }

    }

}

 

消費者 TaskQueuesConsumer.cs

 

using System;

using System.Collections.Generic;

using System.Linq;

using System.Text;

using System.Threading.Tasks;

using RabbitMQ.Client;

using RabbitMQ.Client.Events;

using System.Diagnostics;

using System.Threading;

 

namespace RabbitMQConsumer

{

    public class TaskQueuesConsumer

    {

        static int processId = 0;

        static TaskQueuesConsumer()

        {

            processId = Process.GetCurrentProcess().Id;

        }

        public static void Receive()

        {

            Console.WriteLine($"我是消費者{processId}");

            var factory = new ConnectionFactory()

            {

                HostName = "127.0.0.1"

            };

            using (var connection = factory.CreateConnection())

            {

                using (var channel = connection.CreateModel())

                {

                    channel.QueueDeclare(queue: "taskqueue", durable: false, exclusive: false, autoDelete: false, arguments: null);

                    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

                    consumer.Received += Consumer_Received;

                    //noack=false 不自動消息確認 這時需要手動調用   channel.BasicAck(); 進行消息確認

                    //noack=true 自動消息確認,當消息被RabbitMQ發送給消費者(consumers)之後,馬上就會在記憶體中移除

                    channel.BasicConsume(queue: "taskqueue", noAck: false, consumer: consumer);

 

                    Console.WriteLine(" Press [enter] to exit.");

                    Console.ReadLine();

                }

            }

        }

 

        private static void Consumer_Received(object sender, BasicDeliverEventArgs e)

        {

            string message = Encoding.UTF8.GetString(e.Body);

            Console.WriteLine($"接收到消息:{message}");

            //Thread.Sleep(new Random().Next(1000, 1000 * 5)); //模擬消息處理耗時操作

            Console.WriteLine($"已處理完消息");

 

            //對應前面的 BasicConsume noack=false 發送消息確認回執

            //EventingBasicConsumer consumer = sender as EventingBasicConsumer;

            //consumer.Model.BasicAck(e.DeliveryTag, false);

        }

    }

}

 

當我們運行了三個consumer客戶端,一個producer客戶端後,發現producer發送的20條消息,被三個客戶端依次平均接收並處理了。

這是RabbitMQ預設的消息分發機制——輪詢( round-robin),預設情況下RabbitMQ會按順序把消息發送給每個消費者,平均每個消費者都會收到同等數量的消息。

 

消息確認

當前的代碼中當消息被RabbitMQ發送到consumer後,就會被立即刪除,這種情況給下,如果其中一個consumer客戶端被停止,那麼正在處理的消息就會丟失,同時所有發送到這個工作者並且還沒處理的消息也會丟失。這不是我們希望看到的,我們希望如果一個consumer客戶端掛掉後,希望把重新發送任務到其它的consumer客戶端。

為了防止消息丟失,RabbbitMQ提供了消息確認機制,消費者會通過一個ack,告訴RabbitMQ已經收到並處理了某條消息,然後RabbitMQ就會釋放並刪除這條消息。

如果consumer掛掉了,沒有發送相應,RabbitMQ就會認為消息沒有被處理,然後重新發送給其他消費者,這樣即使某個consumer掛掉,也不會丟失消息。

消息沒有超市的概念,當工作者和它斷開連接時,RabbitMQ會重新發送消息,這樣在處理耗時較長任務時就不會出現問題了。

之前的代碼中我們開啟了自動消息確認,這樣一旦consumer掛掉,就會發生消息丟失的情況,現在我們來修改兩處代碼,開啟消息確認機制。

 

修改參數noack為false,關閉自動消息確認

 

channel.BasicConsume(queue: "taskqueue", noAck: false, consumer: consumer);

 

取消下麵代碼的註釋,進行消息確認回執

 

EventingBasicConsumer consumer = sender as EventingBasicConsumer;

consumer.Model.BasicAck(e.DeliveryTag, false);

 

註意:一旦忘記消息確認,消息會在你程式推出之後就會重新發送,如果不能釋放沒響應的消息,RabbitMQ將會占用越來越來越多的記憶體

可通過以下指令檢查忘記確認的消息信息或在 RabbitMQWebweb管理頁面中查看

 

rabbitmqctl list_queues name messages_ready messages_unacknowledged

 

修改完成後再次運行,就不用擔心消息丟失的問題了

 

消息持久化

如果沒有特殊的設置,那麼在RabbitMQ服務關閉或崩潰的情況下將會丟失所有的隊列和消息。為了確保消息不會丟失需要做兩個事情,把隊列和消息設置為持久化

設置隊列為持久化,producer和consumer兩處都要修改

 

channel.QueueDeclare(queue: "taskqueue", durable: true, exclusive: false, autoDelete: false, arguments: null);

設置消息持久化

 

var properties = channel.CreateBasicProperties();

properties.DeliveryMode = 2; //DeliveryMode 消息的投遞模式,預設為1 非持久化的,DeliveryMode=2 持久化存儲消息內容

channel.BasicPublish(exchange: "", routingKey: "taskqueue", basicProperties: properties, body: Encoding.UTF8.GetBytes(message));

 

註意:

  1. RabbitMQ不允許你使用不同的參數重新定義一個隊列,也就是說我們之前定義了taskqueue隊列為非持久化,現在再定義為持久化將會返回失敗。
  2. 將消息設置為持久化並不能100%保證消息不會丟失,因為RabbitMQ保存到系統磁碟也需要時間,雖然時間很短,但是確實消耗一定的時間,另外RabbitMQ並不是對每個消息都做fsync,它可能僅僅是保存在cache中,還沒來得及保存到磁碟。因此即使我們做了以上幾個操作消息持久化的問題還是存在的。如果必須要保證持久化,可以通過使用transaction(事務)來做支持。

公平調度

RabbitMQ只管分發進入隊列的消息,而不關心那些consumer比較繁忙或空閑,這樣容易導致一些consumer比較繁忙,一些比較空閑,不能使資源被最大化的使用。

為瞭解決這樣的問題,RabbitMQ提供了basicQos方法,傳遞參數為prefetchCount = 1。這樣告訴RabbitMQ不要在同一時間給一個消費者超過一條消息。就是只有在消費者空閑的時候會發送下一條信息。

 

channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

 

執行以上設置之後會發現並沒有按照之前的輪詢(Round-robin)進行消息轉發,而是在消費者不忙時才進行轉發。

由於消息並沒有發出去,在動態添加了consumer後能夠立即投入工作,而預設的輪詢轉發機制則不支持動態添加消費者,因為此時消息已經分配完畢,無法立即加入工作即使還有很多未完成的任務。

註意:

這種方法可能會導致queue滿。當然,這種情況下你可能需要添加更多的Consumer,或者創建更多的virtualHost來細化你的設計。

完整代碼

 

生產者 TaskQueuesProducer.cs

 

using RabbitMQ.Client;

using System;

using System.Collections.Generic;

using System.Diagnostics;

using System.Linq;

using System.Text;

using System.Threading.Tasks;

 

namespace RabbitMQProducer

{

    public class TaskQueuesProducer

    {

        static int processId = Process.GetCurrentProcess().Id;

        public static void Send()

        {

            Console.WriteLine($"我是生產者{processId}");

            var factory = new ConnectionFactory()

            {

                HostName = "127.0.0.1"

            };

            using (var connection = factory.CreateConnection())

            {

                using (var channel = connection.CreateModel())

                {

                    //durable:持久化存儲隊列

                    //autoDelete:自動刪除,如果該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除。這種隊列適用於臨時隊列。

                    //exclusive:排他隊列,如果一個隊列被聲明為排他隊列,該隊列僅對首次聲明它的連接可見,併在連接斷開時自動刪除。註意事項:1,排他隊列是基於連接可見的,同一連接的不同通道是可以同時訪問同一個連接創建的排他隊列的。2"首次",如果一個連接已經聲明瞭一個排他隊列,其他連接是不允許建立同名的排他隊列的,這個與普通隊列不同。3,即使該隊列是持久化的,一旦連接關閉或者客戶端退出,該排他隊列都會被自動刪除的。這種隊列適用於只限於一個客戶端發送讀取消息的應用場景。

                    channel.QueueDeclare(queue: "taskqueue", durable: true, exclusive: false, autoDelete: false, arguments: null);

                    for (int item = 0; item < 200000; item++)

                    {

                        string message = $"我是生產者{processId}發送的消息:{item}";

                        var properties = channel.CreateBasicProperties();

                        properties.DeliveryMode = 2; //DeliveryMode 消息的投遞模式,預設為1 非持久化的,DeliveryMode=2 持久化存儲消息內容

                        channel.BasicPublish(exchange: "", routingKey: "taskqueue", basicProperties: properties, body: Encoding.UTF8.GetBytes(message));

                        Console.WriteLine(message);

                    }

 

                    Console.WriteLine(" Press [enter] to exit.");

                    Console.ReadLine();

                }

            }

        }

    }

}

 

消費者 TaskQueuesConsumer.cs

 

using System;

using System.Collections.Generic;

using System.Linq;

using System.Text;

using System.Threading.Tasks;

using RabbitMQ.Client;

using RabbitMQ.Client.Events;

using System.Diagnostics;

using System.Threading;

 

namespace RabbitMQConsumer

{

    public class TaskQueuesConsumer

    {

        static int processId = 0;

        static TaskQueuesConsumer()

        {

            processId = Process.GetCurrentProcess().Id;

        }

        public static void Receive()

        {

            Console.WriteLine($"我是消費者{processId}");

            var factory = new ConnectionFactory()

            {

                HostName = "127.0.0.1"

            };

            using (var connection = factory.CreateConnection())

            {

                using (var channel = connection.CreateModel())

                {

                    channel.QueueDeclare(queue: "taskqueue", durable: true, exclusive: false, autoDelete: false, arguments: null);

                    // BasicQos 方法設置prefetchCount = 1。這樣RabbitMQ就會使得每個Consumer在同一個時間點最多處理一個Message。換句話說,在接收到該Consumerack前,他它不會將新的Message分發給它

                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

                    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

                    consumer.Received += Consumer_Received;

                    //noack=false 不自動消息確認 這時需要手動調用   channel.BasicAck(); 進行消息確認

                    //noack=true 自動消息確認,當消息被RabbitMQ發送給消費者(consumers)之後,馬上就會在記憶體中移除

                    channel.BasicConsume(queue: "taskqueue", noAck: false, consumer: consumer);

 

                    Console.WriteLine(" Press [enter] to exit.");

                    Console.ReadLine();

                }

            }

        }

 

        private static void Consumer_Received(object sender, BasicDeliverEventArgs e)

        {

            string message = Encoding.UTF8.GetString(e.Body);

            Console.WriteLine($"接收到消息:{message}");

            Thread.Sleep(new Random().Next(1000, 1000 * 5)); //模擬消息處理耗時操作

            Console.WriteLine($"已處理完消息");

 

            //對應前面的 BasicConsume noack=false 發送消息確認回執

            EventingBasicConsumer consumer = sender as EventingBasicConsumer;

            consumer.Model.BasicAck(e.DeliveryTag, false);

        }

    }

}


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • CentOS 6.5 x86_64系統下安裝JDK-1.7,並且根據生產環境需要做了一定的配置。 ...
  • CentOS 6.5 x86_64系統下安裝PHP-5.6.4,並且根據生產環境需要做了一定的配置,比如增加了memcache、memcached和redis的擴展支持,對PHP環境安全做了一定配置等。 ...
  • 記憶體管理單元MMU(memory management unit)的主要功能是虛擬地址(virtual memory addresses)到物理地址(physical addresses)的轉換。除此之外,它還可以實現記憶體保護(memory protection)、緩存控制(cache contro... ...
  • 上次創建了欄目模型,這次主要做欄目的前臺顯示。涉及到數據存儲層、業務邏輯層和Web層。用到了遷移,更新資料庫和註入的一些內容。 一、添加數據存儲層 1、添加Ninesky.DataLibrary(與上次添加方法相同) 在解決方案(Ninesky)上點右鍵->添加->新建項目 選擇.NET Core ... ...
  • 本篇博文介紹了#define條件編譯的用途、用法,並結合具體實例進行說明;本文還說明瞭使用條件編譯時需要註意的事項,以及環境變數(或條件編譯符號)的設置方法。 ...
  • 返回目錄 題目有點意思,大家都知道Dictionary<K,V>不是線程安全的類型,而List<T>是線程安全的嗎?在今天之前大叔沒有去測試過,而就在今天也是一個VIP問我,說在我的代碼中使用了並行,然後為一個List賦值,說的直接一點就是:List元素是全局的,在各個線程里分別去操作它,測試數據是 ...
  • 在項目的web.config文件中添加 <connectionStrings> <add name="SQLConnectionString" connectionString="資料庫連接字元串"/> </connectionStrings> 頁面上使用需要添加命名空間 using System. ...
  • 1.許可權控制使用controller和 action來實現,許可權方式有很多種,最近開發項目使用控制控制器方式實現代碼如下 二.單點登錄方式使用application方式來實現 1.用戶登錄成功後記錄當前信息 2.使用ActionFilter來實現單點登錄,每次點擊控制器都去查詢過濾是否在其它地方登錄 ...
一周排行
    -Advertisement-
    Play Games
  • 示例項目結構 在 Visual Studio 中創建一個 WinForms 應用程式後,項目結構如下所示: MyWinFormsApp/ │ ├───Properties/ │ └───Settings.settings │ ├───bin/ │ ├───Debug/ │ └───Release/ ...
  • [STAThread] 特性用於需要與 COM 組件交互的應用程式,尤其是依賴單線程模型(如 Windows Forms 應用程式)的組件。在 STA 模式下,線程擁有自己的消息迴圈,這對於處理用戶界面和某些 COM 組件是必要的。 [STAThread] static void Main(stri ...
  • 在WinForm中使用全局異常捕獲處理 在WinForm應用程式中,全局異常捕獲是確保程式穩定性的關鍵。通過在Program類的Main方法中設置全局異常處理,可以有效地捕獲並處理未預見的異常,從而避免程式崩潰。 註冊全局異常事件 [STAThread] static void Main() { / ...
  • 前言 給大家推薦一款開源的 Winform 控制項庫,可以幫助我們開發更加美觀、漂亮的 WinForm 界面。 項目介紹 SunnyUI.NET 是一個基於 .NET Framework 4.0+、.NET 6、.NET 7 和 .NET 8 的 WinForm 開源控制項庫,同時也提供了工具類庫、擴展 ...
  • 說明 該文章是屬於OverallAuth2.0系列文章,每周更新一篇該系列文章(從0到1完成系統開發)。 該系統文章,我會儘量說的非常詳細,做到不管新手、老手都能看懂。 說明:OverallAuth2.0 是一個簡單、易懂、功能強大的許可權+可視化流程管理系統。 有興趣的朋友,請關註我吧(*^▽^*) ...
  • 一、下載安裝 1.下載git 必須先下載並安裝git,再TortoiseGit下載安裝 git安裝參考教程:https://blog.csdn.net/mukes/article/details/115693833 2.TortoiseGit下載與安裝 TortoiseGit,Git客戶端,32/6 ...
  • 前言 在項目開發過程中,理解數據結構和演算法如同掌握蓋房子的秘訣。演算法不僅能幫助我們編寫高效、優質的代碼,還能解決項目中遇到的各種難題。 給大家推薦一個支持C#的開源免費、新手友好的數據結構與演算法入門教程:Hello演算法。 項目介紹 《Hello Algo》是一本開源免費、新手友好的數據結構與演算法入門 ...
  • 1.生成單個Proto.bat內容 @rem Copyright 2016, Google Inc. @rem All rights reserved. @rem @rem Redistribution and use in source and binary forms, with or with ...
  • 一:背景 1. 講故事 前段時間有位朋友找到我,說他的窗體程式在客戶這邊出現了卡死,讓我幫忙看下怎麼回事?dump也生成了,既然有dump了那就上 windbg 分析吧。 二:WinDbg 分析 1. 為什麼會卡死 窗體程式的卡死,入口門檻很低,後續往下分析就不一定了,不管怎麼說先用 !clrsta ...
  • 前言 人工智慧時代,人臉識別技術已成為安全驗證、身份識別和用戶交互的關鍵工具。 給大家推薦一款.NET 開源提供了強大的人臉識別 API,工具不僅易於集成,還具備高效處理能力。 本文將介紹一款如何利用這些API,為我們的項目添加智能識別的亮點。 項目介紹 GitHub 上擁有 1.2k 星標的 C# ...