zookeeper源碼(04)leader選舉流程

来源:https://www.cnblogs.com/xugf/archive/2023/11/07/17814786.html
-Advertisement-
Play Games

在"zookeeper源碼(03)集群啟動流程"中介紹了leader選舉的入口,本文將詳細分析leader選舉組件和流程。 leader選舉流程(重要) quorumPeer的start階段使用startLeaderElection()方法啟動選舉 LOOKING狀態,投自己一票 createEle ...


在"zookeeper源碼(03)集群啟動流程"中介紹了leader選舉的入口,本文將詳細分析leader選舉組件和流程。

leader選舉流程(重要)

  1. quorumPeer的start階段使用startLeaderElection()方法啟動選舉
  2. LOOKING狀態,投自己一票
  3. createElectionAlgorithm - 創建選舉核心組件:QuorumCnxManager(管理連接)、FastLeaderElection(選舉)等
  4. quorumPeer的main loop根據當前狀態執行不同流程

狀態與流程:

  • LOOKING - 使用fastLeaderElection.lookForLeader選舉

    1. 遞增選舉epoch開啟新一輪選舉
    2. 使用自己的serverId、zxid、currentEpoch初始化投票決議
    3. 把選票發出去
    4. 迴圈接收其他server的選票:
      • LOOKING選票:對比選舉epoch、currentEpoch、zxid、serverId決定投給哪個server,若是超過半數節點同意該決議,則將該server確定為leader
      • FOLLOWING選票:對比選舉epoch後將選票投給當前leader
      • LEADING選票:對比選舉epoch後將選票投給當前leader
  • LEADING - 創建Leader對象執行lead邏輯

    1. zkServer載入數據
    2. 啟動quorum監聽
    3. 根據各個follower的當前epoch確定新的epoch和zxid
    4. 給follower同步數據
    5. 啟動zkServer
    6. 每間隔tick驗證多數follower同步狀態
  • FOLLOWING - 創建Follower對象指定followLeader邏輯

    1. connectToLeader - 連接leader伺服器
    2. registerWithLeader - 向leader發送當前epoch,等待leader發送新一輪的epoch
    3. syncWithLeader - 接收leader同步的數據:txnlog、committedlog、snapshot
    4. 保持通信處理來自leader的數據包
  • OBSERVING - 創建Observer對象執行observeLeader邏輯,基本與FOLLOWING相同

啟動leader選舉

QuorumPeer的startLeaderElection方法是啟動選舉的入口:

public synchronized void startLeaderElection() {
    try {
        if (getPeerState() == ServerState.LOOKING) {
            // 投自己一票,封裝zxid和epoch
            currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
        }
    } catch (IOException e) {
        RuntimeException re = new RuntimeException(e.getMessage());
        re.setStackTrace(e.getStackTrace());
        throw re;
    }
    // electionType總是3
    this.electionAlg = createElectionAlgorithm(electionType);
}

protected Election createElectionAlgorithm(int electionAlgorithm) {
    Election le = null;

    // TODO: use a factory rather than a switch
    // 可以使用策略模式替換switch語句
    switch (electionAlgorithm) {
    case 1:
        throw new UnsupportedOperationException("Election Algorithm 1 is not supported.");
    case 2:
        throw new UnsupportedOperationException("Election Algorithm 2 is not supported.");
    case 3:
        QuorumCnxManager qcm = createCnxnManager();
        QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
        // 關閉oldQcm
        if (oldQcm != null) {
            oldQcm.halt();
        }
        // 用來啟動serverSocket監聽
        QuorumCnxManager.Listener listener = qcm.listener;
        if (listener != null) {
            listener.start();
            FastLeaderElection fle = new FastLeaderElection(this, qcm);
            fle.start();
            le = fle;
        }
        break;
    default:
        assert false;
    }
    return le;
}

public QuorumCnxManager createCnxnManager() {
    // socket超時設置使用,預設tickTime * syncLimit
    // 按照zoo_sample.cfg文件配置是2000 * 5
    int timeout = quorumCnxnTimeoutMs > 0 ? quorumCnxnTimeoutMs : this.tickTime * this.syncLimit;
    return new QuorumCnxManager(
        this,
        this.getMyId(),
        this.getView(), // serverId->quorumServer
        this.authServer,
        this.authLearner,
        timeout,
        this.getQuorumListenOnAllIPs(), // 是否監聽所有IP預設false
        this.quorumCnxnThreadsSize, // 預設20
        this.isQuorumSaslAuthEnabled());
}

QuorumCnxManager類

概述:

This class implements a connection manager for leader election using TCP.
It maintains one connection for every pair of servers. The tricky part is to guarantee that there is exactly one connection for every pair of servers that are operating correctly and that can communicate over the network. If two servers try to start a connection concurrently, then the connection manager uses a very simple tie-breaking mechanism to decide which connection to drop based on the IP addressed of the two parties.
For every peer, the manager maintains a queue of messages to send. If the connection to any particular peer drops, then the sender thread puts the message back on the list. As this implementation currently uses a queue implementation to maintain messages to send to another peer, we add the message to the tail of the queue, thus changing the order of messages. Although this is not a problem for the leader election, it could be a problem when consolidating peer communication. This is to be verified, though.
  1. 維護leader選舉時server之間的tcp連接
  2. 確保兩個server之間存在一個連接,如果兩個server同時建立連接,則始終保留id大的一方建立的連接
  3. 使用隊列緩存待發送的消息

主要欄位

// 用於執行QuorumConnectionReqThread和QuorumConnectionReceiverThread
private ThreadPoolExecutor connectionExecutor;

// 管理sid -> SendWorker/BlockingQueue/ByteBuffer
final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;
final ConcurrentHashMap<Long, BlockingQueue<ByteBuffer>> queueSendMap;
final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;

// 接收隊列
public final BlockingQueue<Message> recvQueue;

主要方法

public void initiateConnection(final MultipleAddresses electionAddr, final Long sid);
// 將initiateConnection方法放到了QuorumConnectionReqThread中然後提交給connectionExecutor非同步執行
public boolean initiateConnectionAsync(final MultipleAddresses electionAddr, final Long sid);

private boolean startConnection(Socket sock, Long sid) throws IOException;

public void receiveConnection(final Socket sock);
// 將receiveConnection方法放到了QuorumConnectionReceiverThread中然後提交給connectionExecutor非同步執行
public void receiveConnectionAsync(final Socket sock);

public void toSend(Long sid, ByteBuffer b);

boolean connectOne(long sid, MultipleAddresses electionAddr);
void connectOne(long sid);
public void connectAll();

其餘工具方法不分析。

initiateConnection方法

創建Socket對象,如有必要則做ssl握手和認證,發送初始化數據包。如果自己id小則關閉連接,以確保兩個server之間存在一個連接。

public void initiateConnection(final MultipleAddresses electionAddr, final Long sid) {
    Socket sock = null;
    try {
        // 創建Socket
        if (self.isSslQuorum()) {
            sock = self.getX509Util().createSSLSocket();
        } else {
            sock = SOCKET_FACTORY.get();
        }
        setSockOpts(sock); // socket設置例如timeout
        // 連接目標peer
        sock.connect(electionAddr.getReachableOrOne(), cnxTO);
        // ssl握手
        if (sock instanceof SSLSocket) {
            SSLSocket sslSock = (SSLSocket) sock;
            sslSock.startHandshake();
        }
    } catch (X509Exception e) {
        closeSocket(sock);
        return;
    } catch (UnresolvedAddressException | IOException e) {
        closeSocket(sock);
        return;
    }

    try {
        // 發連接初始化數據包、sasl認證
        // 如果selfId小於對方,關閉連接
        // 創建SendWorker、RecvWorker並啟動
        // 創建對應sid的發送隊列
        startConnection(sock, sid);
    } catch (IOException e) {
        closeSocket(sock);
    }
}

startConnection方法

  1. 發連接初始化數據包、sasl認證
  2. 如果selfId小於對方,關閉連接
  3. 創建SendWorker、RecvWorker並啟動
  4. 創建對應sid的發送隊列
private boolean startConnection(Socket sock, Long sid) throws IOException {
    DataOutputStream dout = null;
    DataInputStream din = null;
    try {
        // 輸出流
        BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());
        dout = new DataOutputStream(buf);

        // 發協議版本、myid、address初始化數據包
        long protocolVersion = self.isMultiAddressEnabled() ? PROTOCOL_VERSION_V2 : PROTOCOL_VERSION_V1;
        dout.writeLong(protocolVersion);
        dout.writeLong(self.getMyId());

        // now we send our election address. For the new protocol version, we can send multiple addresses.
        Collection<InetSocketAddress> addressesToSend = protocolVersion == PROTOCOL_VERSION_V2
                ? self.getElectionAddress().getAllAddresses()
                : Arrays.asList(self.getElectionAddress().getOne());

        String addr = addressesToSend.stream()
                .map(NetUtils::formatInetAddr).collect(Collectors.joining("|"));
        byte[] addr_bytes = addr.getBytes();
        dout.writeInt(addr_bytes.length);
        dout.write(addr_bytes);
        dout.flush();

        din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
    } catch (IOException e) {
        closeSocket(sock);
        return false;
    }

    // authenticate learner
    QuorumPeer.QuorumServer qps = self.getVotingView().get(sid);
    if (qps != null) {
        authLearner.authenticate(sock, qps.hostname);
    }

    if (sid > self.getMyId()) { // If lost the challenge, then drop the new connection
        closeSocket(sock);
    } else {
        // 創建SendWorker、RecvWorker
        SendWorker sw = new SendWorker(sock, sid);
        RecvWorker rw = new RecvWorker(sock, din, sid, sw);
        sw.setRecv(rw);

        SendWorker vsw = senderWorkerMap.get(sid);

        if (vsw != null) {
            vsw.finish();
        }

        senderWorkerMap.put(sid, sw);

        // 創建發送隊列
        queueSendMap.putIfAbsent(sid, new CircularBlockingQueue<>(SEND_CAPACITY));

        sw.start();
        rw.start();

        return true;
    }
    return false;
}

receiveConnection方法

當server收到連接請求,如果change獲勝(selfId大於對方),將關閉該連接,由自己去連接對方。

public void receiveConnection(final Socket sock) {
    DataInputStream din = null;
    try {
        // 輸入流
        din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
        handleConnection(sock, din);
    } catch (IOException e) {
        closeSocket(sock);
    }
}

private void handleConnection(Socket sock, DataInputStream din) throws IOException {
    Long sid = null, protocolVersion = null;
    MultipleAddresses electionAddr = null;

    try {
        protocolVersion = din.readLong();
        if (protocolVersion >= 0) { // this is a server id and not a protocol version
            sid = protocolVersion;
        } else {
            try {
                InitialMessage init = InitialMessage.parse(protocolVersion, din);
                sid = init.sid;
                if (!init.electionAddr.isEmpty()) {
                    electionAddr = new MultipleAddresses(init.electionAddr,
                            Duration.ofMillis(self.getMultiAddressReachabilityCheckTimeoutMs()));
                }
            } catch (InitialMessage.InitialMessageException ex) {
                closeSocket(sock);
                return;
            }
        }

        if (sid == QuorumPeer.OBSERVER_ID) {
            // Choose identifier at random. We need a value to identify the connection.
            sid = observerCounter.getAndDecrement();
        }
    } catch (IOException e) {
        closeSocket(sock);
        return;
    }

    // do authenticating learner
    authServer.authenticate(sock, din);
    // If wins the challenge, then close the new connection.
    if (sid < self.getMyId()) { // 對方比自己id小,需要關閉當前連接,由自己去連接對方
        SendWorker sw = senderWorkerMap.get(sid);
        if (sw != null) {
            sw.finish();
        }

        // 關閉連接
        closeSocket(sock);

        if (electionAddr != null) {
            connectOne(sid, electionAddr); // 連接對方
        } else {
            connectOne(sid);
        }
    } else if (sid == self.getMyId()) {
    } else { // 創建SendWorker、RecvWorker和發送隊列
        SendWorker sw = new SendWorker(sock, sid);
        RecvWorker rw = new RecvWorker(sock, din, sid, sw);
        sw.setRecv(rw);

        SendWorker vsw = senderWorkerMap.get(sid);

        if (vsw != null) {
            vsw.finish();
        }

        senderWorkerMap.put(sid, sw);

        queueSendMap.putIfAbsent(sid, new CircularBlockingQueue<>(SEND_CAPACITY));

        sw.start();
        rw.start();
    }
}

toSend方法

發消息。

public void toSend(Long sid, ByteBuffer b) {
    // 如果是給自己的消息,直接發給recvQueue
    if (this.mySid == sid) {
        b.position(0);
        addToRecvQueue(new Message(b.duplicate(), sid));
    } else {
        // 將消息發給sid對應的發送隊列
        BlockingQueue<ByteBuffer> bq =
            queueSendMap.computeIfAbsent(sid, serverId -> new CircularBlockingQueue<>(SEND_CAPACITY));
        addToSendQueue(bq, b);
        // 檢查是否建立了連接
        connectOne(sid);
    }
}

connectOne方法

synchronized boolean connectOne(long sid, MultipleAddresses electionAddr) {
    // 已經建立過連接
    if (senderWorkerMap.get(sid) != null) {
        if (self.isMultiAddressEnabled() && electionAddr.size() > 1 &&
            self.isMultiAddressReachabilityCheckEnabled()) {
            // check是否可達
            senderWorkerMap.get(sid).asyncValidateIfSocketIsStillReachable();
        }
        return true;
    }
    // 非同步建立新連接
    return initiateConnectionAsync(electionAddr, sid);
}

synchronized void connectOne(long sid) {
    if (senderWorkerMap.get(sid) != null) {
        if (self.isMultiAddressEnabled() && self.isMultiAddressReachabilityCheckEnabled()) {
            senderWorkerMap.get(sid).asyncValidateIfSocketIsStillReachable();
        }
        return;
    }
    // 使用sid從lastCommittedView、lastProposedView中解析address之後在建立連接
    synchronized (self.QV_LOCK) {
        boolean knownId = false;
        // Resolve hostname for the remote server before attempting to
        // connect in case the underlying ip address has changed.
        self.recreateSocketAddresses(sid);
        Map<Long, QuorumPeer.QuorumServer> lastCommittedView = self.getView();
        QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();
        Map<Long, QuorumPeer.QuorumServer> lastProposedView = lastSeenQV.getAllMembers();
        if (lastCommittedView.containsKey(sid)) {
            knownId = true;
            if (connectOne(sid, lastCommittedView.get(sid).electionAddr)) {
                return;
            }
        }
        if (lastSeenQV != null
            && lastProposedView.containsKey(sid)
            && (!knownId ||
                !lastProposedView.get(sid).electionAddr.equals(lastCommittedView.get(sid).electionAddr))) {
            knownId = true;
            if (connectOne(sid, lastProposedView.get(sid).electionAddr)) {
                return;
            }
        }
    }
}

connectAll方法

Try to establish a connection with each server if one doesn't exist.

public void connectAll() {
    long sid;
    for (Enumeration<Long> en = queueSendMap.keys(); en.hasMoreElements(); ) {
        sid = en.nextElement();
        connectOne(sid);
    }
}

Listener類

用來啟動serverSocket監聽,一個線程類,在run方法啟動監聽:

public void run() {
    if (!shutdown) {
        Set<InetSocketAddress> addresses;

        // 獲取需要監聽的地址
        if (self.getQuorumListenOnAllIPs()) {
            addresses = self.getElectionAddress().getWildcardAddresses();
        } else {
            addresses = self.getElectionAddress().getAllAddresses();
        }
        // 用於阻塞等待
        CountDownLatch latch = new CountDownLatch(addresses.size());
        // 為每一個監聽地址創建ListenerHandler
        listenerHandlers = addresses.stream().map(address ->
                        new ListenerHandler(address,self.shouldUsePortUnification(),
                                            self.isSslQuorum(), latch))
                .collect(Collectors.toList());

        final ExecutorService executor = Executors.newFixedThreadPool(addresses.size());
        try {
            // 啟動ListenerHandler
            listenerHandlers.forEach(executor::submit);
        } finally {
            executor.shutdown();
        }

        try {
            // 阻塞等待,ListenerHandler結束之後會countdown
            latch.await();
        } catch (InterruptedException ie) {
        } finally {
            // Clean up for shutdown 略
        }
    }
    // 略
}

ListenerHandler run方法:

public void run() {
    try {
        // 接受連接
        acceptConnections();
        try {
            close();
        } catch (IOException e) {}
    } catch (Exception e) {
    } finally {
        latch.countDown();
    }
}

private void acceptConnections() {
    int numRetries = 0;
    Socket client = null;

    while ((!shutdown) && (portBindMaxRetry == 0 || numRetries < portBindMaxRetry)) {
        try {
            // 創建ServerSocket並bind埠
            serverSocket = createNewServerSocket();
            while (!shutdown) {
                try {
                    // 接受客戶端Socket
                    client = serverSocket.accept();
                    setSockOpts(client); // socket設置如timeout
                    // 使用receiveConnection處理新的連接
                    if (quorumSaslAuthEnabled) {
                        receiveConnectionAsync(client);
                    } else {
                        receiveConnection(client);
                    }
                    numRetries = 0;
                } catch (SocketTimeoutException e) {}
            }
        } catch (IOException e) {
            // 略
        }
    }
    // 略
}

QuorumConnectionReqThread類

用於非同步連接其他peer服務,run方法調用initiateConnection方法建立連接。

QuorumConnectionReceiverThread類

用於非同步接受連接,run方法調用receiveConnection方法處理新建立的連接。

SendWorker類

Thread to send messages. Instance waits on a queue, and send a message as soon as there is one available. If connection breaks, then opens a new one.

用來發送消息的線程:

  • 封裝sid、socket、連接輸出流
  • 從發送隊列取消息,通過輸出流發送

RecvWorker類

Thread to receive messages. Instance waits on a socket read. If the channel breaks, then removes itself from the pool of receivers.

用來讀取消息的線程:

public void run() {
    threadCnt.incrementAndGet();
    try {
        while (running && !shutdown && sock != null) {
            // 讀取消息長度
            int length = din.readInt();
            if (length <= 0 || length > PACKETMAXSIZE) {
                throw new IOException("Received packet with invalid packet: " + length);
            }
            // 讀取數據
            final byte[] msgArray = new byte[length];
            din.readFully(msgArray, 0, length);
            // 保存到接收隊列
            addToRecvQueue(new Message(ByteBuffer.wrap(msgArray), sid));
        }
    } catch (Exception e) {
    } finally {
        sw.finish();
        closeSocket(sock);
    }
}

FastLeaderElection類

文檔說明:

Implementation of leader election using TCP. It uses an object of the class QuorumCnxManager to manage connections. Otherwise, the algorithm is push-based as with the other UDP implementations. There are a few parameters that can be tuned to change its behavior. First, finalizeWait determines the amount of time to wait until deciding upon a leader. This is part of the leader election algorithm.
  1. 使用tcp實現leader選舉,基於推送模式
  2. 使用QuorumCnxManager對象管理連接

構造方法

public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager) {
    this.stop = false;
    this.manager = manager;
    starter(self, manager);
}

private void starter(QuorumPeer self, QuorumCnxManager manager) {
    this.self = self;
    proposedLeader = -1;
    proposedZxid = -1;

    sendqueue = new LinkedBlockingQueue<>();
    recvqueue = new LinkedBlockingQueue<>();
    // 用來啟動WorkerSender和WorkerReceiver
    this.messenger = new Messenger(manager);
}

主要欄位

// 在leader最終確定之前嘗試拉取變化選票的時長
static final int finalizeWait = 200;

// 投票箱,用於保存一輪選舉的結果、統計選舉結果
private SyncedLearnerTracker leadingVoteSet;

// 發送隊列
LinkedBlockingQueue<ToSend> sendqueue;
// 接收隊列
LinkedBlockingQueue<Notification> recvqueue;

// 用來啟動WorkerSender和WorkerReceiver
Messenger messenger;

// 決議leaderId
long proposedLeader;
// 決議zxid
long proposedZxid;
// 決議epoch
long proposedEpoch;

start方法啟動選舉

public void start() {
    this.messenger.start(); // 會啟動WorkerSender和WorkerReceiver兩個線程
}

Messenger類

WorkerSender線程

  1. 從sendqueue取ToSend消息
  2. 通過QuorumCnxManager的toSend方法發送消息

WorkerReceiver線程

  1. 通過QuorumCnxManager的pollRecvQueue取接收的消息
  2. 封裝Notification對象,推送到recvqueue隊列

主要方法

// 創建發送消息
static ByteBuffer buildMsg(
    int state, long leader, long zxid, long electionEpoch, long epoch, byte[] configData);

// 給所有節點發Notification投票
private void sendNotifications();

// 對比serverId、zxid、currentEpoch決定將票投給哪個server
protected boolean totalOrderPredicate(
    long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch);

// 給定一個Vote集,返回SyncedLearnerTracker對象,用來確定是否有足夠的選票確定選舉結束
protected SyncedLearnerTracker getVoteTracker(Map<Long, Vote> votes, Vote vote);

// 如果有leader當選,並且有足夠的選票,必須檢查該leader是否投票並確認其處於領先地位
// 需要進行這種檢查,以避免peers一次又一次地選舉一個已經崩潰且不再領先的peer
protected boolean checkLeader(Map<Long, Vote> votes, long leader, long electionEpoch);

// 更新proposedLeader、proposedZxid、proposedEpoch
// 確定leader或者為下一輪投票做準備
synchronized void updateProposal(long leader, long zxid, long epoch);

// 使用當前proposedLeader、proposedZxid、proposedEpoch創建Vote(選票)
public synchronized Vote getVote();

// 通過zkDb獲取lastLoggedZxid
private long getInitLastLoggedZxid();

// 獲取currentEpoch
private long getPeerEpoch();

// 根據參數proposedLeader更新peer狀態
// 如果已經是leader會使用voteSet更新leadingVoteSet
private void setPeerState(long proposedLeader, SyncedLearnerTracker voteSet);

// 啟動一輪leader選舉
// 當狀態變為LOOKING該方法就會被調用,會給其他peer發投票notification
public Vote lookForLeader() throws InterruptedException;

// 收到FOLLOWING狀態notification
private Vote receivedFollowingNotification(
    Map<Long, Vote> recvset, Map<Long, Vote> outofelection,
    SyncedLearnerTracker voteSet, Notification n);

// 收到LEADING狀態notification
private Vote receivedLeadingNotification(
    Map<Long, Vote> recvset, Map<Long, Vote> outofelection,
    SyncedLearnerTracker voteSet, Notification n);

buildMsg方法

static ByteBuffer buildMsg(int state, long leader, long zxid,
                           long electionEpoch, long epoch, byte[] configData) {
    byte[] requestBytes = new byte[44 + configData.length];
    ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);

    requestBuffer.clear();
    requestBuffer.putInt(state); // 當前狀態
    requestBuffer.putLong(leader); // 投票的leaderId
    requestBuffer.putLong(zxid); // zxid
    requestBuffer.putLong(electionEpoch); // 選舉epoch
    requestBuffer.putLong(epoch); // 數據epoch
    requestBuffer.putInt(Notification.CURRENTVERSION); // 0x2
    requestBuffer.putInt(configData.length); // 數據長度
    requestBuffer.put(configData); // quorumVerifier數據

    return requestBuffer;
}

totalOrderPredicate方法

對比serverId、zxid、currentEpoch決定將票投給哪個server:

protected boolean totalOrderPredicate(
    long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {

    if (self.getQuorumVerifier().getWeight(newId) == 0) {
        return false;
    }

    /*
     * Return true if one of the following three cases hold:
     * 1- New epoch is higher
     * 2- New epoch is the same as current epoch, but new zxid is higher
     * 3- New epoch is the same as current epoch, new zxid is the same
     *  as current zxid, but server id is higher.
     */
    return ((newEpoch > curEpoch)
            || ((newEpoch == curEpoch)
                && ((newZxid > curZxid)
                    || ((newZxid == curZxid)
                        && (newId > curId)))));
}

getVoteTracker方法

給定一個Vote集,返回SyncedLearnerTracker對象,用來確定是否有足夠的選票宣佈選舉結束:

protected SyncedLearnerTracker getVoteTracker(Map<Long, Vote> votes, Vote vote) {
    SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
    voteSet.addQuorumVerifier(self.getQuorumVerifier());
    if (self.getLastSeenQuorumVerifier() != null
        && self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) {
        voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
    }

    // 比對其他server響應的選票和本地的選票,決定是否將選票sid放入ack集
    for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
        if (vote.equals(entry.getValue())) {
            voteSet.addAck(entry.getKey()); // key是sid
        }
    }

    return voteSet;
}

checkLeader方法

protected boolean checkLeader(Map<Long, Vote> votes, long leader, long electionEpoch) {

    boolean predicate = true;

    if (leader != self.getMyId()) {
        if (votes.get(leader) == null) { // leader伺服器必須投票,否則次輪投票也無效
            predicate = false;
        } else if (votes.get(leader).getState() != ServerState.LEADING) {
            // leader伺服器的狀態必須是LEADING,否則次輪投票也無效
            predicate = false;
        }
    } else if (logicalclock.get() != electionEpoch) { // 選舉epoch必須一致
        predicate = false;
    }

    return predicate;
}

lookForLeader方法

啟動一輪leader選舉,當狀態變為LOOKING該方法就會被調用,會給其他peer發投票notification通知:

public Vote lookForLeader() throws InterruptedException {
    // 略
    try {
        // 存儲當前選舉周期的sid -> vote選票數據
        Map<Long, Vote> recvset = new HashMap<>();

        // 存儲之前選舉周期的sid -> vote選票數據
        Map<Long, Vote> outofelection = new HashMap<>();

        int notTimeout = minNotificationInterval;

        synchronized (this) {
            logicalclock.incrementAndGet(); // 遞增選舉epoch開始新一輪選舉
            // 初始化選舉"決議",最開始都是投票給自己
            updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
        }
        // 給所有節點發通知
        sendNotifications();
        // 投票箱
        SyncedLearnerTracker voteSet = null;

        // 正常情況下直到選出leader才會退出
        while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
            Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);

            if (n == null) {
                // 重發或者重連
                if (manager.haveDelivered()) {
                    sendNotifications();
                } else {
                    manager.connectAll();
                }

                notTimeout = Math.min(notTimeout << 1, maxNotificationInterval);

                // 略

            } else if (validVoter(n.sid) && validVoter(n.leader)) {
                switch (n.state) {
                case LOOKING:
                    // 略
                    // 對方的選舉epoch比自己大
                    if (n.electionEpoch > logicalclock.get()) {
                        logicalclock.set(n.electionEpoch); // 同步為新的epoch
                        recvset.clear(); // 清空投票集
                        // 比對選票,如果對方贏了,則使用對方的選票更新到本地
                        if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                                getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                        } else {
                            updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
                        }
                        // 把最新的選票發出去
                        sendNotifications();
                    } else if (n.electionEpoch < logicalclock.get()) {
                        // 對方的選舉epoch比自己小
                        break;
                    } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                                   proposedLeader, proposedZxid, proposedEpoch)) {
                        updateProposal(n.leader, n.zxid, n.peerEpoch);
                        sendNotifications();
                    }

                    // 保存到選票集
                    recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                    // 創建投票箱
                    voteSet = getVoteTracker(
                        recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));
                    // 判斷acks>half表示已經選舉出了leader
                    if (voteSet.hasAllQuorums()) {

                        // 等待拉取變化的選票
                        while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
                            if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, 
                                                    proposedLeader, proposedZxid, proposedEpoch)) {
                                recvqueue.put(n);
                                break;
                            }
                        }

                        // 設置peer狀態
                        if (n == null) {
                            setPeerState(proposedLeader, voteSet);
                            Vote endVote = new Vote(
                                proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                    }
                    break;
                case OBSERVING:
                    break;
                case FOLLOWING:
                    // 收到FOLLOWING通知
                    Vote resultFN = receivedFollowingNotification(recvset, outofelection, voteSet, n);
                    if (resultFN == null) {
                        break;
                    } else {
                        return resultFN;
                    }
                case LEADING:
                    // 收到LEADING通知
                    Vote resultLN = receivedLeadingNotification(recvset, outofelection, voteSet, n);
                    if (resultLN == null) {
                        break;
                    } else {
                        return resultLN;
                    }
                default:
                    break;
                }
            } else {
                // 略
            }
        }
        return null;
    } finally {
        // 略
    }
}

receivedFollowingNotification方法

收到FOLLOWING狀態notification。

private Vote receivedFollowingNotification(
    Map<Long, Vote> recvset, Map<Long, Vote> outofelection, SyncedLearnerTracker voteSet, Notification n) {
    // 也會將選票投給當前leader
    // 之後會進行quorum驗證和leaderCheck驗證
    if (n.electionEpoch == logicalclock.get()) {
        // 創建投票箱
        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
        voteSet = getVoteTracker(
            recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
        // acks>half和leaderCheck
        if (voteSet.hasAllQuorums() && checkLeader(recvset, n.leader, n.electionEpoch)) {
            // 更新節點狀態
            setPeerState(n.leader, voteSet);
            Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
            leaveInstance(endVote);
            return endVote;
        }
    }

    // 當本節點較晚進入集群,集群已經有了leader時,會進入下麵邏輯
    // 與前面的代碼基本相同
    outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
    voteSet = getVoteTracker(
        outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));

    if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {
        synchronized (this) {
            logicalclock.set(n.electionEpoch);
            setPeerState(n.leader, voteSet);
        }
        Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
        leaveInstance(endVote);
        return endVote;
    }

    return null;
}

receivedLeadingNotification方法

收到LEADING狀態notification。

private Vote receivedLeadingNotification(Map<Long, Vote> recvset, Map<Long, Vote> outofelection,
                                         SyncedLearnerTracker voteSet, Notification n) {
    Vote result = receivedFollowingNotification(recvset, outofelection, voteSet, n);
    if (result == null) {
        if (self.getQuorumVerifier().getNeedOracle() && !self.getQuorumVerifier().askOracle()) {
            // 略
        } else {
            return null;
        }
    } else {
        return result;
    }
}

QuorumPeer類

管理quorum協議,伺服器可能處於以下三種狀態:

  • Leader選舉 - 每個伺服器將選出一個leader,最初都會選自己
  • Follower節點 - 將與Leader同步並複製所有事務
  • Leader節點 - 處理請求並將其轉發給Follower節點,大多數Follower節點必須同步,該請求才能被提交

run方法main loop

run方法main loop判斷當前peer狀態,執行選舉、lead、follow等邏輯:

public void run() {
    // 略

    try {
        // Main loop
        while (running) {
            switch (getPeerState()) {
            case LOOKING:
                ServerMetrics.getMetrics().LOOKING_COUNT.add(1);
                if (Boolean.getBoolean("readonlymode.enabled")) {
                    // 略
                } else {
                    try {
                        reconfigFlagClear();
                        if (shuttingDownLE) {
                            shuttingDownLE = false;
                            startLeaderElection();
                        }
                        setCurrentVote(makeLEStrategy().lookForLeader());
                    } catch (Exception e) {
                        setPeerState(ServerState.LOOKING);
                    }
                }
                break;
            case OBSERVING:
                try {
                    setObserver(makeObserver(logFactory));
                    observer.observeLeader();
                } catch (Exception e) {
                } finally {
                    observer.shutdown();
                    setObserver(null);
                    updateServerState();

                    // Add delay jitter before we switch to LOOKING
                    // state to reduce the load of ObserverMaster
                    if (isRunning()) {
                        Observer.waitForObserverElectionDelay();
                    }
                }
                break;
            case FOLLOWING:
                try {
                    setFollower(makeFollower(logFactory));
                    follower.followLeader();
                } catch (Exception e) {
                } finally {
                    follower.shutdown();
                    setFollower(null);
                    updateServerState();
                }
                break;
            case LEADING:
                try {
                    setLeader(makeLeader(logFactory));
                    leader.lead();
                    setLeader(null);
                } catch (Exception e) {
                } finally {
                    if (leader != null) {
                        leader.shutdown("Forcing shutdown");
                        setLeader(null);
                    }
                    updateServerState();
                }
                break;
            }
        }
    } finally {
        // 略
    }
}

LOOKING分支

try {
    reconfigFlagClear();
    if (shuttingDownLE) {
        shuttingDownLE = false;
        startLeaderElection();
    }
    // 使用FastLeaderElection選舉
    setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
    setPeerState(ServerState.LOOKING); // 重置為LOOKING狀態
}

FOLLOWING分支

try {
    setFollower(makeFollower(logFactory));
    follower.followLeader(); // 啟動follower
} catch (Exception e) {
} finally {
    follower.shutdown();
    setFollower(null);
    updateServerState(); // 更新服務狀態
}

創建Follower對象:

protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
    return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.zkDb));
}

LEADING分支

try {
    setLeader(makeLeader(logFactory));
    leader.lead(); // 啟動leader
    setLeader(null);
} catch (Exception e) {
} finally {
    if (leader != null) {
        leader.shutdown("Forcing shutdown");
        setLeader(null);
    }
    updateServerState(); // 更新服務狀態
}

創建Leader對象:

protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException, X509Exception {
    return new Leader(this, new LeaderZooKeeperServer(logFactory, this, this.zkDb));
}

OBSERVING分支

try {
    setObserver(makeObserver(logFactory));
    observer.observeLeader();
} catch (Exception e) {
} finally {
    observer.shutdown();
    setObserver(null);
    updateServerState();

    // Add delay jitter before we switch to LOOKING
    // state to reduce the load of ObserverMaster
    if (isRunning()) {
        Observer.waitForObserverElectionDelay();
    }
}

創建Observer對象:

protected Observer makeObserver(FileTxnSnapLog logFactory) throws IOException {
    return new Observer(this, new ObserverZooKeeperServer(logFactory, this, this.zkDb));
}

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • kubelet 簡介 kubernetes 分為控制面和數據面,kubelet 就是數據面最主要的組件,在每個節點上啟動,主要負責容器的創建、啟停、監控、日誌收集等工作。它是一個在每個集群節點上運行的代理,負責確保節點上的容器根據PodSpec(Pod定義文件)正確運行。 Kubelet執行以下幾項 ...
  • 機器學習是通過研究數據和統計信息使電腦學習的過程。機器學習是邁向人工智慧(AI)的一步。機器學習是一個分析數據並學會預測結果的程式。 數據集 在電腦的思維中,數據集是任何數據的集合。它可以是從數組到完整資料庫的任何東西。 數組的示例: [99,86,87,88,111,86,103,87,94, ...
  • @目錄山茶花100ptsT1區間逆序對60pts100pts 區間操作固定套路,轉化為首碼操作dream20pts 神奇分塊杭州:轉化題意,正難則反正難則反(或者對於這種有刪邊操作的題), 我們看成反向加邊看題:構造坐飛機:斜率優化DP抓頹 : 啟髮式合併 + stl大雜燴討厭的線段樹Foo Fig ...
  • Spring5框架概述 Spring是輕量級的開源的JavaEE框架。 Spring可以解決企業應用開發的複雜性。 Spring有兩個核心部分:IOC和AOP IOC:控制反轉,把創建對象過程交給Spring進行管理 AOP:面向切麵,不修改源代碼進行功能增強 Spring特點 方便解耦,簡化開發( ...
  • 1. 左值、右值、左值引用以及右值引用 左值:一般指的是在記憶體中有對應的存儲單元的值,最常見的就是程式中創建的變數 右值:和左值相反,一般指的是沒有對應存儲單元的值(寄存器中的立即數,中間結果等),例如一個常量,或者表達式計算的臨時變數 int x = 10 int y = 20 int z = x ...
  • eclipse下載 官網下載:https://www.eclipse.org/downloads/packages/ 打開後,找到Eclipse IDE for Java Developers點擊進入 進入後點擊右側電腦適配的版本,進入到下載界面點擊“>> Select Another Mirror ...
  • 1 任務調度整體流程 2 組件 調度器 :工廠類創建Scheduler,根據觸發器定義的時間規則調度任務 任務:Job表示被調度的任務 觸發器:Trigger 定義調度時間的元素,按啥時間規則執行任務。一個Job可被多個Trigger關聯,但是一個Trigger 只能關聯一個Job import o ...
  • 進行支付寶開發的第一步就是:配置密鑰。 但是有很多小伙伴都不知道怎麼配置,這篇文章將手把手幫你從頭開始捋清如何配置支付寶密鑰~ ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...