在"zookeeper源碼(03)集群啟動流程"中介紹了leader選舉的入口,本文將詳細分析leader選舉組件和流程。 leader選舉流程(重要) quorumPeer的start階段使用startLeaderElection()方法啟動選舉 LOOKING狀態,投自己一票 createEle ...
在"zookeeper源碼(03)集群啟動流程"中介紹了leader選舉的入口,本文將詳細分析leader選舉組件和流程。
leader選舉流程(重要)
- quorumPeer的start階段使用startLeaderElection()方法啟動選舉
- LOOKING狀態,投自己一票
- createElectionAlgorithm - 創建選舉核心組件:QuorumCnxManager(管理連接)、FastLeaderElection(選舉)等
- quorumPeer的main loop根據當前狀態執行不同流程
狀態與流程:
-
LOOKING - 使用fastLeaderElection.lookForLeader選舉
- 遞增選舉epoch開啟新一輪選舉
- 使用自己的serverId、zxid、currentEpoch初始化投票決議
- 把選票發出去
- 迴圈接收其他server的選票:
- LOOKING選票:對比選舉epoch、currentEpoch、zxid、serverId決定投給哪個server,若是超過半數節點同意該決議,則將該server確定為leader
- FOLLOWING選票:對比選舉epoch後將選票投給當前leader
- LEADING選票:對比選舉epoch後將選票投給當前leader
-
LEADING - 創建Leader對象執行lead邏輯
- zkServer載入數據
- 啟動quorum監聽
- 根據各個follower的當前epoch確定新的epoch和zxid
- 給follower同步數據
- 啟動zkServer
- 每間隔tick驗證多數follower同步狀態
-
FOLLOWING - 創建Follower對象指定followLeader邏輯
- connectToLeader - 連接leader伺服器
- registerWithLeader - 向leader發送當前epoch,等待leader發送新一輪的epoch
- syncWithLeader - 接收leader同步的數據:txnlog、committedlog、snapshot
- 保持通信處理來自leader的數據包
-
OBSERVING - 創建Observer對象執行observeLeader邏輯,基本與FOLLOWING相同
啟動leader選舉
QuorumPeer的startLeaderElection方法是啟動選舉的入口:
public synchronized void startLeaderElection() {
try {
if (getPeerState() == ServerState.LOOKING) {
// 投自己一票,封裝zxid和epoch
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
}
} catch (IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}
// electionType總是3
this.electionAlg = createElectionAlgorithm(electionType);
}
protected Election createElectionAlgorithm(int electionAlgorithm) {
Election le = null;
// TODO: use a factory rather than a switch
// 可以使用策略模式替換switch語句
switch (electionAlgorithm) {
case 1:
throw new UnsupportedOperationException("Election Algorithm 1 is not supported.");
case 2:
throw new UnsupportedOperationException("Election Algorithm 2 is not supported.");
case 3:
QuorumCnxManager qcm = createCnxnManager();
QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
// 關閉oldQcm
if (oldQcm != null) {
oldQcm.halt();
}
// 用來啟動serverSocket監聽
QuorumCnxManager.Listener listener = qcm.listener;
if (listener != null) {
listener.start();
FastLeaderElection fle = new FastLeaderElection(this, qcm);
fle.start();
le = fle;
}
break;
default:
assert false;
}
return le;
}
public QuorumCnxManager createCnxnManager() {
// socket超時設置使用,預設tickTime * syncLimit
// 按照zoo_sample.cfg文件配置是2000 * 5
int timeout = quorumCnxnTimeoutMs > 0 ? quorumCnxnTimeoutMs : this.tickTime * this.syncLimit;
return new QuorumCnxManager(
this,
this.getMyId(),
this.getView(), // serverId->quorumServer
this.authServer,
this.authLearner,
timeout,
this.getQuorumListenOnAllIPs(), // 是否監聽所有IP預設false
this.quorumCnxnThreadsSize, // 預設20
this.isQuorumSaslAuthEnabled());
}
QuorumCnxManager類
概述:
This class implements a connection manager for leader election using TCP.
It maintains one connection for every pair of servers. The tricky part is to guarantee that there is exactly one connection for every pair of servers that are operating correctly and that can communicate over the network. If two servers try to start a connection concurrently, then the connection manager uses a very simple tie-breaking mechanism to decide which connection to drop based on the IP addressed of the two parties.
For every peer, the manager maintains a queue of messages to send. If the connection to any particular peer drops, then the sender thread puts the message back on the list. As this implementation currently uses a queue implementation to maintain messages to send to another peer, we add the message to the tail of the queue, thus changing the order of messages. Although this is not a problem for the leader election, it could be a problem when consolidating peer communication. This is to be verified, though.
- 維護leader選舉時server之間的tcp連接
- 確保兩個server之間存在一個連接,如果兩個server同時建立連接,則始終保留id大的一方建立的連接
- 使用隊列緩存待發送的消息
主要欄位
// 用於執行QuorumConnectionReqThread和QuorumConnectionReceiverThread
private ThreadPoolExecutor connectionExecutor;
// 管理sid -> SendWorker/BlockingQueue/ByteBuffer
final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;
final ConcurrentHashMap<Long, BlockingQueue<ByteBuffer>> queueSendMap;
final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;
// 接收隊列
public final BlockingQueue<Message> recvQueue;
主要方法
public void initiateConnection(final MultipleAddresses electionAddr, final Long sid);
// 將initiateConnection方法放到了QuorumConnectionReqThread中然後提交給connectionExecutor非同步執行
public boolean initiateConnectionAsync(final MultipleAddresses electionAddr, final Long sid);
private boolean startConnection(Socket sock, Long sid) throws IOException;
public void receiveConnection(final Socket sock);
// 將receiveConnection方法放到了QuorumConnectionReceiverThread中然後提交給connectionExecutor非同步執行
public void receiveConnectionAsync(final Socket sock);
public void toSend(Long sid, ByteBuffer b);
boolean connectOne(long sid, MultipleAddresses electionAddr);
void connectOne(long sid);
public void connectAll();
其餘工具方法不分析。
initiateConnection方法
創建Socket對象,如有必要則做ssl握手和認證,發送初始化數據包。如果自己id小則關閉連接,以確保兩個server之間存在一個連接。
public void initiateConnection(final MultipleAddresses electionAddr, final Long sid) {
Socket sock = null;
try {
// 創建Socket
if (self.isSslQuorum()) {
sock = self.getX509Util().createSSLSocket();
} else {
sock = SOCKET_FACTORY.get();
}
setSockOpts(sock); // socket設置例如timeout
// 連接目標peer
sock.connect(electionAddr.getReachableOrOne(), cnxTO);
// ssl握手
if (sock instanceof SSLSocket) {
SSLSocket sslSock = (SSLSocket) sock;
sslSock.startHandshake();
}
} catch (X509Exception e) {
closeSocket(sock);
return;
} catch (UnresolvedAddressException | IOException e) {
closeSocket(sock);
return;
}
try {
// 發連接初始化數據包、sasl認證
// 如果selfId小於對方,關閉連接
// 創建SendWorker、RecvWorker並啟動
// 創建對應sid的發送隊列
startConnection(sock, sid);
} catch (IOException e) {
closeSocket(sock);
}
}
startConnection方法
- 發連接初始化數據包、sasl認證
- 如果selfId小於對方,關閉連接
- 創建SendWorker、RecvWorker並啟動
- 創建對應sid的發送隊列
private boolean startConnection(Socket sock, Long sid) throws IOException {
DataOutputStream dout = null;
DataInputStream din = null;
try {
// 輸出流
BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());
dout = new DataOutputStream(buf);
// 發協議版本、myid、address初始化數據包
long protocolVersion = self.isMultiAddressEnabled() ? PROTOCOL_VERSION_V2 : PROTOCOL_VERSION_V1;
dout.writeLong(protocolVersion);
dout.writeLong(self.getMyId());
// now we send our election address. For the new protocol version, we can send multiple addresses.
Collection<InetSocketAddress> addressesToSend = protocolVersion == PROTOCOL_VERSION_V2
? self.getElectionAddress().getAllAddresses()
: Arrays.asList(self.getElectionAddress().getOne());
String addr = addressesToSend.stream()
.map(NetUtils::formatInetAddr).collect(Collectors.joining("|"));
byte[] addr_bytes = addr.getBytes();
dout.writeInt(addr_bytes.length);
dout.write(addr_bytes);
dout.flush();
din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
} catch (IOException e) {
closeSocket(sock);
return false;
}
// authenticate learner
QuorumPeer.QuorumServer qps = self.getVotingView().get(sid);
if (qps != null) {
authLearner.authenticate(sock, qps.hostname);
}
if (sid > self.getMyId()) { // If lost the challenge, then drop the new connection
closeSocket(sock);
} else {
// 創建SendWorker、RecvWorker
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, din, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
if (vsw != null) {
vsw.finish();
}
senderWorkerMap.put(sid, sw);
// 創建發送隊列
queueSendMap.putIfAbsent(sid, new CircularBlockingQueue<>(SEND_CAPACITY));
sw.start();
rw.start();
return true;
}
return false;
}
receiveConnection方法
當server收到連接請求,如果change獲勝(selfId大於對方),將關閉該連接,由自己去連接對方。
public void receiveConnection(final Socket sock) {
DataInputStream din = null;
try {
// 輸入流
din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
handleConnection(sock, din);
} catch (IOException e) {
closeSocket(sock);
}
}
private void handleConnection(Socket sock, DataInputStream din) throws IOException {
Long sid = null, protocolVersion = null;
MultipleAddresses electionAddr = null;
try {
protocolVersion = din.readLong();
if (protocolVersion >= 0) { // this is a server id and not a protocol version
sid = protocolVersion;
} else {
try {
InitialMessage init = InitialMessage.parse(protocolVersion, din);
sid = init.sid;
if (!init.electionAddr.isEmpty()) {
electionAddr = new MultipleAddresses(init.electionAddr,
Duration.ofMillis(self.getMultiAddressReachabilityCheckTimeoutMs()));
}
} catch (InitialMessage.InitialMessageException ex) {
closeSocket(sock);
return;
}
}
if (sid == QuorumPeer.OBSERVER_ID) {
// Choose identifier at random. We need a value to identify the connection.
sid = observerCounter.getAndDecrement();
}
} catch (IOException e) {
closeSocket(sock);
return;
}
// do authenticating learner
authServer.authenticate(sock, din);
// If wins the challenge, then close the new connection.
if (sid < self.getMyId()) { // 對方比自己id小,需要關閉當前連接,由自己去連接對方
SendWorker sw = senderWorkerMap.get(sid);
if (sw != null) {
sw.finish();
}
// 關閉連接
closeSocket(sock);
if (electionAddr != null) {
connectOne(sid, electionAddr); // 連接對方
} else {
connectOne(sid);
}
} else if (sid == self.getMyId()) {
} else { // 創建SendWorker、RecvWorker和發送隊列
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, din, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
if (vsw != null) {
vsw.finish();
}
senderWorkerMap.put(sid, sw);
queueSendMap.putIfAbsent(sid, new CircularBlockingQueue<>(SEND_CAPACITY));
sw.start();
rw.start();
}
}
toSend方法
發消息。
public void toSend(Long sid, ByteBuffer b) {
// 如果是給自己的消息,直接發給recvQueue
if (this.mySid == sid) {
b.position(0);
addToRecvQueue(new Message(b.duplicate(), sid));
} else {
// 將消息發給sid對應的發送隊列
BlockingQueue<ByteBuffer> bq =
queueSendMap.computeIfAbsent(sid, serverId -> new CircularBlockingQueue<>(SEND_CAPACITY));
addToSendQueue(bq, b);
// 檢查是否建立了連接
connectOne(sid);
}
}
connectOne方法
synchronized boolean connectOne(long sid, MultipleAddresses electionAddr) {
// 已經建立過連接
if (senderWorkerMap.get(sid) != null) {
if (self.isMultiAddressEnabled() && electionAddr.size() > 1 &&
self.isMultiAddressReachabilityCheckEnabled()) {
// check是否可達
senderWorkerMap.get(sid).asyncValidateIfSocketIsStillReachable();
}
return true;
}
// 非同步建立新連接
return initiateConnectionAsync(electionAddr, sid);
}
synchronized void connectOne(long sid) {
if (senderWorkerMap.get(sid) != null) {
if (self.isMultiAddressEnabled() && self.isMultiAddressReachabilityCheckEnabled()) {
senderWorkerMap.get(sid).asyncValidateIfSocketIsStillReachable();
}
return;
}
// 使用sid從lastCommittedView、lastProposedView中解析address之後在建立連接
synchronized (self.QV_LOCK) {
boolean knownId = false;
// Resolve hostname for the remote server before attempting to
// connect in case the underlying ip address has changed.
self.recreateSocketAddresses(sid);
Map<Long, QuorumPeer.QuorumServer> lastCommittedView = self.getView();
QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();
Map<Long, QuorumPeer.QuorumServer> lastProposedView = lastSeenQV.getAllMembers();
if (lastCommittedView.containsKey(sid)) {
knownId = true;
if (connectOne(sid, lastCommittedView.get(sid).electionAddr)) {
return;
}
}
if (lastSeenQV != null
&& lastProposedView.containsKey(sid)
&& (!knownId ||
!lastProposedView.get(sid).electionAddr.equals(lastCommittedView.get(sid).electionAddr))) {
knownId = true;
if (connectOne(sid, lastProposedView.get(sid).electionAddr)) {
return;
}
}
}
}
connectAll方法
Try to establish a connection with each server if one doesn't exist.
public void connectAll() {
long sid;
for (Enumeration<Long> en = queueSendMap.keys(); en.hasMoreElements(); ) {
sid = en.nextElement();
connectOne(sid);
}
}
Listener類
用來啟動serverSocket監聽,一個線程類,在run方法啟動監聽:
public void run() {
if (!shutdown) {
Set<InetSocketAddress> addresses;
// 獲取需要監聽的地址
if (self.getQuorumListenOnAllIPs()) {
addresses = self.getElectionAddress().getWildcardAddresses();
} else {
addresses = self.getElectionAddress().getAllAddresses();
}
// 用於阻塞等待
CountDownLatch latch = new CountDownLatch(addresses.size());
// 為每一個監聽地址創建ListenerHandler
listenerHandlers = addresses.stream().map(address ->
new ListenerHandler(address,self.shouldUsePortUnification(),
self.isSslQuorum(), latch))
.collect(Collectors.toList());
final ExecutorService executor = Executors.newFixedThreadPool(addresses.size());
try {
// 啟動ListenerHandler
listenerHandlers.forEach(executor::submit);
} finally {
executor.shutdown();
}
try {
// 阻塞等待,ListenerHandler結束之後會countdown
latch.await();
} catch (InterruptedException ie) {
} finally {
// Clean up for shutdown 略
}
}
// 略
}
ListenerHandler run方法:
public void run() {
try {
// 接受連接
acceptConnections();
try {
close();
} catch (IOException e) {}
} catch (Exception e) {
} finally {
latch.countDown();
}
}
private void acceptConnections() {
int numRetries = 0;
Socket client = null;
while ((!shutdown) && (portBindMaxRetry == 0 || numRetries < portBindMaxRetry)) {
try {
// 創建ServerSocket並bind埠
serverSocket = createNewServerSocket();
while (!shutdown) {
try {
// 接受客戶端Socket
client = serverSocket.accept();
setSockOpts(client); // socket設置如timeout
// 使用receiveConnection處理新的連接
if (quorumSaslAuthEnabled) {
receiveConnectionAsync(client);
} else {
receiveConnection(client);
}
numRetries = 0;
} catch (SocketTimeoutException e) {}
}
} catch (IOException e) {
// 略
}
}
// 略
}
QuorumConnectionReqThread類
用於非同步連接其他peer服務,run方法調用initiateConnection方法建立連接。
QuorumConnectionReceiverThread類
用於非同步接受連接,run方法調用receiveConnection方法處理新建立的連接。
SendWorker類
Thread to send messages. Instance waits on a queue, and send a message as soon as there is one available. If connection breaks, then opens a new one.
用來發送消息的線程:
- 封裝sid、socket、連接輸出流
- 從發送隊列取消息,通過輸出流發送
RecvWorker類
Thread to receive messages. Instance waits on a socket read. If the channel breaks, then removes itself from the pool of receivers.
用來讀取消息的線程:
public void run() {
threadCnt.incrementAndGet();
try {
while (running && !shutdown && sock != null) {
// 讀取消息長度
int length = din.readInt();
if (length <= 0 || length > PACKETMAXSIZE) {
throw new IOException("Received packet with invalid packet: " + length);
}
// 讀取數據
final byte[] msgArray = new byte[length];
din.readFully(msgArray, 0, length);
// 保存到接收隊列
addToRecvQueue(new Message(ByteBuffer.wrap(msgArray), sid));
}
} catch (Exception e) {
} finally {
sw.finish();
closeSocket(sock);
}
}
FastLeaderElection類
文檔說明:
Implementation of leader election using TCP. It uses an object of the class QuorumCnxManager to manage connections. Otherwise, the algorithm is push-based as with the other UDP implementations. There are a few parameters that can be tuned to change its behavior. First, finalizeWait determines the amount of time to wait until deciding upon a leader. This is part of the leader election algorithm.
- 使用tcp實現leader選舉,基於推送模式
- 使用QuorumCnxManager對象管理連接
構造方法
public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager) {
this.stop = false;
this.manager = manager;
starter(self, manager);
}
private void starter(QuorumPeer self, QuorumCnxManager manager) {
this.self = self;
proposedLeader = -1;
proposedZxid = -1;
sendqueue = new LinkedBlockingQueue<>();
recvqueue = new LinkedBlockingQueue<>();
// 用來啟動WorkerSender和WorkerReceiver
this.messenger = new Messenger(manager);
}
主要欄位
// 在leader最終確定之前嘗試拉取變化選票的時長
static final int finalizeWait = 200;
// 投票箱,用於保存一輪選舉的結果、統計選舉結果
private SyncedLearnerTracker leadingVoteSet;
// 發送隊列
LinkedBlockingQueue<ToSend> sendqueue;
// 接收隊列
LinkedBlockingQueue<Notification> recvqueue;
// 用來啟動WorkerSender和WorkerReceiver
Messenger messenger;
// 決議leaderId
long proposedLeader;
// 決議zxid
long proposedZxid;
// 決議epoch
long proposedEpoch;
start方法啟動選舉
public void start() {
this.messenger.start(); // 會啟動WorkerSender和WorkerReceiver兩個線程
}
Messenger類
WorkerSender線程
- 從sendqueue取ToSend消息
- 通過QuorumCnxManager的toSend方法發送消息
WorkerReceiver線程
- 通過QuorumCnxManager的pollRecvQueue取接收的消息
- 封裝Notification對象,推送到recvqueue隊列
主要方法
// 創建發送消息
static ByteBuffer buildMsg(
int state, long leader, long zxid, long electionEpoch, long epoch, byte[] configData);
// 給所有節點發Notification投票
private void sendNotifications();
// 對比serverId、zxid、currentEpoch決定將票投給哪個server
protected boolean totalOrderPredicate(
long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch);
// 給定一個Vote集,返回SyncedLearnerTracker對象,用來確定是否有足夠的選票確定選舉結束
protected SyncedLearnerTracker getVoteTracker(Map<Long, Vote> votes, Vote vote);
// 如果有leader當選,並且有足夠的選票,必須檢查該leader是否投票並確認其處於領先地位
// 需要進行這種檢查,以避免peers一次又一次地選舉一個已經崩潰且不再領先的peer
protected boolean checkLeader(Map<Long, Vote> votes, long leader, long electionEpoch);
// 更新proposedLeader、proposedZxid、proposedEpoch
// 確定leader或者為下一輪投票做準備
synchronized void updateProposal(long leader, long zxid, long epoch);
// 使用當前proposedLeader、proposedZxid、proposedEpoch創建Vote(選票)
public synchronized Vote getVote();
// 通過zkDb獲取lastLoggedZxid
private long getInitLastLoggedZxid();
// 獲取currentEpoch
private long getPeerEpoch();
// 根據參數proposedLeader更新peer狀態
// 如果已經是leader會使用voteSet更新leadingVoteSet
private void setPeerState(long proposedLeader, SyncedLearnerTracker voteSet);
// 啟動一輪leader選舉
// 當狀態變為LOOKING該方法就會被調用,會給其他peer發投票notification
public Vote lookForLeader() throws InterruptedException;
// 收到FOLLOWING狀態notification
private Vote receivedFollowingNotification(
Map<Long, Vote> recvset, Map<Long, Vote> outofelection,
SyncedLearnerTracker voteSet, Notification n);
// 收到LEADING狀態notification
private Vote receivedLeadingNotification(
Map<Long, Vote> recvset, Map<Long, Vote> outofelection,
SyncedLearnerTracker voteSet, Notification n);
buildMsg方法
static ByteBuffer buildMsg(int state, long leader, long zxid,
long electionEpoch, long epoch, byte[] configData) {
byte[] requestBytes = new byte[44 + configData.length];
ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
requestBuffer.clear();
requestBuffer.putInt(state); // 當前狀態
requestBuffer.putLong(leader); // 投票的leaderId
requestBuffer.putLong(zxid); // zxid
requestBuffer.putLong(electionEpoch); // 選舉epoch
requestBuffer.putLong(epoch); // 數據epoch
requestBuffer.putInt(Notification.CURRENTVERSION); // 0x2
requestBuffer.putInt(configData.length); // 數據長度
requestBuffer.put(configData); // quorumVerifier數據
return requestBuffer;
}
totalOrderPredicate方法
對比serverId、zxid、currentEpoch決定將票投給哪個server:
protected boolean totalOrderPredicate(
long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
if (self.getQuorumVerifier().getWeight(newId) == 0) {
return false;
}
/*
* Return true if one of the following three cases hold:
* 1- New epoch is higher
* 2- New epoch is the same as current epoch, but new zxid is higher
* 3- New epoch is the same as current epoch, new zxid is the same
* as current zxid, but server id is higher.
*/
return ((newEpoch > curEpoch)
|| ((newEpoch == curEpoch)
&& ((newZxid > curZxid)
|| ((newZxid == curZxid)
&& (newId > curId)))));
}
getVoteTracker方法
給定一個Vote集,返回SyncedLearnerTracker對象,用來確定是否有足夠的選票宣佈選舉結束:
protected SyncedLearnerTracker getVoteTracker(Map<Long, Vote> votes, Vote vote) {
SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
voteSet.addQuorumVerifier(self.getQuorumVerifier());
if (self.getLastSeenQuorumVerifier() != null
&& self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) {
voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}
// 比對其他server響應的選票和本地的選票,決定是否將選票sid放入ack集
for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
if (vote.equals(entry.getValue())) {
voteSet.addAck(entry.getKey()); // key是sid
}
}
return voteSet;
}
checkLeader方法
protected boolean checkLeader(Map<Long, Vote> votes, long leader, long electionEpoch) {
boolean predicate = true;
if (leader != self.getMyId()) {
if (votes.get(leader) == null) { // leader伺服器必須投票,否則次輪投票也無效
predicate = false;
} else if (votes.get(leader).getState() != ServerState.LEADING) {
// leader伺服器的狀態必須是LEADING,否則次輪投票也無效
predicate = false;
}
} else if (logicalclock.get() != electionEpoch) { // 選舉epoch必須一致
predicate = false;
}
return predicate;
}
lookForLeader方法
啟動一輪leader選舉,當狀態變為LOOKING該方法就會被調用,會給其他peer發投票notification通知:
public Vote lookForLeader() throws InterruptedException {
// 略
try {
// 存儲當前選舉周期的sid -> vote選票數據
Map<Long, Vote> recvset = new HashMap<>();
// 存儲之前選舉周期的sid -> vote選票數據
Map<Long, Vote> outofelection = new HashMap<>();
int notTimeout = minNotificationInterval;
synchronized (this) {
logicalclock.incrementAndGet(); // 遞增選舉epoch開始新一輪選舉
// 初始化選舉"決議",最開始都是投票給自己
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
// 給所有節點發通知
sendNotifications();
// 投票箱
SyncedLearnerTracker voteSet = null;
// 正常情況下直到選出leader才會退出
while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
if (n == null) {
// 重發或者重連
if (manager.haveDelivered()) {
sendNotifications();
} else {
manager.connectAll();
}
notTimeout = Math.min(notTimeout << 1, maxNotificationInterval);
// 略
} else if (validVoter(n.sid) && validVoter(n.leader)) {
switch (n.state) {
case LOOKING:
// 略
// 對方的選舉epoch比自己大
if (n.electionEpoch > logicalclock.get()) {
logicalclock.set(n.electionEpoch); // 同步為新的epoch
recvset.clear(); // 清空投票集
// 比對選票,如果對方贏了,則使用對方的選票更新到本地
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
// 把最新的選票發出去
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {
// 對方的選舉epoch比自己小
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
// 保存到選票集
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
// 創建投票箱
voteSet = getVoteTracker(
recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));
// 判斷acks>half表示已經選舉出了leader
if (voteSet.hasAllQuorums()) {
// 等待拉取變化的選票
while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
recvqueue.put(n);
break;
}
}
// 設置peer狀態
if (n == null) {
setPeerState(proposedLeader, voteSet);
Vote endVote = new Vote(
proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
case OBSERVING:
break;
case FOLLOWING:
// 收到FOLLOWING通知
Vote resultFN = receivedFollowingNotification(recvset, outofelection, voteSet, n);
if (resultFN == null) {
break;
} else {
return resultFN;
}
case LEADING:
// 收到LEADING通知
Vote resultLN = receivedLeadingNotification(recvset, outofelection, voteSet, n);
if (resultLN == null) {
break;
} else {
return resultLN;
}
default:
break;
}
} else {
// 略
}
}
return null;
} finally {
// 略
}
}
receivedFollowingNotification方法
收到FOLLOWING狀態notification。
private Vote receivedFollowingNotification(
Map<Long, Vote> recvset, Map<Long, Vote> outofelection, SyncedLearnerTracker voteSet, Notification n) {
// 也會將選票投給當前leader
// 之後會進行quorum驗證和leaderCheck驗證
if (n.electionEpoch == logicalclock.get()) {
// 創建投票箱
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
voteSet = getVoteTracker(
recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
// acks>half和leaderCheck
if (voteSet.hasAllQuorums() && checkLeader(recvset, n.leader, n.electionEpoch)) {
// 更新節點狀態
setPeerState(n.leader, voteSet);
Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
// 當本節點較晚進入集群,集群已經有了leader時,會進入下麵邏輯
// 與前面的代碼基本相同
outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
voteSet = getVoteTracker(
outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {
synchronized (this) {
logicalclock.set(n.electionEpoch);
setPeerState(n.leader, voteSet);
}
Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
return null;
}
receivedLeadingNotification方法
收到LEADING狀態notification。
private Vote receivedLeadingNotification(Map<Long, Vote> recvset, Map<Long, Vote> outofelection,
SyncedLearnerTracker voteSet, Notification n) {
Vote result = receivedFollowingNotification(recvset, outofelection, voteSet, n);
if (result == null) {
if (self.getQuorumVerifier().getNeedOracle() && !self.getQuorumVerifier().askOracle()) {
// 略
} else {
return null;
}
} else {
return result;
}
}
QuorumPeer類
管理quorum協議,伺服器可能處於以下三種狀態:
- Leader選舉 - 每個伺服器將選出一個leader,最初都會選自己
- Follower節點 - 將與Leader同步並複製所有事務
- Leader節點 - 處理請求並將其轉發給Follower節點,大多數Follower節點必須同步,該請求才能被提交
run方法main loop
run方法main loop判斷當前peer狀態,執行選舉、lead、follow等邏輯:
public void run() {
// 略
try {
// Main loop
while (running) {
switch (getPeerState()) {
case LOOKING:
ServerMetrics.getMetrics().LOOKING_COUNT.add(1);
if (Boolean.getBoolean("readonlymode.enabled")) {
// 略
} else {
try {
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
setPeerState(ServerState.LOOKING);
}
}
break;
case OBSERVING:
try {
setObserver(makeObserver(logFactory));
observer.observeLeader();
} catch (Exception e) {
} finally {
observer.shutdown();
setObserver(null);
updateServerState();
// Add delay jitter before we switch to LOOKING
// state to reduce the load of ObserverMaster
if (isRunning()) {
Observer.waitForObserverElectionDelay();
}
}
break;
case FOLLOWING:
try {
setFollower(makeFollower(logFactory));
follower.followLeader();
} catch (Exception e) {
} finally {
follower.shutdown();
setFollower(null);
updateServerState();
}
break;
case LEADING:
try {
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
} catch (Exception e) {
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
updateServerState();
}
break;
}
}
} finally {
// 略
}
}
LOOKING分支
try {
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
// 使用FastLeaderElection選舉
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
setPeerState(ServerState.LOOKING); // 重置為LOOKING狀態
}
FOLLOWING分支
try {
setFollower(makeFollower(logFactory));
follower.followLeader(); // 啟動follower
} catch (Exception e) {
} finally {
follower.shutdown();
setFollower(null);
updateServerState(); // 更新服務狀態
}
創建Follower對象:
protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.zkDb));
}
LEADING分支
try {
setLeader(makeLeader(logFactory));
leader.lead(); // 啟動leader
setLeader(null);
} catch (Exception e) {
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
updateServerState(); // 更新服務狀態
}
創建Leader對象:
protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException, X509Exception {
return new Leader(this, new LeaderZooKeeperServer(logFactory, this, this.zkDb));
}
OBSERVING分支
try {
setObserver(makeObserver(logFactory));
observer.observeLeader();
} catch (Exception e) {
} finally {
observer.shutdown();
setObserver(null);
updateServerState();
// Add delay jitter before we switch to LOOKING
// state to reduce the load of ObserverMaster
if (isRunning()) {
Observer.waitForObserverElectionDelay();
}
}
創建Observer對象:
protected Observer makeObserver(FileTxnSnapLog logFactory) throws IOException {
return new Observer(this, new ObserverZooKeeperServer(logFactory, this, this.zkDb));
}