【RocketMQ】順序消息實現總結

来源:https://www.cnblogs.com/shanml/archive/2023/09/20/17706095.html
-Advertisement-
Play Games

全局有序 在RocketMQ中,如果使消息全局有序,可以為Topic設置一個消息隊列,使用一個生產者單線程發送數據,消費者端也使用單線程進行消費,從而保證消息的全局有序,但是這種方式效率低,一般不使用。 局部有序 假設一個Topic分配了兩個消息隊列,生產者在發送消息的時候,可以對消息設置一個路由I ...


全局有序
在RocketMQ中,如果使消息全局有序,可以為Topic設置一個消息隊列,使用一個生產者單線程發送數據,消費者端也使用單線程進行消費,從而保證消息的全局有序,但是這種方式效率低,一般不使用。

局部有序
假設一個Topic分配了兩個消息隊列,生產者在發送消息的時候,可以對消息設置一個路由ID,比如想保證一個訂單的相關消息有序,那麼就使用訂單ID當做路由ID,在發送消息的時候,通過訂單ID對消息隊列的個數取餘,根據取餘結果選擇消息隊列,這樣同一個訂單的數據就可以保證發送到一個消息隊列中,消費者端使用MessageListenerOrderly處理有序消息,這就是RocketMQ的局部有序,保證消息在某個消息隊列中有序。

接下來看RoceketMQ源碼中提供的順序消息例子(稍微做了一些修改):

生產者

public class Producer {
    public static void main(String[] args) throws UnsupportedEncodingException {
        try {
            // 創建生產者
            DefaultMQProducer producer = new DefaultMQProducer("生產者組");
            // 啟動
            producer.start();
            // 創建TAG
            String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
            for (int i = 0; i < 100; i++) {
                // 生成訂單ID
                int orderId = i % 10;
                // 創建消息
                Message msg =
                    new Message("TopicTest", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        // 獲取訂單ID
                        Integer id = (Integer) arg;
                        // 對消息隊列個數取餘
                        int index = id % mqs.size();
                        // 根據取餘結果選擇消息要發送給哪個消息隊列
                        return mqs.get(index);
                    }
                }, orderId); // 這裡傳入了訂單ID
                System.out.printf("%s%n", sendResult);
            }

            producer.shutdown();
        } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

消費者

public class Consumer {

    public static void main(String[] args) throws MQClientException {
        // 創建消費者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("消費者組");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 訂閱主題
        consumer.subscribe("TopicTest", "TagA || TagC || TagD");
        // 註冊消息監聽器,使用的是MessageListenerOrderly
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                // 列印消息
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

從例子中可以看出生產者在發送消息的時候,通過訂單ID作為路由信息,將同一個訂單ID的消息發送到了同一個消息隊列中,保證同一個訂單ID相關消息有序發送,接下來就看消費者是如何保證消息的順序消費的。

定時任務對消息隊列加鎖

消費者在啟動的時候,會對是否是順序消費進行判斷(監聽器是否是MessageListenerOrderly類型來判斷),如果是順序消費,會使用ConsumeMessageOrderlyService,並調用它的start方法進行啟動,在集群模式模式下,start方法中會啟動一個定時加鎖的任務,周期性的對該消費者負責的消息隊列進行加鎖。

為什麼集群模式下需要加鎖?
因為廣播模式下,消息隊列會分配給消費者下的每一個消費者,而在集群模式下,一個消息隊列同一時刻只能被同一個消費組下的某一個消費者進行,所以在廣播模式下不存在競爭關係,也就不需要對消息隊列進行加鎖,而在集群模式下,有可能因為負載均衡等原因將某一個消息隊列分配到了另外一個消費者中,因此在順序消費情況下,集群模式下需要對消息隊列加鎖,當某個消息隊列被鎖定時,其他的消費者不能進行消費。

加鎖的具體邏輯如下,首先獲取當前消費者負責的所有消息隊列MessageQueue,返回數據是一個MAP,key為broker名稱,value為broker下的消息隊列,接著對MAP進行遍歷,處理每一個broker下的消息隊列:
(1)根據broker名稱查找broker的詳細信息;
(2)創建加鎖請求,在請求中設置要加鎖的消息隊列,將請求發送給broker,表示要對這些消息隊列進行加鎖;
(3)Broker返回請求處理結果,響應結果中包含了加鎖成功的消息隊列,對於加鎖成功的消息隊列將消息隊列MessageQueue,將其對應的ProcessQueue中的locked屬性置為true表示該消息隊列已加鎖成功,如果響應中未包含某個消息隊列的信息,表示此消息隊列加鎖失敗,需要將其對應的ProcessQueue對象中的locked屬性置為false表示加鎖失敗;

順序消息拉取

上面可知,在使用順序消息時,定時任務會周期性的對當前消費者負責的消息隊列進行加鎖,不過由於負載均衡等原因,有可能給當前消費者分配了新的消息隊列,此時還未來得及通過定時任務加鎖,所以消費者在構建消息拉取請求前會再次進行判斷,如果是新分配到當前消費者的消息隊列,同樣會向Broker發送請求,對MessageQueue進行加鎖,加鎖成功將其對應的ProcessQueue中的locked屬性置為true才可以拉取消息。

順序消息消費

消息拉取成功之後,會將消息提交到線程池中進行處理,對於順序消費處理邏輯如下:

  1. 獲取消息隊列MessageQueue的對象鎖,每個MessageQueue對應了一把Object對象鎖,然後使用synchronized進行加鎖,這裡加鎖的原因是因為順序消費使用的是線程池,由多個線程同時進行消費,所以某個線程在處理某個消息隊列的消息時需要對該消息隊列MessageQueue加鎖,防止其他線程併發消費該消息隊列的鎖,破壞消息的順序性

    public class MessageQueueLock {
        private ConcurrentMap<MessageQueue, Object> mqLockTable = new ConcurrentHashMap<MessageQueue, Object>();
    
        public Object fetchLockObject(final MessageQueue mq) {
            // 獲取消息隊列對應的對象鎖,也就是一個Object類型的對象
            Object objLock = this.mqLockTable.get(mq);
            // 如果獲取為空
            if (null == objLock) {
                // 創建對象
                objLock = new Object();
                // 加入到Map中
                Object prevLock = this.mqLockTable.putIfAbsent(mq, objLock);
                if (prevLock != null) {
                    objLock = prevLock;
                }
            }
            return objLock;
        }
    }
    
  2. 上一步獲取鎖成功之後,會再次校驗該MessageQueue對應的ProcessQueue中的鎖(locked狀態),看是否過期或者已經失效,過期或者失效稍後會重新進行加鎖;

  3. 獲取ProcessQueue的中的consumeLock消費鎖,獲取成功之後調用消息監聽器的consumeMessage方法開始消費消費;

    public class ProcessQueue {
       // 消息消費鎖
       private final Lock consumeLock = new ReentrantLock();
    
       public Lock getConsumeLock() { // 獲取消息消費鎖
             return consumeLock;
       }
    }
    
  4. 消息消費完畢,釋放ProcessQueueconsumeLock消費鎖;

  5. 方法執行完畢,釋放MessageQueue對應的Object對象鎖;

在第1步中就已經獲取了MessageQueue對應的Object對象鎖對消息隊列進行加鎖了,那麼為什麼在第3步消費消息之前還要再加一個消費鎖呢?

猜測有可能是在消費者進行負載均衡時,當前消費者負責的消息隊列發生變化,可能移除某個消息隊列,那麼消費者在進行消費的時候就要獲取ProcessQueueconsumeLock消費鎖進行加鎖,相當於鎖住ProcessQueue,防止正在消費的過程中,ProcessQueue被負載均衡移除。

既然如此,負載均衡的時候為什麼不使用MessageQueue對應的Object對象鎖進行加鎖而要使用ProcessQueue中的consumeLock消費鎖?

這裡應該是為了減小鎖的粒度,因為消費者在MessageQueue對應的Object加鎖後,還進行了一系列的判斷,校驗都成功之後獲取ProcessQueue中的consumeLock加鎖,之後開始消費消息,消費完畢釋放所有的鎖,如果負載均衡使用MessageQueueObject對象鎖需要等待整個過程結束,鎖的粒度較粗,這樣顯然會降低性能,而如果使用消息消費鎖,只需要等待第3步和第4步結束就可以獲取鎖,減少等待的時間,而且消費者在進行消息消費前也會判斷ProcessQueue是否被移除,所以只要保證consumeMessage方法在執行的過程中(消息被消費的過程)ProcessQueue不被移除即可。

總結

消費者端,是通過加鎖來保證消息的順序消費,一共有三把鎖:

  1. 向Broker申請的消息隊列鎖
    集群模式下一個消息隊列同一時刻只能被同一個消費組下的某一個消費者進行,為了避免負載均衡等原因引起的變動,消費者會向Broker發送請求對消息隊列進行加鎖,如果加鎖成功,記錄到消息隊列對應的ProcessQueue中的locked變數中。

  2. 消息隊列鎖
    對應MessageQueue對應的Object對象鎖,消費者在處理拉取到的消息時,由於可以開啟多線程進行處理,所以處理消息前需要對MessageQueue加鎖,鎖住要處理的消息隊列,主要是處理多線程之間的競爭,保證消息的順序性。

  3. 消息消費鎖
    對應ProcessQueue中的consumeLock,消費者在調用consumeMessage方法之前會加消費鎖,主要是為了避免在消費消息時,由於負載均衡等原因,ProcessQueue被刪除

對應的相關源碼可參考:

【RocketMQ】【源碼】順序消息實現原理


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 上一篇提到過類的屬性,但沒有詳細介紹,本篇詳細介紹一下類的屬性 一 、類的屬性 方法是用來操作數據的,而屬性則是建模必不的內容,而且操作的數據,大多數是屬性,比如游戲中的某個boss類,它的生命值就是屬性(不同級別的boss,有不同的生命值),被攻擊方法(不同的攻擊,傷害值不同),當boss被攻擊時 ...
  • 大家好,我是Antvictor,一個勵志要成為架構師的程式員。 閑話少說,讓我們直接開始安裝Python。 Python安裝 從Python官網找到Download下載對應的安裝包,python3.6及以上即可。 Python官網會根據系統預設展示對應系統的最新版本安裝包,下載成功後點擊安裝。 這裡 ...
  • Python庫解析地址PyParsing 人們普遍認為,Python編程語言的pyparsing 模塊是對文本數據進行操作的一個寶貴工具。 用於解析和修改文本數據的pyparsing 包,簡化了對地址的操作。這是因為該模塊可以轉換和幫助解析地址。 在這篇文章中,我們將討論PyParsing 模塊在處 ...
  • 所有的面試題目都不是一成不變的,面試題目只是給大家一個借鑒作用,最主要的是給自己增加知識的儲備,有備無患。 ...
  • 大家好,我是 Java陳序員,今天給大家介紹一個顏值功能雙線上的 Zookeeper 可視化工具。 項目介紹 PrettyZoo 是一個基於 Apache Curator 和 JavaFX 實現的 Zookeeper 圖形化管理客戶端。 使用了 Java 的模塊化(Jigsaw)技術,並基於 JPa ...
  • 2.1、環境搭建 2.1.1、右擊project創建新module 2.1.2、選擇maven 2.1.3、設置module名稱和路徑 2.1.4、module初始狀態 2.1.5、配置打包方式 註意:預設的打包方式為 jar,為了能配置web資源,需要將打包方式設置為 war <packaging ...
  • 在之前的Java 17新特性中,我們介紹過關於JEP 406: switch的模式匹配,但當時還只是關於此內容的首個預覽版本。之後在JDK 18、JDK 19、JDK 20中又都進行了更新和完善。如今,在JDK 21中,該特性得到了最終確定!下麵,我們就再正式學習一下該功能! 在以往的switch語 ...
  • 本文介紹了Python 非同步編程技術asyncio ,使用場景,介紹了同步編程,非同步編程原理,非同步技術的優勢,非同步語法 async await, 協程,create_task, gather, event loop, asyncio.run() 等,用回調函數callback 來解析響應消息,實... ...
一周排行
    -Advertisement-
    Play Games
  • 示例項目結構 在 Visual Studio 中創建一個 WinForms 應用程式後,項目結構如下所示: MyWinFormsApp/ │ ├───Properties/ │ └───Settings.settings │ ├───bin/ │ ├───Debug/ │ └───Release/ ...
  • [STAThread] 特性用於需要與 COM 組件交互的應用程式,尤其是依賴單線程模型(如 Windows Forms 應用程式)的組件。在 STA 模式下,線程擁有自己的消息迴圈,這對於處理用戶界面和某些 COM 組件是必要的。 [STAThread] static void Main(stri ...
  • 在WinForm中使用全局異常捕獲處理 在WinForm應用程式中,全局異常捕獲是確保程式穩定性的關鍵。通過在Program類的Main方法中設置全局異常處理,可以有效地捕獲並處理未預見的異常,從而避免程式崩潰。 註冊全局異常事件 [STAThread] static void Main() { / ...
  • 前言 給大家推薦一款開源的 Winform 控制項庫,可以幫助我們開發更加美觀、漂亮的 WinForm 界面。 項目介紹 SunnyUI.NET 是一個基於 .NET Framework 4.0+、.NET 6、.NET 7 和 .NET 8 的 WinForm 開源控制項庫,同時也提供了工具類庫、擴展 ...
  • 說明 該文章是屬於OverallAuth2.0系列文章,每周更新一篇該系列文章(從0到1完成系統開發)。 該系統文章,我會儘量說的非常詳細,做到不管新手、老手都能看懂。 說明:OverallAuth2.0 是一個簡單、易懂、功能強大的許可權+可視化流程管理系統。 有興趣的朋友,請關註我吧(*^▽^*) ...
  • 一、下載安裝 1.下載git 必須先下載並安裝git,再TortoiseGit下載安裝 git安裝參考教程:https://blog.csdn.net/mukes/article/details/115693833 2.TortoiseGit下載與安裝 TortoiseGit,Git客戶端,32/6 ...
  • 前言 在項目開發過程中,理解數據結構和演算法如同掌握蓋房子的秘訣。演算法不僅能幫助我們編寫高效、優質的代碼,還能解決項目中遇到的各種難題。 給大家推薦一個支持C#的開源免費、新手友好的數據結構與演算法入門教程:Hello演算法。 項目介紹 《Hello Algo》是一本開源免費、新手友好的數據結構與演算法入門 ...
  • 1.生成單個Proto.bat內容 @rem Copyright 2016, Google Inc. @rem All rights reserved. @rem @rem Redistribution and use in source and binary forms, with or with ...
  • 一:背景 1. 講故事 前段時間有位朋友找到我,說他的窗體程式在客戶這邊出現了卡死,讓我幫忙看下怎麼回事?dump也生成了,既然有dump了那就上 windbg 分析吧。 二:WinDbg 分析 1. 為什麼會卡死 窗體程式的卡死,入口門檻很低,後續往下分析就不一定了,不管怎麼說先用 !clrsta ...
  • 前言 人工智慧時代,人臉識別技術已成為安全驗證、身份識別和用戶交互的關鍵工具。 給大家推薦一款.NET 開源提供了強大的人臉識別 API,工具不僅易於集成,還具備高效處理能力。 本文將介紹一款如何利用這些API,為我們的項目添加智能識別的亮點。 項目介紹 GitHub 上擁有 1.2k 星標的 C# ...