SpringBoot如何優雅的使用RocketMQ

来源:https://www.cnblogs.com/SimpleWu/archive/2019/12/28/12112351.html
-Advertisement-
Play Games

[TOC] SpringBoot如何優雅的使用RocketMQ MQ,是一種跨進程的通信機制,用於上下游傳遞消息。在傳統的互聯網架構中通常使用MQ來對上下游來做解耦合。 舉例:當A系統對B系統進行消息通訊,如A系統發佈一條系統公告,B系統可以訂閱該頻道進行系統公告同步,整個過程中A系統並不關係B系統 ...


目錄

SpringBoot如何優雅的使用RocketMQ

MQ,是一種跨進程的通信機制,用於上下游傳遞消息。在傳統的互聯網架構中通常使用MQ來對上下游來做解耦合。

舉例:當A系統對B系統進行消息通訊,如A系統發佈一條系統公告,B系統可以訂閱該頻道進行系統公告同步,整個過程中A系統並不關係B系統會不會同步,由訂閱該頻道的系統自行處理。

什麼是RocketMQ?

官方說明:

隨著使用越來越多的隊列和虛擬主題,ActiveMQ IO模塊遇到了瓶頸。我們儘力通過節流,斷路器或降級來解決此問題,但效果不佳。因此,我們那時開始關註流行的消息傳遞解決方案Kafka。不幸的是,Kafka不能滿足我們的要求,特別是在低延遲和高可靠性方面。

看到這裡可以很清楚的知道RcoketMQ 是一款低延遲、高可靠、可伸縮、易於使用的消息中間件。

具有以下特性:

  • 支持發佈/訂閱(Pub/Sub)和點對點(P2P)消息模型
  • 能夠保證嚴格的消息順序,在一個隊列中可靠的先進先出(FIFO)和嚴格的順序傳遞
  • 提供豐富的消息拉取模式,支持拉(pull)和推(push)兩種消息模式
  • 單一隊列百萬消息的堆積能力,億級消息堆積能力
  • 支持多種消息協議,如 JMS、MQTT 等
  • 分散式高可用的部署架構,滿足至少一次消息傳遞語義

RocketMQ環境安裝

下載地址:https://rocketmq.apache.org/dowloading/releases/

從官方下載二進位或者源碼來進行使用。源碼編譯需要Maven3.2x,JDK8

在根目錄進行打包:

mvn -Prelease-all -DskipTests clean packager -U

distribution/target/apache-rocketmq文件夾中會存在一個文件夾版,zip,tar三個可運行的完整程式。

使用rocketmq-4.6.0.zip:

  1. 啟動名稱服務 mqnamesrv.cmd
  2. 啟動數據中心 mqbroker.cmd -n localhost:9876

SpringBoot環境中使用RocketMQ

SpringBoot 入門:https://www.cnblogs.com/SimpleWu/p/10027237.html
SpringBoot 常用start:https://www.cnblogs.com/SimpleWu/p/9798146.html
當前環境版本為:

  • SpringBoot 2.0.6.RELEASE
  • SpringCloud Finchley.RELEASE
  • SpringCldod Alibaba 0.2.1.RELEASE
  • RocketMQ 4.3.0
    在項目工程中導入:
<!-- MQ Begin -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>${rocketmq.version}</version>
</dependency>
<!-- MQ End -->

由於我們這邊已經有工程了所以就不在進行創建這種過程了。主要是看看如何使用RocketMQ。
創建RocketMQProperties配置屬性類,類中內容如下:

@ConfigurationProperties(prefix = "rocketmq")
public class RocketMQProperties {
    private boolean isEnable = false;
    private String namesrvAddr = "localhost:9876";
    private String groupName = "default";
    private int producerMaxMessageSize = 1024;
    private int producerSendMsgTimeout = 2000;
    private int producerRetryTimesWhenSendFailed = 2;
    private int consumerConsumeThreadMin = 5;
    private int consumerConsumeThreadMax = 30;
    private int consumerConsumeMessageBatchMaxSize = 1;
    //省略get set
}

現在我們所有子系統中的生產者,消費者對應:
isEnable 是否開啟mq
namesrvAddr 集群地址
groupName 分組名稱
設置為統一已方便系統對接,如有其它需求在進行擴展,類中我們已經給了預設值也可以在配置文件或配置中心中獲取配置,配置如下:

#發送同一類消息的設置為同一個group,保證唯一,預設不需要設置,rocketmq會使用ip@pid(pid代表jvm名字)作為唯一標示
rocketmq.groupName=please_rename_unique_group_name
#是否開啟自動配置
rocketmq.isEnable=true
#mq的nameserver地址
rocketmq.namesrvAddr=127.0.0.1:9876
#消息最大長度 預設1024*4(4M)
rocketmq.producer.maxMessageSize=4096
#發送消息超時時間,預設3000
rocketmq.producer.sendMsgTimeout=3000
#發送消息失敗重試次數,預設2
rocketmq.producer.retryTimesWhenSendFailed=2
#消費者線程數量
rocketmq.consumer.consumeThreadMin=5
rocketmq.consumer.consumeThreadMax=32
#設置一次消費消息的條數,預設為1條
rocketmq.consumer.consumeMessageBatchMaxSize=1

創建消費者介面 RocketConsumer.java 該介面用戶約束消費者需要的核心步驟:

/**
 * 消費者介面
 * 
 * @author SimpleWu
 *
 */
public interface RocketConsumer {

/**
     * 初始化消費者
     */
    public abstract void init();

    /**
     * 註冊監聽
     * 
     * @param messageListener
     */
    public void registerMessageListener(MessageListener messageListener);

}

創建抽象消費者 AbstractRocketConsumer.java:

/**
 * 消費者基本信息
 * 
 * @author SimpelWu
 */
public abstract class AbstractRocketConsumer implements RocketConsumer {

    protected String topics;
    protected String tags;
    protected MessageListener messageListener;
    protected String consumerTitel;
    protected MQPushConsumer mqPushConsumer;

    /**
     * 必要的信息
     * 
     * @param topics
     * @param tags
     * @param consumerTitel
     */
    public void necessary(String topics, String tags, String consumerTitel) {
        this.topics = topics;
        this.tags = tags;
        this.consumerTitel = consumerTitel;
    }

    public abstract void init();

    @Override
    public void registerMessageListener(MessageListener messageListener) {
        this.messageListener = messageListener;
    }
    
}

在類中我們必須指定這個topics,tags與消息監聽邏輯
public abstract void init();該方法是用於初始化消費者,由子類實現。
接下來我們編寫自動配置類RocketMQConfiguation.java,該類用戶初始化一個預設的生產者連接,以及載入所有的消費者。
@EnableConfigurationProperties({ RocketMQProperties.class }) 使用該配置文件
@Configuration 標註為配置類
@ConditionalOnProperty(prefix = "rocketmq", value = "isEnable", havingValue = "true") 只有當配置中指定rocketmq.isEnable = true的時候才會生效
核心內容如下:

/**
 * mq配置
 * 
 * @author SimpleWu
 */
@Configuration
@EnableConfigurationProperties({ RocketMQProperties.class })
@ConditionalOnProperty(prefix = "rocketmq", value = "isEnable", havingValue = "true")
public class RocketMQConfiguation {

    private RocketMQProperties properties;

    private ApplicationContext applicationContext;

    private Logger log = LoggerFactory.getLogger(RocketMQConfiguation.class);

    public RocketMQConfiguation(RocketMQProperties properties, ApplicationContext applicationContext) {
        this.properties = properties;
        this.applicationContext = applicationContext;
    }

    /**
     * 註入一個預設的消費者
     * @return
     * @throws MQClientException
     */
    @Bean
    public DefaultMQProducer getRocketMQProducer() throws MQClientException {
        if (StringUtils.isEmpty(properties.getGroupName())) {
            throw new MQClientException(-1, "groupName is blank");
        }

        if (StringUtils.isEmpty(properties.getNamesrvAddr())) {
            throw new MQClientException(-1, "nameServerAddr is blank");
        }
        DefaultMQProducer producer;
        producer = new DefaultMQProducer(properties.getGroupName());

        producer.setNamesrvAddr(properties.getNamesrvAddr());
        // producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");

        // 如果需要同一個jvm中不同的producer往不同的mq集群發送消息,需要設置不同的instanceName
        // producer.setInstanceName(instanceName);
        producer.setMaxMessageSize(properties.getProducerMaxMessageSize());
        producer.setSendMsgTimeout(properties.getProducerSendMsgTimeout());
        // 如果發送消息失敗,設置重試次數,預設為2次
        producer.setRetryTimesWhenSendFailed(properties.getProducerRetryTimesWhenSendFailed());

        try {
            producer.start();
            log.info("producer is start ! groupName:{},namesrvAddr:{}", properties.getGroupName(),
                    properties.getNamesrvAddr());
        } catch (MQClientException e) {
            log.error(String.format("producer is error {}", e.getMessage(), e));
            throw e;
        }
        return producer;

    }

    /**
     * SpringBoot啟動時載入所有消費者
     */
    @PostConstruct
    public void initConsumer() {
        Map<String, AbstractRocketConsumer> consumers = applicationContext.getBeansOfType(AbstractRocketConsumer.class);
        if (consumers == null || consumers.size() == 0) {
            log.info("init rocket consumer 0");
        }
        Iterator<String> beans = consumers.keySet().iterator();
        while (beans.hasNext()) {
            String beanName = (String) beans.next();
            AbstractRocketConsumer consumer = consumers.get(beanName);
            consumer.init();
            createConsumer(consumer);
            log.info("init success consumer title {} , toips {} , tags {}", consumer.consumerTitel, consumer.tags,
                    consumer.topics);
        }
    }

    /**
     * 通過消費者信心創建消費者
     * 
     * @param consumerPojo
     */
    public void createConsumer(AbstractRocketConsumer arc) {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(this.properties.getGroupName());
        consumer.setNamesrvAddr(this.properties.getNamesrvAddr());
        consumer.setConsumeThreadMin(this.properties.getConsumerConsumeThreadMin());
        consumer.setConsumeThreadMax(this.properties.getConsumerConsumeThreadMax());
        consumer.registerMessageListener(arc.messageListenerConcurrently);
        /**
         * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費 如果非第一次啟動,那麼按照上次消費的位置繼續消費
         */
        // consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        /**
         * 設置消費模型,集群還是廣播,預設為集群
         */
        // consumer.setMessageModel(MessageModel.CLUSTERING);

        /**
         * 設置一次消費消息的條數,預設為1條
         */
        consumer.setConsumeMessageBatchMaxSize(this.properties.getConsumerConsumeMessageBatchMaxSize());
        try {
            consumer.subscribe(arc.topics, arc.tags);
            consumer.start();
            arc.mqPushConsumer=consumer;
        } catch (MQClientException e) {
            log.error("info consumer title {}", arc.consumerTitel, e);
        }

    }

}

然後在src/main/resources文件夾中創建目錄與文件META-INF/spring.factories裡面添加自動配置類即可開啟啟動配置,我們只需要導入依賴即可:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.xcloud.config.rocketmq.RocketMQConfiguation

接下來在服務中導入依賴,然後通過我們的抽象類獲取所有必要信息對消費者進行創建,該步驟會在所有消費者初始化完成後進行,且只會管理是Spring Bean的消費者。
下麵我們看看如何創建一個消費者,創建消費者的步驟非常簡單,只需要繼承AbstractRocketConsumer然後再加上Spring的@Component就能夠完成消費者的創建,我們可以在類中自定義消費的主題與標簽。
在項目可以根據需求當消費者創建失敗的時候是否繼續啟動工程。
創建一個預設的消費者 DefaultConsumerMQ.java

@Component
public class DefaultConsumerMQ extends AbstractRocketConsumer {
    /**
     * 初始化消費者
     */
    @Override
    public void init() {
        // 設置主題,標簽與消費者標題
        super.necessary("TopicTest", "*", "這是標題");
        //消費者具體執行邏輯
        registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                msgs.forEach(msg -> {
                    System.out.printf("consumer message boyd %s %n", new String(msg.getBody()));
                });
                // 標記該消息已經被成功消費
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
    }
}

super.necessary("TopicTest", "*", "這是標題"); 是必須要設置的,代表該消費者監聽TopicTest主題下所有tags,標題那個欄位是我自己定義的,所以對於該配置來說沒什麼意義。
我們可以在這裡註入Spring的Bean來進行任意邏輯處理。
創建一個消息發送類進行測試

@Override
public String qmtest(@PathVariable("name")String name) throws MQClientException, RemotingException, MQBrokerException, InterruptedException, UnsupportedEncodingException {
    Message msg = new Message("TopicTest", "tags1", name.getBytes(RemotingHelper.DEFAULT_CHARSET));
    // 發送消息到一個Broker
    SendResult sendResult = defaultMQProducer.send(msg);
    // 通過sendResult返回消息是否成功送達
    System.out.printf("%s%n", sendResult);
    return null;
}

我們來通過Http請求測試:

http://localhost:10001/demo/base/mq/hello  consumer message boyd hello 
http://localhost:10001/demo/base/mq/嘿嘿嘿嘿嘿  consumer message boyd 嘿嘿嘿嘿嘿 

好了到這裡簡單的start算是設計完成了,後面還有一些:順序消息生產,順序消費消息,非同步消息生產等一系列功能,官人可參照官方去自行處理。

  • ActiveMQ 沒經過大規模吞吐量場景的驗證,社區不高不活躍。
  • RabbitMQ 集群動態擴展麻煩,且與當前程式語言不至於難以定製化。
  • kafka 支持主要的MQ功能,功能無法達到程式需求的要求,所以不使用,且與當前程式語言不至於難以定製化。
  • rocketMQ 經過全世界的女人的洗禮,已經很強大;MQ功能較為完善,還是分散式的,擴展性好;支持複雜MQ業務場景。(業務複雜可做首選)

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 素描作為一種近乎完美的表現手法有其獨特的魅力,隨著數字技術的發展,素描早已不再是專業繪畫師的專利,今天這篇文章就來講一講如何使用python批量獲取小姐姐素描畫像。文章共分兩部分: 第一部分介紹兩種使用python生成素描畫的思路 第二部分介紹如何批量獲取素描畫 一、獲取素描圖的兩個思路 本部分介紹 ...
  • Struts2整合AJAX有2種方式: 使用type="stream"類型的<result> 使用JSON插件 使用type="stream"類型的<result> 獲取text 前端 <body> <form> 學號:<input type="text" id="no"><br /> 姓名:<in ...
  • 將pip的下載源換成國內的,速度會有很大的提升。 下麵是一些國內常用鏡像: 阿裡雲 http://mirrors.aliyun.com/pypi/simple/ 中國科技大學 https://pypi.mirrors.ustc.edu.cn/simple/ 豆瓣(douban) http://pyp ...
  • ArrayList和LinkedList的區別 步驟 1 : ArrayList和LinkedList的區別 ArrayList , 插入,刪除數據慢 LinkedList, 插入,刪除數據快 ArrayList是順序結構,所以 定位很快 ,指哪找哪。 就像電影院位置一樣,有了電影票,一下就找到位置 ...
  • Cookie Cookie 是一種伺服器發送給瀏覽器以鍵值對形式存儲小量信息的技術。 當瀏覽器首次請求伺服器時,伺服器會將一條信息封裝成一個Cookie發送給瀏覽器,瀏覽器收到Cookie,會將它保存在記憶體中(註意這裡的記憶體是本機記憶體,而不是伺服器記憶體)或者本地文件,那之後每次向伺服器發送請求,瀏覽 ...
  • Java ArrayList和HashSet的區別 示例 1 : 是否有順序 ArrayList: 有順序 HashSet: 無順序 HashSet的具體順序,既不是按照插入順序,也不是按照hashcode的順序。 以下是 HasetSet源代碼 中的部分註釋 / It makes no guara ...
  • 運行環境:centos 7,jdk 1.8 問題一: 原因:無法創建本地文件問題,用戶最大可創建文件數太小 解決方案:切換到root用戶,編輯limits.conf配置文件, 添加類似如下內容: vim /etc/security/limits.conf 添加如下內容:* soft nofile 6 ...
  • 最近又學到了很多新知識,感謝優銳課老師細緻地講解,這篇博客記錄下自己所學所想。 想更多地瞭解Spring Boot項目中的功能測試嗎?這篇文章帶你瞭解有關在測試中使用Docker容器的更多信息。 本文重點介紹在Spring Boot應用程式的功能測試期間應用一些最佳實踐。我們將演示一種高級方法,該方 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...