clone 並導入源碼 本地啟動 NameServer 本地啟動 Broker 本地運行生產者與消費者代碼 完成上述步驟之後,RocketMQ的源碼環境就搭建完畢了,之後就可以在本地啟動以及收發消息,調試和分析RocketMQ的源碼了。 clone 並導入源碼 在 github 上選擇對應的的代碼 ...
- clone 並導入源碼
- 本地啟動 NameServer
- 本地啟動 Broker
- 本地運行生產者與消費者代碼
完成上述步驟之後,RocketMQ的源碼環境就搭建完畢了,之後就可以在本地啟動以及收發消息,調試和分析RocketMQ的源碼了。
clone 並導入源碼
在 github 上選擇對應的的代碼 https://github.com/apache/rocketmq/tree/rocketmq-all-4.7.0,將其 clone 下來,再切出 4.7.0 版本的源碼。Clone 到本地之後,用 IDEA 打開項目。
項目結構
模塊 | 作用 |
---|---|
broker | Broker 相關代碼 |
client | Producer、Consumer 客戶端代碼,用於生產消息、消費消息 |
common | 公共代碼 |
dev | 開發相關的信息 |
distribution | 部署相關,比如配置文件 |
example | 例子 |
filter | 過濾器 |
logappender | 日誌相關 |
logging | 日誌相關 |
namesvr | NameServer |
openmessaging | 開放消息標準 |
remoting | 遠程網路通信,基於 netty 實現 |
srvutil | 工具類 |
store | 消息如何在 Broker 中進行存儲相關代碼 |
style | 代碼檢查 |
test | 測試 |
tools | 命令行監控 |
本地啟動 NameServer
接下來我們要做的是在本地啟動 NameServer,包括兩個步驟:
- 在 IDEA 中配置啟動相關的信息,NameServer 的啟動類是
org.apache.rocketmq.namesrv.NamesrvStartup
- 準備好啟動 NameServer 需要的配置文件和目錄
看上圖:
- 配置啟動類的名字
NameServerStartup
- 配置主類的路徑
org.apache.rocketmq.namesrv.NamesrvStartup
- 工作目錄,也就是當前代碼所在的目錄
- 運行目錄
ROCKETMQ_HOME
,這個目錄裡面放的是運行時需要的配置文件、數據、日誌等。你需要創建一個目錄,在裡面創建conf
、logs
、store
目錄
接著將源碼中 distrbution
模塊中的 logback_namesvr.xml
文件拷貝到上面的 conf 目錄下,並將這個文件中的${user.home}
全部替換為前面配置的運行目錄。
然後運行配置好的啟動類,就會讀取 conf 里的配置文件,並將日誌列印在logs目錄里,數據都會寫在store目錄里。看到 IDEA 的列印出下麵這樣的信息,就說明 NameServer 啟動成功了。
本地運行 Broker
啟動 Broker 和啟動 NameServer 的過程類似。首先也是配置啟動類:
- Broker 的啟動類在
org.apache.rocketmq.broker.BrokerStartup
- 不一樣的地方是要設置一個參數
-c你的broker.conf配置文件的路徑
,因為程式啟動的時候會讀-c
這個參數 - 接著還是設置工作目錄和運行目錄,選擇 module 為 rocketmq-broker
接著把distrbution
模塊中的 broker.conf
和 logback_broker.xml
文件拷貝到 conf
目錄下:
- 將 logback_broker.xml 的
${user.home}
替換為你的 RocketMQ 運行目錄 - broker.conf 按照下麵的配置方式進行配置
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
# nameserver的地址
namesrvAddr=127.0.0.1:9876
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# 運行目錄的store目錄
storePathRootDir=/Users/shui/Desktop/rocketmq-nameserver/store
# commitLog的存儲路徑
storePathCommitLog=你的store目錄/commitlog
# consume queue文件的存儲路徑
storePathConsumeQueue=你的store目錄/consumequeue
# 消息索引文件的存儲路徑
storePathIndex=你的store目錄/store/index
# checkpoint文件的存儲路徑
storeCheckpoint=你的store目錄/checkpoint
# abort文件的存儲路徑
abortFile=你的store目錄/abort
最後運行主類,看到控制台列印如下信息就表示啟動成功:
此時 rocketmqlogs,裡面有一個broker.log,就可以看到Broker的啟動日誌了:
本地運行生產者與消費者代碼
在控制台創建一個 topic 名為 TopicTest
。如果不知道如何使用 RocketMQ 的控制台,可以看我之前寫這篇文章:https://www.cnblogs.com/shuiyj/p/13200658.html。
接著去修改 example 中給出的生產者和消費者代碼 org.apache.rocketmq.example.quickstart.Consumer
和 org.apache.rocketmq.example.quickstart.Producer
。
生產者
改動兩個地方:
- 設置 NameServer 地址,讓生產者可以獲取到 Broker 地址
- 本來發送 1000 條信息,改少一點發送 3 條,便於消費的時候觀察
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
/*
* Instantiate with a producer group name.
*/
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 其他代碼不變
// 在這裡設置 NameServer 地址,保證 Producer 可以從 NameServer 獲取到 Broker 地址
producer.setNamesrvAddr("127.0.0.1:9876");
/*
* Launch the instance.
*/
producer.start();
// 本來是發送 1000 條消息,改成發送 3 條
for (int i = 0; i < 3; i++) {
try {
/*
* Create a message instance, specifying topic, tag and message body.
*/
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
看到控制台輸出如下所示的信息,表示消息發送成功了。
SendResult [sendStatus=SEND_OK, msgId=24098A28085A1DB0ECAD4CD655E7AF1548B818B4AAC28B1F132F0000, offsetMsgId=C0A8010800002A9F0000000000000000, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=24098A28085A1DB0ECAD4CD655E7AF1548B818B4AAC28B1F13AB0001, offsetMsgId=C0A8010800002A9F00000000000000CA, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=24098A28085A1DB0ECAD4CD655E7AF1548B818B4AAC28B1F13AE0002, offsetMsgId=C0A8010800002A9F0000000000000194, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=0]
消費者
消息者只改動一個地方,就是設置 NameServer 地址,也是為了獲取到 Broker 的地址。
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// 省略其它代碼...
// 設置 NameServer 地址,保證 Consumer 可以從 NameServer 獲取到 Broker 地址
consumer.setNamesrvAddr("127.0.0.1:9876");
/*
* Launch the consumer instance.
*/
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
可以看到消費到了 3 條數據,並列印出了消息的相關信息。
00:24:23.571 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=3, storeSize=202, queueOffset=0, sysFlag=0, bornTimestamp=1593274869675, bornHost=/192.168.1.8:54010, storeTimestamp=1593274869676, storeHost=/192.168.1.8:10911, msgId=C0A8010800002A9F00000000000000CA, commitLogOffset=202, bodyCRC=1401636825, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1593275064336, UNIQ_KEY=24098A28085A1DB0ECAD4CD655E7AF1548B818B4AAC28B1F13AB0001, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49], transactionId='null'}]]
ConsumeMessageThread_3 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=0, storeSize=202, queueOffset=0, sysFlag=0, bornTimestamp=1593274869678, bornHost=/192.168.1.8:54010, storeTimestamp=1593274869679, storeHost=/192.168.1.8:10911, msgId=C0A8010800002A9F0000000000000194, commitLogOffset=404, bodyCRC=1250039395, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1593275064339, UNIQ_KEY=24098A28085A1DB0ECAD4CD655E7AF1548B818B4AAC28B1F13AE0002, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 50], transactionId='null'}]]
ConsumeMessageThread_2 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=2, storeSize=202, queueOffset=0, sysFlag=0, bornTimestamp=1593274869552, bornHost=/192.168.1.8:54010, storeTimestamp=1593274869574, storeHost=/192.168.1.8:10911, msgId=C0A8010800002A9F0000000000000000, commitLogOffset=0, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1593275064340, UNIQ_KEY=24098A28085A1DB0ECAD4CD655E7AF1548B818B4AAC28B1F132F0000, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]]