【RocketMQ】順序消息實現總結

来源:https://www.cnblogs.com/shanml/archive/2023/09/20/17706095.html
-Advertisement-
Play Games

全局有序 在RocketMQ中,如果使消息全局有序,可以為Topic設置一個消息隊列,使用一個生產者單線程發送數據,消費者端也使用單線程進行消費,從而保證消息的全局有序,但是這種方式效率低,一般不使用。 局部有序 假設一個Topic分配了兩個消息隊列,生產者在發送消息的時候,可以對消息設置一個路由I ...


全局有序
在RocketMQ中,如果使消息全局有序,可以為Topic設置一個消息隊列,使用一個生產者單線程發送數據,消費者端也使用單線程進行消費,從而保證消息的全局有序,但是這種方式效率低,一般不使用。

局部有序
假設一個Topic分配了兩個消息隊列,生產者在發送消息的時候,可以對消息設置一個路由ID,比如想保證一個訂單的相關消息有序,那麼就使用訂單ID當做路由ID,在發送消息的時候,通過訂單ID對消息隊列的個數取餘,根據取餘結果選擇消息隊列,這樣同一個訂單的數據就可以保證發送到一個消息隊列中,消費者端使用MessageListenerOrderly處理有序消息,這就是RocketMQ的局部有序,保證消息在某個消息隊列中有序。

接下來看RoceketMQ源碼中提供的順序消息例子(稍微做了一些修改):

生產者

public class Producer {
    public static void main(String[] args) throws UnsupportedEncodingException {
        try {
            // 創建生產者
            DefaultMQProducer producer = new DefaultMQProducer("生產者組");
            // 啟動
            producer.start();
            // 創建TAG
            String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
            for (int i = 0; i < 100; i++) {
                // 生成訂單ID
                int orderId = i % 10;
                // 創建消息
                Message msg =
                    new Message("TopicTest", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        // 獲取訂單ID
                        Integer id = (Integer) arg;
                        // 對消息隊列個數取餘
                        int index = id % mqs.size();
                        // 根據取餘結果選擇消息要發送給哪個消息隊列
                        return mqs.get(index);
                    }
                }, orderId); // 這裡傳入了訂單ID
                System.out.printf("%s%n", sendResult);
            }

            producer.shutdown();
        } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

消費者

public class Consumer {

    public static void main(String[] args) throws MQClientException {
        // 創建消費者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("消費者組");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 訂閱主題
        consumer.subscribe("TopicTest", "TagA || TagC || TagD");
        // 註冊消息監聽器,使用的是MessageListenerOrderly
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                // 列印消息
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

從例子中可以看出生產者在發送消息的時候,通過訂單ID作為路由信息,將同一個訂單ID的消息發送到了同一個消息隊列中,保證同一個訂單ID相關消息有序發送,接下來就看消費者是如何保證消息的順序消費的。

定時任務對消息隊列加鎖

消費者在啟動的時候,會對是否是順序消費進行判斷(監聽器是否是MessageListenerOrderly類型來判斷),如果是順序消費,會使用ConsumeMessageOrderlyService,並調用它的start方法進行啟動,在集群模式模式下,start方法中會啟動一個定時加鎖的任務,周期性的對該消費者負責的消息隊列進行加鎖。

為什麼集群模式下需要加鎖?
因為廣播模式下,消息隊列會分配給消費者下的每一個消費者,而在集群模式下,一個消息隊列同一時刻只能被同一個消費組下的某一個消費者進行,所以在廣播模式下不存在競爭關係,也就不需要對消息隊列進行加鎖,而在集群模式下,有可能因為負載均衡等原因將某一個消息隊列分配到了另外一個消費者中,因此在順序消費情況下,集群模式下需要對消息隊列加鎖,當某個消息隊列被鎖定時,其他的消費者不能進行消費。

加鎖的具體邏輯如下,首先獲取當前消費者負責的所有消息隊列MessageQueue,返回數據是一個MAP,key為broker名稱,value為broker下的消息隊列,接著對MAP進行遍歷,處理每一個broker下的消息隊列:
(1)根據broker名稱查找broker的詳細信息;
(2)創建加鎖請求,在請求中設置要加鎖的消息隊列,將請求發送給broker,表示要對這些消息隊列進行加鎖;
(3)Broker返回請求處理結果,響應結果中包含了加鎖成功的消息隊列,對於加鎖成功的消息隊列將消息隊列MessageQueue,將其對應的ProcessQueue中的locked屬性置為true表示該消息隊列已加鎖成功,如果響應中未包含某個消息隊列的信息,表示此消息隊列加鎖失敗,需要將其對應的ProcessQueue對象中的locked屬性置為false表示加鎖失敗;

順序消息拉取

上面可知,在使用順序消息時,定時任務會周期性的對當前消費者負責的消息隊列進行加鎖,不過由於負載均衡等原因,有可能給當前消費者分配了新的消息隊列,此時還未來得及通過定時任務加鎖,所以消費者在構建消息拉取請求前會再次進行判斷,如果是新分配到當前消費者的消息隊列,同樣會向Broker發送請求,對MessageQueue進行加鎖,加鎖成功將其對應的ProcessQueue中的locked屬性置為true才可以拉取消息。

順序消息消費

消息拉取成功之後,會將消息提交到線程池中進行處理,對於順序消費處理邏輯如下:

  1. 獲取消息隊列MessageQueue的對象鎖,每個MessageQueue對應了一把Object對象鎖,然後使用synchronized進行加鎖,這裡加鎖的原因是因為順序消費使用的是線程池,由多個線程同時進行消費,所以某個線程在處理某個消息隊列的消息時需要對該消息隊列MessageQueue加鎖,防止其他線程併發消費該消息隊列的鎖,破壞消息的順序性

    public class MessageQueueLock {
        private ConcurrentMap<MessageQueue, Object> mqLockTable = new ConcurrentHashMap<MessageQueue, Object>();
    
        public Object fetchLockObject(final MessageQueue mq) {
            // 獲取消息隊列對應的對象鎖,也就是一個Object類型的對象
            Object objLock = this.mqLockTable.get(mq);
            // 如果獲取為空
            if (null == objLock) {
                // 創建對象
                objLock = new Object();
                // 加入到Map中
                Object prevLock = this.mqLockTable.putIfAbsent(mq, objLock);
                if (prevLock != null) {
                    objLock = prevLock;
                }
            }
            return objLock;
        }
    }
    
  2. 上一步獲取鎖成功之後,會再次校驗該MessageQueue對應的ProcessQueue中的鎖(locked狀態),看是否過期或者已經失效,過期或者失效稍後會重新進行加鎖;

  3. 獲取ProcessQueue的中的consumeLock消費鎖,獲取成功之後調用消息監聽器的consumeMessage方法開始消費消費;

    public class ProcessQueue {
       // 消息消費鎖
       private final Lock consumeLock = new ReentrantLock();
    
       public Lock getConsumeLock() { // 獲取消息消費鎖
             return consumeLock;
       }
    }
    
  4. 消息消費完畢,釋放ProcessQueueconsumeLock消費鎖;

  5. 方法執行完畢,釋放MessageQueue對應的Object對象鎖;

在第1步中就已經獲取了MessageQueue對應的Object對象鎖對消息隊列進行加鎖了,那麼為什麼在第3步消費消息之前還要再加一個消費鎖呢?

猜測有可能是在消費者進行負載均衡時,當前消費者負責的消息隊列發生變化,可能移除某個消息隊列,那麼消費者在進行消費的時候就要獲取ProcessQueueconsumeLock消費鎖進行加鎖,相當於鎖住ProcessQueue,防止正在消費的過程中,ProcessQueue被負載均衡移除。

既然如此,負載均衡的時候為什麼不使用MessageQueue對應的Object對象鎖進行加鎖而要使用ProcessQueue中的consumeLock消費鎖?

這裡應該是為了減小鎖的粒度,因為消費者在MessageQueue對應的Object加鎖後,還進行了一系列的判斷,校驗都成功之後獲取ProcessQueue中的consumeLock加鎖,之後開始消費消息,消費完畢釋放所有的鎖,如果負載均衡使用MessageQueueObject對象鎖需要等待整個過程結束,鎖的粒度較粗,這樣顯然會降低性能,而如果使用消息消費鎖,只需要等待第3步和第4步結束就可以獲取鎖,減少等待的時間,而且消費者在進行消息消費前也會判斷ProcessQueue是否被移除,所以只要保證consumeMessage方法在執行的過程中(消息被消費的過程)ProcessQueue不被移除即可。

總結

消費者端,是通過加鎖來保證消息的順序消費,一共有三把鎖:

  1. 向Broker申請的消息隊列鎖
    集群模式下一個消息隊列同一時刻只能被同一個消費組下的某一個消費者進行,為了避免負載均衡等原因引起的變動,消費者會向Broker發送請求對消息隊列進行加鎖,如果加鎖成功,記錄到消息隊列對應的ProcessQueue中的locked變數中。

  2. 消息隊列鎖
    對應MessageQueue對應的Object對象鎖,消費者在處理拉取到的消息時,由於可以開啟多線程進行處理,所以處理消息前需要對MessageQueue加鎖,鎖住要處理的消息隊列,主要是處理多線程之間的競爭,保證消息的順序性。

  3. 消息消費鎖
    對應ProcessQueue中的consumeLock,消費者在調用consumeMessage方法之前會加消費鎖,主要是為了避免在消費消息時,由於負載均衡等原因,ProcessQueue被刪除

對應的相關源碼可參考:

【RocketMQ】【源碼】順序消息實現原理


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 上一篇提到過類的屬性,但沒有詳細介紹,本篇詳細介紹一下類的屬性 一 、類的屬性 方法是用來操作數據的,而屬性則是建模必不的內容,而且操作的數據,大多數是屬性,比如游戲中的某個boss類,它的生命值就是屬性(不同級別的boss,有不同的生命值),被攻擊方法(不同的攻擊,傷害值不同),當boss被攻擊時 ...
  • 大家好,我是Antvictor,一個勵志要成為架構師的程式員。 閑話少說,讓我們直接開始安裝Python。 Python安裝 從Python官網找到Download下載對應的安裝包,python3.6及以上即可。 Python官網會根據系統預設展示對應系統的最新版本安裝包,下載成功後點擊安裝。 這裡 ...
  • Python庫解析地址PyParsing 人們普遍認為,Python編程語言的pyparsing 模塊是對文本數據進行操作的一個寶貴工具。 用於解析和修改文本數據的pyparsing 包,簡化了對地址的操作。這是因為該模塊可以轉換和幫助解析地址。 在這篇文章中,我們將討論PyParsing 模塊在處 ...
  • 所有的面試題目都不是一成不變的,面試題目只是給大家一個借鑒作用,最主要的是給自己增加知識的儲備,有備無患。 ...
  • 大家好,我是 Java陳序員,今天給大家介紹一個顏值功能雙線上的 Zookeeper 可視化工具。 項目介紹 PrettyZoo 是一個基於 Apache Curator 和 JavaFX 實現的 Zookeeper 圖形化管理客戶端。 使用了 Java 的模塊化(Jigsaw)技術,並基於 JPa ...
  • 2.1、環境搭建 2.1.1、右擊project創建新module 2.1.2、選擇maven 2.1.3、設置module名稱和路徑 2.1.4、module初始狀態 2.1.5、配置打包方式 註意:預設的打包方式為 jar,為了能配置web資源,需要將打包方式設置為 war <packaging ...
  • 在之前的Java 17新特性中,我們介紹過關於JEP 406: switch的模式匹配,但當時還只是關於此內容的首個預覽版本。之後在JDK 18、JDK 19、JDK 20中又都進行了更新和完善。如今,在JDK 21中,該特性得到了最終確定!下麵,我們就再正式學習一下該功能! 在以往的switch語 ...
  • 本文介紹了Python 非同步編程技術asyncio ,使用場景,介紹了同步編程,非同步編程原理,非同步技術的優勢,非同步語法 async await, 協程,create_task, gather, event loop, asyncio.run() 等,用回調函數callback 來解析響應消息,實... ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...