在RocketMQ 5.0以前,有兩種集群部署模式,分別為主從模式(Master-Slave模式)和Dledger模式。 主從模式 主從模式中分為Master和Slave兩個角色,集群中可以有多個Master節點,一個Master節點可以有多個Slave節點。Master節點負責接收生產者發送的寫入 ...
在RocketMQ 5.0以前,有兩種集群部署模式,分別為主從模式(Master-Slave模式)和Dledger模式。
主從模式
主從模式中分為Master和Slave兩個角色,集群中可以有多個Master節點,一個Master節點可以有多個Slave節點。Master節點負責接收生產者發送的寫入請求,將消息寫入CommitLog文件,Slave節點會與Master節點建立連接,從Master節點同步消息數據(有同步複製和非同步複製兩種方式)。
消費者可以從Master節點拉取消息,也可以從Slave節點拉取消息。
在RocketMQ 4.5版 本之前,如果Master宕機,不支持自動將Slave切換為Master,需要人工介入。
Dledger模式
為瞭解決主從架構下Slave不能自動切換為Master的問題,4.5版本之後提供了DLedger模式,使用Raft演算法,如果Master節點出現故障,可以自動從Slave節點中選舉出新的Master進行切換。
存在問題
(1)根據Raft演算法的多數原則,集群至少有三個節點以上,在消息寫入時,也需要大多數的Follower節點響應成功才能認為消息寫入成功;
(2)Dledger模式下,進行消息寫入的時候,使用的是openmessaging包中提供的介面,無法利用RocketMQ原生的存儲和複製能力(比如非Dledger模式下使用暫存池方式寫入);
(3)存在兩套日誌複製流程(主從模式下一套、Dledger模式下一套),不統一;
主從同步實現原理
Controller模式
為瞭解決如上問題,RocketMQ 5.0以後推出了Controller模式,它的特點如下:
(1)在主從部署模式下就具有自動切換Master的能力,5.0之前需要使用DLedger才可以;
(2)可以利用RocketMQ原生存儲複製能力,並統一RocketMQ的存儲和複製能力;
RocketMQ 5.0對Broker選主相關的功能進行了抽離,放在Controller中,實現了在主從部署模式下就可以自動切換Master,Controller可以獨立部署也可以嵌入在NameServer中部署。
獨立部署下的Controller:
嵌入NameServer中的部署圖如下:
Controller
也稱為Controller控制器,一般集群中部署多個Controller,使用Raft演算法選舉出一個Active DLedger Controller作為主控制器,它主要用來管理一個SyncStateSet集合,
這個集合中存儲的是一組跟上Master進度的Broker節點集合,如果Controller發現某個Master Broker下線時,會從集合中選出新的Master Broker並切換,Controller可以單獨部署可以嵌在NameServer中部署。
SyncStateSet
SyncStateSet中維護了一個Broker副本組集合,包含當前Master Broker和它的Slave Broker,需要註意在集合內的節點都是跟上Master進度的節點,在節更變動時,由Master Broker向Controller控制器發起變更請求,更新Controller中的SyncStateSet數據,在選舉Master的時候,Controller只需從這個列表中選出一個節點成為新的Master即可。
節點變更分為Shrink操作和Expand操作,需要Master Broker發起,它會通過定時任務以及在數據同步過程中判斷是否需要進行Shrink或Expand。
Shrink
Shrink指的是將SyncStateSet副本集合中與Master節點差距過大的副本移除,差距的判斷條件如下:
- 節點是否與Master Broker的連接已斷,如果斷開需要將該節點從SyncStateSet移除;
- 節點的複製進度是否過大,新增了haMaxTimeSlaveNotCatchup參數,Master Broker會通過定時任務掃描每一個Slave節點的複製信息,裡面有每個節點上一次跟上Master進度的時間戳lastCaughtUpTimeMs,如果當前時間減去這個lastCaughtUpTimeMs超過了haMaxTimeSlaveNotCatchup的值,會認為該Slave節點的複製進度過後;
haMaxTimeSlaveNotCatchup:表示Slave沒有跟上 Master 的最大時間間隔,若在 SyncStateSet 中的 slave 超過該時間間隔會將其從 SyncStateSet 移除。預設為 15000(15s)。
Expand
如果Master Broker發現某個Slave節點趕上了Master節點的進度,需要將其重新加入到SyncStateSet。
需要註意以上兩個操作,都需要Master Broker向Controller節點發送通知,請求更新SyncStateSet中的數據。
選舉Master
不管是Controller獨立部署,還是嵌入到NameServer中部署,Controller都會監聽每個Broker的連接,Broker會定期向Controller發送心跳包,Controller會定時掃描,如果某個Broker心跳包發送超時,會認為這個Broker已經失效,此時會判斷Broker是否是Master角色,如果是Master角色就需要從該組的SyncStateSet中重新選出一個節點作為Master。
選舉Master的方式比較簡單,從該組的SyncStateSet中,挑選一個心跳包發送正常的Slave成為新的Master節點即可,並將結果通知到該組所有的Broker,每個Broker也會定時向Controller發送請求獲取主備信息。
Broker端設計
主從架構部署模式下,需要配置brokerRole和brokerId,也就是手動分配Master和Slave,在Controller模式下,這兩個參數會失效,不需要再進行配置,角色和ID由Controller來分配。
Controller模式下增加了controllerAddr參數,Broker在啟動時,需要配置這個參數,設置每個controller的地址:
controllerAddr:controller的地址,多個controller中間用分號隔開。例如controllerAddr = 127.0.0.1:9877;127.0.0.1:9878;127.0.0.1:9879
Broker上線
Broker配置了每個Controller的地址,Broker啟動時,會先向Controller註冊,並獲取角色關係和brokerId,通過角色關係可以知道自己是Master還是Slave,之後再向NameServer註冊。
Broker可以通過任意一個Controller獲取Active Controller節點的IP,後臺也會有一個定時任務,定時更新Active Controller節點的IP。
主備關係確定
初始化時,第一個Broker在向Controller註冊的時候,此時並沒有該Broker組的SyncStateSet,所以Active Controller會將第一個向其發送請求共識的Broker設置為Master,之後該組的其他節點會設置為Slave,Master節點的brokerId為0,
Slave節點從1開始編號,往後遞增。
由於Controller控制每個節點的角色,所以每個Broker也會定時向Controller發送請求獲取主備信息,以便在角色發生變化的時候可以及時更新。
日誌複製
- MasterEpoch(Epoch):Master的任期號,與Term類似,每一任Master都會有一個對應的MasterEpoch任期號,這個任期號的值由Controller控制,單獨遞增;
- StartOffset:每一任Master除了有一個任期號之外,還會取當選時對應CommitLog文件中最大的偏移量(MaxPhyOffset),作為本任期期間日誌的起始偏移量,記作StartOffset;
- EpochFile:用於存放每一任Master對應的日誌起始偏移量(<MasterEpoch, StartOffset> 序列),存儲在 ~/store文件夾下;
當Broker成為Master時,會進行如下操作:
- 獲取當前CommitLog文件中最後一條消息的偏移量,也就是MaxPhyOffset的值,作為StartOffset;
- 將當前任期號MasterEpoch和起始偏移量StartOffset的值持久化到EpochFile文件中;
- 監聽Slave節點的連接;
日誌複製整體流程
Broker在接收Controller指令之後,會根據Controller的選舉結果,轉變對應的角色,分別為Master和Slave。
連接階段
連接階段用於Master節點與Slave節點間建立連接:
- Master節點開始監聽連接;
- Slave節點請求與Master節點建立連接;
HandShake階段
Master節點與Slave節點連接建立成功之後,進入HandShake階段:
-
Slave節點向Master節點發送HandShake包,裡面包含一些狀態信息及Slave的地址,數據格式如下:
- Current State:表示當前狀態,當前是HandShake階段,所以表示HandShake;
- Flags:一些標誌位;
- SlaveAddressLength:Salve節點的地址長度;
- SlaveAddress:Slave節點的地址,發送給Master節點後,在下個階段Master節點會判斷是否需要將Slave節點加入到SyncStateSet中;
-
Master節點向Slave節點回覆HandShake包,Slave節點收到Master節點回覆的包後,會使用本地的Epoch+StartOffset與Master傳輸的對比,找到截斷點進行日誌截斷,與Master的日誌保持一致,Master節點回覆的HandShake包數據格式如下:
- Current State:表示當前狀態,當前是HandShake階段;
- Body Size:存儲Body的長度;
- Offset:表示當前Master節點的CommitLog最大偏移量;
- Epoch:表示當前Master節點任期號;
- Body:Master端記錄的所有任期信息,是一個集合,所以總大小為EpochEntry大小 * EpochEntry條數;
日誌截斷
- endOffset:下一任期的StartOffset,如果沒有下一任期,那麼取當前CommitLog的最大偏移量作為endOffset;
Slave中將每一任Epoch對應的<Startoffset,Endoffset>序列存儲在一個TreeMap中(從大到小排序):
TreeMap<Epoch, Pair<startOffset,endOffset>> epochMap;
Slave節點會遍歷所有的任期(從大到小),然後根據任期號Epoch獲取Master節點對應的<startOffset,endOffset>序列進行對比,如果Slave的Epoch與Master一致,並且StartOffset相等,取兩者中較小的那個endOffset作為截斷位點,之後Slave節點修正自己的<epoch,startoffset>信息,然後進入Transfer階段進行日誌傳輸。如果未找到截斷位點,會一直向後遍歷直到找到。
Slave保證在截斷位點位置之前的日誌與Master一致,之後從截斷位點位置開始從Master複製日誌。
// Slave從大到小遍歷所有的任期
while (iterator.hasNext()) {
// 任期信息及對應的<startOffset,endOffset>
Map.Entry<Epoch, Pair<startOffset,endOffset>> curEntry = iterator.next();
// 根據Epoch任期號獲取Master節點對應的<startOffset,endOffset>
Pair<startOffset,endOffset> masterOffset=findMasterOffsetByEpoch(curEntry.getKey());
// 如果獲取不為空,並且startOffset相等
if(masterOffset != null &&
curEntry.getKey().getObejct1() == masterOffset.getObejct1()) {
// 返回較小的那個endOffset
truncateOffset = Math.min(curEntry.getKey().getObejct2(), masterOffset.getObejct2());
break;
}
}
Transfer階段
在Transfer階段,Master節點會不斷向Slave發送日誌包,開始進行日誌複製:
- Master節點向Slave節點發送日誌包;
- Slave節點收到日誌包之後,會檢測Epoch是否發生變化,然後更新本地的EpochFile,之後向Master節點回覆ACK;
- Master節點處理Slave節點回覆的ACK響應;
參考
RIP-44 Support DLedger Controller