RocketMQ4.3.x對順序消息的理解

来源:https://www.cnblogs.com/zhyg/archive/2019/01/09/10244729.html
-Advertisement-
Play Games

1、RocketMQ消息隊列簡單介紹 這裡簡單介紹一下RocketMQ的消息隊列的模型 一個topic對應多個隊列如下圖: 生產者和消費者分別向隊列中發送和消費消息,生產者和消費者都可以是多個,通過組名進行群組約束。由於負載因素造成生產消息會生產到各個queue中。 消費群組進行queue消費時首先 ...


1、RocketMQ消息隊列簡單介紹

  這裡簡單介紹一下RocketMQ的消息隊列的模型

  一個topic對應多個隊列如下圖:

  

  生產者和消費者分別向隊列中發送和消費消息,生產者和消費者都可以是多個,通過組名進行群組約束。由於負載因素造成生產消息會生產到各個queue中。

  消費群組進行queue消費時首先因為負載因素,queue會分配給各自的消費實例中,如果消費組有變化會重新分配,導致queue分配亂序。

  另外一個消費者實例消費對應的queue時,消費者使用線程池進行處理消息。

  以上各種操作都會導致消息不一定先處理就會先完成,所以造成消息消費不是嚴格順序處理的。

  所以在之前的版本中假如我們要求要嚴格按照順序進行消息處理的話就必須進行單隊列單線程進行消息消費處理

  4.0.0之後版本支持順序消費處理,我們看一看他是如何處理的。。。

2、順序消費原來介紹

  根據上面的簡單介紹我們知道一個topic對應多個隊列,我們生產消息的時候就可以針對隊列的數量和消息的有效標識進行取模進行隊列選擇發送,如下示例代碼

package org.apache.rocketmq.example.ordermessage;

import java.io.UnsupportedEncodingException;
import java.util.List;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;

public class Producer {
    public static void main(String[] args) throws UnsupportedEncodingException {
        try {
            MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
            producer.start();

            String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
            for (int i = 0; i < 100; i++) {
                int orderId = i % 10;
                Message msg =
                    new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Integer id = (Integer) arg;
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, orderId);

                System.out.printf("%s%n", sendResult);
            }

            producer.shutdown();
        } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

  比如我們使用訂單號作為和queue的ID取模的關鍵欄位,我們就可以保證這個訂單號的消息都會發送給一個隊列,這樣我們就保證了生產者生產消息在業務上是有序的

  接下來我們看看消費端是怎樣實現的。

  1、消費者與broker建立連接分配隊列的時候會嘗試給隊列加鎖,如果成功則獲取queue消費權利,否則嘗試下一個queue。

  2、如果消費模式為集群每20秒對分配給自己的隊列自動加鎖

  3、消息消費時對queue進行加鎖,同一時刻只允許一個線程對一個queue進行消費

  4、根據消費時間進行隊列和線程的切換預設60s(這個時間就是鎖住隊列的時間)

  5、消息重試次數超過最大次數之後將消息移入死信queue

  

  根據以上幾點可以保證一個隊列中的消息可以按順序進行消費。

3、總結

  RocketMQ4.0.0對順序消息做了升級,但是犧牲了部分消費性能。因為要給隊列加鎖,並且只能一個隊列同一時間只能有一個線程處理消息。

  至於為什麼是60秒(時間可設置)進行線程切換可能是更好的利用cpu或者不因為某個隊列消息異常拖慢其他隊列消息處理吧,還有待深入研究。

  

  本人理解如有誤請廣大網友指正,謝謝!


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 公司項目之前一個需求,需要用戶一進頁面觸摸手機後就自動幫他複製一個串碼。。wtf? 還有這種操作?好吧,需求出來了,那就想實現吧。。。 用戶進來觸摸手機 會產生 三個事件,我們肯定不能直接寫這三個事件去複製,這樣會影響它的預設事件,我們還要做的神不知鬼不覺。。。 所以,在函數內部我們就需要用到下麵代 ...
  • <script src="http://libs.baidu.com/jquery/1.10.2/jquery.min.js"></script> <script> $(document).ready(function(){ //獲得文本框對象 var t = $("#text_box"); //初 ...
  • 經常在項目中需要寫到切換當前欄目的展示效果,定義公共方法 ...
  •  對象拷貝的方法是一個難點,尤其是深拷貝。建議把代碼都運行下,幫助理解拷貝。 一. json方法 1. 適合情況 :  JSON對象的深度克隆。方法是先JSON.stringify() 轉為json字元串, 再JSON.parse() 轉為json數組 2. 缺點:   ...
  • 一、樣本 地址:http://js.zhuamimi.cn/choujiang/index.htm 源碼:https://pan.baidu.com/s/15KhesfcLf1WMOom6PhzCjA 二、實現方法 1:構建環形鏈表 構建環形鏈表主要是為了無限迴圈子節點 環形鏈表數據結構與演算法裡面有 ...
  • 本文由雲+社區發表 作者:paulzeng 導語: Lottie是Airbnb開源的一個面向 iOS、Android、React Native 的動畫庫,可實現非常複雜的動畫,使用也及其簡單,極大釋放人力,值得一試。 一、簡介 Lottie 是Airbnb開源的一個面向 iOS、Android、Re ...
  • 單例模式是軟體工程中最著名的模式之一。從本質上講,單例是一個只允許創建自身的單個實例的類,並且通常可以簡單地訪問該實例。最常見的是,單例不允許在創建實例時指定任何參數——否則對實例的第二個請求但具有不同的參數可能會有問題!(如果對於具有相同參數的所有請求都應訪問相同的實例,則工廠模式更合適。)本文... ...
  • 個人博客原文: "開閉原則" 設計模式六大原則之六:開閉原則。 簡介 姓名 :開閉原則 英文名 :Open Closed Principle 價值觀 :老頑童就是我,休想改變我 個人介紹 : Software entities (classes, modules, functions, etc.) ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...