原文:阿裡 RocketMQ 安裝與簡介 一、簡介 官方簡介: l RocketMQ是一款分散式、隊列模型的消息中間件,具有以下特點: l 能夠保證嚴格的消息順序 l 提供豐富的消息拉取模式 l 高效的訂閱者水平擴展能力 l 實時的消息訂閱機制 l 億級消息堆積能力 二、網路架構 三、特性 1. n ...
一、簡介
官方簡介:
l RocketMQ是一款分散式、隊列模型的消息中間件,具有以下特點:
l 能夠保證嚴格的消息順序
l 提供豐富的消息拉取模式
l 高效的訂閱者水平擴展能力
l 實時的消息訂閱機制
l 億級消息堆積能力
二、網路架構
三、特性
1. nameserver
相對來說,nameserver的穩定性非常高。原因有二:
1 、nameserver互相獨立,彼此沒有通信關係,單台nameserver掛掉,不影響其他nameserver,即使全部掛掉,也不影響業務系統使用。
2 、nameserver不會有頻繁的讀寫,所以性能開銷非常小,穩定性很高。
2. broker
與nameserver關係
l 連接
單個broker和所有nameserver保持長連接
l 心跳
心跳間隔:每隔30秒(此時間無法更改)向所有nameserver發送心跳,心跳包含了自身的topic配置信息。
心跳超時:nameserver每隔10秒鐘(此時間無法更改),掃描所有還存活的broker連接,若某個連接2分鐘內(當前時間與最後更新時間差值超過2分鐘,此時間無法更改)沒有發送心跳數據,則斷開連接。
l 斷開
時機:broker掛掉;心跳超時導致nameserver主動關閉連接
動作:一旦連接斷開,nameserver會立即感知,更新topc與隊列的對應關係,但不會通知生產者和消費者
負載均衡
l 一個topic分佈在多個broker上,一個broker可以配置多個topic,它們是多對多的關係。
l 如果某個topic消息量很大,應該給它多配置幾個隊列,並且儘量多分佈在不同broker上,減輕某個broker的壓力。
l topic消息量都比較均勻的情況下,如果某個broker上的隊列越多,則該broker壓力越大。
可用性
由於消息分佈在各個broker上,一旦某個broker宕機,則該broker上的消息讀寫都會受到影響。所以rocketmq提供了master/slave的結構,salve定時從master同步數據,如果master宕機,則slave提供消費服務,但是不能寫入消息,此過程對應用透明,由rocketmq內部解決。
這裡有兩個關鍵點:
l 一旦某個broker master宕機,生產者和消費者多久才能發現?受限於rocketmq的網路連接機制,預設情況下,最多需要30秒,但這個時間可由應用設定參數來縮短時間。這個時間段內,發往該broker的消息都是失敗的,而且該broker的消息無法消費,因為此時消費者不知道該broker已經掛掉。
l 消費者得到master宕機通知後,轉向slave消費,但是slave不能保證master的消息100%都同步過來了,因此會有少量的消息丟失。但是消息最終不會丟的,一旦master恢復,未同步過去的消息會被消費掉。
可靠性
l 所有發往broker的消息,有同步刷盤和非同步刷盤機制,總的來說,可靠性非常高
l 同步刷盤時,消息寫入物理文件才會返回成功,因此非常可靠
l 非同步刷盤時,只有機器宕機,才會產生消息丟失,broker掛掉可能會發生,但是機器宕機崩潰是很少發生的,除非突然斷電
消息清理
l 掃描間隔
預設10秒,由broker配置參數cleanResourceInterval決定
l 空間閾值
物理文件不能無限制的一直存儲在磁碟,當磁碟空間達到閾值時,不再接受消息,broker列印出日誌,消息發送失敗,閾值為固定值85%
l 清理時機
預設每天凌晨4點,由broker配置參數deleteWhen決定;或者磁碟空間達到閾值
l 文件保留時長
預設72小時,由broker配置參數fileReservedTime決定
讀寫性能
l 文件記憶體映射方式操作文件,避免read/write系統調用和實時文件讀寫,性能非常高
l 永遠一個文件在寫,其他文件在讀
l 順序寫,隨機讀
l 利用linux的sendfile機制,將消息內容直接輸出到sokect管道,避免系統調用
系統特性
l 大記憶體,記憶體越大性能越高,否則系統swap會成為性能瓶頸
l IO密集
l cpu load高,使用率低,因為cpu占用後,大部分時間在IO WAIT
l 磁碟可靠性要求高,為了兼顧安全和性能,採用RAID10陣列
l 磁碟讀取速度要求快,要求高轉速大容量磁碟
3. 消費者
與nameserver關係
l 連接
單個消費者和一臺nameserver保持長連接,定時查詢topic配置信息,如果該nameserver掛掉,消費者會自動連接下一個nameserver,直到有可用連接為止,並能自動重連。
l 心跳
與nameserver沒有心跳
l 輪詢時間
預設情況下,消費者每隔30秒從nameserver獲取所有topic的最新隊列情況,這意味著某個broker如果宕機,客戶端最多要30秒才能感知。該時間由DefaultMQPushConsumer的pollNameServerInteval參數決定,可手動配置。
與broker關係
l 連接
單個消費者和該消費者關聯的所有broker保持長連接。
l 心跳
預設情況下,消費者每隔30秒向所有broker發送心跳,該時間由DefaultMQPushConsumer的heartbeatBrokerInterval參數決定,可手動配置。broker每隔10秒鐘(此時間無法更改),掃描所有還存活的連接,若某個連接2分鐘內(當前時間與最後更新時間差值超過2分鐘,此時間無法更改)沒有發送心跳數據,則關閉連接,並向該消費者分組的所有消費者發出通知,分組內消費者重新分配隊列繼續消費
l 斷開
時機:消費者掛掉;心跳超時導致broker主動關閉連接
動作:一旦連接斷開,broker會立即感知到,並向該消費者分組的所有消費者發出通知,分組內消費者重新分配隊列繼續消費
負載均衡
集群消費模式下,一個消費者集群多台機器共同消費一個topic的多個隊列,一個隊列只會被一個消費者消費。如果某個消費者掛掉,分組內其它消費者會接替掛掉的消費者繼續消費。
消費機制
l 本地隊列
消費者不間斷的從broker拉取消息,消息拉取到本地隊列,然後本地消費線程消費本地消息隊列,只是一個非同步過程,拉取線程不會等待本地消費線程,這種模式實時性非常高。對消費者對本地隊列有一個保護,因此本地消息隊列不能無限大,否則可能會占用大量記憶體,本地隊列大小由DefaultMQPushConsumer的pullThresholdForQueue屬性控制,預設1000,可手動設置。
l 輪詢間隔
消息拉取線程每隔多久拉取一次?間隔時間由DefaultMQPushConsumer的pullInterval屬性控制,預設為0,可手動設置。
l 消息消費數量
監聽器每次接受本地隊列的消息是多少條?這個參數由DefaultMQPushConsumer的consumeMessageBatchMaxSize屬性控制,預設為1,可手動設置。
消費進度存儲
每隔一段時間將各個隊列的消費進度存儲到對應的broker上,該時間由DefaultMQPushConsumer的persistConsumerOffsetInterval屬性控制,預設為5秒,可手動設置。
如果一個topic在某broker上有3個隊列,一個消費者消費這3個隊列,那麼該消費者和這個broker有幾個連接?
一個連接,消費單位與隊列相關,消費連接只跟broker相關,事實上,消費者將所有隊列的消息拉取任務放到本地的隊列,挨個拉取,拉取完畢後,又將拉取任務放到隊尾,然後執行下一個拉取任務
4. 生產者
與nameserver關係
l 連接
單個生產者者和一臺nameserver保持長連接,定時查詢topic配置信息,如果該nameserver掛掉,生產者會自動連接下一個nameserver,直到有可用連接為止,並能自動重連。
l 輪詢時間
預設情況下,生產者每隔30秒從nameserver獲取所有topic的最新隊列情況,這意味著某個broker如果宕機,生產者最多要30秒才能感知,在此期間,發往該broker的消息發送失敗。該時間由DefaultMQProducer的pollNameServerInteval參數決定,可手動配置。
l 心跳
與nameserver沒有心跳
與broker關係
l 連接
單個生產者和該生產者關聯的所有broker保持長連接。
l 心跳
預設情況下,生產者每隔30秒向所有broker發送心跳,該時間由DefaultMQProducer的heartbeatBrokerInterval參數決定,可手動配置。broker每隔10秒鐘(此時間無法更改),掃描所有還存活的連接,若某個連接2分鐘內(當前時間與最後更新時間差值超過2分鐘,此時間無法更改)沒有發送心跳數據,則關閉連接。
l 連接斷開
移除broker上的生產者信息
負載均衡
生產者時間沒有關係,每個生產者向隊列輪流發送消息
四、Broker集群配置方式及優缺點
1. 單個 Master
這種方式風險較大,一旦Broker 重啟或者宕機時,會導致整個服務不可用,不建議線上環境使用。
2. 多 Master 模式
一個集群無 Slave,全是 Master,例如 2 個 Master 或者 3 個 Master
優點:配置簡單,單個Master 宕機或重啟維護對應用無影響,在磁碟配置為 RAID10 時,即使機器宕機不可恢復情況下,由與 RAID10 磁碟非常可靠,消息也不會丟(非同步刷盤丟失少量消息,同步刷盤一條不丟)。性能最高。
缺點:單台機器宕機期間,這台機器上未被消費的消息在機器恢復之前不可訂閱,消息實時性會受到受到影響。
### 先啟動 NameServer,例如機器 IP 為:172.16.8.106:9876
1 |
|
### 在機器 A,啟動第一個 Master
1 |
|
### 在機器 B,啟動第二個 Master
1 |
|
3. 多 Master 多 Slave 模式,非同步複製
每個 Master 配置一個 Slave,有多對Master-Slave,HA 採用非同步複製方式,主備有短暫消息延遲,毫秒級。
優點:即使磁碟損壞,消息丟失的非常少,且消息實時性不會受影響,因為 Master 宕機後,消費者仍然可以從 Slave 消費,此過程對應用透明。不需要人工干預。性能同多 Master 模式幾乎一樣。
缺點:Master 宕機,磁碟損壞情況,會丟失少量消息。
### 先啟動 NameServer,例如機器 IP 為:172.16.8.106:9876
1 |
|
### 在機器 A,啟動第一個 Master
1 |
|
### 在機器 B,啟動第二個 Master
1 |
|
### 在機器 C,啟動第一個 Slave
1 |
|
### 在機器 D,啟動第二個 Slave
1 |
|
4. 多 Master 多 Slave 模式,同步雙寫
每個 Master 配置一個 Slave,有多對Master-Slave,HA 採用同步雙寫方式,主備都寫成功,嚮應用返回成功。
優點:數據與服務都無單點,Master宕機情況下,消息無延遲,服務可用性與數據可用性都非常高
缺點:性能比非同步複製模式略低,大約低 10%左右,發送單個消息的 RT 會略高。目前主宕機後,備機不能自動切換為主機,後續會支持自動切換功能。
### 先啟動 NameServer,例如機器 IP 為:172.16.8.106:9876
1 |
|
### 在機器 A,啟動第一個 Master
1 |
|
### 在機器 B,啟動第二個 Master
1 |
|
### 在機器 C,啟動第一個 Slave
1 |
|
### 在機器 D,啟動第二個 Slave
1 |
|
以上 Broker 與 Slave 配對是通過指定相同的brokerName 參數來配對,Master 的 BrokerId 必須是 0,Slave的BrokerId 必須是大與 0 的數。另外一個 Master 下麵可以掛載多個 Slave,同一 Master 下的多個 Slave 通過指定不同的 BrokerId 來區分。
五、安裝
1. 安裝
下載RocketMQ,在每個節點,解壓到指定目錄
?1 2 |
alibaba-rocketmq-3.2.6. tar .gz
tar -zxvf alibaba-rocketmq-3.2.6. tar .gz -C /usr/local
|
解壓後的文件夾:alibaba-rocketmq
進入bin目錄
註:RocketMQ需要jdk1.7及以上版本
2. 啟動NameServer
?1 2 3 |
[root@m106 2m-2s- sync ] # nohup sh mqnamesrv &
[2] 17938
[root@m106 2m-2s- sync ] # nohup: ignoring input and appending output to `nohup.out'
|
查看nohup.out文件中:
The Name Server boot success.表示NameServer啟動成功
Jps查看NameServer進程
3. 啟動BrokerServer a, BrokerServer b
在m106上啟動master A
?1 2 |
[root@m106 bin] # nohup sh mqbroker -n 172.16.8.106:9876 -c ../conf/2m-noslave/broker-a.properties &
[1] 17206
|
在m107上啟動master B
?1 2 |
[root@m107 bin] # nohup sh mqbroker -n 172.16.8.106:9876 -c ../conf/2m-noslave/broker-b.properties &
[1] 14488
|
Jps查看服務啟動情況
?1 2 3 4 5 6 7 |
[root@m106 bin] # jps
12494 HRegionServer
12240 Kafka
16556 DataNode
18499 NamesrvStartup
13101 RunJar
17210 BrokerStartup
|
4. 創建topic
?1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
[root@m106 bin] # sh mqadmin updateTopic
usage: mqadmin updateTopic [-b <arg>] [-c <arg>] [-h] [-n <arg>] [-o <arg>] [-p <arg>] [-r <arg>] [-s <arg>]
-t <arg> [-u <arg>] [-w <arg>]
-b,--brokerAddr <arg> create topic to which broker
-c,--clusterName <arg> create topic to which cluster
-h,--help Print help
-n,--namesrvAddr <arg> Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876
-o,--order <arg> set topic's order( true | false
-p,--perm <arg> set topic's permission(2|4|6), intro[2:R; 4:W; 6:RW]
-r,--readQueueNums <arg> set read queue nums
-s,--hasUnitSub <arg> has unit sub ( true | false
-t,--topic <arg> topic name
-u,--unit <arg> is unit topic ( true | false
-w,--writeQueueNums <arg> set write queue nums
|
實例:
?1 2 3 |
[root@m106 bin] # sh mqadmin updateTopic -n 172.16.8.106:9876 -c DefaultCluster -t TopicTest1
create topic to 172.16.8.107:10911 success.
TopicConfig [topicName=TopicTest1, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order= false ]
|
5. 刪除topic
?1 2 3 |
[root@m106 bin] # sh mqadmin deleteTopic -n 172.16.8.106:9876 -c DefaultCluster -t TopicTest1
delete topic [TopicTest1] from cluster [DefaultCluster] success.
delete topic [TopicTest1] from NameServer success.
|
6. 查看topic信息
?1 2 3 4 5 |
[root@m106 bin] # sh mqadmin topicList -n 172.16.8.106:9876
BenchmarkTest
TopicTest1
broker-a
DefaultCluster
|
7. 查看topic統計信息
?1 2 3 4 5 6 7 8 9 10 |
[root@m106 bin] # sh mqadmin topicStatus -n 172.16.8.106:9876 -t TopicTest1
#Broker Name #QID #Min Offset #Max Offset #Last Updated
broker-a 0 0 0
broker-a 1 0 0
broker-a 2 0 0
broker-a 3 0 0
broker-a 4 0 0
broker-a 5 0 0
broker-a 6 0 0
broker-a 7 0 0
|
8. 查看所有消費組group
?1 |
[root@m106 bin] # sh mqadmin consumerProgress -n 172.16.8.106:9876
|
9. 查看指定消費組下的所有topic數據堆積情況
?1 |
[root@m106 bin] # sh mqadmin consumerProgress -n 172.16.8.106:9876 -g ConsumerGroupName
|
六、使用指南
1. 客戶端定址方式
l 在代碼中指定NameServer地址
Producer.setNamesrvAddr(“192.168.8.106:9876”);
或
Consumer.setNamesrvAddr(“192.168.8.106:9876”);
l Java啟動參數中指定NameServer地址
-Drocketmq.namesrv.addr=192.168.8.106:9876
l 環境變數指定NameServer地址·
export NAMESRV_ADDR=192.168.8.106:9876
l http靜態伺服器定址
客戶端啟動後,會定時訪問一個靜態的HTTP伺服器,地址如下:
http://jmenv.tbsite.net:8080/rocketmq/msaddr
這個URL的返回內容如下:
192.168.8.106:9876
客戶端預設每隔2分鐘訪問一次這個HTTP伺服器,並更新本地的NameServer地址。URL已經在代碼中寫死,可通過修改/etc/hosts文件來改變要訪問的伺服器,例如在/etc/hosts增加如下配置:
10.232.22.67 jmenv.taobao.net
2. 客戶端的公共配置類:ClientConfig
參數名 |
預設值 |
說明 |
NamesrvAddr |
|
NameServer地址列表,多個nameServer地址用分號隔開 |
clientIP |
本機IP |
客戶端本機IP地址,某些機器會發生無法識別客戶端IP地址情況,需要應用在代碼中強制指定 |
instanceName |
DEFAULT |
客戶端實例名稱,客戶端創建的多個Producer,Consumer實際是共用一個內部實例(這個實例包含網路連接,線程資源等) |
clientCallbackExecutorThreads |
4 |
通信層非同步回調線程數 |
pollNameServerInteval |
30000 |
輪訓Name Server 間隔時間,單位毫秒 |
heartbeatBrokerInterval |
30000 |
向Broker發送心跳間隔時間,單位毫秒 |
persistConsumerOffsetInterval |
5000 |
持久化Consumer消費進度間隔時間,單位毫秒 |
3. Producer配置
參數名 |
預設值 |
說明 |
producerGroup |
DEFAULT_PRODUCER |
Producer組名,多個Producer如果屬於一個應用,發送同樣的消息,則應該將它們歸為同一組。 |
createTopicKey |
TBW102 |
在發送消息時,自動創建伺服器不存在的topic,需要指定key |
defaultTopicQueueNums |
4 |
在發送消息時,自動創建伺服器不存在的topic,預設創建的隊列數 |
sendMsgTimeout |
10000 |
發送消息超時時間,單位毫秒 |
compressMsgBodyOverHowmuch |
4096 |
消息Body超過多大開始壓縮(Consumer收到消息會自動解壓縮),單位位元組 |
retryAnotherBrokerWhenNotStoreOK |
FALSE |
如果發送消息返回sendResult,但是sendStatus!=SEND_OK,是否重試發送 |
maxMessageSize |
131072 |
客戶端限制的消息大小,超過報錯,同時服務端也會限制(預設128K) |
transactionCheckListener |
|
事物消息回查監聽器,如果發送事務消息,必須設置 |
checkThreadPoolMinSize |
1 |
Broker回查Producer事務狀態時,線程池大小 |
checkThreadPoolMaxSize |
1 |
Broker回查Producer事務狀態時,線程池大小 |
checkRequestHoldMax |
2000 |
Broker回查Producer事務狀態時,Producer本地緩衝請求隊列大小 |
4. PushConsumer配置
參數名 |
預設值 |
說明 |
consumerGroup |
DEFAULT_CONSUMER |
Consumer組名,多個Consumer如果屬於一個應用,訂閱同樣的消息,且消費邏輯一致,則應將它們歸為同一組 |
messageModel |
CLUSTERING |
消息模型,支持以下兩種1.集群消費2.廣播消費 |
consumeFromWhere |
CONSUME_FROM_LAST_OFFSET |
Consumer啟動後,預設從什麼位置開始消費 |
allocateMessageQueueStrategy |
AllocateMessageQueueAveragely |
Rebalance演算法實現策略 |
Subscription |
{} |
訂閱關係 |
messageListener |
|
消息監聽器 |
offsetStore |
|
消費進度存儲 |
consumeThreadMin |
10 |
消費線程池數量 |
consumeThreadMax |
20 |
消費線程池數量 |
consumeConcurrentlyMaxSpan |
2000 |
單隊列並行消費允許的最大跨度 |
pullThresholdForQueue |
1000 |
拉消息本地隊列緩存消息最大數 |
Pullinterval |
0 |
拉消息間隔,由於是長輪詢,所以為0,但是如果應用了流控,也可以設置大於0的值,單位毫秒 |
consumeMessageBatchMaxSize |
1 |
批量消費,一次消費多少條消息 |
pullBatchSize |
32 |
批量拉消息,一次最多拉多少條 |
5. PullConsumer配置
參數名 |
預設值 |
說明 |
consumerGroup |
|
Conusmer組名,多個Consumer如果屬於一個應用,訂閱同樣的消息,且消費邏輯一致,則應該將它們歸為同一組 |
brokerSuspendMaxTimeMillis |
20000 |
長輪詢,Consumer拉消息請求在Broker掛起最長時間,單位毫秒 |
consumerPullTimeoutMillis |
10000 |
非長輪詢,拉消息超時時間,單位毫秒 |
consumerTimeoutMillisWhenSuspend |
30000 |
長輪詢,Consumer拉消息請求咋broker掛起超過指定時間,客戶端認為超時,單位毫秒 |
messageModel |
BROADCASTING |
消息模型,支持以下兩種:1集群消費 2廣播模式 |
messageQueueListener |
|
監聽隊列變化 |
offsetStore |
|
消費進度存儲 |
registerTopics |
|
註冊的topic集合 |
allocateMessageQueueStrategy |
|
Rebalance演算法實現策略 |
6. Broker配置參數
查看Broker預設配置
sh mqbroker -m
參數名 |
預設值 |
說明 |
consumerGroup |
|
Conusmer組名,多個Consumer如果屬於一個應用,訂閱同樣的消息,且消費邏輯一致,則應該將它們歸為同一組 |
listenPort |
10911 |
Broker對外服務的監聽埠 |
namesrvAddr |
Null |
Name Server地址 |
brokerIP1 |
本機IP |
本機IP地址,預設系統自動識別,但是某些多網卡機器會存在識別錯誤的情況,這種情況下可以人工配置。 |
brokerName |
本機主機名 |
|
brokerClusterName |
DefaultCluster |
Broker所屬哪個集群 |
brokerId |
0 |
BrokerId,必須是大等於0的整數,0表示Master,>0表示Slave,一個Master可以掛多個Slave,Master和Slave通過BrokerName來配對 |
storePathCommitLog |
$HOME/store/commitlog |
commitLog存儲路徑 |
storePathConsumeQueue |
$HOME/store/consumequeue |
消費隊列存儲路徑 |
storePathIndex |
$HOME/store/index |
消息索引存儲隊列 |
deleteWhen |
4 |
刪除時間時間點,預設凌晨4點 |
fileReservedTime |
48 |
文件保留時間,預設48小時 |
maxTransferBytesOnMessageInMemory |
262144 |
單次pull消息(記憶體)傳輸的最大位元組數 |
maxTransferCountOnMessageInMemory |
32 |
單次pull消息(記憶體)傳輸的最大條數 |
maxTransferBytesOnMessageInMemory |
65535 |
單次pull消息(磁碟)傳輸的最大位元組數 |
maxTransferCountOnMessageInDisk |
8 |
單次pull消息(磁碟)傳輸的最大條數 |
messageIndexEnable |
TRUE |
是否開啟消息索引功能 |
messageIndexSafe |
FALSE |
是否提供安全的消息索引機制,索引保證不丟 |
brokerRole |
ASYNC_MASTER |
Broker的角色 -ASYNC_MASTER非同步複製Master -SYNC_MASTER同步雙寫Master -SLAVE |
flushDiskType |
ASYNC_FLUSH |
刷盤方式 -ASYNC_FLUSH非同步刷盤 -SYNC_FLUSH同步刷盤 |
cleanFileForciblyEnable |
TRUE |
磁碟滿,且無過期文件情況下TRUE表示強制刪除文件,優先保證服務可用 FALSE標記服務不可用,文件不刪除 |