從消息中間件看分散式系統的多種套路

来源:https://www.cnblogs.com/XiaoHDeBlog/archive/2020/06/06/12920754.html
-Advertisement-
Play Games

消息中間件作為分散式系統的重要成員,各大公司及開源均有許多解決方案。目前主流的開源解決方案包括RabbitMQ、RocketMQ、Kafka、ActiveMQ等。消息這個東西說簡單也簡單,說難也難。簡單之處在於好用方便,接入簡單使用簡單,非同步操作能夠解耦系統間的依賴,同時失敗後也能夠追溯重試。難的地 ...


     

 

 

 

  

  消息中間件作為分散式系統的重要成員,各大公司及開源均有許多解決方案。目前主流的開源解決方案包括RabbitMQ、RocketMQ、Kafka、ActiveMQ等。消息這個東西說簡單也簡單,說難也難。簡單之處在於好用方便,接入簡單使用簡單,非同步操作能夠解耦系統間的依賴,同時失敗後也能夠追溯重試。難的地方在於,設計一套可以支撐業務的消息機制,並提供高可用架構,解決消息存儲、消息重試、消息隊列的負載均衡等一系列問題。然而難也不代表沒有方法或者“套路”,熟悉一下原理與實現,多看幾個框架的源碼後多總結勢必能找出一些共性。

  消息框架大同小異,熟練掌握其原理、工作機制是必要的。就拿用的比較多的RocketMQ為引,來說說消息引擎的設計與實現。阿裡的消息引擎經過了從Notify到Napoli、再到MetaQ三代的發展,現在已經非常成熟,在不同部門的代碼中現在沒準都還可以從代碼里看到這一系列演進過程。當前的Apache RocketMQ 就是阿裡將MetaQ項目捐贈給了Apache基金會,而內部還是沿用MetaQ的名稱。

      首先詮釋幾個消息相關的基本概念。

  • 每個消息隊列都必須建立一個Topic。
  • 消息可以分組,每個消息隊列都至少需要一個生產者Producer和一個消費者Consumer。生產者生產發送消息,消費者接收消費消息。
  • 每個消費者和生產者都會分批提個ID。

 

RocketMQ 系統架構

 

    

 

  接下來再來看看RocketMQ的架構,如圖所示,簡要描述一下幾種角色及作用。 

  • NameServer
    • NameServer是消息Topic的註冊中心,用於發現和管理消息生產者、消費者、及路由關係。
  • Broker
    • 消息存儲與轉發的中轉站,使用隊列機制管理數據存儲。Broker中會存儲多份消息數據進行容錯,以Master/Slave的架構保證系統的高可用,Broker中可以部署單個或多個Master。單個Master的場景,Master掛掉後,Producer新產生的消息無法被消費,但已經發送到Broker的消息,由於Slave節點的存在,還能繼續被Consumer所消費;如果部署多個Master則系統能能正常運轉。
    • 另外,Broker中的Master和Slave不是像Zookeeper集群中用選舉機制進行確定,而是固定的配置,這也是在高可用場景需要部署多個Master的原因。
    • 生產者將消息發送到Broker中後,Broker會將消息寫到本地的CommitLog文件中,保存消息。
  • Producer
    • 生產者會和NameServer集群中某一節點建立長鏈接,定時從NamerServeri獲取Topic路由信息,並且和Broker建立心跳。
  • Consumer
    • 消費者需要給生產者一個明確的消費成功的回應,MetaQ才會認為消費成功,否則失敗。失敗後,RocketMQ會將消息重新發回Broker,在指定的延遲時間內進行重試,當重試達到一定的次數後(預設16次),MetaQ則認為此消息不能被消費,消息會被投遞到死信隊列。

 

  這個架構看其實是否很熟悉?好像接觸過的一些分散式系統的架構和這個長的都比較像是吧,甚至只要裡面框圖的角色稍微換換就能變成另外一個框架的介紹,比如Dubbo/Redis...。

並且在RocketMQ架構設計中,要解決的問題與其他分散式框架也可以觸類旁通。Master/Slave機制,天然的讀寫分離方式都是分散式高可用系統的典型解決方案。

負載均衡

  負載均衡是消息框架需要解決的又一個重要問題。當系統中生產者生產了大量消息,而消費者有多個或多台機器時,就需要平衡負載,讓消息均分地被消費者進行消費。目前RocketMQ中使用了多種負載均衡演算法。主要有以下幾種,靜態配置由於過於簡單,直接為消費者配置需要消費的隊列,因此直接忽略。

  1. 求平均數法
  2. 環形隊列法
  3. 一致Hash演算法
  4. Machine Room演算法
  5. 靜態配置


  來看一下源碼,RocketMQ內部對以上負載均衡演算法均有實現,並定義了一個介面 AllocateMessageQueueStrategy,採用策略模式,每種負載均衡演算法都依靠實現這個介面實現,在運行中,會獲取這個介面的實例,從而動態判斷到底採用的是哪種負載均衡演算法。

 1 public interface AllocateMessageQueueStrategy {
 2 
 3     /**
 4      * Allocating by consumer id
 5      *
 6      * @param consumerGroup current consumer group
 7      * @param currentCID current consumer id
 8      * @param mqAll message queue set in current topic
 9      * @param cidAll consumer set in current consumer group
10      * @return The allocate result of given strategy
11      */
12     List<MessageQueue> allocate(
13         final String consumerGroup,
14         final String currentCID,
15         final List<MessageQueue> mqAll,
16         final List<String> cidAll
17     );
18 
19     /**
20      * Algorithm name
21      *
22      * @return The strategy name
23      */
24     String getName();
25 }

 

 

1. 求平均數法

  顧名思義,就是根據消息隊列的數量和消費者的數量,求出單個消費者上應該負擔的平均消費隊列數,然後根據消費者的ID,按照取模的方式將消息隊列分配到指定的consumer上。具體代碼可以去Github上找,截取核心演算法代碼如下, mqAll就是消息隊列的結構,是一個MessageQueue的List,cidAll是消費者ID的列表,也是一個List。考慮mqAll和cidAll固定時以及變化時,當前消費者節點會從隊列中獲取到哪個隊列中的消息,比如當 averageSize 大於1時,這時每個消費者上的消息隊列就不止一個,而分配在每個消費者的上的隊列的ID是連續的。

 

 1     int index = cidAll.indexOf(currentCID);
 2         int mod = mqAll.size() % cidAll.size();
 3         int averageSize =
 4             mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
 5                 + 1 : mqAll.size() / cidAll.size());
 6         int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
 7         int range = Math.min(averageSize, mqAll.size() - startIndex);
 8         for (int i = 0; i < range; i++) {
 9             result.add(mqAll.get((startIndex + i) % mqAll.size()));
10         }
11         return result;

 

2. 環形平均法

  這種演算法更為簡單。首先獲取當前消費者在整個列表中的下標index,直接用求餘方法得到當前消費者應該處理的消息隊列。註意mqAll的size和cidAll的size可以是任意的。

  • 當ciAll.size() == mqAll.size() 時,該演算法就是類似hashtable的求餘分桶。
  • 當ciAll.size() > mqAll.size() 時,那麼多出的消費者上並不能獲取到消費的隊列,只有部分消費者能夠獲取到消息隊列並執行,相當於在消費者資源充足的情況下,由於隊列數少,所以使用其中一部分消費者就能滿足需求,不用額外的開銷。
  • 當ciAll.size() < mqAll.size() 時,這樣每個消費者上需要負載的隊列數就超過了1個,並且區別於直接求平均的方式,分配在每個消費者上的消費隊列不是連續的,而是有一定步長的間隔。
1         int index = cidAll.indexOf(currentCID);
2         for (int i = index; i < mqAll.size(); i++) {
3             if (i % cidAll.size() == index) {
4                 result.add(mqAll.get(i));
5             }
6         }
7         return result;

 

3. 一致Hash演算法

  迴圈所有需要消費的隊列,根據隊列toString後的hash值計算出處理當前隊列的最近節點並分配給該節點。routeNode 中方法稍微複雜一些,有時間建議細看,這裡就只說功能。

 1      Collection<ClientNode> cidNodes = new ArrayList<ClientNode>();
 2         for (String cid : cidAll) {
 3             cidNodes.add(new ClientNode(cid));
 4         }
 5 
 6         final ConsistentHashRouter<ClientNode> router; //for building hash ring
 7         if (customHashFunction != null) {
 8             router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction);
 9         } else {
10             router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt);
11         }
12 
13         List<MessageQueue> results = new ArrayList<MessageQueue>();
14         for (MessageQueue mq : mqAll) {
15             ClientNode clientNode = router.routeNode(mq.toString());
16             if (clientNode != null && currentCID.equals(clientNode.getKey())) {
17                 results.add(mq);
18             }
19         }
20 
21         return results;

 

 

4. Machine Room演算法

  基於機房的Hash演算法。這個命名看起來很詐唬,其實和上面的普通求餘演算法是一樣的,只不過多了個配置和過濾,為了把這個說清楚就把源碼貼全一點。可以看到在這個演算法的實現類中多了一個成員 consumeridcs,這個就是consumer id的一個集合,按照一定的約定,預先給broker命名,例如us@metaq4,然後給不同集群配置不同的consumeridcs,從而實現不同機房處理不同消息隊列的能力。

 1 /*
 2  * Licensed to the Apache Software Foundation (ASF) under one or more
 3  * contributor license agreements.  See the NOTICE file distributed with
 4  * this work for additional information regarding copyright ownership.
 5  * The ASF licenses this file to You under the Apache License, Version 2.0
 6  * (the "License"); you may not use this file except in compliance with
 7  * the License.  You may obtain a copy of the License at
 8  *
 9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.rebalance;
18 
19 import java.util.ArrayList;
20 import java.util.List;
21 import java.util.Set;
22 import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.AllocateMessageQueueStrategy;
23 import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageQueue;
24 
25 /**
26  * Computer room Hashing queue algorithm, such as Alipay logic room
27  */
28 public class AllocateMessageQueueByMachineRoom implements AllocateMessageQueueStrategy {
29     private Set<String> consumeridcs;
30 
31     @Override
32     public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
33         List<String> cidAll) {
34         List<MessageQueue> result = new ArrayList<MessageQueue>();
35         int currentIndex = cidAll.indexOf(currentCID);
36         if (currentIndex < 0) {
37             return result;
38         }
39         List<MessageQueue> premqAll = new ArrayList<MessageQueue>();
40         for (MessageQueue mq : mqAll) {
41             String[] temp = mq.getBrokerName().split("@");
42             if (temp.length == 2 && consumeridcs.contains(temp[0])) {
43                 premqAll.add(mq);
44             }
45         }
46 
47         int mod = premqAll.size() / cidAll.size();
48         int rem = premqAll.size() % cidAll.size();
49         int startIndex = mod * currentIndex;
50         int endIndex = startIndex + mod;
51         for (int i = startIndex; i < endIndex; i++) {
52             result.add(mqAll.get(i));
53         }
54         if (rem > currentIndex) {
55             result.add(premqAll.get(currentIndex + mod * cidAll.size()));
56         }
57         return result;
58     }
59 
60     @Override
61     public String getName() {
62         return "MACHINE_ROOM";
63     }
64 
65     public Set<String> getConsumeridcs() {
66         return consumeridcs;
67     }
68 
69     public void setConsumeridcs(Set<String> consumeridcs) {
70         this.consumeridcs = consumeridcs;
71     }
72 }

 

  由於近些年阿裡海外業務的擴展和投入,RocketMQ 等中間件對常見的海外業務場景的支持也更加健全。典型的場景包括跨單元消費以及消息路由。跨單元消費是比較好實現的,就是在consumer中增加一個配置,指定接收消息的來源單元,RocketMQ內部會完成客戶端從指定單元拉取消息的工作。而全球消息路由則是需要一些公共資源,消息的發送方只能將消息發送到一個指定單元/機房,然後將消息路由到另外指定的單元,consumer部署在指定單元。區別在於一個配置在客戶端,一個配置在服務端。

 

 

總結

從RocketMQ的設計、原理以及用過的個人用過的其他分散式框架上看,典型的分散式系統在設計中無外乎要解決的就是以下幾點,RocketMQ全都用上了。

  • 服務的註冊和發現。一般會有一個統一的註冊中心進行管理維護。
  • 服務的提供方和使用方間的通信,可以是非同步也可以是同步,例如dubbo服務同步服務,而消息類型就是非同步通信。
  • HA——高可用架構。八字決 ———— “主從同步,讀寫分離”。 要再加一句的話可以是“異地多活”。
  • 負載均衡。典型的負載均衡演算法在文章內容裡面已經列出好幾種了,常用的基本也就這些。

當然消息框架設計中用到的套路遠不止這些,包括如何保證消息消費的順序性、消費者和服務端通信、以及消息持久化等問題也是難點和重點,同樣,分散式緩存系統也需要解決這些問題,先寫到這裡,要完全理解並自己設計一個這樣的框架難度還是相當大的。

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 前端開發 HTML標簽 HTML文檔結構 head <head> <!-- 漢字編碼 --> <meta charset="UTF-8"> <!-- 設置一個網站的搜索關鍵字 --> <meta name="Keywords" content=""/> <!-- 網站描述內容 --> <meta n ...
  • 在使用avue表單時,若想在表單中進行數據請求時,可以使用下麵的方法: { label: "補貼類型", prop: "sub_type_msg", search: true, type: "select", dicUrl: "/admin/hadoop/not_auth/subType", dic ...
  • 在做一個公眾號商城,遇到了跨域問題。 後端用了beego框架 ctx.ResponseWriter.Header().Set("Access-Control-Allow-Methods", "POST, GET, PUT, OPTIONS") ctx.ResponseWriter.Header(). ...
  • 前端開發學習路線知識點彙總,學習前端要熟練掌握前端開發HTML、CSS、JavaScript等核心技術,熟練掌握Vue、React、Angular三大流行框架;使用面向對象思想進行編程,掌握應對業務編程的能力以及常見相容性方案;前後端分工開發流程、原生Ajax請求流程與細節,掌握常見跨域技巧等相關知 ...
  • 動態組件 & 非同步組件 切換組件保持組件的原狀態 1.使用 is 進行組件的切換顯示 <component v-bind:is="currentTabComponent"></component> 這樣是重新創建了組件 如果要保持組件的狀態,比如打開的菜單欄還是保持展開的 ,就可以這樣 <!-- 失 ...
  • 1、圓角邊框 定義圓角邊框後,可以將盒子定義為圓角的 (1)長度方式 <html> <head> <meta charset="utf-8"> <title>盒子模型外邊距</title> <style> div{ width: 200px; height: 100px; background-co ...
  • 13.5 記憶體和性能 Javascript 中函數都是對象,過多被添加到事件的處理程式都會占用記憶體,記憶體中對象越多性能就越差。其次,事先指定好所有事件處理程式而導致的DOM訪問次數也會延遲整個頁面的交互就緒時間 對付“事件處理程式過多”的方案就是事件委托,事件委托利用了事件冒泡只指定一個事件處理程式 ...
  • 前面幾章蜻蜓點水的介紹了elasticsearch、apm相關的內容。本片主要介紹怎麼使用ELK Stack幫助我們打造一個支撐起日產TB級的日誌監控系統 背景 在企業級的微服務環境中,跑著成百上千個服務都算是比較小的規模了。在生產環境上,日誌扮演著很重要的角色,排查異常需要日誌,性能優化需要日誌, ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...