RabbitMQ學習第二記:工作隊列的兩種分發方式,輪詢分發(Round-robin)和 公平分發(Fair dispatch)

来源:https://www.cnblogs.com/wy697495/archive/2018/09/09/9611648.html
-Advertisement-
Play Games

1、什麼是RabbitMQ工作隊列 我們在應用程式使用消息系統時,一般情況下生產者往隊列里插入數據時速度是比較快的,但是消費者消費數據往往涉及到一些業務邏輯處理導致速度跟不上生產者生產數據。因此如果一個生產者對應一個消費者的話,很容易導致很多消息堆積在隊列里。這時,就得使用工作隊列了。一個隊列有多個 ...


1、什麼是RabbitMQ工作隊列

  我們在應用程式使用消息系統時,一般情況下生產者往隊列里插入數據時速度是比較快的,但是消費者消費數據往往涉及到一些業務邏輯處理導致速度跟不上生產者生產數據。因此如果一個生產者對應一個消費者的話,很容易導致很多消息堆積在隊列里。這時,就得使用工作隊列了。一個隊列有多個消費者同時消費數據。

  下圖取自於官方網站(RabbitMQ)的工作隊列的圖例

P:消息的生產者

C1:消息的消費者1

C2:消息的消費者2

紅色:隊列 

生產者將消息發送到隊列,多個消費者同時從隊列中獲取消息。

  工作隊列有兩種分發數據的方式:輪詢分發(Round-robin)和  公平分發(Fair dispatch)。輪詢分發:隊列給每一個消費者發送數量一樣的數據。公平分發:消費者設置每次從隊列中取一條數據,並且消費完後手動應答,繼續從隊列取下一個數據。下麵分別是兩種分發方式不同的寫法。

2、輪詢分發(Round-robin)

  生產者(Send)生產10條數據,消費者1(Receive1)接收數據並假設處理業務邏輯1s,消費者2(Receive2)接收數據並假設處理業務邏輯2s(生產者先運行,兩個消費者同時運行)。

2.1、生產者(Send)代碼

public class Send
{
    //隊列名稱
    private static final String QUEUE_NAME = "test_work_round_robin_queue";
    
    public static void main(String[] args)
    {
        try
        {
            //獲取連接
            Connection connection = ConnectionUtil.getConnection();
            //從連接中獲取一個通道
            Channel channel = connection.createChannel();
            //聲明隊列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            for (int i = 0; i < 10; i++)
            {
                String message = "this is work_round_robin queue message" + i;
                System.out.println("[send]:" + message);
                //發送消息
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes("utf-8"));
                Thread.sleep(20 * i);
            }
            channel.close();
            connection.close();
        }
        catch (IOException | TimeoutException | InterruptedException e)
        {
            e.printStackTrace();
        }
    }
}

運行結果:

[send]:this is work_round_robin queue message0
[send]:this is work_round_robin queue message1
[send]:this is work_round_robin queue message2
[send]:this is work_round_robin queue message3
[send]:this is work_round_robin queue message4
[send]:this is work_round_robin queue message5
[send]:this is work_round_robin queue message6
[send]:this is work_round_robin queue message7
[send]:this is work_round_robin queue message8
[send]:this is work_round_robin queue message9

2.2、消費者1(Receive1)代碼

public class Receive1
{
    //隊列名稱
    private static final String QUEUE_NAME = "test_work_round_robin_queue";
    
    public static void main(String[] args)
    {
        try
        {
            //獲取連接
            Connection connection = ConnectionUtil.getConnection();
            //從連接中獲取一個通道
            Channel channel = connection.createChannel();
            //聲明隊列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //定義消費者
            DefaultConsumer consumer = new DefaultConsumer(channel)
            {
                //當消息到達時執行回調方法
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException
                {
                    String message = new String(body, "utf-8");
                    System.out.println("[1] Receive message:" + message);
                    try
                    {
                        //消費者休息1s處理業務
                        Thread.sleep(1000);
                    }
                    catch (InterruptedException e)
                    {
                        e.printStackTrace();
                    }
                    finally
                    {
                        System.out.println("[1] done");
                    }
                }
            };
            //監聽隊列
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
        catch (IOException e)
        {
            e.printStackTrace();
        }
        
    }
}

運行結果:

[1] Receive message:this is work_round_robin queue message0
[1] done
[1] Receive message:this is work_round_robin queue message2
[1] done
[1] Receive message:this is work_round_robin queue message4
[1] done
[1] Receive message:this is work_round_robin queue message6
[1] done
[1] Receive message:this is work_round_robin queue message8
[1] done

2.3、消費者2(Receive2)代碼

public class Receive2
{
    //隊列名稱
    private static final String QUEUE_NAME = "test_work_round_robin_queue";
    
    public static void main(String[] args)
    {
        
        try
        {
            //獲取連接
            Connection connection = ConnectionUtil.getConnection();
            //從連接中獲取一個通道
            Channel channel = connection.createChannel();
            //聲明隊列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //定義消費者
            DefaultConsumer consumer = new DefaultConsumer(channel)
            {
                //當消息到達時執行回調方法
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException
                {
                    String message = new String(body, "utf-8");
                    System.out.println("[2] Receive message:" + message);
                    try
                    {
                        //消費者休息2s處理業務
                        Thread.sleep(2000);
                    }
                    catch (InterruptedException e)
                    {
                        e.printStackTrace();
                    }
                    finally
                    {
                        System.out.println("[2] done");
                    }
                }
            };
            //監聽隊列
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
        catch (IOException e)
        {
            e.printStackTrace();
        }
        
    }
}

運行結果:

[2] Receive message:this is work_round_robin queue message1
[2] done
[2] Receive message:this is work_round_robin queue message3
[2] done
[2] Receive message:this is work_round_robin queue message5
[2] done
[2] Receive message:this is work_round_robin queue message7
[2] done
[2] Receive message:this is work_round_robin queue message9
[2] done

總結:兩個消費者得到的數據量一樣的。從運行時可以看到消費者1會先執行完,消費者2會後執行完。並不會因為兩個消費者處理數據速度不一樣使得兩個消費者取得不一樣數量的數據。並且當隊列數量大的時候通過觀察RabbitMQ的管理後臺,可以看到管理界面隊列中的數據很快就沒了,但是這個時候兩個消費者其實並沒有消費完數據。這種分發方式存在著很大的隱患。

3、公平分發(Fair dispatch)

  生產者(Send)生產10條數據,消費者1(Receive1)接收數據並假設處理業務邏輯1s,消費者2(Receive2)接收數據並假設處理業務邏輯2s(生產者先運行,兩個消費者同時運行)。

消費者設置每次從隊列里取一條數據,並且關閉自動回覆機制,每次取完一條數據後,手動回覆並繼續取下一條數據。

3.1、生產者(Send)代碼

public class Send
{
    //隊列名稱
    private static final String QUEUE_NAME = "test_work_fair_dispatch_queue";
    
    public static void main(String[] args)
    {
        try
        {
            //獲取連接
            Connection connection = ConnectionUtil.getConnection();
            //從連接中獲取一個通道
            Channel channel = connection.createChannel();
            //聲明隊列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            for (int i = 0; i < 10; i++)
            {
                String message = "this is work_fair_dispatch queue message" + i;
                System.out.println("[send]:" + message);
                //發送消息
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes("utf-8"));
                Thread.sleep(20 * i);
            }
            channel.close();
            connection.close();
        }
        catch (IOException | TimeoutException | InterruptedException e)
        {
            e.printStackTrace();
        }
    }
}

運行結果:

[send]:this is work_fair_dispatch queue message0
[send]:this is work_fair_dispatch queue message1
[send]:this is work_fair_dispatch queue message2
[send]:this is work_fair_dispatch queue message3
[send]:this is work_fair_dispatch queue message4
[send]:this is work_fair_dispatch queue message5
[send]:this is work_fair_dispatch queue message6
[send]:this is work_fair_dispatch queue message7
[send]:this is work_fair_dispatch queue message8
[send]:this is work_fair_dispatch queue message9

3.2、消費者1(Receive1)代碼

public class Receive1
{
    //隊列名稱
    private static final String QUEUE_NAME = "test_work_fair_dispatch_queue";
    
    public static void main(String[] args)
    {
        
        try
        {
            //獲取連接
            Connection connection = ConnectionUtil.getConnection();
            //從連接中獲取一個通道
            final Channel channel = connection.createChannel();
            //聲明隊列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //設置每次從隊列里取一條數據
            int prefetchCount = 1;
            channel.basicQos(prefetchCount);
            //定義消費者
            DefaultConsumer consumer = new DefaultConsumer(channel)
            {
                //當消息到達時執行回調方法
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException
                {
                    String message = new String(body, "utf-8");
                    System.out.println("[1] Receive message:" + message);
                    try
                    {
                        //消費者休息1s處理業務
                        Thread.sleep(1000);
                    }
                    catch (InterruptedException e)
                    {
                        e.printStackTrace();
                    }
                    finally
                    {
                        System.out.println("[1] done");
                        //手動應答
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
            //設置手動應答
            boolean autoAck = false;
            //監聽隊列
            channel.basicConsume(QUEUE_NAME, autoAck, consumer);
        }
        catch (IOException e)
        {
            e.printStackTrace();
        }
        
    }
}

運行結果:

[1] Receive message:this is work_fair_dispatch queue message1
[1] done
[1] Receive message:this is work_fair_dispatch queue message2
[1] done
[1] Receive message:this is work_fair_dispatch queue message4
[1] done
[1] Receive message:this is work_fair_dispatch queue message5
[1] done
[1] Receive message:this is work_fair_dispatch queue message7
[1] done
[1] Receive message:this is work_fair_dispatch queue message8
[1] done

3.3、消費者2(Receive2)代碼

public class Receive2
{
    //隊列名稱
    private static final String QUEUE_NAME = "test_work_fair_dispatch_queue";
    
    public static void main(String[] args)
    {
        
        try
        {
            //獲取連接
            Connection connection = ConnectionUtil.getConnection();
            //從連接中獲取一個通道
            final Channel channel = connection.createChannel();
            //聲明隊列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //保證一次只分發一個  
            int prefetchCount = 1;
            channel.basicQos(prefetchCount);
            //定義消費者
            DefaultConsumer consumer = new DefaultConsumer(channel)
            {
                //當消息到達時執行回調方法
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException
                {
                    String message = new String(body, "utf-8");
                    System.out.println("[2] Receive message:" + message);
                    try
                    {
                        //消費者休息2s處理業務
                        Thread.sleep(2000);
                    }
                    catch (InterruptedException e)
                    {
                        e.printStackTrace();
                    }
                    finally
                    {
                        System.out.println("[2] done");
                        //手動應答
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
            //設置手動應答
            boolean autoAck = false;
            //監聽隊列
            channel.basicConsume(QUEUE_NAME, autoAck, consumer);
        }
        catch (IOException e)
        {
            e.printStackTrace();
        }
        
    }
}

運行結果:

[2] Receive message:this is work_fair_dispatch queue message0
[2] done
[2] Receive message:this is work_fair_dispatch queue message3
[2] done
[2] Receive message:this is work_fair_dispatch queue message6
[2] done
[2] Receive message:this is work_fair_dispatch queue message9
[2] done

總結:消費者1處理了6條數據,消費者2處理了4條數據

  與輪詢分發不同的是,當每個消費都設置了每次只會從隊列取一條數據時,並且關閉自動應答,在每次處理完數據後手動給隊列發送確認收到數據。這樣隊列就會公平給每個消息費者發送數據,消費一條再發第二條,而且可以在管理界面中看到數據是一條條隨著消費者消費完從而減少的,並不是一下子全部分發完了。顯然公平分發更符合系統設計。

  

註意:本文僅代表個人理解和看法喲!和本人所在公司和團體無任何關係!

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • todoList 結合之前 Vuejs 基礎與語法 使用 v-model 雙向綁定 input 輸入內容與數據 data 使用 @click 和 methods 關聯事件 使用 v-for 進行數據迴圈展示 <!DOCTYPE html> <html lang="en"> <head> <meta ...
  • 剛開始學JQuery寫的如有錯誤歡迎批評指正 JQuery擁有的選擇器可以讓我們更快更方便找到想要的元素,然後對相應的元素進行操作 簡單介紹一下一些常用的選擇器: 1.基本選擇器: 2.層級選擇器: 層級函數 3.過濾選擇器: 4.內容選擇器: 5.屬性選擇器: 6.子元素選擇器: 7.表單選擇器: ...
  • 全棧工程師也可以叫web 前端 H5主要是網站 app 小程式 公眾號這一塊 HTML篇 html(超文本標記語言,標記通用標記語言下的一個應用。) “超文本”就是指頁面內可以包含圖片、鏈接,甚至音樂、程式等非文字元素。 超文本標記語言的結構包括“頭”部分(英語:Head)、和“主體”部分(英語:B ...
  • 我主要進行對日期數據進行查看使用,有數據的顯示顏色、沒有數據可以不選 更多可以查看官方網站:http://www.daterangepicker.com/#examples ...
  • 在js中,函數本身屬於對象的一種,因此可以定義、賦值,作為對象的屬性或者成為其他函數的參數。函數名只是函數這個對象類的引用。 函數定義 從技術上講,這是一個函數表達式。但不推薦使用,因為這種語法會導致解析兩次代碼。第一次是解析常規javascript代碼,第二次解析傳入構造函數中的字元串,影響性能。 ...
  • 王之泰201771010131《面向對象程式設計(java)》第二周學習總結 第一部分:理論知識學習部分 第三章 第三章內容主要為Java語言的基礎語法,主要內容如下 1.基礎知識 1.1標識符 a)標識符可用作類名、變數名、方法名、數組名、文件名等。 註:第一個符號不能為數字,即不能用數字開頭。 ...
  • 1、類載入器 2、反射構造方法 3、反射成員變數 4、反射成員方法 5、反射配置文件運行類中的方法 ...
  • 本篇文章根據個人理解的知識整理彙總,包含 Cookie、Session 的基礎知識和單點登錄的方法,如有不足之處,請大家多多指正。 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...