rocketmq-spring : 實戰與源碼解析一網打盡

来源:https://www.cnblogs.com/makemylife/archive/2023/04/02/17280455.html
-Advertisement-
Play Games

RocketMQ 是大家耳熟能詳的消息隊列,開源項目 rocketmq-spring 可以幫助開發者在 Spring Boot 項目中快速整合 RocketMQ。 這篇文章會介紹 Spring Boot 項目使用 rocketmq-spring SDK 實現消息收發的操作流程,同時筆者會從開發者的角 ...


RocketMQ 是大家耳熟能詳的消息隊列,開源項目 rocketmq-spring 可以幫助開發者在 Spring Boot 項目中快速整合 RocketMQ。

這篇文章會介紹 Spring Boot 項目使用 rocketmq-spring SDK 實現消息收發的操作流程,同時筆者會從開發者的角度解讀 SDK 的設計邏輯

1 SDK 簡介


項目地址:

https://github.com/apache/rocketmq-spring

rocketmq-spring 的本質是一個 Spring Boot starter

Spring Boot 基於“約定大於配置”(Convention over configuration)這一理念來快速地開發、測試、運行和部署 Spring 應用,並能通過簡單地與各種啟動器(如 spring-boot-web-starter)結合,讓應用直接以命令行的方式運行,不需再部署到獨立容器中。

Spring Boot starter 構造的啟動器使用起來非常方便,開發者只需要在 pom.xml 引入 starter 的依賴定義,在配置文件中編寫約定的配置即可。

下麵我們看下 rocketmq-spring-boot-starter 的配置:

1、引入依賴

<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-spring-boot-starter</artifactId>
  <version>2.2.3</version>
</dependency>

2、約定配置

接下來,我們分別按照生產者和消費者的順序,詳細的講解消息收發的操作過程。

2 生產者

首先我們添加依賴後,進行如下三個步驟:

1、配置文件中配置如下

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
      group: platform-sms-server-group
    # access-key: myaccesskey
    # secret-key: mysecretkey
  topic: sms-common-topic

生產者配置非常簡單,主要配置名字服務地址生產者組

2、需要發送消息的類中註入 RcoketMQTemplate

@Autowired
private RocketMQTemplate rocketMQTemplate;

@Value("${rocketmq.topic}")
private String smsTopic;

3、發送消息,消息體可以是自定義對象,也可以是 Message 對象

rocketMQTemplate 類包含多鐘發送消息的方法:

  1. 同步發送 syncSend
  2. 非同步發送 asyncSend
  3. 順序發送 syncSendOrderly
  4. oneway發送 sendOneWay

下麵的代碼展示如何同步發送消息。

String destination = StringUtils.isBlank(tags) ? topic : topic + ":" + tags;
SendResult sendResult =
         rocketMQTemplate.syncSend(
            destination, 
            MessageBuilder.withPayload(messageContent).
            setHeader(MessageConst.PROPERTY_KEYS, uniqueId).
            build()
          );
if (sendResult != null) {
    if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
       // send message success ,do something 
    }
}

syncSend 方法的第一個參數是發送的目標,格式是:topic + ":" + tags

第二個參數是:spring-message 規範的 message 對象 ,而 MessageBuilder 是一個工具類,方法鏈式調用創建消息對象。

3 消費者

1、配置文件中配置如下

rocketmq:
  name-server: 127.0.0.1:9876
  consumer1:
    group: platform-sms-worker-common-group
    topic: sms-common-topic

2、實現消息監聽器

@Component
@RocketMQMessageListener(
    consumerGroup = "${rocketmq.consumer1.group}",  //消費組
    topic = "${rocketmq.consumer1.topic}"  					//主題
)
public class SmsMessageCommonConsumer implements RocketMQListener<String> {
    public void onMessage(String message) {
       System.out.println("普通簡訊:" + message);
    }
}

消費者實現類也可以實現 RocketMQListener<MessageExt>, 在 onMessage 方法里通過 RocketMQ 原生消息對象 MessageExt 獲取更詳細的消息數據

public void onMessage(MessageExt message) {
    try {
        String body = new String(message.getBody(), "UTF-8");
        logger.info("普通簡訊:" + message);
    } catch (Exception e) {
        logger.error("common onMessage error:", e);
    }
}

4 源碼概覽

最新源碼中,我們可以看到源碼中包含四個模塊:

1、rocketmq-spring-boot-parent

該模塊是父模塊,定義項目所有依賴的 jar 包。

2、rocketmq-spring-boot

核心模塊,實現了 starter 的核心邏輯。

3、rocketmq-spring-boot-starter

SDK 模塊,簡單封裝,外部項目引用。

4、rocketmq-spring-boot-samples

示例代碼模塊。這個模塊非常重要,當用戶使用 SDK 時,可以參考示例快速開發。

5 starter 實現

我們重點分析下 rocketmq-spring-boot 模塊的核心源碼:


spring-boot-starter 實現需要包含如下三個部分:

1、定義 Spring 自身的依賴包和 RocketMQ 的依賴包 ;

2、定義spring.factories 文件

在 resources 包下創建 META-INF 目錄後,新建 spring.factories 文件,併在文件中定義自動載入類,文件內容是:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration

spring boot 會根據文件中配置的自動化配置類來自動初始化相關的 Bean、Component 或 Service。

3、實現自動載入類

在 RocketMQAutoConfiguration 類的具體實現中,我們重點分析下生產者和消費者是如何分別啟動的。

▍生產者發送模板類:RocketMQTemplate

RocketMQAutoConfiguration 類定義了兩個預設的 Bean :

首先SpringBoot項目中配置文件中的配置值會根據屬性條件綁定到 RocketMQProperties 對象 中,然後使用 RocketMQ 的原生 API 分別創建生產者 Bean 和拉取消費者 Bean , 分別將兩個 bean 設置到 RocketMQTemplate 對象中。

兩個重點需要強調:

  • 發送消息時,將 spring-message 規範下的消息對象封裝成 RocketMQ 消息對象

  • 預設拉取消費者 litePullConsumer 。拉取消費者一般用於大數據批量處理場景 。

    原生使用方式

​ RocketMQTemplate 類封裝了拉取消費者的receive方法,以方便開發者使用。

▍自定義消費者類

下圖是併發消費者的例子:

消費者示例代碼

那麼 rocketmq-spring 是如何自動啟動消費者呢 ?

spring 容器首先註冊了消息監聽器後置處理器,然後調用 ListenerContainerConfiguration 類的 registerContainer 方法 。

對比併發消費者的例子,我們可以看到: DefaultRocketMQListenerContainer 是對 DefaultMQPushConsumer 消費邏輯的封裝。

封裝消費消息的邏輯,同時滿足 RocketMQListener 泛化介面支持不同參數,比如 String 、MessageExt 、自定義對象 。

首先DefaultRocketMQListenerContainer初始化之後, 獲取 onMessage 方法的參數類型 。

然後消費者調用 consumeMessage 處理消息時,封裝了一個 handleMessage 方法 ,將原生 RocketMQ 消息對象 MessageExt 轉換成 onMessage 方法定義的參數對象,然後調用 rocketMQListener 的 onMessage 方法。

mnjh9

上圖右側標紅的代碼也就是該方法的精髓:

rocketMQListener.onMessage(doConvertMessage(messageExt));

6 寫到最後

開源項目 rocketmq-spring 有很多值得學習的地方 ,我們可以從如下四個層面逐層進階:

1、學會如何使用 :參考 rocketmq-spring-boot-samples 模塊的示例代碼,學會如何發送和接收消息,快速編碼;

2、模塊設計:學習項目的模塊分層 (父模塊、SDK 模塊、核心實現模塊、示例代碼模塊);

3、starter 設計思路 :定義自動配置文件 spring.factories 、設計配置屬性類 、在 RocketMQ client 的基礎上實現優雅的封裝、深入理解 RocketMQ 源碼等;

4、舉一反三:當我們理解了 rocketmq-spring 的源碼,我們可以嘗試模仿該項目寫一個簡單的 spring boot starter。


如果我的文章對你有所幫助,還請幫忙點贊、在看、轉發一下,你的支持會激勵我輸出更高質量的文章,非常感謝!


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 1. SQL的弱點 1.1. SQL語句的執行結果轉換為想要的格式 1.1.1. 格式轉換 1.1.2. SQL語言本來就不是為了這個目的而出現的 1.1.3. SQL終究也只是主要用於查詢數據的語言而已 1.2. 生成報表的功能 1.2.1. 視窗函數 1.3. SQL不是用來生成報表的語言,所以 ...
  • 在docker中配置Oracle11g docker鏡像拉取及相關配置 1.在docker打開的情況下,使用下方命令拉去鏡像,大概需要下載3個G的image文件 docker pull registry.cn-hangzhou.aliyuncs.com/helowin/oracle_11g 2.啟動 ...
  • 1. 關聯子查詢 1.1. 關聯子查詢和自連接在很多時候都是等價的 1.2. 使用SQL進行行間比較時,發揮主要作用的技術是關聯子查詢,特別是與自連接相結合的“自關聯子查詢” 1.3. 缺點 1.3.1. 代碼的可讀性不好 1.3.1.1. 特別是在計算累計值和移動平均值的例題里,與聚合一起使用後, ...
  • 在IOS的界面開發中,佈局方式存在分裂的兩種局面。 早入行的iOS開發者,基本上都是手動計算各種高度。但是從web或者Android轉過去的學習iOS的開發者,基本上都很難適應這種手動計算的方式,更加偏向使用autolayout去做佈局。 實際上 iPhone6之後,iOS設備出現多種解析度之後,蘋 ...
  • 前言 在開發過程中,取消需求是很常見的,但很容易被忽略。然而,取消需求的好處也很大。例如,在頁面中會發送很多請求。如果頁面被切走並處於不可見狀態,就需要取消未完成的請求任務。如果未及時取消,則可能會導致以下負面影響: 消耗用戶額外數據流量。 任務回調持有全局上下文變數,未及時釋放存在記憶體泄漏風險 異 ...
  • 前言 唯傑地圖VJMAP 為CAD圖或自定義地圖格式WebGIS可視化顯示開發提供的一站式解決方案,支持的格式如常用的AutoCAD的DWG格式文件、GeoJSON等常用GIS文件格式,它使用 WebGL矢量圖塊和自定義樣式呈現互動式地圖, 提供了全新的大數據可視化可視化功能。 唯傑地圖可視化平臺旨 ...
  • 目標:用三種方式實現簡易的計算器(計算屬性,監聽器,方法) 1.創建html,導入vue,實例化vue對象。 <!DOCTYPE html> <html> <head> <meta charset="utf-8" /> <title></title> <script type="text/javas ...
  • 本博文介紹CSS三大特性之一:優先順序。 1 規則 (1)若選擇器相同,則執行層疊性(層疊性:給相同的選擇器設置相同的樣式,則“後來居上”,後面設置的樣式會覆蓋前面設置的樣式); (2)若選擇器不同,則有優先順序。 2 選擇器權重 常見的選擇器權重如下表: 選擇器 權重 繼承或者*(通配符) 0,0,0 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...