1、 RocketMQ安裝測試 1.1 下載解壓 下載地址:https://rocketmq.apache.org/release-notes/ rocketmq-all-5.0.0-bin-release.zip 下載後上傳到伺服器; 解壓命令# unzip rocketmq-all-5.0.0- ...
1、 RocketMQ安裝測試
1.1 下載解壓
下載地址:https://rocketmq.apache.org/release-notes/
rocketmq-all-5.0.0-bin-release.zip
下載後上傳到伺服器;
解壓命令# unzip rocketmq-all-5.0.0-bin-release.zip
1.2 啟動 測試
RocketMQ預設配置是比較好的,這樣可以直接應用於生產環境,所以如果機器記憶體較小,啟動會因為記憶體不足失敗,為了避免後面啟動失敗,選擇先修改其記憶體大小,一般阿裡雲伺服器是滿足不了預設記憶體。
手動調整JVM的配置,單位從g改為m
1.2.1 啟動nameserver
1.2.1.1 修改runbroker.sh和runserver.sh
1.2.1.2 runbroker.sh
-server -Xms256m -Xmx256m -Xmn128m
1.2.1.3 runserver.sh
-server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m
1.2.1.4 啟動nameserver
解壓目錄執行# nohup ./bin/mqnamesrv -n 1.117.75.57(自己的ip):9876 &
1.2.2 啟動broker
1.2.2.1 修改broker.conf
添加namesrvAddr 和 brokerIP1:
1.2.2.3 啟動 borker
解壓目錄執行# nohup ./bin/mqbroker -n 1.117.75.57:9876 -c ./conf/broker.conf &
1.2.2.4 查看啟動情況
jps
1.2.3 測試
由於伺服器記憶體可能比較小,建議先關閉其他應用,比如rabbitmq,docker的容器等;
還需要開啟幾個埠:9876,10909,10910,10911;
1.2.3.1 生產者
導出環境變數# export NAMESRV_ADDR=1.117.75.57:9876
發送消息# bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
1.2.3.2 消費者
導出環境變數# export NAMESRV_ADDR=1.117.75.57:9876
消費消息# bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
1.2.4 關閉命令
關閉nameserver# bin/mqshutdown namesrv
關閉broker# bin/mqshutdown broker
1.2.5 RocketMQ控制台
1.2.5.1 下載,解壓,修改配置信息
1.2.5.2 訪問控制台
localhost:9696
2、RocketMQ框架原理
2.1 框架
2.2 概念
整體可以分成4個角色,分別是:NameServer,Broker,Producer,Consumer:
- Broker(郵遞員):Broker是RocketMQ的核心,負責消息的接收,存儲,投遞等功能;
- NameServer(郵局):消息隊列的協調者,Broker向它註冊路由信息,同時Producer和Consumer向其獲取路由信息Producer(寄件人)消息的生產者,需要從NameServer獲取Broker信息,然後與Broker建立連接,向Broker發送消息;
- Consumer(收件人) :消息的消費者,需要從NameServer獲取Broker信息,然後與Broker建立連接,從Broker獲取消息;
- Topic(地區):用來區分不同類型的消息,發送和接收消息前都需要先創建Topic,針對Topic來發送和接收消息Message Queue(郵件)為了提高性能和吞吐量,引入了Message Queue,一個Topic可以設置一個或多個Message Queue,這樣消息就可以並行往各個Message Queue發送消息,消費者也可以並行的從多個Message Queue讀取消息;
- Message:Message 是消息的載體;
- Producer Group:生產者組,簡單來說就是多個發送同一類消息的生產者稱之為一個生產者組。
- Consumer Group:消費者組,消費同一類消息的多個 consumer 實例組成一個消費者組。
3、RocketMQ整合
3.1 rocketmq模塊 發送消息
3.2.1.1 依賴
<!-- rocket -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
3.2.1.2 配置
# rocketmq配置
rocketmq:
#rocketMQ服務的地址
name-server: 1.117.75.57:9876
# 生產者組
producer:
group: kh96-sendsms-group
3.2.1.3 請求
@Autowired(required = false)
private RocketMQTemplate rocketMQTemplate;
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 測試發送消息到用戶中心,用戶中心給手機號發信息
*/
@RequestMapping("/testRocketMQSendMsg")
public String testRocketMQSendMsg(@RequestParam String phoneNo) {
log.info("------ 使用RocketMQ,測試給手機:{},發送消息 -------", phoneNo);
//使用RocketMQ發送消息
rocketMQTemplate.convertAndSend("rocketmq-send-sms-kh96", phoneNo);
return "send msg success";
}
3.2 user模塊 消費消息
1.添加加rocketmq的依賴;
2.用戶服務,監聽發送簡訊的請求發送消息:
/**
* Created On : 30/11/2022.
* <p>
* Author : huayu
* <p>
* Description: 用戶服務,監聽發送簡訊的請求發送消息
*/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "rocketmq-user-sms-group", //組 隨便寫
topic = "rocketmq-send-sms-kh96" //消息隊列,發送的時候指定的
)
public class SendSmsListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("***** 接收發送信息的請求,給手機:{},發送消息 ******", message);
}
}
3.3 測試
3.3.1 發送請求
3.3.2 發送消息模塊日誌
3.3.3 接收消息模塊日誌
3.3.4 控制台查看消息詳情
4、發送不同類型的消息
4.1 發送可靠同步消息
4.1.1 請求
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 測試roketmq-發送可靠同步消息
*/
@RequestMapping("/testRocketMQSendMsgSync")
public String testRocketMQSendMsgSync(@RequestParam String syncMsg) {
log.info("------ 使用RocketMQ,發送可靠同步消息{} -------", syncMsg);
//使用RocketMQ發送消息,拿到同步結果
SendResult sendResult = rocketMQTemplate.syncSend("rocketmq-sync-msg-kh96", syncMsg);
log.info("------ 使用RocketMQ,發送可靠同步消息結果:{} -------", sendResult);
return "send sync msg success";
}
4.1.2 發送請求
4.1.3 同步結果
4.2 發送可靠非同步消息
4.2.1 請求
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 測試roketmq-發送可靠非同步消息
*/
@RequestMapping("/testRocketMQSendMsgAsync")
public String testRocketMQSendMsgAsync(@RequestParam String asyncMsg) {
log.info("------ 使用RocketMQ,發送可靠非同步消息:{} -------", asyncMsg);
//使用RocketMQ發送消息
rocketMQTemplate.asyncSend("rocketmq-sync-msg-kh96",
asyncMsg,
new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("------ 可靠非同步發送成功回調 ------");
}
@Override
public void onException(Throwable throwable) {
log.info("------ 可靠非同步發送失敗回調 ------");
}
});
return "send async msg success";
}
4.2.2 發送請求
4.2.3 回調結果
4.3 發送單項消息,只發不收結果
4.3.1 請求
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 測試roketmq-發送單向消息,只發不收結果
*/
@RequestMapping("/testRocketMQSendMsgOneWay")
public String testRocketMQSendMsgOneWay(@RequestParam String oneWayMsg) {
log.info("------ 使用RocketMQ,發送單向消息給:{} -------", oneWayMsg);
//使用RocketMQ發送消息
rocketMQTemplate.sendOneWay("rocketmq-oneWay-msg-kh96", oneWayMsg);
return "send oneWay msg success";
}
4.3.2 發送請求
4.3.3 日誌查看
4.4 發送順序消息
4.4.1 請求
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 測試roketmq-發送順序消息
*/
@RequestMapping("/testRocketMQSendMsgOrderly")
public String testRocketMQSendMsgOrderly(@RequestParam String orderlyMsgs) {
log.info("------ 使用RocketMQ,發送順序消息:{} -------", orderlyMsgs);
//使用RocketMQ發送順序消息,必須要提供一個唯一的標識di,比如用戶編號等
String userId = UUID.randomUUID().toString().replace("-", "");
//發送多條順序消息,模擬iang消息分割成多個符號發送
Arrays.asList(orderlyMsgs.split("")).
forEach(orderlyMsg -> rocketMQTemplate.syncSendOrderly("rocketmq-orderly-msg-kh96", orderlyMsg, userId));
return "send orderly msg success";
}
4.4.2 監聽器
/**
* Created On : 30/11/2022.
* <p>
* Author : huayu
* <p>
* Description: RocketMQ 監聽順序消息
*/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "rocketmq-orderly-sms-group",
topic = "rocketmq-orderly-msg-kh96",
consumeMode = ConsumeMode.ORDERLY)
public class RocketMQOrderlyMsgListener implements RocketMQListener<String> {
@Override
public void onMessage(String orderlyMsg) {
log.info("------接收順序消息 :{} ------", orderlyMsg);
}
}
4.2.3 發送請求
4.2.4 消費消息
4.5 發送事務消息(重點)
4.5.1 發送事務消息流程
4.5.1.1 流程圖
4.5.1.2 流程解析
- 正常事務發送與提交階段
- 生產者發送一個半消息給broker(半消息是指的暫時不能消費的消息);
- 服務端響應;
- 開始執行本地事務;
- 根據本地事務的執行情況執行Commit或者Rollback
- 事務信息的補償流程
- 如果broker長時間沒有收到本地事務的執行狀態,會向生產者發起一個確認會查的操作請求;
- 生產者收到確認會查請求後,檢查本地事務的執行狀態;
- 根據檢查後的結果執行Commit或者Rollback操作 補償階段主要是用於解決生產者在發送Commit或者Rollbacke操作時發生超時或失敗的情況;
4.5.2 RocketMQ事務流程關鍵
-
事務消息在一階段對用戶不可見、
事務消息相對普通消息最大的特點就是一階段發送的消息對用戶是不可見的,也就是說消費者不能直接消費.這裡RocketMQ實現方法是原消息的主題與消息消費隊列,然後把主題改成RMQ_SYS_TRANS_HALF_TOPIC.這樣由於消費者沒有訂閱這個主題,所以不會消費;
-
如何處理第二階段的發送消息?
在本地事務執行完成後迴向Broker發送Commit或者Rollback操作,此時如果在發送消息的時候生產者出故障了,要保證這條消息最終被消費,broker就會向服務端發送回查請求,確認本地事務的執行狀態.當然RocketMQ並不會無休止的發送事務狀態回查請求,預設是15次,如果15次回查還是無法得知事務的狀態,RocketMQ預設回滾消息(broker就會將這條半消息刪除);
4.5.3 RocketMQ事務消息原理
-
設計思想
在RocketMQ事務消息的主要流程中,一階段的消息如何對用戶不可見。其中,事務消息相對普通消息最大的特點就是一階段發送的消息對用戶是不可見的。那麼,如何做到寫入消息但是對用戶不可見呢?RocketMQ事務消息的做法是:如果消息是half消息,將備份原消息的主題與消息消費隊列,然後改變主題為RMQ_SYS_TRANS_HALF_TOPIC。由於消費組未訂閱該主題,故消費端無法消費half類型的消息。
-
如何實現事務回查?
Broker會啟動一個消息回查的定時任務,定時從事務消息queue中讀取所有待反查的消息。針對每個需要反查的半消息,Broker會給對應的Producer發一個要求執行事務狀態反查的RPC請求。然後根據RPC返迴響應中的反查結果,來決定這個半消息是需要提交還是回滾,或者後續繼續來反查。最後,提交或者回滾事務,將半消息標記為已處理狀態【將消息存儲在主題為:RMQ_SYS_TRANS_OP_HALF_TOPIC的主題中,代表這些消息已經被處理(提交或回滾)】。 如果是提交事務,就把半消息從半消息隊列中複製到該消息真正的topic和queue中; 如果是回滾事務,則什麼都不做。
參考博客1:https://blog.csdn.net/Weixiaohuai/article/details/123733518
參考博客2:https://blog.csdn.net/qq_42877546/article/details/125404307
4.5.4 實現代碼
4.5.4.1 業務層
4.5.4.1.1 介面
/**
* Created On : 30/11/2022.
* <p>
* Author : huayu
* <p>
* Description: 事務消息的業務 介面
*/
public interface RocketMQTxService {
/**
* @param : [kgcMallOrder]
* @return : void
* @author : huayu
* @date : 30/11/2022
* @description : 發送生成訂單的半事務消息
*/
void sendCreateOrderHalfTx(KgcMallOrder kgcMallOrder);
/**
* @param : [txId, kgcMallOrder]
* @return : void
* @author : huayu
* @date : 30/11/2022
* @description : 執行本地生成訂單的事務操作
*/
void executeCreateOrderLocalTx(String txId, KgcMallOrder kgcMallOrder);
}
4.5.4.1.2 實現類
/**
* Created On : 30/11/2022.
* <p>
* Author : huayu
* <p>
* Description: 事務消息的業務 處理類
*/
@Service
@Slf4j
public class RocketMQTxServiceImpl implements RocketMQTxService {
@Autowired(required = false)
private RocketMQTemplate rocketMQTemplate;
@Autowired
private KgcMallOrderRepository kgcMallOrderRepository;
@Autowired
private KgcMallTxlogrepisitory kgcMallTxlogrepisitory;
@Override
@Transactional
public void sendCreateOrderHalfTx(KgcMallOrder kgcMallOrder) {
log.info("###### 1. 開始發送生成訂單的半事務消息 到 rocketmq服務端 ######");
//自定義事務編號
String txId = UUID.randomUUID().toString().substring(0, 8);
//發送半事務消息,返回發送結果
TransactionSendResult transactionSendResult =
rocketMQTemplate.sendMessageInTransaction("rocketmq-tx-msg-group", //組
"rocketmq-tx-msg-kh96", //隊列
MessageBuilder.withPayload(kgcMallOrder).setHeader("txId", txId).build(), // 消息體
kgcMallOrder); //發送內容
log.info("###### 2. 開始發送生成訂單的半事務消息rocketmq服務端成功,響應:{} ######", transactionSendResult);
}
@Override
@Transactional
public void executeCreateOrderLocalTx(String txId, KgcMallOrder kgcMallOrder) {
log.info("###### 3.1 本地開始執行生成訂單的事務操作 ######");
//開始插入訂單
kgcMallOrderRepository.save(kgcMallOrder);
log.info("###### 3.2 本地執行生成訂單的事務操作 成功 ######");
// 模擬本地事務處理失敗
// int a = 10 / 0;
log.info("###### 3.3開始生成用於事務回查的本地事務日誌 ######");
//創建事務對象
KgcMallTxlog kgcMallTxlog = KgcMallTxlog.builder()
.id(txId)
.txDetail("本地事務日誌")
.txTime(new Date())
.build();
//事務日誌入庫
kgcMallTxlogrepisitory.save(kgcMallTxlog);
log.info("###### 3.4 生成用於事務回查的本地事務日誌成功 ######");
}
}
4.5.4.2 監聽器
4.5.4.2.1 RocketMQExecuteLocalTxListener
/**
* Created On : 30/11/2022.
* <p>
* Author : huayu
* <p>
* Description: 事務消息,本地執行事務監聽,半事務消息發送成功後,此監聽會收到本地事務處理的通知
*/
@Slf4j
@Component
@RocketMQTransactionListener(txProducerGroup = "rocketmq-tx-msg-group")
public class RocketMQExecuteLocalTxListener implements RocketMQLocalTransactionListener {
@Autowired
private RocketMQTxService rocketMQTxService;
@Autowired
private KgcMallTxlogrepisitory kgcMallTxlogrepisitory;
//執行本地事務
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
//調用本地事務執行的業務處理介面
log.info("###### 3 半事務消息發送成功, 執行本地事務 ######");
rocketMQTxService.executeCreateOrderLocalTx((String) msg.getHeaders().get("txId"), (KgcMallOrder) arg);
//響應本地事務執行成功結果給服務端,服務端接收到此提交結果,會投遞消息
log.info("###### 4 本地事務處理成功,響應事務處理結果給服務端 ######");
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("###### 本地事務執行異常:{} ######", e.getMessage());
}
//響應本地事務執行失敗結果給服務端,服務端接收到此回滾結果,不會投遞消息(緩存,並定期刪除)
log.info("###### 4 本地事務處理失敗,響應事務處理結果給服務端 #######");
return RocketMQLocalTransactionState.ROLLBACK;
}
//回查本地事務
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
log.info("###### 5 未收到第4步本地事務處理結果,回查事務狀態 ######");
//在網路閃斷或者服務重啟,沒有及時通知服務斷事務處理結果,進行會查操作
//如果回查本地事務執行成功(看事物日誌是否存在,如果存在就是處理成功如果不存在就是處理失敗),通知服務端投遞消息,否則不能投遞
log.info("###### 6 檢查本地事務處理結果,響應事務處理結果給服務端 ######");
if (kgcMallTxlogrepisitory.findById((String) msg.getHeaders().get("txId")).orElse(null) == null) {
//本地事務入庫失敗,代表本地事務沒有處理成功,步投遞消息(緩存,並定期刪除)
log.info("###### 7 檢查本地事務處理結果失敗 ######");
return RocketMQLocalTransactionState.ROLLBACK;
}
//本地事務入庫成功,代表本地事務處理成功,投遞消息
log.info("###### 7 檢查本地事務處理結果成功 ######");
return RocketMQLocalTransactionState.COMMIT;
}
}
4.5.4.2.2 RocketMQConsumerTxMsgListener
/**
* Created On : 30/11/2022.
* <p>
* Author : huayu
* <p>
* Description: 事務消息,消費監聽,如果本地事務處理成功,會收到投遞的消息,如果失敗,收不到消息
*/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "rocket-tx-msg-consumer-group",
topic = "rocket-tx-msg-kh96"
)
public class RocketMQConsumerTxMsgListener implements RocketMQListener<Object> {
@Override
public void onMessage(Object message) {
log.info("###### 8 消費端,收到生成訂單成功的事務消息:{} ###### ", message);
}
}
4.5.4.3 控制器
/**
* Created On : 30/11/2022.
* <p>
* Author : huayu
* <p>
* Description: RocketMQ 消息隊列 測試消息入口
*/
@Slf4j
@RestController
public class RocketMQController {
@Autowired(required = false)
private RocketMQTemplate rocketMQTemplate;
@Autowired
private RocketMQTxService rocketMQTxService;
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 測試發送消息到用戶中心,用戶中心給手機號發信息
*/
@RequestMapping("/testRocketMQSendMsg")
public String testRocketMQSendMsg(@RequestParam String phoneNo) {
log.info("------ 使用RocketMQ,測試給手機:{},發送消息 -------", phoneNo);
//使用RocketMQ發送消息
rocketMQTemplate.convertAndSend("rocketmq-send-sms-kh96", phoneNo);
return "send msg success";
}
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 測試roketmq-發送可靠同步消息
*/
@RequestMapping("/testRocketMQSendMsgSync")
public String testRocketMQSendMsgSync(@RequestParam String syncMsg) {
log.info("------ 使用RocketMQ,發送可靠同步消息{} -------", syncMsg);
//使用RocketMQ發送消息,拿到同步結果
SendResult sendResult = rocketMQTemplate.syncSend("rocketmq-sync-msg-kh96", syncMsg);
log.info("------ 使用RocketMQ,發送可靠同步消息結果:{} -------", sendResult);
return "send sync msg success";
}
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 測試roketmq-發送可靠非同步消息
*/
@RequestMapping("/testRocketMQSendMsgAsync")
public String testRocketMQSendMsgAsync(@RequestParam String asyncMsg) {
log.info("------ 使用RocketMQ,發送可靠非同步消息:{} -------", asyncMsg);
//使用RocketMQ發送消息
rocketMQTemplate.asyncSend("rocketmq-sync-msg-kh96",
asyncMsg,
new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("------ 可靠非同步發送成功回調 ------");
}
@Override
public void onException(Throwable throwable) {
log.info("------ 可靠非同步發送失敗回調 ------");
}
});
return "send async msg success";
}
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 測試roketmq-發送單向消息,只發不收結果
*/
@RequestMapping("/testRocketMQSendMsgOneWay")
public String testRocketMQSendMsgOneWay(@RequestParam String oneWayMsg) {
log.info("------ 使用RocketMQ,發送單向消息:{} -------", oneWayMsg);
//使用RocketMQ發送消息
rocketMQTemplate.sendOneWay("rocketmq-oneWay-msg-kh96", oneWayMsg);
return "send oneWay msg success";
}
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 測試roketmq-發送順序消息
*/
@RequestMapping("/testRocketMQSendMsgOrderly")
public String testRocketMQSendMsgOrderly(@RequestParam String orderlyMsgs) {
log.info("------ 使用RocketMQ,發送順序消息:{} -------", orderlyMsgs);
//使用RocketMQ發送順序消息,必須要提供一個唯一的標識di,比如用戶編號等
String userId = UUID.randomUUID().toString().replace("-", "");
//發送多條順序消息,模擬iang消息分割成多個符號發送
Arrays.asList(orderlyMsgs.split("")).
forEach(orderlyMsg -> rocketMQTemplate.syncSendOrderly("rocketmq-orderly-msg-kh96", orderlyMsg, userId));
return "send orderly msg success";
}
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 測試roketmq-發送事務消息
*/
@RequestMapping("/testRocketMQSendMsgTx")
public String testRocketMQSendMsgTx(@RequestParam String txmsg) {
log.info("------ 使用RocketMQ,發送事務消息:{} -------", txmsg);
//使用RocketMQ發送事務消息,模擬生成一筆訂單
KgcMallOrder kgcMallOrder = KgcMallOrder.builder()
.userId(2)
.userName("RocketMQ_tx")
.prodId(2)
.prodName(txmsg)
.totalPrice(96.0)
.build();
//發送事務消息
rocketMQTxService.sendCreateOrderHalfTx(kgcMallOrder);
return "send tx msg success";
}
}