RocketMQ 是大家耳熟能詳的消息隊列,開源項目 rocketmq-spring 可以幫助開發者在 Spring Boot 項目中快速整合 RocketMQ。 這篇文章會介紹 Spring Boot 項目使用 rocketmq-spring SDK 實現消息收發的操作流程,同時筆者會從開發者的角 ...
RocketMQ 是大家耳熟能詳的消息隊列,開源項目 rocketmq-spring 可以幫助開發者在 Spring Boot 項目中快速整合 RocketMQ。
這篇文章會介紹 Spring Boot 項目使用 rocketmq-spring SDK 實現消息收發的操作流程,同時筆者會從開發者的角度解讀 SDK 的設計邏輯。
1 SDK 簡介
項目地址:
rocketmq-spring 的本質是一個 Spring Boot starter 。
Spring Boot 基於“約定大於配置”(Convention over configuration)這一理念來快速地開發、測試、運行和部署 Spring 應用,並能通過簡單地與各種啟動器(如 spring-boot-web-starter)結合,讓應用直接以命令行的方式運行,不需再部署到獨立容器中。
Spring Boot starter 構造的啟動器使用起來非常方便,開發者只需要在 pom.xml 引入 starter 的依賴定義,在配置文件中編寫約定的配置即可。
下麵我們看下 rocketmq-spring-boot-starter 的配置:
1、引入依賴
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
2、約定配置
接下來,我們分別按照生產者和消費者的順序,詳細的講解消息收發的操作過程。
2 生產者
首先我們添加依賴後,進行如下三個步驟:
1、配置文件中配置如下
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: platform-sms-server-group
# access-key: myaccesskey
# secret-key: mysecretkey
topic: sms-common-topic
生產者配置非常簡單,主要配置名字服務地址和生產者組。
2、需要發送消息的類中註入 RcoketMQTemplate
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Value("${rocketmq.topic}")
private String smsTopic;
3、發送消息,消息體可以是自定義對象,也可以是 Message 對象
rocketMQTemplate 類包含多鐘發送消息的方法:
- 同步發送 syncSend
- 非同步發送 asyncSend
- 順序發送 syncSendOrderly
- oneway發送 sendOneWay
下麵的代碼展示如何同步發送消息。
String destination = StringUtils.isBlank(tags) ? topic : topic + ":" + tags;
SendResult sendResult =
rocketMQTemplate.syncSend(
destination,
MessageBuilder.withPayload(messageContent).
setHeader(MessageConst.PROPERTY_KEYS, uniqueId).
build()
);
if (sendResult != null) {
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
// send message success ,do something
}
}
syncSend 方法的第一個參數是發送的目標,格式是:topic + ":" + tags ,
第二個參數是:spring-message 規範的 message 對象 ,而 MessageBuilder 是一個工具類,方法鏈式調用創建消息對象。
3 消費者
1、配置文件中配置如下
rocketmq:
name-server: 127.0.0.1:9876
consumer1:
group: platform-sms-worker-common-group
topic: sms-common-topic
2、實現消息監聽器
@Component
@RocketMQMessageListener(
consumerGroup = "${rocketmq.consumer1.group}", //消費組
topic = "${rocketmq.consumer1.topic}" //主題
)
public class SmsMessageCommonConsumer implements RocketMQListener<String> {
public void onMessage(String message) {
System.out.println("普通簡訊:" + message);
}
}
消費者實現類也可以實現 RocketMQListener<MessageExt>, 在 onMessage 方法里通過 RocketMQ 原生消息對象 MessageExt 獲取更詳細的消息數據 。
public void onMessage(MessageExt message) {
try {
String body = new String(message.getBody(), "UTF-8");
logger.info("普通簡訊:" + message);
} catch (Exception e) {
logger.error("common onMessage error:", e);
}
}
4 源碼概覽
最新源碼中,我們可以看到源碼中包含四個模塊:
1、rocketmq-spring-boot-parent
該模塊是父模塊,定義項目所有依賴的 jar 包。
2、rocketmq-spring-boot
核心模塊,實現了 starter 的核心邏輯。
3、rocketmq-spring-boot-starter
SDK 模塊,簡單封裝,外部項目引用。
4、rocketmq-spring-boot-samples
示例代碼模塊。這個模塊非常重要,當用戶使用 SDK 時,可以參考示例快速開發。
5 starter 實現
我們重點分析下 rocketmq-spring-boot 模塊的核心源碼:
spring-boot-starter 實現需要包含如下三個部分:
1、定義 Spring 自身的依賴包和 RocketMQ 的依賴包 ;
2、定義spring.factories 文件
在 resources 包下創建 META-INF 目錄後,新建 spring.factories 文件,併在文件中定義自動載入類,文件內容是:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration
spring boot 會根據文件中配置的自動化配置類來自動初始化相關的 Bean、Component 或 Service。
3、實現自動載入類
在 RocketMQAutoConfiguration 類的具體實現中,我們重點分析下生產者和消費者是如何分別啟動的。
▍生產者發送模板類:RocketMQTemplate
RocketMQAutoConfiguration 類定義了兩個預設的 Bean :
首先SpringBoot項目中配置文件中的配置值會根據屬性條件綁定到 RocketMQProperties 對象 中,然後使用 RocketMQ 的原生 API 分別創建生產者 Bean 和拉取消費者 Bean , 分別將兩個 bean 設置到 RocketMQTemplate 對象中。
兩個重點需要強調:
-
發送消息時,將 spring-message 規範下的消息對象封裝成 RocketMQ 消息對象
-
預設拉取消費者 litePullConsumer 。拉取消費者一般用於大數據批量處理場景 。
RocketMQTemplate 類封裝了拉取消費者的receive方法,以方便開發者使用。
▍自定義消費者類
下圖是併發消費者的例子:
那麼 rocketmq-spring 是如何自動啟動消費者呢 ?
spring 容器首先註冊了消息監聽器後置處理器,然後調用 ListenerContainerConfiguration 類的 registerContainer 方法 。
對比併發消費者的例子,我們可以看到: DefaultRocketMQListenerContainer 是對 DefaultMQPushConsumer 消費邏輯的封裝。
封裝消費消息的邏輯,同時滿足 RocketMQListener 泛化介面支持不同參數,比如 String 、MessageExt 、自定義對象 。
首先DefaultRocketMQListenerContainer初始化之後, 獲取 onMessage 方法的參數類型 。
然後消費者調用 consumeMessage 處理消息時,封裝了一個 handleMessage 方法 ,將原生 RocketMQ 消息對象 MessageExt 轉換成 onMessage 方法定義的參數對象,然後調用 rocketMQListener 的 onMessage 方法。
上圖右側標紅的代碼也就是該方法的精髓:
rocketMQListener.onMessage(doConvertMessage(messageExt));
6 寫到最後
開源項目 rocketmq-spring 有很多值得學習的地方 ,我們可以從如下四個層面逐層進階:
1、學會如何使用 :參考 rocketmq-spring-boot-samples 模塊的示例代碼,學會如何發送和接收消息,快速編碼;
2、模塊設計:學習項目的模塊分層 (父模塊、SDK 模塊、核心實現模塊、示例代碼模塊);
3、starter 設計思路 :定義自動配置文件 spring.factories 、設計配置屬性類 、在 RocketMQ client 的基礎上實現優雅的封裝、深入理解 RocketMQ 源碼等;
4、舉一反三:當我們理解了 rocketmq-spring 的源碼,我們可以嘗試模仿該項目寫一個簡單的 spring boot starter。
如果我的文章對你有所幫助,還請幫忙點贊、在看、轉發一下,你的支持會激勵我輸出更高質量的文章,非常感謝!