使用RocketMQ消費消息

来源:https://www.cnblogs.com/interviewClever/archive/2022/04/18/16163325.html
-Advertisement-
Play Games

RocketMQ消費端 今天要來跟大家學習怎樣使用RocketMQ來進行消息的消費 先簡單創建個Maven項目使用 添加依賴 <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifa ...


RocketMQ消費端

今天要來跟大家學習怎樣使用RocketMQ來進行消息的消費

先簡單創建個Maven項目使用

  • 添加依賴
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.9.2</version>
</dependency>
  • 啟動消費者

    package mq.consumer;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
    
    import java.util.List;
    
    public class BroadcastConsumer {
        public static void main(String[] args) throws MQClientException {
            //創建一個push模式的消費組
            DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("pushConsumer");
            pushConsumer.setNamesrvAddr("localhost:9876");
            //集群模式
            pushConsumer.setMessageModel(MessageModel.CLUSTERING);
            //  訂閱的topic tag
            pushConsumer.subscribe("topic_test01","Tag1 || Tag2");
    
            pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            pushConsumer.start();
            System.out.printf("Broadcast Consumer Started.%n");
    
        }
    
    
  • 啟動生產者

    package mq.producer;
    
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    
    import java.io.UnsupportedEncodingException;
    
    public class SyncProducerV2 {
        /**
         * 同步消息發送
         *
         * @param args
         * @throws MQClientException
         * @throws MQBrokerException
         * @throws RemotingException
         * @throws InterruptedException
         * @throws UnsupportedEncodingException
         */
        public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException, UnsupportedEncodingException {
            System.out.println("SyncProducer start......");
            DefaultMQProducer defaultMQProducer = new DefaultMQProducer("pg_sync_01");
            defaultMQProducer.setNamesrvAddr("localhost:9876");
            defaultMQProducer.start();
            for (int i = 0; i < 10; i++) {
                send(defaultMQProducer, i, i % 3);
            }
            defaultMQProducer.shutdown();
            System.out.println("SyncProducer end......");
    
        }
    
        private static void send(DefaultMQProducer defaultMQProducer, Integer i, int tag) throws MQClientException, RemotingException, MQBrokerException, InterruptedException, UnsupportedEncodingException {
            SendResult sendResult = defaultMQProducer.send(new Message("topic_test01", "Tag" + tag, ("hello this is sync message_" + i + "!").getBytes(RemotingHelper.DEFAULT_CHARSET)));
            System.out.println(sendResult);
        }
    }
    
  • 消費者消費

可以看到消費了Tag為Tag1、Tag2的消息

image

其它Tag會被過濾掉

image

消費分類

RocketMQ的消費模式分為兩種:BROADCASTING(廣播)和CLUSTERING(集群)

那這兩種模式有什麼區別呢?

  • 廣播:相同消費組下的實例會重覆消費同一個Topic的消息,可以理解為大家做同樣的工作,消費進度存儲在客戶端,有可能會導致部分消息沒有被消費
  • 集群:相同消費組下的實例會負載均衡地消費同一個Topic的消息,可以理解為分工合作,消費進度存儲在Broker端

所以大部分系統都會使用集群模式去消費信息,畢竟可以水平拓展消費者來承受更大的消費壓力

廣播模式相對來說使用比較少,一般都是一些消息通知同步的場景,想同步刷新緩存等

本文由博客一文多發平臺 OpenWrite 發佈!


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 一、什麼是協商緩存 協商緩存是伺服器端的一種緩存策略,服務端提供一種記號,用來判斷客戶端資源和服務端是否一樣。 一致返回304,否則返回200和新資源。 二、如何實現 主要是通過在response header中攜帶相關標識 一種通過last-modified資源的最後修改時間 第一次請求時,伺服器 ...
  • 本文將介紹利用 CSS 實現滾動視差效果的一個小技巧,並且,利用這個技巧來製作一些有意思的交互特效。 關於使用 CSS 實現滾動視差效果,在之前有一篇文章詳細描述過具體方案 - CSS 實現視差效果,感興趣的同學可以先看看這篇文章。 這裡,會運用上這樣一種純 CSS 的視差技巧: 使用 transf ...
  • 德國科技管理專家斯坦門茨早年移居美國,他以非凡的才能成為美國企業界的佼佼者。一次,美國著名的福特公司的一組電機發生故障,在束手無策之時,公司請斯坦門茨出馬解決問題。 斯坦門茨在電機旁仔細觀察,經過計算,用粉筆在電機外殼划了一條線,說:“從這裡打開,把裡面的線圈減少16圈。”工人們照他說的一試,電機果 ...
  • 策略模式是什麼 策略模式是一種行為設計模式, 它能讓你定義一系列演算法, 並將每種演算法分別放入獨立的類中, 以使演算法的對象能夠相互替換。 為什麼用策略模式 當你想使用對象中各種不同的演算法變體,並希望能在運行時切換演算法時,可使用策略模式。策略模式讓你能將不同行為抽取到一個獨立類層次結構中, 並將原始類組 ...
  • 博客推行版本更新,成果積累制度,已經寫過的博客還會再次更新,不斷地琢磨,高質量高數量都是要追求的,工匠精神是學習必不可少的精神。因此,大家有何建議歡迎在評論區踴躍發言,你們的支持是我最大的動力,你們敢投,我就敢肝 ...
  • 一、chiner介紹 在chiner出現之前進行資料庫建模設計的時候,大部分時間使用的是PowerDesigner。說實話基本上是偷偷的用,因為大家都知道PD是收費軟體,到處和國內廠商打官司。不僅如此,PowerDesigner古老守舊的的設計界面已經快讓讓我忍不了了,一直想尋找一款PD的替代品。 ...
  • Apache Flink是一個在無界和有界數據流上進行有狀態計算的框架。Flink提供了不同抽象級別的多個API,併為常見用例提供了專用庫。 在這裡,我們介紹Flink易於使用且富有表現力的API和庫。 流媒體應用的構建塊 流處理框架可以構建和執行的應用程式類型取決於該框架對流、狀態和時間的控製程度 ...
  • 虛擬環境搭建 我們進行開發的時候虛擬環境搭建尤為重要,我們如果需要的python解釋器模塊版本不一樣可以採用這個辦法 pycharm中搭建 命令創建虛擬環境 比如centos沒有圖形化界面的話,沒法裝pycharm,沒法點點點創建,只能使用命令 步驟 第一步:安裝 pip3 install virt ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...