本地啟動 NameServer 和 Broker | 讀 RocketMQ 源碼前的準備工作

来源:https://www.cnblogs.com/shuiyj/archive/2020/06/30/13215978.html
-Advertisement-
Play Games

clone 並導入源碼 本地啟動 NameServer 本地啟動 Broker 本地運行生產者與消費者代碼 完成上述步驟之後,RocketMQ的源碼環境就搭建完畢了,之後就可以在本地啟動以及收發消息,調試和分析RocketMQ的源碼了。 clone 並導入源碼 在 github 上選擇對應的的代碼 ...


  1. clone 並導入源碼
  2. 本地啟動 NameServer
  3. 本地啟動 Broker
  4. 本地運行生產者與消費者代碼

完成上述步驟之後,RocketMQ的源碼環境就搭建完畢了,之後就可以在本地啟動以及收發消息,調試和分析RocketMQ的源碼了。

clone 並導入源碼

在 github 上選擇對應的的代碼 https://github.com/apache/rocketmq/tree/rocketmq-all-4.7.0,將其 clone 下來,再切出 4.7.0 版本的源碼。Clone 到本地之後,用 IDEA 打開項目。

clone代碼

項目結構

目錄結構

模塊 作用
broker Broker 相關代碼
client Producer、Consumer 客戶端代碼,用於生產消息、消費消息
common 公共代碼
dev 開發相關的信息
distribution 部署相關,比如配置文件
example 例子
filter 過濾器
logappender 日誌相關
logging 日誌相關
namesvr NameServer
openmessaging 開放消息標準
remoting 遠程網路通信,基於 netty 實現
srvutil 工具類
store 消息如何在 Broker 中進行存儲相關代碼
style 代碼檢查
test 測試
tools 命令行監控

本地啟動 NameServer

接下來我們要做的是在本地啟動 NameServer,包括兩個步驟:

  1. 在 IDEA 中配置啟動相關的信息,NameServer 的啟動類是org.apache.rocketmq.namesrv.NamesrvStartup
  2. 準備好啟動 NameServer 需要的配置文件和目錄

看上圖:

  1. 配置啟動類的名字 NameServerStartup
  2. 配置主類的路徑 org.apache.rocketmq.namesrv.NamesrvStartup
  3. 工作目錄,也就是當前代碼所在的目錄
  4. 運行目錄 ROCKETMQ_HOME,這個目錄裡面放的是運行時需要的配置文件、數據、日誌等。你需要創建一個目錄,在裡面創建 conflogsstore目錄

接著將源碼中 distrbution 模塊中的 logback_namesvr.xml 文件拷貝到上面的 conf 目錄下,並將這個文件中的${user.home}全部替換為前面配置的運行目錄。

然後運行配置好的啟動類,就會讀取 conf 里的配置文件,並將日誌列印在logs目錄里,數據都會寫在store目錄里。看到 IDEA 的列印出下麵這樣的信息,就說明 NameServer 啟動成功了。

本地運行 Broker

啟動 Broker 和啟動 NameServer 的過程類似。首先也是配置啟動類:

  1. Broker 的啟動類在 org.apache.rocketmq.broker.BrokerStartup
  2. 不一樣的地方是要設置一個參數 -c你的broker.conf配置文件的路徑,因為程式啟動的時候會讀-c這個參數
  3. 接著還是設置工作目錄和運行目錄,選擇 module 為 rocketmq-broker

接著把distrbution模塊中的 broker.conflogback_broker.xml 文件拷貝到 conf目錄下:

  1. 將 logback_broker.xml 的${user.home}替換為你的 RocketMQ 運行目錄
  2. broker.conf 按照下麵的配置方式進行配置
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
# nameserver的地址
namesrvAddr=127.0.0.1:9876
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# 運行目錄的store目錄
storePathRootDir=/Users/shui/Desktop/rocketmq-nameserver/store
# commitLog的存儲路徑
storePathCommitLog=你的store目錄/commitlog
# consume queue文件的存儲路徑
storePathConsumeQueue=你的store目錄/consumequeue
# 消息索引文件的存儲路徑
storePathIndex=你的store目錄/store/index
# checkpoint文件的存儲路徑
storeCheckpoint=你的store目錄/checkpoint
# abort文件的存儲路徑
abortFile=你的store目錄/abort

最後運行主類,看到控制台列印如下信息就表示啟動成功:

此時 rocketmqlogs,裡面有一個broker.log,就可以看到Broker的啟動日誌了:

本地運行生產者與消費者代碼

在控制台創建一個 topic 名為 TopicTest。如果不知道如何使用 RocketMQ 的控制台,可以看我之前寫這篇文章:https://www.cnblogs.com/shuiyj/p/13200658.html。

接著去修改 example 中給出的生產者和消費者代碼 org.apache.rocketmq.example.quickstart.Consumerorg.apache.rocketmq.example.quickstart.Producer

生產者

改動兩個地方:

  1. 設置 NameServer 地址,讓生產者可以獲取到 Broker 地址
  2. 本來發送 1000 條信息,改少一點發送 3 條,便於消費的時候觀察
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        /*
         * Instantiate with a producer group name.
         */
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

        // 其他代碼不變
      	// 在這裡設置 NameServer 地址,保證  Producer 可以從 NameServer 獲取到 Broker 地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        /*
         * Launch the instance.
         */
        producer.start();
	     	
      // 本來是發送 1000 條消息,改成發送 3 條
        for (int i = 0; i < 3; i++) {
            try {

                /*
                 * Create a message instance, specifying topic, tag and message body.
                 */
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );

看到控制台輸出如下所示的信息,表示消息發送成功了。

SendResult [sendStatus=SEND_OK, msgId=24098A28085A1DB0ECAD4CD655E7AF1548B818B4AAC28B1F132F0000, offsetMsgId=C0A8010800002A9F0000000000000000, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=24098A28085A1DB0ECAD4CD655E7AF1548B818B4AAC28B1F13AB0001, offsetMsgId=C0A8010800002A9F00000000000000CA, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=24098A28085A1DB0ECAD4CD655E7AF1548B818B4AAC28B1F13AE0002, offsetMsgId=C0A8010800002A9F0000000000000194, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=0]

消費者

消息者只改動一個地方,就是設置 NameServer 地址,也是為了獲取到 Broker 的地址。

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        // 省略其它代碼...

        // 設置 NameServer 地址,保證  Consumer 可以從 NameServer 獲取到 Broker 地址
        consumer.setNamesrvAddr("127.0.0.1:9876");
        /*
         *  Launch the consumer instance.
         */
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

可以看到消費到了 3 條數據,並列印出了消息的相關信息。

00:24:23.571 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=3, storeSize=202, queueOffset=0, sysFlag=0, bornTimestamp=1593274869675, bornHost=/192.168.1.8:54010, storeTimestamp=1593274869676, storeHost=/192.168.1.8:10911, msgId=C0A8010800002A9F00000000000000CA, commitLogOffset=202, bodyCRC=1401636825, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1593275064336, UNIQ_KEY=24098A28085A1DB0ECAD4CD655E7AF1548B818B4AAC28B1F13AB0001, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49], transactionId='null'}]] 
ConsumeMessageThread_3 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=0, storeSize=202, queueOffset=0, sysFlag=0, bornTimestamp=1593274869678, bornHost=/192.168.1.8:54010, storeTimestamp=1593274869679, storeHost=/192.168.1.8:10911, msgId=C0A8010800002A9F0000000000000194, commitLogOffset=404, bodyCRC=1250039395, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1593275064339, UNIQ_KEY=24098A28085A1DB0ECAD4CD655E7AF1548B818B4AAC28B1F13AE0002, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 50], transactionId='null'}]] 
ConsumeMessageThread_2 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=2, storeSize=202, queueOffset=0, sysFlag=0, bornTimestamp=1593274869552, bornHost=/192.168.1.8:54010, storeTimestamp=1593274869574, storeHost=/192.168.1.8:10911, msgId=C0A8010800002A9F0000000000000000, commitLogOffset=0, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1593275064340, UNIQ_KEY=24098A28085A1DB0ECAD4CD655E7AF1548B818B4AAC28B1F132F0000, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]] 

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 據統計,國外的前端開發人員和後端開發人員比例約1:1,但是在國內比例卻在1:3以下, Web前端開發職位人才缺口巨大。前端工程師的發展之路十分有“錢”景。 每天,HR 群都有人在吐槽招不到前端工程師。實話說對這些需求,高級招聘人員也無能為力,因為在供不應求的前端招聘市場上,優秀的前端工程師才是有話語 ...
  • 1.隊列是遵循先進先出(FIFO)原則的一組有序的項,隊列在尾部添加元素,並從頂部移除元素,最新添加的元素必須排在隊列的末尾。生活中常見的例子如排隊等。 2.創建一個隊列類 class Queue{ constructor(){ this.count = 0;//記錄隊列的數量 this.lowes ...
  • 一、Socket 1.Socket構造方法 構造方法說明 Socket() 該創建的對象,沒有指定IP地址和埠號,意味著只創建了客戶端對象,並且沒有連接任何伺服器。通過該構造方法創建對象後還需要調用connect(SocketAddress endpoint)方法,才能完成與指定伺服器端的連接,其 ...
  • 現在距離 Python 3.9.0 的最終版本還有 3 個月,官方公佈的時間線是: 3.9.0 beta 4: Monday, 2020-06-29 3.9.0 beta 5: Monday, 2020-07-20 3.9.0 candidate 1: Monday, 2020-08-10 3.9. ...
  • 類載入運行的全過程 當用java命令運行某個main函數時,首先需要類載入器把主類載入到JVM記憶體中。 通過Java命令執行代碼的大致流程為 將編譯好的位元組碼class文件通過java命令,在win操作系統就是一個java.exe文件,這個文件底層是c++語言實現的,通過這個文件調用底層jvm.dl ...
  • mac 使用yarn brew install yarn 安裝全局vue-cli全家桶: yarn global add @vue/cli 驗證: node -v npm -v 打開界面 vue ui 本文由博客一文多發平臺 OpenWrite 發佈! ...
  • 前言 網路上的信息很多,有的時候我們需要關鍵字搜索才可以快速方便的找到我們需要的信息。今天我們實現搜索關鍵字爬取堆糖網上相關的美圖,零基礎學會通用爬蟲,當然我們還可以實現多線程爬蟲,加快爬蟲爬取速度 環境: windows pycharm python3 導入模塊 import urllib.par ...
  • 本文源碼:GitHub·點這裡 || GitEE·點這裡 一、服務間隔離 1、分散式結構 分散式系統架構的明顯特點,就是按照業務系統的功能,拆分成各種服務,每個服務下麵都有自己獨立的資料庫,以此降低業務間的耦合度,隔離不同的資料庫保證系統最大的穩定性等。 例如上圖是電商系統中經典的業務場景,訂單-倉 ...
一周排行
    -Advertisement-
    Play Games
  • 前言 在我們開發過程中基本上不可或缺的用到一些敏感機密數據,比如SQL伺服器的連接串或者是OAuth2的Secret等,這些敏感數據在代碼中是不太安全的,我們不應該在源代碼中存儲密碼和其他的敏感數據,一種推薦的方式是通過Asp.Net Core的機密管理器。 機密管理器 在 ASP.NET Core ...
  • 新改進提供的Taurus Rpc 功能,可以簡化微服務間的調用,同時可以不用再手動輸出模塊名稱,或調用路徑,包括負載均衡,這一切,由框架實現並提供了。新的Taurus Rpc 功能,將使得服務間的調用,更加輕鬆、簡約、高效。 ...
  • 順序棧的介面程式 目錄順序棧的介面程式頭文件創建順序棧入棧出棧利用棧將10進位轉16進位數驗證 頭文件 #include <stdio.h> #include <stdbool.h> #include <stdlib.h> 創建順序棧 // 指的是順序棧中的元素的數據類型,用戶可以根據需要進行修改 ...
  • 前言 整理這個官方翻譯的系列,原因是網上大部分的 tomcat 版本比較舊,此版本為 v11 最新的版本。 開源項目 從零手寫實現 tomcat minicat 別稱【嗅虎】心有猛虎,輕嗅薔薇。 系列文章 web server apache tomcat11-01-官方文檔入門介紹 web serv ...
  • C總結與剖析:關鍵字篇 -- <<C語言深度解剖>> 目錄C總結與剖析:關鍵字篇 -- <<C語言深度解剖>>程式的本質:二進位文件變數1.變數:記憶體上的某個位置開闢的空間2.變數的初始化3.為什麼要有變數4.局部變數與全局變數5.變數的大小由類型決定6.任何一個變數,記憶體賦值都是從低地址開始往高地 ...
  • 如果讓你來做一個有狀態流式應用的故障恢復,你會如何來做呢? 單機和多機會遇到什麼不同的問題? Flink Checkpoint 是做什麼用的?原理是什麼? ...
  • C++ 多級繼承 多級繼承是一種面向對象編程(OOP)特性,允許一個類從多個基類繼承屬性和方法。它使代碼更易於組織和維護,並促進代碼重用。 多級繼承的語法 在 C++ 中,使用 : 符號來指定繼承關係。多級繼承的語法如下: class DerivedClass : public BaseClass1 ...
  • 前言 什麼是SpringCloud? Spring Cloud 是一系列框架的有序集合,它利用 Spring Boot 的開發便利性簡化了分散式系統的開發,比如服務註冊、服務發現、網關、路由、鏈路追蹤等。Spring Cloud 並不是重覆造輪子,而是將市面上開發得比較好的模塊集成進去,進行封裝,從 ...
  • class_template 類模板和函數模板的定義和使用類似,我們已經進行了介紹。有時,有兩個或多個類,其功能是相同的,僅僅是數據類型不同。類模板用於實現類所需數據的類型參數化 template<class NameType, class AgeType> class Person { publi ...
  • 目錄system v IPC簡介共用記憶體需要用到的函數介面shmget函數--獲取對象IDshmat函數--獲得映射空間shmctl函數--釋放資源共用記憶體實現思路註意 system v IPC簡介 消息隊列、共用記憶體和信號量統稱為system v IPC(進程間通信機制),V是羅馬數字5,是UNI ...