rocketMQ消息隊列簡介及其實例

来源:https://www.cnblogs.com/sunyonggao/archive/2023/06/14/17478404.html
-Advertisement-
Play Games

一、RocketMQ 核心的四大組件: Producer:就是消息生產者,可以集群部署。它會先和 NameServer 集群中的隨機一臺建立長連接,得知當前要發送的 Topic 存在哪台 Broker Master上,然後再與其建立長連接,支持多種負載平衡模式發送消息。 Consumer:消息消費者 ...


一、RocketMQ 核心的四大組件:

Producer:就是消息生產者,可以集群部署。它會先和 NameServer 集群中的隨機一臺建立長連接,得知當前要發送的 Topic 存在哪台 Broker Master上,然後再與其建立長連接,支持多種負載平衡模式發送消息。

Consumer:消息消費者,也可以集群部署。它也會先和 NameServer 集群中的隨機一臺建立長連接,得知當前要消息的 Topic 存在哪台 Broker Master、Slave上,然後它們建立長連接,支持集群消費和廣播消費消息。

Broker:主要負責消息的存儲、查詢消費,支持主從部署,一個 Master 可以對應多個 Slave,Master 支持讀寫,Slave 只支持讀。Broker 會向集群中的每一臺 NameServer 註冊自己的路由信息。

NameServer:類似Zookeeper,是一個很簡單的 Topic 路由註冊中心,支持 Broker 的動態註冊和發現,保存 Topic 和 Borker 之間的關係。通常也是集群部署,但是各 NameServer 之間不會互相通信, 各 NameServer 都有完整的路由信息,即無狀態。

二、rocketmq基本工作流程:

1、先啟動 NameServer 集群,各 NameServer 之間無任何數據交互,Broker在啟動的時候會註冊自己配置的Topic信息到NameServer集群的每一臺機器中。即每一個NameServer均有該broker的Topic路由配置信息,並向所有 NameServer 定期(每 30s)發送心跳包,包括:IP、Port、TopicInfo;NameServer 也會定期掃描 Broker 存活列表,如果超過 120s 沒有心跳則移除此 Broker 相關信息,代表下線。

2、這樣每個 NameServer 就知道集群所有 Broker 的相關信息,此時 Producer 上線會根據配置文件中的NameServer 地址自動連接一個NameServer ;每 30s 會從連接的 NameServer 獲取 Topic 和 Broker 的映射關係存在本地記憶體中,從 NameServer 就可以得知它要發送的某 Topic 消息在哪個 Broker 上,和對應的 Broker (Master 角色的)建立長連接,發送消息。

3、Consumer 上線也可以從 NameServer 得知它所要接收的 Topic 是哪個 Broker ,和對應的 Master、Slave 建立連接,接收消息。

可以理解為如下:

name server:註冊中心

broker:消息處理

procucer:生成消息

consumer:消費消息

每個組件都可以部署成集群模式進行水平擴展。
消息由topic區分消息類型(一級分類):如訂單消息,物流消息等
tag為二級分類
message queue為消息類型下的消息隊列。
用於並行發送和接受消息。

四、基礎
分散式事務:
對於分散式事務,通俗地說就是,一次操作由若幹分支操作組成,這些分支操作分屬不同應用,分佈在不同伺服器上。分散式事務需要保證這些分支操作要麼全部成功,要麼全部失敗。分散式事務與普通事務一樣,就是為了保證操作結果的一致性。

事務消息:
RocketMQ提供了類似X/Open XA的分散式事務功能,通過事務消息能達到分散式事務的最終一致。XA是一種分散式事務解決方案,一種分散式事務處理模式。

半事務消息:
暫不能投遞的消息,發送方已經成功地將消息發送到了Broker,但Broker未收到最終確認指令,此時該消息被標記成“暫不能投遞”狀態,即不能被消費者看到。處於該種狀態下的消息即半事務消息。

本地事務狀態:
Producer回調操作執行的結果為本地事務狀態,其會發送給TC,而TC會再發送給TM。TM會根據TC發送來的本地事務狀態來決定全局事務確認令。

// 描述本地事務執行狀態 public enum LocalTransactionState {
COMMIT_MESSAGE, // 本地事務執行成功
ROLLBACK_MESSAGE, // 本地事務執行失敗
UNKNOW, // 不確定,表示需要進行回查以確定本地事務的執行結果
}

RocketMQ中的消息回查設置:
關於消息回查,有三個常見的屬性設置。它們都在broker載入的配置文件中設置,例如:

transactionTimeout=20,指定TM在 20 秒內應將最終確認狀態發送給TC,否則引發消息回查。預設為 60 秒
transactionCheckMax=5,指定最多回查 5 次,超過後將丟棄消息並記錄錯誤日誌。預設 15 次。
transactionCheckInterval=10,指定設置的多次消息回查的時間間隔為 10 秒。預設為 60 秒。
五、Topic與Broker的關係:

  • Borker中有一個或多個Topic
  • Topic中有一個或多個MessageQueue

Topic可以自動創建和手動創建;

1、手動創建也叫預先創建,就是在使用Topic之前就創建,可以通過命令行或者通過RocketMQ的管理界面(可視化控制台)創建Topic。

方法:DefaultMQProducer producer = rocketMQTemplate.getProducer();

producer.createTopic(String key, String newTopic, int queueNum, int topicSysFlag)

key:這個參數是系統已經存在的一個topic的名稱,新建的topic會跟它在相同的broker上創建
newTopic:新建的topic的名稱
queueNum:指定topic中queue的數量
topicSysFlag:topic的標記位設置,沒有特殊要求就填0就可以了。可選值在TopicSysFlag中定義

根據源碼可以分析出大致創建分為如下幾步:

第1步,根據提供的key代表的topic去獲取該topic所在的broker的路由,如果想在所有broker創建,一般使用DefaultTopic,因為這個topic是在所有broker上都存在的。
第2步,輪詢所有的broker,在master上創建topic,中間有一個broker失敗,則中止創建,返回失敗。因為master和slave的配置數據也會自動同步,所以只需要在master上創建。
第3,4步,設置參數
第5步,調用MQClientAPIImpl介面創建,失敗會重試4次。

2、自動創建就是設置了autoCreateTopicEnable =true;

TBW102 是啥用的?

TBW102是Broker啟動時,當autoCreateTopicEnable的配置為true時,會自動創建該預設(TBW102)topic。

就是一個接受自動創建topic的 Broker上的topic, 啟動會把這個預設Topic(主題)的Broker登記到 NameServer,這樣當 Producer 發送新 Topic 的消息時候會根據"TBW102 "這個topic得知哪個 Broker 可以自動創建主題,然後發往那個 Broker。

而 Broker 接受到這個消息的時候發現沒找到對應的主題,但是它接受創建新主題,這樣就會創建對應的 Topic 路由信息。

假設此時發送方還在連續快速的發送消息,那 NameServer 上其實還沒有關於這個 Topic 的路由信息,所以有機會讓別的允許自動創建的 Broker 也創建對應的 Topic 路由信息,這樣集群里的 Broker 就能接受這個 Topic 的信息,達到負載均衡的目的,但也有個別 Broker 可能,沒收到。

如果發送方這一次發了之後 30s 內一個都不發,之前的那個 Broker 隨著心跳把這個路由信息更新到 NameServer 了,那麼之後發送該 Topic 消息的 Producer 從 NameServer 只能得知該 Topic 消息只能發往之前的那台 Broker ,這就不均衡了,如果這個新主題消息很多,那台 Broker 負載就很高了。

所以不建議線上開啟允許自動創建主題,即 autoCreateTopicEnable 參數。

Tags的使用

tag(標簽): 標簽可以被認為是對topic的進一步細化。一般在相同業務模塊中通過引入標簽來標記不同用途的消息。區分相同topic下不同種類的消息。生產到哪個topic的哪個tag下,消費者也是從topic的哪個tag進行消費,即實現消息的過濾。

建議一個應用一個 Topic,利用 tages 來標記不同業務,因為 tages 設置比較靈活,且一個應用一個 Topic 很清晰,能直觀的辨別。

Keys的使用

如果有消息業務上的唯一標識,請填寫到 keys 欄位中,方便日後的定位查找。

queue(隊列): queue是消息的物理管理單位,而topic是邏輯管理單位。一個topic下可以有多個queue,預設自動創建是4個,手動創建是8個

 

六、下麵以windows伺服器為例演示使用rocketmq如下:

1、下載rocketmq的安裝包:https://rocketmq.apache.org/zh/download

2、下載rocketmq儀錶盤(也就是可視化操作界面,是一個完整的java項目可以用idea運行)

3、修改conf/broker.conf配置在末尾添加如下配置(IP使用自己的),並保存。

brokerIP1=192.168.31.199

namesrvAddr=192.168.31.199:9876

4、配置ROCKET_HOME環境變數,路徑使用下載路徑;path中配置%ROCKET_HOME%\bin即可

5、啟動Namesrv

在rocketmq文件的bin目錄下,進入cmd使用如下命令:start mqnamesrv.cmd

6、啟動Broker:start mqbroker.cmd -n 127.0.0.1:9876  autoCreateTopicEnable=true  (也就是說,producer使用RocketMQTemplate發送的消息,就算Booker上的topic之前不存在,rocket也會幫我們創建好)

7、將儀錶盤項目導入idea,然後打開application.properties文件修改rocket.config.namesrvAddr=localhost:9876;

8、啟動儀錶盤項目:瀏覽器輸入http://localhost:8080/#/即可看到可視化界面;

9、java代碼創建生產者和消費者:

創建普通springboot項目,添加依賴

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.1</version>
</dependency>

10、修改配置文件

# 應用名稱
spring:
application:
name: rocket-producer
# 應用服務 WEB 訪問埠
server:
port: 8002
rocketmq:
name-server: localhost:9876
producer:
group: my-group

11、創建測試代碼

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
public class SendMessage {
@Resource
private RocketMQTemplate rocketMQTemplate;


@Scheduled(fixedRate = 5000)
public void run(){
//發送消息
rocketMQTemplate.convertAndSend("test-topic-1", "Hello, World!");

}
}

12、創建消費者項目(同上)

消費端測試代碼:
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
class MyConsumer1 implements RocketMQListener<String> {

/**
*需要註意的是,onMessage()封裝了ACK機制,消費者往外拋異常時,RocketMQ認為消費失敗,重新發送該條消息,否則預設消費成功
*/

@Override
public void onMessage(String s) {
System.out.println(s);
}
}


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 原文: [Kotlin 集合對象的單條件和多條件排序 - Stars-One的雜貨小窩](https://stars-one.site/2023/06/14/kotlin-list-sort) 本文不是太難的東西,因為`sortedWith`之前沒怎麼用過,所以就記錄下 平常開發經常使用到List, ...
  • 618是每年重要的電商大促活動,熱度高流量大,是電商App吸引新用戶,提高用戶轉化率(購買率)的最好時機。對電商App運營來說,消息推送是不可忽略的流量來源之一,適當的消息推送可以召回用戶,提高用戶復購率。如何利用消息推送功能在618電商節幫助App獲取流量並提高轉化率是運營需要關註的問題。 在回答 ...
  • 這裡給大家分享我在網上總結出來的一些知識,希望對大家有所幫助 使用ElementPlus的Table啥都好,就是沒有可編輯表格!!!😭 既然UI庫不支持,那我們實現一個可編輯表格是很難的事麽?😒難麽?😢不難麽?... 個人覺得如果是業務固定的可編輯表格,使用ElementPlus實現都不難。但 ...
  • 起因 為了降低併發時的API請求量, 這兩天寫了個LRU Cache. 其中用到了Set做AllowList, 來判斷API是否應該被緩存. 在MDN查API時, 發現Set被歸類在Keyed Collection中. 一直以來, 下意識覺得Set只是value唯一的Array. 應該屬於Index ...
  • 本文將對測試驅動開發(TDD)進行探討,主要內容有:TDD基本理解、TDD常見誤區、TDD技術選型,以及案例實戰。希望通過本文,讀者能夠理解掌握TDD並將其應用於實際開發中。 ...
  • 某日二師兄參加XXX科技公司的C++工程師開發崗位第14面: > 面試官:在C++中,有哪些可執行體? > > 二師兄:可執行體? > > 面試官:也就是可調用對象。 > > 二師兄:讓我想一想。函數、函數指針、類的靜態方法、類的成員方法、仿函數、lambda表達式。 > > 面試官:能說一說他們之 ...
  • Set 介面是 Collection 介面的一個子介面。Set 介面的實現類不會包含重覆的元素,並且最多只能有一個 null 元素。當嘗試添加重覆元素時,添加操作將被忽略。Set 介面取出元素的順序和添加元素的順序不一致(但是每次取出的順序是固定的),即無法通過索引訪問 Set 中的元素。 ...
  • 函數是帶名字的代碼塊,用於完成具體的工作,無需反覆編寫完成該工作的代碼。之前我們接觸過print函數,數據類型轉換中的int函數、str函數,還有列表中的append函數、pop函數、remove函數,以及字典中的keys函數、values函數等等,其實在正式學習函數之前,我們已經接觸了函數,只不過 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...