zookeeper集群 配置多個實例共同構成一個集群對外提供服務以達到水平擴展的目的,每個伺服器上的數據是相同的,每一個伺服器均可以對外提供讀和寫的服務,這點和redis是相同的,即對客戶端來講每個伺服器都是平等的。 這篇主要分析leader的選擇機制,zookeeper提供了三種方式: Leade ...
zookeeper集群
配置多個實例共同構成一個集群對外提供服務以達到水平擴展的目的,每個伺服器上的數據是相同的,每一個伺服器均可以對外提供讀和寫的服務,這點和redis是相同的,即對客戶端來講每個伺服器都是平等的。
這篇主要分析leader的選擇機制,zookeeper提供了三種方式:
- LeaderElection
- AuthFastLeaderElection
- FastLeaderElection
預設的演算法是FastLeaderElection,所以這篇主要分析它的選舉機制。
選擇機制中的概念
伺服器ID
比如有三台伺服器,編號分別是1,2,3。
編號越大在選擇演算法中的權重越大。
數據ID
伺服器中存放的最大數據ID.
值越大說明數據越新,在選舉演算法中數據越新權重越大。
邏輯時鐘
或者叫投票的次數,同一輪投票過程中的邏輯時鐘值是相同的。每投完一次票這個數據就會增加,然後與接收到的其它伺服器返回的投票信息中的數值相比,根據不同的值做出不同的判斷。
選舉狀態
- LOOKING,競選狀態。
- FOLLOWING,隨從狀態,同步leader狀態,參與投票。
- OBSERVING,觀察狀態,同步leader狀態,不參與投票。
- LEADING,領導者狀態。
選舉消息內容
在投票完成後,需要將投票信息發送給集群中的所有伺服器,它包含如下內容。
- 伺服器ID
- 數據ID
- 邏輯時鐘
- 選舉狀態
選舉流程圖
因為每個伺服器都是獨立的,在啟動時均從初始狀態開始參與選舉,下麵是簡易流程圖。
選舉狀態圖
描述Leader選擇過程中的狀態變化,這是假設全部實例中均沒有數據,假設伺服器啟動順序分別為:A,B,C。
源碼分析
QuorumPeer
主要看這個類,只有LOOKING狀態才會去執行選舉演算法。每個伺服器在啟動時都會選擇自己做為領導,然後將投票信息發送出去,迴圈一直到選舉出領導為止。
public void run() { //....... try { while (running) { switch (getPeerState()) { case LOOKING: if (Boolean.getBoolean("readonlymode.enabled")) { //... try { //投票給自己... setCurrentVote(makeLEStrategy().lookForLeader()); } catch (Exception e) { //... } finally { //... } } else { try { //... setCurrentVote(makeLEStrategy().lookForLeader()); } catch (Exception e) { //... } } break; case OBSERVING: //... break; case FOLLOWING: //... break; case LEADING: //... break; } } } finally { //... } }
FastLeaderElection
它是zookeeper預設提供的選舉演算法,核心方法如下:具體的可以與本文上面的流程圖對照。
public Vote lookForLeader() throws InterruptedException { //... try { HashMap<Long, Vote> recvset = new HashMap<Long, Vote>(); HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>(); int notTimeout = finalizeWait; synchronized(this){ //給自己投票 logicalclock.incrementAndGet(); updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } //將投票信息發送給集群中的每個伺服器 sendNotifications(); //迴圈,如果是競選狀態一直到選舉出結果 while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){ Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS); //沒有收到投票信息 if(n == null){ if(manager.haveDelivered()){ sendNotifications(); } else { manager.connectAll(); } //... } //收到投票信息 else if (self.getCurrentAndNextConfigVoters().contains(n.sid)) { switch (n.state) { case LOOKING: // 判斷投票是否過時,如果過時就清除之前已經接收到的信息 if (n.electionEpoch > logicalclock.get()) { logicalclock.set(n.electionEpoch); recvset.clear(); //更新投票信息 if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { updateProposal(n.leader, n.zxid, n.peerEpoch); } else { updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } //發送投票信息 sendNotifications(); } else if (n.electionEpoch < logicalclock.get()) { //忽略 break; } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { //更新投票信息 updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications(); } recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); //判斷是否投票結束 if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch))) { // Verify if there is any change in the proposed leader while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){ if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)){ recvqueue.put(n); break; } } if (n == null) { self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState()); Vote endVote = new Vote(proposedLeader, proposedZxid, proposedEpoch); leaveInstance(endVote); return endVote; } } break; case OBSERVING: //忽略 break; case FOLLOWING: case LEADING: //如果是同一輪投票 if(n.electionEpoch == logicalclock.get()){ recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); //判斷是否投票結束 if(termPredicate(recvset, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)) && checkLeader(outofelection, n.leader, n.electionEpoch)) { self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState()); Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch); leaveInstance(endVote); return endVote; } } //記錄投票已經完成 outofelection.put(n.sid, new Vote(n.leader, IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state)); if (termPredicate(outofelection, new Vote(n.leader, IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state)) && checkLeader(outofelection, n.leader, IGNOREVALUE)) { synchronized(this){ logicalclock.set(n.electionEpoch); self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState()); } Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch); leaveInstance(endVote); return endVote; } break; default: //忽略 break; } } else { LOG.warn("Ignoring notification from non-cluster member " + n.sid); } } return null; } finally { //... } }
判斷是否已經勝出
預設是採用投票數大於半數則勝出的邏輯。
選舉流程簡述
目前有5台伺服器,每台伺服器均沒有數據,它們的編號分別是1,2,3,4,5,按編號依次啟動,它們的選擇舉過程如下:
- 伺服器1啟動,給自己投票,然後發投票信息,由於其它機器還沒有啟動所以它收不到反饋信息,伺服器1的狀態一直屬於Looking。
- 伺服器2啟動,給自己投票,同時與之前啟動的伺服器1交換結果,由於伺服器2的編號大所以伺服器2勝出,但此時投票數沒有大於半數,所以兩個伺服器的狀態依然是LOOKING。
- 伺服器3啟動,給自己投票,同時與之前啟動的伺服器1,2交換信息,由於伺服器3的編號最大所以伺服器3勝出,此時投票數正好大於半數,所以伺服器3成為領導者,伺服器1,2成為小弟。
- 伺服器4啟動,給自己投票,同時與之前啟動的伺服器1,2,3交換信息,儘管伺服器4的編號大,但之前伺服器3已經勝出,所以伺服器4只能成為小弟。
- 伺服器5啟動,後面的邏輯同伺服器4成為小弟。