一、RocketMQ 核心的四大組件: Producer:就是消息生產者,可以集群部署。它會先和 NameServer 集群中的隨機一臺建立長連接,得知當前要發送的 Topic 存在哪台 Broker Master上,然後再與其建立長連接,支持多種負載平衡模式發送消息。 Consumer:消息消費者 ...
一、RocketMQ 核心的四大組件:
Producer:就是消息生產者,可以集群部署。它會先和 NameServer 集群中的隨機一臺建立長連接,得知當前要發送的 Topic 存在哪台 Broker Master上,然後再與其建立長連接,支持多種負載平衡模式發送消息。
Consumer:消息消費者,也可以集群部署。它也會先和 NameServer 集群中的隨機一臺建立長連接,得知當前要消息的 Topic 存在哪台 Broker Master、Slave上,然後它們建立長連接,支持集群消費和廣播消費消息。
Broker:主要負責消息的存儲、查詢消費,支持主從部署,一個 Master 可以對應多個 Slave,Master 支持讀寫,Slave 只支持讀。Broker 會向集群中的每一臺 NameServer 註冊自己的路由信息。
NameServer:類似Zookeeper,是一個很簡單的 Topic 路由註冊中心,支持 Broker 的動態註冊和發現,保存 Topic 和 Borker 之間的關係。通常也是集群部署,但是各 NameServer 之間不會互相通信, 各 NameServer 都有完整的路由信息,即無狀態。
二、rocketmq基本工作流程:
1、先啟動 NameServer 集群,各 NameServer 之間無任何數據交互,Broker在啟動的時候會註冊自己配置的Topic信息到NameServer集群的每一臺機器中。即每一個NameServer均有該broker的Topic路由配置信息,並向所有 NameServer 定期(每 30s)發送心跳包,包括:IP、Port、TopicInfo;NameServer 也會定期掃描 Broker 存活列表,如果超過 120s 沒有心跳則移除此 Broker 相關信息,代表下線。
2、這樣每個 NameServer 就知道集群所有 Broker 的相關信息,此時 Producer 上線會根據配置文件中的NameServer 地址自動連接一個NameServer ;每 30s 會從連接的 NameServer 獲取 Topic 和 Broker 的映射關係存在本地記憶體中,從 NameServer 就可以得知它要發送的某 Topic 消息在哪個 Broker 上,和對應的 Broker (Master 角色的)建立長連接,發送消息。
3、Consumer 上線也可以從 NameServer 得知它所要接收的 Topic 是哪個 Broker ,和對應的 Master、Slave 建立連接,接收消息。
可以理解為如下:
name server:註冊中心broker:消息處理
procucer:生成消息
consumer:消費消息
每個組件都可以部署成集群模式進行水平擴展。
消息由topic區分消息類型(一級分類):如訂單消息,物流消息等
tag為二級分類
message queue為消息類型下的消息隊列。
用於並行發送和接受消息。
四、基礎
分散式事務:
對於分散式事務,通俗地說就是,一次操作由若幹分支操作組成,這些分支操作分屬不同應用,分佈在不同伺服器上。分散式事務需要保證這些分支操作要麼全部成功,要麼全部失敗。分散式事務與普通事務一樣,就是為了保證操作結果的一致性。
事務消息:
RocketMQ提供了類似X/Open XA的分散式事務功能,通過事務消息能達到分散式事務的最終一致。XA是一種分散式事務解決方案,一種分散式事務處理模式。
半事務消息:
暫不能投遞的消息,發送方已經成功地將消息發送到了Broker,但Broker未收到最終確認指令,此時該消息被標記成“暫不能投遞”狀態,即不能被消費者看到。處於該種狀態下的消息即半事務消息。
本地事務狀態:
Producer回調操作執行的結果為本地事務狀態,其會發送給TC,而TC會再發送給TM。TM會根據TC發送來的本地事務狀態來決定全局事務確認令。
// 描述本地事務執行狀態 public enum LocalTransactionState {
COMMIT_MESSAGE, // 本地事務執行成功
ROLLBACK_MESSAGE, // 本地事務執行失敗
UNKNOW, // 不確定,表示需要進行回查以確定本地事務的執行結果
}
RocketMQ中的消息回查設置:
關於消息回查,有三個常見的屬性設置。它們都在broker載入的配置文件中設置,例如:
transactionTimeout=20,指定TM在 20 秒內應將最終確認狀態發送給TC,否則引發消息回查。預設為 60 秒
transactionCheckMax=5,指定最多回查 5 次,超過後將丟棄消息並記錄錯誤日誌。預設 15 次。
transactionCheckInterval=10,指定設置的多次消息回查的時間間隔為 10 秒。預設為 60 秒。
五、Topic與Broker的關係:
- Borker中有一個或多個Topic
- Topic中有一個或多個MessageQueue
Topic可以自動創建和手動創建;
1、手動創建也叫預先創建,就是在使用Topic之前就創建,可以通過命令行或者通過RocketMQ的管理界面(可視化控制台)創建Topic。
方法:DefaultMQProducer producer = rocketMQTemplate.getProducer();
producer.createTopic(String key, String newTopic, int queueNum, int topicSysFlag)
key:這個參數是系統已經存在的一個topic的名稱,新建的topic會跟它在相同的broker上創建
newTopic:新建的topic的名稱
queueNum:指定topic中queue的數量
topicSysFlag:topic的標記位設置,沒有特殊要求就填0就可以了。可選值在TopicSysFlag中定義
根據源碼可以分析出大致創建分為如下幾步:
第1步,根據提供的key代表的topic去獲取該topic所在的broker的路由,如果想在所有broker創建,一般使用DefaultTopic,因為這個topic是在所有broker上都存在的。
第2步,輪詢所有的broker,在master上創建topic,中間有一個broker失敗,則中止創建,返回失敗。因為master和slave的配置數據也會自動同步,所以只需要在master上創建。
第3,4步,設置參數
第5步,調用MQClientAPIImpl介面創建,失敗會重試4次。
2、自動創建就是設置了autoCreateTopicEnable =true;
TBW102 是啥用的?
TBW102是Broker啟動時,當autoCreateTopicEnable的配置為true時,會自動創建該預設(TBW102)topic。
就是一個接受自動創建topic的 Broker上的topic, 啟動會把這個預設Topic(主題)的Broker登記到 NameServer,這樣當 Producer 發送新 Topic 的消息時候會根據"TBW102 "這個topic得知哪個 Broker 可以自動創建主題,然後發往那個 Broker。
而 Broker 接受到這個消息的時候發現沒找到對應的主題,但是它接受創建新主題,這樣就會創建對應的 Topic 路由信息。
假設此時發送方還在連續快速的發送消息,那 NameServer 上其實還沒有關於這個 Topic 的路由信息,所以有機會讓別的允許自動創建的 Broker 也創建對應的 Topic 路由信息,這樣集群里的 Broker 就能接受這個 Topic 的信息,達到負載均衡的目的,但也有個別 Broker 可能,沒收到。
如果發送方這一次發了之後 30s 內一個都不發,之前的那個 Broker 隨著心跳把這個路由信息更新到 NameServer 了,那麼之後發送該 Topic 消息的 Producer 從 NameServer 只能得知該 Topic 消息只能發往之前的那台 Broker ,這就不均衡了,如果這個新主題消息很多,那台 Broker 負載就很高了。
所以不建議線上開啟允許自動創建主題,即 autoCreateTopicEnable 參數。
Tags的使用
tag(標簽): 標簽可以被認為是對topic的進一步細化。一般在相同業務模塊中通過引入標簽來標記不同用途的消息。區分相同topic下不同種類的消息。生產到哪個topic的哪個tag下,消費者也是從topic的哪個tag進行消費,即實現消息的過濾。
建議一個應用一個 Topic,利用 tages 來標記不同業務,因為 tages 設置比較靈活,且一個應用一個 Topic 很清晰,能直觀的辨別。
Keys的使用
如果有消息業務上的唯一標識,請填寫到 keys 欄位中,方便日後的定位查找。
queue(隊列): queue是消息的物理管理單位,而topic是邏輯管理單位。一個topic下可以有多個queue,預設自動創建是4個,手動創建是8個
六、下麵以windows伺服器為例演示使用rocketmq如下:
1、下載rocketmq的安裝包:https://rocketmq.apache.org/zh/download
2、下載rocketmq儀錶盤(也就是可視化操作界面,是一個完整的java項目可以用idea運行)
3、修改conf/broker.conf配置在末尾添加如下配置(IP使用自己的),並保存。
brokerIP1=192.168.31.199
namesrvAddr=192.168.31.199:9876
4、配置ROCKET_HOME環境變數,路徑使用下載路徑;path中配置%ROCKET_HOME%\bin即可
5、啟動Namesrv
在rocketmq文件的bin目錄下,進入cmd使用如下命令:start mqnamesrv.cmd
6、啟動Broker:start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true (也就是說,producer使用RocketMQTemplate
發送的消息,就算Booker上的topic之前不存在,rocket也會幫我們創建好)
7、將儀錶盤項目導入idea,然後打開application.properties文件修改rocket.config.namesrvAddr=localhost:9876;
8、啟動儀錶盤項目:瀏覽器輸入http://localhost:8080/#/即可看到可視化界面;
9、java代碼創建生產者和消費者:
創建普通springboot項目,添加依賴
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.1</version>
</dependency>
10、修改配置文件
# 應用名稱
spring:
application:
name: rocket-producer
# 應用服務 WEB 訪問埠
server:
port: 8002
rocketmq:
name-server: localhost:9876
producer:
group: my-group
11、創建測試代碼
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
public class SendMessage {
@Resource
private RocketMQTemplate rocketMQTemplate;
@Scheduled(fixedRate = 5000)
public void run(){
//發送消息
rocketMQTemplate.convertAndSend("test-topic-1", "Hello, World!");
}
}
12、創建消費者項目(同上)
消費端測試代碼:
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Service
@RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
class MyConsumer1 implements RocketMQListener<String> {
/**
*需要註意的是,onMessage()封裝了ACK機制,消費者往外拋異常時,RocketMQ認為消費失敗,重新發送該條消息,否則預設消費成功
*/
@Override
public void onMessage(String s) {
System.out.println(s);
}
}