本地啟動 NameServer 和 Broker | 讀 RocketMQ 源碼前的準備工作

来源:https://www.cnblogs.com/shuiyj/archive/2020/06/30/13215978.html
-Advertisement-
Play Games

clone 並導入源碼 本地啟動 NameServer 本地啟動 Broker 本地運行生產者與消費者代碼 完成上述步驟之後,RocketMQ的源碼環境就搭建完畢了,之後就可以在本地啟動以及收發消息,調試和分析RocketMQ的源碼了。 clone 並導入源碼 在 github 上選擇對應的的代碼 ...


  1. clone 並導入源碼
  2. 本地啟動 NameServer
  3. 本地啟動 Broker
  4. 本地運行生產者與消費者代碼

完成上述步驟之後,RocketMQ的源碼環境就搭建完畢了,之後就可以在本地啟動以及收發消息,調試和分析RocketMQ的源碼了。

clone 並導入源碼

在 github 上選擇對應的的代碼 https://github.com/apache/rocketmq/tree/rocketmq-all-4.7.0,將其 clone 下來,再切出 4.7.0 版本的源碼。Clone 到本地之後,用 IDEA 打開項目。

clone代碼

項目結構

目錄結構

模塊 作用
broker Broker 相關代碼
client Producer、Consumer 客戶端代碼,用於生產消息、消費消息
common 公共代碼
dev 開發相關的信息
distribution 部署相關,比如配置文件
example 例子
filter 過濾器
logappender 日誌相關
logging 日誌相關
namesvr NameServer
openmessaging 開放消息標準
remoting 遠程網路通信,基於 netty 實現
srvutil 工具類
store 消息如何在 Broker 中進行存儲相關代碼
style 代碼檢查
test 測試
tools 命令行監控

本地啟動 NameServer

接下來我們要做的是在本地啟動 NameServer,包括兩個步驟:

  1. 在 IDEA 中配置啟動相關的信息,NameServer 的啟動類是org.apache.rocketmq.namesrv.NamesrvStartup
  2. 準備好啟動 NameServer 需要的配置文件和目錄

看上圖:

  1. 配置啟動類的名字 NameServerStartup
  2. 配置主類的路徑 org.apache.rocketmq.namesrv.NamesrvStartup
  3. 工作目錄,也就是當前代碼所在的目錄
  4. 運行目錄 ROCKETMQ_HOME,這個目錄裡面放的是運行時需要的配置文件、數據、日誌等。你需要創建一個目錄,在裡面創建 conflogsstore目錄

接著將源碼中 distrbution 模塊中的 logback_namesvr.xml 文件拷貝到上面的 conf 目錄下,並將這個文件中的${user.home}全部替換為前面配置的運行目錄。

然後運行配置好的啟動類,就會讀取 conf 里的配置文件,並將日誌列印在logs目錄里,數據都會寫在store目錄里。看到 IDEA 的列印出下麵這樣的信息,就說明 NameServer 啟動成功了。

本地運行 Broker

啟動 Broker 和啟動 NameServer 的過程類似。首先也是配置啟動類:

  1. Broker 的啟動類在 org.apache.rocketmq.broker.BrokerStartup
  2. 不一樣的地方是要設置一個參數 -c你的broker.conf配置文件的路徑,因為程式啟動的時候會讀-c這個參數
  3. 接著還是設置工作目錄和運行目錄,選擇 module 為 rocketmq-broker

接著把distrbution模塊中的 broker.conflogback_broker.xml 文件拷貝到 conf目錄下:

  1. 將 logback_broker.xml 的${user.home}替換為你的 RocketMQ 運行目錄
  2. broker.conf 按照下麵的配置方式進行配置
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
# nameserver的地址
namesrvAddr=127.0.0.1:9876
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# 運行目錄的store目錄
storePathRootDir=/Users/shui/Desktop/rocketmq-nameserver/store
# commitLog的存儲路徑
storePathCommitLog=你的store目錄/commitlog
# consume queue文件的存儲路徑
storePathConsumeQueue=你的store目錄/consumequeue
# 消息索引文件的存儲路徑
storePathIndex=你的store目錄/store/index
# checkpoint文件的存儲路徑
storeCheckpoint=你的store目錄/checkpoint
# abort文件的存儲路徑
abortFile=你的store目錄/abort

最後運行主類,看到控制台列印如下信息就表示啟動成功:

此時 rocketmqlogs,裡面有一個broker.log,就可以看到Broker的啟動日誌了:

本地運行生產者與消費者代碼

在控制台創建一個 topic 名為 TopicTest。如果不知道如何使用 RocketMQ 的控制台,可以看我之前寫這篇文章:https://www.cnblogs.com/shuiyj/p/13200658.html。

接著去修改 example 中給出的生產者和消費者代碼 org.apache.rocketmq.example.quickstart.Consumerorg.apache.rocketmq.example.quickstart.Producer

生產者

改動兩個地方:

  1. 設置 NameServer 地址,讓生產者可以獲取到 Broker 地址
  2. 本來發送 1000 條信息,改少一點發送 3 條,便於消費的時候觀察
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        /*
         * Instantiate with a producer group name.
         */
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

        // 其他代碼不變
      	// 在這裡設置 NameServer 地址,保證  Producer 可以從 NameServer 獲取到 Broker 地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        /*
         * Launch the instance.
         */
        producer.start();
	     	
      // 本來是發送 1000 條消息,改成發送 3 條
        for (int i = 0; i < 3; i++) {
            try {

                /*
                 * Create a message instance, specifying topic, tag and message body.
                 */
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );

看到控制台輸出如下所示的信息,表示消息發送成功了。

SendResult [sendStatus=SEND_OK, msgId=24098A28085A1DB0ECAD4CD655E7AF1548B818B4AAC28B1F132F0000, offsetMsgId=C0A8010800002A9F0000000000000000, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=24098A28085A1DB0ECAD4CD655E7AF1548B818B4AAC28B1F13AB0001, offsetMsgId=C0A8010800002A9F00000000000000CA, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=24098A28085A1DB0ECAD4CD655E7AF1548B818B4AAC28B1F13AE0002, offsetMsgId=C0A8010800002A9F0000000000000194, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=0]

消費者

消息者只改動一個地方,就是設置 NameServer 地址,也是為了獲取到 Broker 的地址。

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        // 省略其它代碼...

        // 設置 NameServer 地址,保證  Consumer 可以從 NameServer 獲取到 Broker 地址
        consumer.setNamesrvAddr("127.0.0.1:9876");
        /*
         *  Launch the consumer instance.
         */
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

可以看到消費到了 3 條數據,並列印出了消息的相關信息。

00:24:23.571 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=3, storeSize=202, queueOffset=0, sysFlag=0, bornTimestamp=1593274869675, bornHost=/192.168.1.8:54010, storeTimestamp=1593274869676, storeHost=/192.168.1.8:10911, msgId=C0A8010800002A9F00000000000000CA, commitLogOffset=202, bodyCRC=1401636825, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1593275064336, UNIQ_KEY=24098A28085A1DB0ECAD4CD655E7AF1548B818B4AAC28B1F13AB0001, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49], transactionId='null'}]] 
ConsumeMessageThread_3 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=0, storeSize=202, queueOffset=0, sysFlag=0, bornTimestamp=1593274869678, bornHost=/192.168.1.8:54010, storeTimestamp=1593274869679, storeHost=/192.168.1.8:10911, msgId=C0A8010800002A9F0000000000000194, commitLogOffset=404, bodyCRC=1250039395, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1593275064339, UNIQ_KEY=24098A28085A1DB0ECAD4CD655E7AF1548B818B4AAC28B1F13AE0002, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 50], transactionId='null'}]] 
ConsumeMessageThread_2 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=2, storeSize=202, queueOffset=0, sysFlag=0, bornTimestamp=1593274869552, bornHost=/192.168.1.8:54010, storeTimestamp=1593274869574, storeHost=/192.168.1.8:10911, msgId=C0A8010800002A9F0000000000000000, commitLogOffset=0, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1593275064340, UNIQ_KEY=24098A28085A1DB0ECAD4CD655E7AF1548B818B4AAC28B1F132F0000, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]] 

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 據統計,國外的前端開發人員和後端開發人員比例約1:1,但是在國內比例卻在1:3以下, Web前端開發職位人才缺口巨大。前端工程師的發展之路十分有“錢”景。 每天,HR 群都有人在吐槽招不到前端工程師。實話說對這些需求,高級招聘人員也無能為力,因為在供不應求的前端招聘市場上,優秀的前端工程師才是有話語 ...
  • 1.隊列是遵循先進先出(FIFO)原則的一組有序的項,隊列在尾部添加元素,並從頂部移除元素,最新添加的元素必須排在隊列的末尾。生活中常見的例子如排隊等。 2.創建一個隊列類 class Queue{ constructor(){ this.count = 0;//記錄隊列的數量 this.lowes ...
  • 一、Socket 1.Socket構造方法 構造方法說明 Socket() 該創建的對象,沒有指定IP地址和埠號,意味著只創建了客戶端對象,並且沒有連接任何伺服器。通過該構造方法創建對象後還需要調用connect(SocketAddress endpoint)方法,才能完成與指定伺服器端的連接,其 ...
  • 現在距離 Python 3.9.0 的最終版本還有 3 個月,官方公佈的時間線是: 3.9.0 beta 4: Monday, 2020-06-29 3.9.0 beta 5: Monday, 2020-07-20 3.9.0 candidate 1: Monday, 2020-08-10 3.9. ...
  • 類載入運行的全過程 當用java命令運行某個main函數時,首先需要類載入器把主類載入到JVM記憶體中。 通過Java命令執行代碼的大致流程為 將編譯好的位元組碼class文件通過java命令,在win操作系統就是一個java.exe文件,這個文件底層是c++語言實現的,通過這個文件調用底層jvm.dl ...
  • mac 使用yarn brew install yarn 安裝全局vue-cli全家桶: yarn global add @vue/cli 驗證: node -v npm -v 打開界面 vue ui 本文由博客一文多發平臺 OpenWrite 發佈! ...
  • 前言 網路上的信息很多,有的時候我們需要關鍵字搜索才可以快速方便的找到我們需要的信息。今天我們實現搜索關鍵字爬取堆糖網上相關的美圖,零基礎學會通用爬蟲,當然我們還可以實現多線程爬蟲,加快爬蟲爬取速度 環境: windows pycharm python3 導入模塊 import urllib.par ...
  • 本文源碼:GitHub·點這裡 || GitEE·點這裡 一、服務間隔離 1、分散式結構 分散式系統架構的明顯特點,就是按照業務系統的功能,拆分成各種服務,每個服務下麵都有自己獨立的資料庫,以此降低業務間的耦合度,隔離不同的資料庫保證系統最大的穩定性等。 例如上圖是電商系統中經典的業務場景,訂單-倉 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...