一文總結 MetaQ/RocketMQ 原理

来源:https://www.cnblogs.com/88223100/archive/2023/08/22/One-article-summarizes-the-principles-of-MetaQ_RocketMQ.html
-Advertisement-
Play Games

本文介紹的 MetaQ/RocketMQ 是側重於維持消息一致性和高可靠性的消息隊列中間件,幫助大家對隊列設計的理解。 ...


本文介紹的 MetaQ/RocketMQ 是側重於維持消息一致性和高可靠性的消息隊列中間件,幫助大家對隊列設計的理解。

簡介—— 消息隊列中間件 MetaQ/RocketMQ

中間件 MetaQ 是一種基於隊列模型的消息中間件,MetaQ 據說最早是受 Kafka 的影響開發的,第一版的名字 "metamorphosis",是奧地利作家卡夫卡的名作——《變形記》。RocketMQ 是 MetaQ 的開源版本。
消息隊列中間件一般用於在分散式場景下解決集群單機瓶頸的問題。在傳統的分散式計算環境中,常常會出現由於某個單機節點的性能瓶頸,即使其他節點仍有餘力,仍然會導致整個系統的性能無法進一步提升的情況,這一現象通常是由於任務負載不均衡,網路延遲等常見且難以解決的問題。消息隊列本質上是提供了一種非常合理的任務分配策略,通過將任務分給消費者實現非同步和分散式處理,提高整個集群的性能。
消息隊列(mq)的核心思想是將耗時的任務非同步化,通過消息隊列緩存任務,從而實現消息發送方和接收方的解耦,使得任務的處理能夠非同步、並行,從而提高系統或集群的吞吐量和可擴展性。在這個過程中,整個系統強依賴於消息隊列,起到類似橋梁的作用。消息隊列有著經典的三大應用場景:解耦、非同步削峰填谷
解耦場景:消息隊列一般使用發佈/訂閱的模型,如果服務 B C D 依賴服務 A 的消息,此時新增服務 E 也需要依賴 A ,而 B 服務不再需要消息,需要頻繁且複雜的業務改造,效率低,穩定性差,此時引入消息隊列進行解耦,服務 A 只需要將產生的消息發佈到 mq 中,就不用管了,其它服務會自己根據需要訂閱 mq 中的消息,或者說去 mq 中消費,這就使得每個服務可以更多地關註自身業務,而不需要把精力用在維護服務之間的關係上,可擴展性提高。

圖片

非同步場景:如用戶的業務需要一系列的服務進行處理,按順序處理的話,用戶需要等待的時間過長。例如電商平臺的用戶下單、支付、積分、郵件、簡訊通知等流程,長時間等待用戶無法接受,就可以通過 mq 進行服務的非同步處理,例如積分、郵件和簡訊通知服務訂閱了支付服務的消息,將支付完成作為消息發佈到 mq ,這些服務就可以同時對這一訂單進行處理,降低了請求等待時間(rt) 。

圖片

削峰填穀場景:削峰表示的含義是,流量如果太大,就控制伺服器處理的 QPS,不要讓大流量打掛資料庫等導致伺服器宕機,讓服務處理請求更加平緩,節省伺服器資源,其本質上是控制用戶的請求速率,或是延緩或是直接拒絕。填谷的含義是將階段性的大流量請求緩存起來,在流量平緩的時候慢慢處理,防止過多的請求被拒絕後的重試導致更大的流量。mq 很適合這一場景,QPS 超出服務端接收請求的能力時,服務端仍然保持在安全範圍內地從消息隊列中獲取消息進行處理,多餘的消息會積壓在消息隊列中,或由於超時直接拒絕,到 QPS 低於這一閾值的時候,這些積壓的消息就會被逐漸消費掉。相當於在系統前修建了一個流量蓄水池。

除此之外還可以利用消息隊列進行消息通信,日誌處理等業務,但消息隊列也會引入系統可用性,系統複雜度,數據一致性等問題(強依賴消息隊列的正確執行,需要確保消息不會丟失,確保消息的順序性等)。這意味著如果系統中的消息隊列承擔著重要的角色,那麼消息隊列的可靠性和穩定性也至關重要,本文介紹的 MetaQ/RocketMQ 是側重於維持消息一致性和高可靠性的消息隊列中間件。

物理架構

MetaQ 的高可用性是基於其物理部署架構實現的,在生產者為消息定義了一個 topic 之後,消費者可以訂閱這個 topic ,於是消息就有了從生產到消費的路由指向。

圖片

NameServer 負責暴露消息的 topic ,因此可以以將 NameServer 理解成一個註冊中心,用來關聯 topic 和對應的 broker ,即消息的存儲位置。NameServer 的每個節點都維護著 topic 和 broker 的映射關係,每個節點彼此獨立,無同步。在每個NameServer節點內部都維護著所有 Broker 的地址列表,所有 Topic 和 Topic 對應 Queue 的信息等。消息生產者在發送消息之前先與任意一臺 NameServer 建立連接,獲取 Broker 伺服器的地址列表,然後根據負載均衡演算法從列表中選擇一臺消息伺服器發送消息。

Broker 主要負責消息的存儲和轉發,分為 master 和 slave,是一寫多讀的關係。broker 節點可以按照處理的數據相同劃分成副本組,同一組 master 和 slave 的關係可以通過指定相同 brokerName,不同的 brokerId 來定義,brokerId 為 0 標識 master,非 0 是 slave。每個 broker 伺服器會與 NameServer 集群建立長連接(註意是跟所有的 NameServer 伺服器,因為 NameServer 彼此之間獨立不同步),並且會註冊 topic 信息到 NameServer 中。複製策略是 Broker 的 Master 與 Slave 間的數據同步方式,分為同步複製與非同步複製。由於非同步複製、非同步刷盤可能會丟失少量消息,因此 Broker 預設採用的是同步雙寫的方式,消息寫入 master 成功後,master 會等待 slave 同步數據成功後才向 Producer 返回成功 ACK ,即 Master 與 Slave 都要寫入成功後才會返回成功 ACK 。這樣可以保證消息發送時消息不丟失。副本組中,各個節點處理的速度不同,也就有了日誌水位的概念 (高水位對消費者不可見)。在 master 宕機時,同步副本集中的其餘節點會自動選舉出新的 master 代替工作(Raft 協議)。

圖片

Producer,消息生產者,與 NameServer 隨機一個節點建立長連接,定時從 NameServer 獲取 topic 路由信息,與 master broker 建立長連接,定時發送心跳,Producer 只與 master 建立連接產生通信,不與 slave 建立連接。生產者和消費者都有組(Group)的概念,同一組節點的生產/消費邏輯相同。

Consumer,消息消費者,與 NameServer 隨機一個節點建立長連接,定時從 NameServer 獲取 topic 的路由信息,並獲取想要消費的 queue 。可以和提供服務的 master 或 slave 建立長連接,定時向 master 和 slave 發送心跳,既可以從 master 訂閱消息,也可以從 slave 訂閱消息。

圖片

消息的存儲

MetaQ 將消息存儲(持久化)到位於生產者和消費者之間的一個消息代理(Message Broker)上。

圖片

MetaQ 消息模型:

  • Message 單位消息;

  • Topic 消息的類型,生產者對應消費者的分區標識;

  • Tag 消息在相同 Topic 時的二級分類標識,可用於消息的篩選;

  • Queue 物理分區,一個 Topic 對應多個 Queue;

  • Group 生產者或消費者的邏輯分組,同一個 Group 的 生產者/消費者 通常 生產/消費 同一類消息,並且 生產/消費 的邏輯一致;
  • Offset:偏移值, 表示消費到的位置或待消費的消息位置;

圖片

消息的存儲方式對消息隊列的性能有很大影響,如 ActiveMQ 會使用隊列表來存儲消息,依靠輪訓、加鎖等方式檢查和處理消息,但對於 QPS 很高的系統來說,一下子積壓龐大的數據量在表中會導致 B+ 樹索引層級加深,影響查詢效率。KV 資料庫採用如 LSM 樹作為索引結構,對讀性能有較大的犧牲,這對於消息隊列而言很難接受,因為消息隊列常常需要面對消費失敗需要重試的情況。

RocketMQ/Kafka/RabbitMQ 等消息隊列會採用順序寫的日誌結構,將消息刷盤至文件系統作持久化。順序寫日誌文件可以避免頻繁的隨機訪問而導致的性能問題,而且利於延遲寫入等優化手段,能夠快速保存日誌。Kafka 會為每個 topic (事件的組織和存儲單位,一個 topic 可以對應多個生產者和多個消費者) 劃分出一個分區日誌,便於根據 topic 順序消費,消息被讀取後不會立刻刪除,可以持久存儲,但 topic 數量增加的時候,broker 的分區文件數量增大,會使得本來速度很快的順序寫變成隨機寫(不同文件之間移動),性能大幅下降。

圖片

MetaQ 2.0 對這部分進行重新設計,其存儲結構主要包括 CommitLog 和 Consume queue 兩部分。

CommitLog 是物理存儲,存儲不定長的完整消息記錄,邏輯上是完全連續的一個文件,物理上單個文件大小是 1 GB,文件名是當前文件首地址在 CommitLog 中的偏移量。只要 CommitLog 落盤,就可以認為已經接收到消息,即使 Cosume queue 丟失,也可以從 CommitLog 恢復。而所有 topic 的消息都會存儲在同一個 CommitLog 中來保證順序寫。這樣的結構會導致 CommitLog 讀取完全變成隨機讀,所以需要 Consume queue 作為索引隊列 (offset, size, tag),每個 topic-queue 的消息在寫完 CommitLog 之後,都會寫到獨立的 Consume queue ,隊列里的每個元素都是定長的元數據,內容包含該消息在對應 CommitLog 的 offset 和 size ,還包括 tagcode 可支持消息按照指定 tag 進行過濾。順序寫是 MetaQ 實現高性能的基礎。

圖片

基於這樣的存儲結構,MetaQ 對客戶端暴露的主要是 Consume queue 邏輯視圖,提供隊列訪問介面。消費者通過指定 Consume queue 的位點來讀取消息,通過提交 Consume queue 的位點來維護消費進度。Concume queue 每個條目長度固定(8個位元組CommitLog物理偏移量、4位元組消息長度、8位元組tag哈希碼),單個 ConsumeQueue 文件預設最多包括 30 萬個條目。這樣做的好處是隊列非常輕量級,Consume Queue 非常小,且在消費過程中都是順序讀取,其速度幾乎能與記憶體讀寫相比,而在 page cache 和良好的空間局部性作用下,CommitLog 的訪問也非常快速。

圖片

MetaQ 會啟動一個定時服務 ReputMessageService 定時調用(間隔 1ms)來生成 Consume queue 和 其它索引文件。

Consume queue 解決了順序消費的問題,但如果需要根據屬性進行篩選,就必須用到 index 索引

圖片

index 索引支持根據 key 值進行篩選,查找時,可以根據消息的 key 計算 hash 槽的位置,hash 槽中存儲著 Index 條目的位置,可以根據這個 index 條目獲得一個鏈表(尾),每個 index 條目包含在 CommitLog 上的消息主體的物理偏移量。

消息鏈路

MetaQ 的消息可以根據 topic-queue 劃分出確定的從生產者到消費者路由指向。

圖片

1.producer 指定 broker 和 queue 發送消息 msg ;
2.broker 接收消息,並完成緩存、刷盤和生成摘要(同時根據 tag 和 user properties 對 msg 進行打標)等操作;
3.consumer 每隔一段時間( pullInterval )從 broker 端的(根據服務端消息過濾模式 tag 或 sql 過濾後)獲取一定量的消息到本地消息隊列中(單線程)
4.consumer 按照配置併發分配上述隊列消息並執行消費方法;

5.consumer 返回 broker 消費結果並重置消費位點;

 

生產者

Topic 是消息的主題,每個 topic 對應多個隊列,多個隊列會均勻的分佈在多個 broker 上,Producer 發送的消息在 broker 上會均衡的分佈在多個隊列中,Producer 發送消息時在多個隊列間輪詢確保消息的均衡。

圖片

發送消息的具體操作如下:

1、查詢本地緩存是否存儲了 TopicPublishInfo ,否則從 NameServer 獲取
2、根據負載均衡選擇策略獲取待發送隊列並輪訓訪問
3、獲取消息隊列對應的 broker 實際 IP
4、設置消息 Unique ID ,zip 壓縮消息

5、消息校驗(長度等),發送消息

Producer 發送的每條消息都包含一個 Topic,表示一類消息的集合。同時還有一個 Tag,用於區分同一Topic 下不同類型的消息。一個 Topic 包括多個 Queue,每個 Queue 中存放該 Topic 對應消息的位置。一個 Topic 的 Queue 相當於該 Topic 中消息的分區,Queue 可以存儲在不同的 Broker 上。發送消息時,Producer 通過負載均衡模塊選擇相應的 Broker 集群隊列進行消息投遞。

消息發送時如果出現失敗,預設會重試 2 次,在重試時會儘量避開剛剛接收失敗的 Broker,而是選擇其它 Broker 上的隊列進行發送,從而提高消息發送的成功率。

 

消費者

消費方式

  • 廣播消費:Producer 向一些隊列輪流發送消息,隊列集合稱為 Topic,每一個 Consumer 實例消費這個 Topic 對應的所有隊列。
  • 集群消費:多個 Consumer 實例平均消費這個 Topic 對應的隊列集合。

MetaQ 消費者端有多套負載均衡演算法的實現,比較常見的是平均分配和平均迴圈分配,預設使用平均分配演算法,給每個 Consumer 分配均等的隊列。一個 Consumer 可以對應多個隊列,而一個隊列只能給一個 Consumer 進行消費,Consumer 和隊列之間是一對多的關係。

集群模式下有一點需要註意:消費隊列負載機制遵循一個通用的思想,一個消息隊列同時只允許被一個消費者消費,一個消費者可以消費多個消費隊列。因此當 Consumer 的數量大於隊列的數量,會有部分 Consumer 分配不到隊列,這些分配不到隊列的 Consumer 機器不會有消息到達。

平均分配演算法舉例:

  • 如果有 5 個隊列,2 個 consumer,consumer1 會分配 3 個隊列,consumer2 分配 2 個隊列;
  • 如果有 6 個隊列,2 個 consumer,consumer1 會分配 3 個隊列,consumer2 也會分配 3 個隊列;
  • 如果 10 個隊列,11 個 consumer,consumer1~consumer10 各分配一個隊列,consumer11 無隊列分配;

如果消費集群規模較大:例如 topic 隊列資源是 128 個,而消費機器數有 160 台,按照一個隊列只會被一個消費集群中一臺機器處理的原則,會有 32 台機器不會收到消息,此種情況需要聯繫 MetaQ 人員進行擴容評估。

消費重試:當出現消費失敗的消息時,Broker 會為每個消費者組設置一個重試隊列。當一條消息初次消費失敗,消息隊列會自動進行消費重試。達到最大重試次數後,若消費仍然失敗,此時會將該消息發送到死信隊列。對於死信消息,通常需要開發人員進行手動處理。

圖片

在消費時間過程中可能會遇到消息消費隊列增加和減少、消息消費者增加或減少,此時需要對消息消費隊列進行重新平衡,既重新分配 (rebalance),這就是所謂的重平衡機制。在 RocketMQ 中,每隔 20s 會根據當前隊列數量、消費者數量重新進行隊列負載計算,如果計算出來的結果與當前不一樣,則觸發消息消費隊列的重分配。

Consumer 啟動時會啟動定時器,還執行一些定時同步任務,包括:同步 nameServer 地址,從 nameServer 同步 topic 的路由信息,清理 offline 的 broker,並向所有 broker 發送心跳,分配給當前 consumer 的每個隊列將最新消費的 offset 同步給 broker。

 

消息消費過程淺析

三個關鍵服務: RebalanceService、PullMessageService、MessageConsumeService    

RebalanceService 負載均衡服務

定時執行一次負載均衡(20 s)分配消息隊列給消費者。負載均衡針對每個 topic 獨立進行,具體如下:


private void rebalanceByTopic(final String topic, final boolean isOrder) {
        switch (messageModel) {
            case BROADCASTING: {
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                if (mqSet != null) {
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);//廣播模式下每個消費者要消費所有 queue 的消息
                    if (changed) {
                        this.messageQueueChanged(topic, mqSet, mqSet);
                        log.info("messageQueueChanged {} {} {} {}",
                            consumerGroup,
                            topic,
                            mqSet,
                            mqSet);
                    }
                } else {
                    log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                }
                break;
            }
            case CLUSTERING: {
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);//找到該topic下的消息隊列集合
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);//找到給消費者組下的所有消費者id
                if (null == mqSet) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                    }
                }
                if (null == cidAll) {
                    log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
                }
                
                if (mqSet != null && cidAll != null) {
                    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                    mqAll.addAll(mqSet);

                    Collections.sort(mqAll);
                    Collections.sort(cidAll);
                    
                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
                    
                    List<MessageQueue> allocateResult = null;
                    try {
                        allocateResult = strategy.allocate(
                            this.consumerGroup,
                            this.mQClientFactory.getClientId(),
                            mqAll,
                            cidAll);// 根據分配策略進行分配
                    } catch (Throwable e) {
                        log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                            e);
                        return;
                    }
                    
                    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                    if (allocateResult != null) {
                        allocateResultSet.addAll(allocateResult);
                    }
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);// 更新處理隊列表
                    
                    if (changed) {
                        log.info(
                            "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
                            strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
                            allocateResultSet.size(), allocateResultSet);
                        this.messageQueueChanged(topic, mqSet, allocateResultSet);
                    }
                }
                break;
            }
            default:
                break;
        }
    }

這裡主要做了幾件事:

  • 判斷消費模式
  • 廣播模式

i.找到 topic 下的消息隊列(queue)集合

ii.更新處理隊列表
  • 集群模式

i.找到 topic 下的消息隊列集合

ii.找到消費者組下所有消費者 id

iii.根據分配策略進行分配

iv.更新處理隊列表,開始真正拉取消息請求

消費者會將消費位點更新到 NameServer 上,Rebalance 發生時,讀取消費者的消費位點信息,需要註意在消費者數量大於隊列數量的情況下,如果消費者不及時更新消費位點信息,可能會導致消息被重覆消費。因此,消費者需要及時更新消費位點信息,確保消費進度正確。

Consumer 創建的時候 Rebalance 會被執行。整個 rebalanceService 的作用就是不斷的通過負載均衡,重新分配隊列的過程。根據分配好的隊列構建拉取消息的請求,然後放到 pullRequestQueue 中。

PullMessageService 拉取消息服務

首先拉取消息時最重要的是確定偏移量 offset,這存儲在消費者端的 OffsetStore 對象中。


if (this.defaultMQPushConsumer.getOffsetStore() != null) {
          this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
        } else {
          switch (this.defaultMQPushConsumer.getMessageModel()) {
            case BROADCASTING:
              this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
              break;
            case CLUSTERING:
              this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
              break;
            default:
              break;
          }
}
this.offsetStore.load();
可以看到廣播模式和集群模式的對象類型不同,這是因為對 offset 的維護的方式不一樣,在 load 的時候 LocalFileOffsetStore 會從本地文件載入這個 offset,而 RemoteBrokerOffsetStore 的 load 函數是空的。

兩種對象類型分別有 readOffset 函數支持從記憶體中獲取 offset 值,以及分別從本地文件存儲和 broker 獲取 offset。需要註意集群模式下消費者只需要關心 broker 上維護的消費進度,因為不論 queue 和 消費者的映射關係如何切換, 只有 offset 之後的未消費消息是消費者需要關心的。

圖片

消息的拉取過程是一個不斷迴圈的生產者消費者模型,一個 PullRequest 就對應一個拉取任務,並和一對MessageQueue(保存 Consume queue 的信息)和 ProcessQueue 關聯,消息拉取的過程中,PullMessageService 拉取線程不停的讀取 PullRequestQueue 根據 PullRequest 拉取消息。拉取到消息時,消息提交到 ProcessQueue 中並新建 ConsumeRequest 提交到 ConsumeService 處理, 然後生成下一批的 PullRequest 丟到 PullRequestQueue。如果沒有拉取到消息或出現異常,則會重新將請求放回拉取隊列。ProcessQueue 中以 TreeMap 形式保存待處理的消息, key 為消息對應的 offset ,並自動進行排序。

圖片

消息拉取過程:

1.PullMessageService 不斷迴圈遍歷,從 PullRequestQueue 中提取 PullRequest,根據 nextOffset 去 broker 拉取消息,若該隊列 已經 dropped 則更新 offset 到 broker 並丟棄此拉消息請求。
2.PullMessageService 非同步拉取消息,同時將 PullRequest 封裝在 PullCallback 中,PullCallback 封裝在 ResponseFuture中,並以自增的請求 id 為鍵,ResponseFuture 為值放入 ResponseTable 中。
3.Broker 收到請求,如果 offset 之後有新的消息會立即發送非同步響應;否則等待直到 producer 有新的消息發送後返回或者超時。如果通信異常或者 Broker 超時未返迴響應,nettyClient 會定時清理超時的請求,釋放 PullRequest 回到 PullRequestQueue。

4.用最新的 offset 更新 ResponseFuture 里的 PullRequest 並推送給 PullRequestQueue 里以進行下一次拉取。批量拉取到的消息分批提交給 consumeExecutor 線程處理。

 

消費控速

MetaQ 為消費者端拉取消息提供了消費控速的能力:

  • 主動控速,在整個消費過程中我們可以發現,如果想要做到流控,一個是控制生成 PullRequest 的時間間隔,一個是控制生成新一批的請求數量,因此 MetaQ 提供了兩個參數給我們 pullInterval、pullBatchSize ,主動控速的邏輯是通過控制消息的拉取速度來達到降低速率的效果。
  • 被動控速,這種流量控制的方式要複雜得多,需要用戶在消費消息時控制流量 (sentinel),由於消費線程池的待消費隊列的消息達到一定閾值之後,MetaQ 會被動降低 PullRequest 的產生的速率,因此當採用流量控制手段通過埋點降低消費速度時,待消費隊列會逐漸占滿,觸發降速機制;為什麼不直接用 sentinel ?因為 sentinel 快速失敗等策略觸發限流後會產生大量重試,重試消息會進入重試隊列,當重試的量逐漸增大,broker 上重試隊列中消息量也越來越多,並且重試消息再次投遞時還可能再次發生重試,又重新進入重試隊列,同一條消息反覆進出隊列,這種無意義的重覆動作會增加 broker 的壓力。

 

消息種類

普通消息

可選擇同步、非同步或單向發送。同步:Producer 發出一條消息後,會在收到 MQ 返回的 ACK 之後再發送下一條消息。非同步:Producer 發出消息後無需等待 MQ 返回 ACK ,直接發送下一條消息。單向: Producer 僅負責發送消息,不等待,MQ 也不返回 ACK。

 

順序消息

消息的順序性分為兩種:

  • 全局順序:對於指定的一個 Topic ,所有消息按照嚴格的先入先出的順序進行發佈和消費 (同一個 queue)。
  • 分區順序:對於一個指定的 Topic ,所有消息根據 sharding key 進行分區,同一個分區內的消息按照嚴格的 FIFO 順序進行發佈和消費,分區之間彼此獨立。

MetaQ 只支持同一個 queue 的順序消息,且同一個 queue 只能被一臺機器的一個線程消費,如果想要支持全局消息,那需要將該 topic 的 queue 的數量設置為 1,犧牲了可用性。

 

消息事務

圖片

1.發送方向 MQ 服務端發送消息。
2.MQ Server 將消息持久化成功之後,向發送方 ACK 確認消息已經發送成功,此時消息為半消息。
3.發送方開始執行本地事務邏輯。
4.發送方根據本地事務執行結果向 MQ Server 提交二次確認(Commit 或是 Rollback),MQ Server 收到 Commit 狀態則將半消息標記為可投遞,訂閱方最終將收到該消息;MQ Server 收到 Rollback 狀態則刪除半消息,訂閱方將不會接受該消息。
5.在斷網或者是應用重啟的特殊情況下,上述步驟4提交的二次確認最終未到達 MQ Server,經過固定時間後 MQ Server 將對該消息發起消息回查。
6.發送方收到消息回查後,需要檢查對應消息的本地事務執行的最終結果。

7.發送方根據檢查得到的本地事務的最終狀態再次提交二次確認,MQ Server 仍按照步驟 4 對半消息進行操作。

MetaQ 3.0 以後,新的版本提供更加豐富的功能,支持消息屬性、無序消息、延遲消息、廣播消息、長輪詢消費、高可用特性,這些功能基本上覆蓋了大部分應用對消息中間件的需求。除了功能豐富之外,MetaQ 基於順序寫,大概率順序讀的隊列存儲結構和 pull 模式的消費方式,使得 MetaQ 具備了最快的消息寫入速度和百億級的堆積能力,特別適合用來削峰填谷。在 MetaQ 3.0 版本的基礎上,衍生了開源版本 RocketMQ。

 

高可用

如何做到不重覆消費也不丟失消息?

 

重覆消費問題

  • 發送時消息重覆【消息 Message ID 不同】:MQ Producer 發送消息時,消息已成功發送到服務端並完成持久化,此時網路閃斷或者客戶端宕機導致服務端應答給客戶端失敗。如果此時 MQ Producer 意識到消息發送失敗並嘗試再次發送消息,MQ 消費者後續會收到兩條內容相同但是 Message ID 不同的消息。
  • 投遞時消息重覆【消息 Message ID 相同】:MQ Consumer 消費消息場景下,消息已投遞到消費者並完成業務處理,當客戶端給服務端反饋應答的時候網路閃斷。為了保證消息至少被消費一次,MQ 服務端將在網路恢復後再次嘗試投遞之前已被處理過的消息,MQ 消費者後續會收到兩條內容相同並且 Message ID 也相同的消息。

MetaQ 不能保證消息不重覆,因此對於重覆消費情況,需要業務自定義唯一標識作為冪等處理的依據。

 

消息丟失問題

MetaQ 避免消息丟失的機制主要包括:重試、冗餘消息存儲。在生產者的消息投遞失敗時,預設會重試兩次。消費者消費失敗時,在廣播模式下,消費失敗僅會返回 ConsumeConcurrentlyStatus.RECONSUME_LATER ,而不會重試。在未指定順序消息的集群模式下,消費失敗的消息會進入重試隊列自動重試,預設最大重試次數為 16 。在順序消費的集群模式下,消費失敗會使得當前隊列暫停消費,並重試到成功為止。

主從同步

RocketMQ/MetaQ 為每個存儲數據的 Broker 節點配置 ClusterName,BrokerName 標識來更好的進行資源管理。多個 BrokerName 相同的節點構成一個副本組。每個副本還擁有一個從 0 開始編號,不重覆也不一定連續的 BrokerId 用來表示身份,編號為 0 的節點是這個副本組的 Leader / Primary / Master,故障時通過選舉來重新對 Broker 編號標識新的身份。例如 BrokerId = {0, 1, 3},則 0 為主,其他兩個為備。

從模型的角度來看,RocketMQ /MetaQ 單節點上 Topic 數量較多,如果像 kafka 以 topic 粒度維護狀態機,節點宕機會導致上萬個狀態機切換,這種驚群效應會帶來很多潛在風險,因此新版本的 RocketMQ/MetaQ 選擇以單個 Broker 作為切換的最小粒度來管理,相比於其他更細粒度的實現,副本身份切換時只需要重分配 Broker 編號,對元數據節點壓力最小。由於通信的數據量少,可以加快主備切換的速度,單個副本下線的影響被限制在副本組內,減少管理和運維成本。這種實現也存在一些缺點,例如存儲節點的負載無法以最佳狀態在集群上進行負載均衡。

圖片

RocketMQ/MetaQ 採用物理複製的方法,存儲層的 CommitLog 通過鏈表和內核的 MappedFile 機制抽象出一條 append only 的數據流。主副本將未提交的消息按序傳輸給其他副本(相當於 redo log),並根據一定規則計算確認位點(confirm offset)判斷日誌流是否被提交。最終一致性通過數據水位對齊的方式來實現(越近期的消息價值越高):

圖片

  • 1-1 情況下滿足備 Max <= 主 Min,一般是備新上線或下線較久,備跳過存量日誌,從主的 Min 開始複製。
  • 1-2,2-2 兩種情況下滿足 主 Min < 備 Max <= 主 Max,一般是由於備網路閃斷導致日誌水位落後,通過 HA 連接追隨主即可。
  • 1-3,2-3 兩種情況下備 Max > 主 Max,可能由於主非同步寫磁碟宕機後又成為主,或者網路分區時雙主寫入造成 CommitLog 分叉。由於新主落後於備,在確認位點對齊後少量未確認的消息丟失,這種非正常模式的選舉是應該儘量避免的。
  • 3-3 理論上不會出現,備的數據長於主,原因可能是主節點數據丟失又疊加了非正常選舉,因此這種情況需要人工介入處理。

副本組的消息複製也支持同步和非同步的模式。

複製方式

優點

缺點

同步複製

成功寫入的消息不會丟失,可靠性高

寫入延遲更高

非同步複製

slave 宕機不影響 master 性能更高

可能丟失消息


slave broker 會定時(60 s)從 master 同步信息

 public void syncAll() {
        this.syncTopicConfig();
        this.syncConsumerOffset();
        this.syncDelayOffset();
        this.syncSubscriptionGroupConfig();
        this.syncMessageRequestMode();
        if (brokerController.getMessageStoreConfig().isTimerWheelEnable()) {
            this.syncTimerMetrics();
        }
    }
主從切換

RocketMQ 衍生出了很多不同的主從切換架構。

無切換架構

最早的時候,RocketMQ 基於 Master-Slave 模式提供了主備部署的架構,這種模式提供了一定的高可用能力,在 Master 節點負載較高情況下,讀流量可以被重定向到備機。由於沒有選主機制,在 Master 節點不可用時,這個副本組的消息發送將會完全中斷,還會出現延遲消息、事務消息等無法消費或者延遲。此外,備機在正常工作場景下資源使用率較低,造成一定的資源浪費。為瞭解決這些問題,社區提出了在一個 Broker 進程內運行多個 BrokerContainer,這個設計類似於 Flink 的 slot,讓一個 Broker 進程上可以以 Container 的形式運行多個節點,復用傳輸層的連接,業務線程池等資源,通過單節點主備交叉部署來同時承擔多份流量,無外部依賴,自愈能力強。這種方式下隔離性弱於使用原生容器方式進行隔離,同時由於架構的複雜度增加導致了自愈流程較為複雜。

切換架構

另一條演進路線則是基於可切換的,RocketMQ 也嘗試過依托於 Zookeeper 的分散式鎖和通知機制進行 HA 狀態的管理。引入外部依賴的同時給架構帶來了複雜性,不容易做小型化部署,部署運維和診斷的成本較高。另一種方式就是基於 Raft 在集群內自動選主,Raft 中的副本身份被透出和復用到 Broker Role 層面去除外部依賴,然而強一致的 Raft 版本並未支持靈活的降級策略,無法在 C(Consistency)和 A (Availability)之間靈活調整。兩種切換方案都是 CP 設計,犧牲高可用優先保證一致性。主副本下線時選主和路由定時更新策略導致整個故障轉移時間依然較長,Raft 本身對三副本的要求也會面臨較大的成本壓力。

RocketMQ DLedger 融合模式

RocketMQ DLedger (基於 Raft 的分散式日誌存儲)融合模式是 RocketMQ 5.0 演進中結合上述兩條路線後的一個系統的解決方案。

 

模式

優點

缺點

無切換

Master-Slave 模式

實現簡單,適用於中小型用戶,人工管控力強

故障需要人工處理,故障時寫入消息失敗,導致消息消費暫停

 

Broker Container 模式

無需選主,無外部依賴,故障轉移非常快 (< 3 秒)

增加單節點運維的複雜度,機器故障的風險增加,自愈流程複雜

切換架構

Raft 自動選主模式

自動主備切換

故障轉移時間較長,強一致無法靈活降級,三副本成本壓力較大

融合架構

基於 Dledger Controller 的可切換模式

可支持無切換和切換架構之間的轉換,複製協議更簡單,靈活降級

提高了部署和系統的複雜度

 

總結

相比較於 RocketMQ/MetaQ,Kafka 具有更高的吞吐量。Kafka 預設採用非同步發送的機制,並且還擁有消息收集和批量發送的機制,這樣的設置可以顯著提高其吞吐量。由於 Kafka 的高吞吐量,因此通常被用於日誌採集、大數據等領域。

RocketMQ/MetaQ 不採用非同步的方式發送消息。因為當採用非同步的方式發送消息時,Producer 發送的消息到達 Broker 就會返回成功。此時如果 Producer 宕機,而消息在 Broker 刷盤失敗時,就會導致消息丟失,從而降低系統的可靠性。

RocketMQ/MetaQ 單機可以支持更多的 topic 數量。因為 Kafka 在 Broker 端是將一個分區存儲在一個文件中的,當 topic 增加時,分區的數量也會增加,就會產生過多的文件。當消息刷盤時,就會出現性能下降的情況。而 RocketMQ/MetaQ 是將所有消息順序寫入文件的,因此不會出現這種情況。

當 Kafka 單機的 topic 數量從幾十到幾百個時,就會出現吞吐量大幅度下降、load 增高、響應時間變長等現象。而 RocketMQ/MetaQ 的 topic 數量達到幾千,甚至上萬時,也只是會出現小幅度的性能下降。

綜上所述,Kafka 具有更高的吞吐量,適合應用於日誌採集、大數據等領域。而 RocketMQ/MetaQ 單機支持更多的 topic,且具有更高的可靠性(一致性支持),因此適用於淘寶這樣複雜的業務處理。

 

作者| 陽禮

本文來自博客園,作者:古道輕風,轉載請註明原文鏈接:https://www.cnblogs.com/88223100/p/One-article-summarizes-the-principles-of-MetaQ_RocketMQ.html


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • ##### 10 CSS邊框屬性 1. border-style(邊框風格) 定義邊框的風格,值可以有: ``` /* none:沒有邊框,當border的值為none的時候,系統將會忽略[border-color] hidden:隱藏邊框,低版本瀏覽器不支持。 dotted:點狀邊框。 dashe ...
  • ##### 15 JavaScript ES6中的箭頭函數 1. 什麼是箭頭函數 ES6中允許使用=>來定義函數。箭頭函數相當於匿名函數,並簡化了函數定義。 2. 基本語法 ```js // 箭頭函數 let fn = (name) => { // 函數體 return `Hello ${name} ...
  • 13 JavaScript關於prototype(超重點) prototype是js裡面給類增加功能擴展的一種模式. 寫個面向對象來看看. ```js function People(name, age){ this.name = name; this.age = age; this.run = f ...
  • 本文,我們將一起學習,使用純 CSS,實現如下所示的動畫效果: ![](https://img2023.cnblogs.com/blog/608782/202308/608782-20230822102547750-742841232.gif) 上面的動畫效果,非常有意思,核心有兩點: 1. 小球隨 ...
  • ##### 12 eval函數 eval本身在js裡面正常情況下使用的並不多. 但是很多網站會利用eval的特性來完成反爬操作. 我們來看看eval是個什麼鬼? 從功能上講, eval非常簡單. 它和python裡面的eval是一樣的. 它可以動態的把字元串當成js代碼進行運行. ```js s = ...
  • 在 Infinispan 配置文件中切換髮現協議從廣播到組播,需要修改 JGroups 的配置,因為 Infinispan 使用 JGroups 來處理集群通信和發現。下麵是一個示例,展示如何將配置從廣播切換到組播。 首先,確保您已經有一個 Infinispan 配置文件,比如 `infinispa ...
  • Lora晶元的PCB板受力接收信號有問題可能有電路板設計問題、電路板受潮或受損、外部干擾、設備相容性問題等原因及其解決辦法... ...
  • 小程式平臺是怎麼保證商家業務的穩定、健康發展,服務好這些外部商家的呢?這裡面非常重要的是我們平臺對小程式基本流量的運營與監控。如何不讓業務的小程式線上上裸奔?如何幫助業務對自身小程式流量的沖高回落有一種直觀的把握和監測?如何基於海量數據指導業務去進行一個精細化的運營?實際上,京東小程式數據中心就扮演... ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...