zookeeper源碼(03)啟動流程

来源:https://www.cnblogs.com/xugf/archive/2023/10/30/17797584.html
-Advertisement-
Play Games

本文將從啟動類開始詳細分析zookeeper的啟動流程: 載入配置的過程 集群啟動過程 單機版啟動過程 啟動類 org.apache.zookeeper.server.quorum.QuorumPeerMain類。 用於啟動zookeeper服務,第一個參數用來指定配置文件,配置文件properti ...


本文將從啟動類開始詳細分析zookeeper的啟動流程:

  1. 載入配置的過程
  2. 集群啟動過程
  3. 單機版啟動過程

啟動類

org.apache.zookeeper.server.quorum.QuorumPeerMain類。

用於啟動zookeeper服務,第一個參數用來指定配置文件,配置文件properties格式,例如以下配置參數:

  • dataDir - 數據存儲目錄
  • dataLogDir - txnlog(事務日誌)存儲目錄,預設dataDir
  • clientPort - 接收客戶端連接的埠,例如2181
  • tickTime - leader做quorum驗證的周期時長,預設3000ms
  • initLimit - leader等待follower連接、數據同步ack的最大tick數量
  • syncLimit - leader發送同步數據等待ack的最大tick數量
  • server.id - 用於quorum協議的host:port[:port]格式的server列表

載入配置

QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
    config.parse(args[0]);
}

args[0]是配置文件名。

  1. 載入普通配置
  2. 載入quorumPeer配置
  3. 從dynamic文件載入quorumVerifier配置,zk會在processReconfig時生成dynamic文件,預設文件名zoo.cfg.dynamic.${version}格式,預設不開啟Reconfig功能
  4. 從dynamic文件載入lastQuorumPeer配置,同上,預設zoo.cfg.dynamic.next文件

載入普通配置

QuorumPeerConfig封裝以下欄位:

// 監聽客戶端連接的地址,使用clientPort和clientPortAddress配置確定,用來創建cnxnFactory
protected InetSocketAddress clientPortAddress;
// 用來創建secureCnxnFactory,使用secureClientPort和secureClientPortAddress配置確定
protected InetSocketAddress secureClientPortAddress;
// quorum使用ssl通信
protected boolean sslQuorum = false;
// portUnification配置,使用UnifiedServerSocket創建套接字
protected boolean shouldUsePortUnification = false;
// 用來創建ObserverMaster,該組件可以實現鏈式的數據複製,減小leader的負載
protected int observerMasterPort;
// 自動重新載入ssl文件
protected boolean sslQuorumReloadCertFiles = false;
// 數據目錄和事務log目錄
protected File dataDir;
protected File dataLogDir;
// dynamicConfig文件名
protected String dynamicConfigFileStr = null;
// 配置文件名,非配置參數
protected String configFileStr = null;
// leader做quorum驗證的周期時長,預設300ms
protected int tickTime = ZooKeeperServer.DEFAULT_TICK_TIME;
// 單個addr的client最大連接數
protected int maxClientCnxns = 60;
// 超時時長
protected int minSessionTimeout = -1;
protected int maxSessionTimeout = -1;
// metricsProvider.className配置,MetricsProvider實現類全名
protected String metricsProviderClassName = DefaultMetricsProvider.class.getName();
// 封裝metricsProvider.*下麵的配置
protected Properties metricsProviderConfiguration = new Properties();
// 本地session配置
protected boolean localSessionsEnabled = false;
protected boolean localSessionsUpgradingEnabled = false;
// client連接的backlog數設置
protected int clientPortListenBacklog = -1;
// leader等待follower連接、數據同步ack的最大tick數量
protected int initLimit;
// leader發送同步數據等待ack的最大tick數量
protected int syncLimit;
// 大於0時用來計算連接leader的超時時長
protected int connectToLearnerMasterLimit;
// 必須是3
protected int electionAlg = 3;
// 未使用的配置
protected int electionPort = 2182;
// 監聽所有IP地址
protected boolean quorumListenOnAllIPs = false;
// myid配置
protected long serverId = UNSET_SERVERID;

protected QuorumVerifier quorumVerifier = null, lastSeenQuorumVerifier = null;

// autopurge.snapRetainCount配置
protected int snapRetainCount = 3;
// autopurge.purgeInterval配置
protected int purgeInterval = 0;
// 開啟同步
protected boolean syncEnabled = true;

protected String initialConfig;

// PARTICIPANT|OBSERVER
protected LearnerType peerType = LearnerType.PARTICIPANT;

/**
 * Configurations for the quorumpeer-to-quorumpeer sasl authentication
 */
protected boolean quorumServerRequireSasl = false;
protected boolean quorumLearnerRequireSasl = false;
protected boolean quorumEnableSasl = false;
protected String quorumServicePrincipal = QuorumAuth.QUORUM_KERBEROS_SERVICE_PRINCIPAL_DEFAULT_VALUE;
protected String quorumLearnerLoginContext = QuorumAuth.QUORUM_LEARNER_SASL_LOGIN_CONTEXT_DFAULT_VALUE;
protected String quorumServerLoginContext = QuorumAuth.QUORUM_SERVER_SASL_LOGIN_CONTEXT_DFAULT_VALUE;
protected int quorumCnxnThreadsSize;

// multi address related configs
// multiAddress.enabled配置
private boolean multiAddressEnabled = Boolean.parseBoolean(
    System.getProperty(QuorumPeer.CONFIG_KEY_MULTI_ADDRESS_ENABLED, "false"));
// multiAddress.reachabilityCheckEnabled配置
private boolean multiAddressReachabilityCheckEnabled = Boolean.parseBoolean(
    System.getProperty(QuorumPeer.CONFIG_KEY_MULTI_ADDRESS_REACHABILITY_CHECK_ENABLED, "true"));
// multiAddress.reachabilityCheckTimeoutMs配置
private int multiAddressReachabilityCheckTimeoutMs = Integer.parseInt(
    System.getProperty(QuorumPeer.CONFIG_KEY_MULTI_ADDRESS_REACHABILITY_CHECK_TIMEOUT_MS,
                       String.valueOf(1000)));

// 創建QuorumVerifier時使用,不為null時會創建QuorumOracleMaj
protected String oraclePath;

// Minimum snapshot retain count.
private final int MIN_SNAP_RETAIN_COUNT = 3;
// JVM Pause Monitor feature switch
protected boolean jvmPauseMonitorToRun = false;
// JVM Pause Monitor warn threshold in ms
protected long jvmPauseWarnThresholdMs = JvmPauseMonitor.WARN_THRESHOLD_DEFAULT;
// JVM Pause Monitor info threshold in ms
protected long jvmPauseInfoThresholdMs = JvmPauseMonitor.INFO_THRESHOLD_DEFAULT;
// JVM Pause Monitor sleep time in ms
protected long jvmPauseSleepTimeMs = JvmPauseMonitor.SLEEP_TIME_MS_DEFAULT;

在parse方法的最後一個else分支,會將其他配置首碼zookeeper.之後設置到System環境變數中:

System.setProperty("zookeeper." + key, value);

比如一些SSL相關配置參數:

ssl.quorum.keyStore.location=/path/to/keystore.jks
ssl.quorum.keyStore.password=password
ssl.quorum.trustStore.location=/path/to/truststore.jks
ssl.quorum.trustStore.password=password

解析配置文件之後,會根據ssl相關參數做ssl配置:

if (this.secureClientPortAddress != null) {
    configureSSLAuth();
}

預設會將X509AuthenticationProvider作為ssl認證組件:

// key = "zookeeper.authProvider.x509"
System.setProperty(ProviderRegistry.AUTHPROVIDER_PROPERTY_PREFIX + "x509",
                   "org.apache.zookeeper.server.auth.X509AuthenticationProvider");

其餘的都是參數驗證代碼,不詳細說明。

載入quorumPeer配置

// backward compatibility - dynamic configuration in the same file as
// static configuration params see writeDynamicConfig()
if (dynamicConfigFileStr == null) {
    // 解析quorum配置
    setupQuorumPeerConfig(zkProp, true);
    if (isDistributed() && isReconfigEnabled()) { // 預設reconfigEnabled==false分支進不來
        // we don't backup static config for standalone mode.
        // we also don't backup if reconfig feature is disabled.
        // 備份zoo.cfg到zoo.cfg.bak
        backupOldConfig();
    }
}

解析quorum配置:

void setupQuorumPeerConfig(Properties prop,
                           boolean configBackwardCompatibilityMode) throws IOException, ConfigException {
    quorumVerifier = parseDynamicConfig(
        prop, electionAlg, true, configBackwardCompatibilityMode, oraclePath);
    // 讀取${dataDir}/myid文件,給serverId賦值
    setupMyId();
    // 對比clientPortAddress配置與quorum配置進行重新賦值
    setupClientPort();
    // 對比peerType配置與quorum配置進行重新賦值
    setupPeerType();
    checkValidity(); // 參數驗證
}

parseDynamicConfig方法需要看一下:

public static QuorumVerifier parseDynamicConfig(
        Properties dynamicConfigProp, int eAlg, boolean warnings,
        boolean configBackwardCompatibilityMode, String oraclePath) throws IOException, ConfigException {
    boolean isHierarchical = false;
    for (Entry<Object, Object> entry : dynamicConfigProp.entrySet()) {
        String key = entry.getKey().toString().trim();
        // group.*和weight.*的參數配置
        if (key.startsWith("group") || key.startsWith("weight")) {
            isHierarchical = true;
        } else if (!configBackwardCompatibilityMode &&
                   !key.startsWith("server.") && !key.equals("version")) {
            throw new ConfigException("Unrecognised parameter: " + key);
        }
    }

    QuorumVerifier qv = createQuorumVerifier(dynamicConfigProp, isHierarchical, oraclePath);

    // 驗證 略
}

private static QuorumVerifier createQuorumVerifier(
        Properties dynamicConfigProp, boolean isHierarchical, String oraclePath) throws ConfigException {
    if (oraclePath == null) {
        return createQuorumVerifier(dynamicConfigProp, isHierarchical);
    } else {
        return new QuorumOracleMaj(dynamicConfigProp, oraclePath);
    }
}

private static QuorumVerifier createQuorumVerifier(
        Properties dynamicConfigProp, boolean isHierarchical) throws ConfigException {
    if (isHierarchical) {
        return new QuorumHierarchical(dynamicConfigProp);
    } else {
        return new QuorumMaj(dynamicConfigProp);
    }
}

QuorumMaj

解析集群配置,配置形如:

server.1=server_config;client_config
server.2=server_config;client_config
server.3=server_config;client_config

# 配置兩個也能啟動,但是只能提供副本能力,無法保證高可用

# 使用分號分隔server_config和client_config

# 1. server_config格式:
#    host:quorumPort:electionPort 或 host:quorumPort:electionPort:type
#    可以配置多個,使用|分隔
#    例如:
#       127.0.0.1:2888:3888:PARTICIPANT

# 2. client_config可以沒有,格式: port或host:port

構造方法:

public QuorumMaj(Properties props) throws ConfigException {
    for (Entry<Object, Object> entry : props.entrySet()) {
        String key = entry.getKey().toString();
        String value = entry.getValue().toString();
        if (key.startsWith("server.")) {
            int dot = key.indexOf('.');
            // 獲取serverId
            long sid = Long.parseLong(key.substring(dot + 1));
            // 創建QuorumServer對象,解析value字元串
            // value格式: server_config或server_config;client_config
            // server_config格式是使用|分隔的列表,每個元素是:
            // host:quorumPort:electionPort或host:quorumPort:electionPort:type
            // client_config格式: port或host:port
            QuorumServer qs = new QuorumServer(sid, value);
            allMembers.put(Long.valueOf(sid), qs);
            if (qs.type == LearnerType.PARTICIPANT) {
                votingMembers.put(Long.valueOf(sid), qs); // 投票成員
            } else {
                observingMembers.put(Long.valueOf(sid), qs); // observer成員
            }
        } else if (key.equals("version")) {
            version = Long.parseLong(value, 16);
        }
    }
    half = votingMembers.size() / 2; // 成員半數,例如5/2=2
}

QuorumHierarchical

比QuorumMaj多了group和weight等特性。

從dynamic文件載入quorumVerifier配置

用於載入quorumVerifier信息:從dynamicConfigFileStr參數指定的文件載入quorumPeer配置,方式與上一小節一樣。

從dynamic文件載入lastQuorumPeer配置

用於載入lastSeenQuorumVerifier信息:從zoo.cfg.dynamic.next文件載入lastSeenQuorumVerifier配置,方式與上一小節一樣。

創建並啟動DatadirCleanupManager

預設配置時不啟動。

// Start and schedule the purge task
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(
    config.getDataDir(),
    config.getDataLogDir(),
    config.getSnapRetainCount(), // 預設3
    config.getPurgeInterval());
purgeMgr.start();

啟動周期任務:

public void start() {
    if (PurgeTaskStatus.STARTED == purgeTaskStatus) {
        LOG.warn("Purge task is already running.");
        return;
    }
    // 預設不啟動
    if (purgeInterval <= 0) {
        LOG.info("Purge task is not scheduled.");
        return;
    }

    timer = new Timer("PurgeTask", true);
    TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);
    timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));

    purgeTaskStatus = PurgeTaskStatus.STARTED;
}

清理邏輯在PurgeTask的run方法:

public void run() {
    try {
        PurgeTxnLog.purge(logsDir, snapsDir, snapRetainCount);
    } catch (Exception e) {}
}

啟動集群

if (args.length == 1 && config.isDistributed()) {
    runFromConfig(config);
}

isDistributed判斷:

public boolean isDistributed() {
    return quorumVerifier != null && (!standaloneEnabled || quorumVerifier.getVotingMembers().size() > 1);
}

// standaloneEnabled預設true

創建並啟動MetricsProvider

final MetricsProvider metricsProvider;
try {
    metricsProvider = MetricsProviderBootstrap.startMetricsProvider(
        config.getMetricsProviderClassName(), // DefaultMetricsProvider
        config.getMetricsProviderConfiguration());
} catch (MetricsProviderLifeCycleException error) {
    throw new IOException("Cannot boot MetricsProvider " + config.getMetricsProviderClassName(), error);
}
// 註冊到全局
ServerMetrics.metricsProviderInitialized(metricsProvider);

創建ServerCnxnFactory

ServerCnxnFactory cnxnFactory = null;
ServerCnxnFactory secureCnxnFactory = null;

if (config.getClientPortAddress() != null) {
    // 預設使用NIOServerCnxnFactory實現類
    cnxnFactory = ServerCnxnFactory.createFactory();
    cnxnFactory.configure(config.getClientPortAddress(),
                          config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
}

if (config.getSecureClientPortAddress() != null) {
    secureCnxnFactory = ServerCnxnFactory.createFactory();
    secureCnxnFactory.configure(config.getSecureClientPortAddress(),
                                config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true);
}

ServerCnxnFactory有兩個主要的子類:

  • NIOServerCnxnFactory
  • NettyServerCnxnFactory

預設使用NIOServerCnxnFactory實現類,可以使用-Dzookeeper.serverCnxnFactory=xx來修改:

-Dzookeeper.serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory

ServerCnxnFactory

用於接收客戶端連接、管理客戶端session、處理客戶端請求。

NIOServerCnxnFactory

基於NIO的非阻塞、多線程的ServerCnxnFactory實現類,多線程之間通過queue通信:

  • 1個accept線程,用來接收客戶端連接,交給selector線程處理
  • 1-N個selector線程,每個線程會select 1/N個連接,多個selector線程的原因是,由於有大量連接,select()可能會成為性能瓶頸
  • 0-M個socket IO worker線程,做socket讀寫,如果配置為0則selector線程來做IO
  • 1個清理線程,用於關閉空閑連接

線程數量分配示例:32核的機器,1accept線程,1個清理線程,4個selector線程,64個worker線程。

configure方法:

  • 不支持ssl

  • 創建ConnectionExpirerThread線程

  • 根據核數確定各個線程的數量

    int numCores = Runtime.getRuntime().availableProcessors();
    // 32 cores sweet spot seems to be 4 selector threads
    numSelectorThreads = Integer.getInteger(
        ZOOKEEPER_NIO_NUM_SELECTOR_THREADS,
        Math.max((int) Math.sqrt((float) numCores / 2), 1));
    
    // 64
    numWorkerThreads = Integer.getInteger(ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);
    
  • 創建SelectorThread線程

  • 創建ServerSocketChannel、啟動監聽、設置非阻塞

  • 創建AcceptThread線程

start方法啟動各種線程:

  • acceptThread
  • selectorThreads
  • workerPool
  • expirerThread

NettyServerCnxnFactory

基於Netty的ServerCnxnFactory實現,使用CnxnChannelHandler作為業務處理器。

後續會有文章詳細分析。

創建並啟動QuorumPeer

管理quorum協議,伺服器可能處於以下三種狀態:

  • Leader選舉 - 每個伺服器將選出一個leader,最初都會選自己
  • Follower節點 - 將與Leader同步並複製所有事務
  • Leader節點 - 處理請求並將其轉發給Follower節點,大多數Follower節點必須同步,該請求才能被提交

創建QuorumPeer並使用QuorumPeerConfig為其設置屬性:

public QuorumPeer() throws SaslException {
    super("QuorumPeer");
    quorumStats = new QuorumStats(this);
    jmxRemotePeerBean = new HashMap<>();
    adminServer = AdminServerFactory.createAdminServer(); // http管理的服務,使用JettyAdminServer實現類
    x509Util = createX509Util();
    initialize();
    reconfigEnabled = QuorumPeerConfig.isReconfigEnabled(); // 預設false不開啟Reconfig功能
}

下麵記錄一下重要的步驟。

創建FileTxnSnapLog

quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir()));

FileTxnSnapLog類:操作TxnLog和SnapShot的入口類。

此步驟會創建dataDir和snapDir目錄、判斷數據目錄可寫、創建txnLog和snapLog對象訪問數據文件。

創建並初始化ZKDatabase

quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.initConfigInZKDatabase();

維護zookeeper伺服器記憶體資料庫,包括session、dataTree和committedlog數據,從磁碟讀取日誌和快照後啟動。

內部使用DataTree存儲數據,先看一下創建和初始化階段的代碼。

構造方法:創建DataTree對象,創建/zookeeper/quota、/zookeeper/config節點,創建dataWatches和childWatches對象(使用WatchManager實現類)。

initConfigInZKDatabase方法:

public synchronized void initConfigInZKDatabase() {
    if (zkDb != null) {
        zkDb.initConfigInZKDatabase(getQuorumVerifier());
    }
}

public synchronized void initConfigInZKDatabase(QuorumVerifier qv) {
    try {
        if (this.dataTree.getNode(ZooDefs.CONFIG_NODE) == null) {
            // should only happen during upgrade
            this.dataTree.addConfigNode();
        }
        // 把當前QuorumVerifier保存到/zookeeper/config中
        // qv.toString()格式如下:
        // server.1=host1:2888:3888:participant;host1:2181\n
        // server.2=host2:2888:3888:participant;host2:2181\n
        // ...
        // version=2
        this.dataTree.setData(ZooDefs.CONFIG_NODE,
            qv.toString().getBytes(UTF_8), // data
            -1, // version
            qv.getVersion(), // txid
            Time.currentWallTime());
    } catch (NoNodeException e) {}
}

設置QuorumVerifier

quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
if (config.getLastSeenQuorumVerifier() != null) {
    quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
}

初始化啟動QuorumPeer

// 初始化QuorumAuthServer
quorumPeer.initialize();
// 啟動QuorumPeer
quorumPeer.start();
// 線程阻塞
quorumPeer.join();

啟動QuorumPeer方法:

public synchronized void start() {
    loadDataBase();
    startServerCnxnFactory();
    try {
        adminServer.start();
    } catch (AdminServerException e) {}
    startLeaderElection();
    startJvmPauseMonitor();
    super.start();
}

啟動QuorumPeer流程

ZKDatabase載入

從txnlog和snapshot載入dataTree數據:

long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
  1. 倒序查找所有snapshot文件,從文件名解析snapZxid作為dataTree的lastProcessedZxid屬性,文件內容解析到dataTree中
  2. 如果從snapshot文件未找到數據,則生成snapshot.0文件,將當前dataTree(空的)保存到裡面
  3. 使用fastForwardFromEdits方法從txnlog載入數據

獲取currentEpoch和acceptedEpoch的值:

// 當前zxid
long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
// 當前epoch = zxid >> 32L
long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);

從${dataDir}/currentEpoch文件讀取currentEpoch值:

currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
  1. 如果文件不存在,直接使用epochOfZxid作為currentEpoch並保存到文件
  2. 如果currentEpoch比epochOfZxid小,則繼續查找${dataDir}/currentEpoch.tmp文件作為currentEpoch保存到文件,如果文件不存在則拋數據異常

從${dataDir}/acceptedEpoch文件讀取acceptedEpoch值:

acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
  1. 如果文件不存在,直接使用epochOfZxid作為acceptedEpoch並保存到文件
  2. 如果acceptedEpoch比currentEpoch小則拋數據異常

啟動serverCnxnFactory

private void startServerCnxnFactory() {
    if (cnxnFactory != null) {
        cnxnFactory.start(); // NIOServerCnxnFactory在啟動階段會啟動內部的4類線程
    }
    if (secureCnxnFactory != null) {
        secureCnxnFactory.start();
    }
}

啟動AdminServer

預設使用JettyAdminServer實現類,負責提供管理端的http介面。

啟動選舉

public synchronized void startLeaderElection() {
    try {
        if (getPeerState() == ServerState.LOOKING) {
            // 投自己一票,封裝zxid和epoch
            currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
        }
    } catch (IOException e) {
        RuntimeException re = new RuntimeException(e.getMessage());
        re.setStackTrace(e.getStackTrace());
        throw re;
    }
    // electionType總是3
    this.electionAlg = createElectionAlgorithm(electionType);
}

protected Election createElectionAlgorithm(int electionAlgorithm) {
    Election le = null;

    // TODO: use a factory rather than a switch
    // 可以使用策略模式替換switch語句
    switch (electionAlgorithm) {
    case 1:
        throw new UnsupportedOperationException("Election Algorithm 1 is not supported.");
    case 2:
        throw new UnsupportedOperationException("Election Algorithm 2 is not supported.");
    case 3:
        QuorumCnxManager qcm = createCnxnManager();
        QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
        // 關閉oldQcm
        if (oldQcm != null) {
            oldQcm.halt();
        }
        // 用來啟動ServerSocket監聽
        QuorumCnxManager.Listener listener = qcm.listener;
        if (listener != null) {
            listener.start();
            FastLeaderElection fle = new FastLeaderElection(this, qcm);
            fle.start();
            le = fle;
        }
        break;
    default:
        assert false;
    }
    return le;
}

創建QuorumCnxManager對象:

public QuorumCnxManager createCnxnManager() {
    // 預設tickTime * syncLimit
    // 按照zoo_sample.cfg文件配置是2000 * 5
    int timeout = quorumCnxnTimeoutMs > 0 ? quorumCnxnTimeoutMs : this.tickTime * this.syncLimit;
    return new QuorumCnxManager(
        this,
        this.getMyId(),
        this.getView(), // serverId->quorumServer
        this.authServer,
        this.authLearner,
        timeout,
        this.getQuorumListenOnAllIPs(), // 是否監聽所有IP預設false
        this.quorumCnxnThreadsSize, // 預設20
        this.isQuorumSaslAuthEnabled());
}

QuorumCnxManager類:

This class implements a connection manager for leader election using TCP.
It maintains one connection for every pair of servers. The tricky part is to guarantee that there is exactly one connection for every pair of servers that are operating correctly and that can communicate over the network. If two servers try to start a connection concurrently, then the connection manager uses a very simple tie-breaking mechanism to decide which connection to drop based on the IP addressed of the two parties.
For every peer, the manager maintains a queue of messages to send. If the connection to any particular peer drops, then the sender thread puts the message back on the list. As this implementation currently uses a queue implementation to maintain messages to send to another peer, we add the message to the tail of the queue, thus changing the order of messages. Although this is not a problem for the leader election, it could be a problem when consolidating peer communication. This is to be verified, though.
  1. 維護leader選舉時server之間的tcp連接
  2. 確保兩個server之間存在一個連接,如果兩個server同時建立連接,則始終保留id大的一方建立的連接
  3. 隊列緩存待發送的消息

FastLeaderElection類:

  • 使用TCP實現leader選舉
  • 使用QuorumCnxManager管理連接
  • 某些參數可以改變選舉行為,比如finalizeWait參數決定leader確定之前需要等待的時間

啟動線程

QuorumPeer繼承了ZooKeeperThread類,最後會使用super.start()啟動線程。run方法while迴圈,根據當前的ServerState執行不同的邏輯。

啟動單機版服務

啟動入口

在QuorumPeerMain的initializeAndRun階段:

if (args.length == 1 && config.isDistributed()) {
    runFromConfig(config);
} else {
    // 啟動單機版服務
    ZooKeeperServerMain.main(args);
}

ZooKeeperServerMain.main方法:

ZooKeeperServerMain main = new ZooKeeperServerMain();
try {
    main.initializeAndRun(args);
}
// 略

initializeAndRun方法:

protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException {
    // 略

    ServerConfig config = new ServerConfig();
    if (args.length == 1) {
        config.parse(args[0]); // args[0]是配置文件
    } else {
        config.parse(args); // args = {clientPortAddress, dataDir, tickTime, maxClientCnxns}
    }

    runFromConfig(config);
}

啟動流程

  1. 創建FileTxnSnapLog對象
  2. 創建ZooKeeperServer對象
  3. 創建並啟動AdminServer組件
  4. 創建並啟動cnxnFactory和secureCnxnFactory用於接受客戶端連接、處理客戶端請求,會啟動ZooKeeperServer、ZXDatabase等核心組件
  5. 創建並啟動ContainerManager組件

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • Lock實現線程間定製化通信 案例 要求 三個線程,AA BB CC AA線程列印5次,BB線程列印10次,CC線程列印15次 代碼實現 import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lo ...
  • 基礎代碼 只包含最簡單的代碼,不包含亂碼解決、文件上傳。 import org.apache.http.Consts; import org.apache.http.HttpEntity; import org.apache.http.client.config.RequestConfig; imp ...
  • 不論是在團隊寫作還是在個人工作中,PDF 文檔往往會經過多次修訂和更新。掌握 PDF 文檔內容的變化對於管理文檔有極大的幫助。通過對比 PDF 文檔,用戶可以快速找出文檔增加、刪除和修改的內容,更好地瞭解文檔的演變過程,輕鬆地管理文檔。本文將介紹如何在 Java 程式中通過代碼快速比較兩個 PDF ...
  • 作者:Escape 來源:https://www.escapelife.site/posts/38c81b25.html 服務日誌收集方案:Filebeat + Graylog! 當我們公司內部部署很多服務以及測試、正式環境的時候,查看日誌就變成了一個非常剛需的需求了。是多個環境的日誌統一收集,然後 ...
  • 一、SpringCloud 簡介 Spring Cloud 是一系列框架的有序集合如服務發現註冊、配置中心、消息匯流排、負載均衡、熔斷器、數據監控等。 SpringCloud 將多個服務框架組合起來,通過Spring Boot進行再封裝,屏蔽掉了複雜的配置和實現原理,最終給開發者提供了一套簡單易懂、易 ...
  • 哈嘍兄弟們,今天來實現一下建築市場公共服務平臺的數據採集,順便實現一下網站的JS解密。 話不多說,我們直接開始今天的內容。 首先我們需要準備這些 環境使用 Python 3.8 Pycharm 模塊使用 requests --> pip install requests execjs --> pip ...
  • 我們在對keycloak框架中的核心項目keycloak-services進行二次開發過程中,發現了一個問題,當時有這種需求,在keycloak-services中需要使用infinispan緩存,我們直接添加infinispan-core引用之後,在啟動keycloak進出錯了,提示我們沒有找到i ...
  • AES演算法是一種對稱加密演算法,全稱為高級加密標準(Advanced Encryption Standard)。它是一種分組密碼,以`128`比特為一個分組進行加密,其密鑰長度可以是`128`比特、`192`比特或`256`比特,因此可以提供不同等級的安全性。該演算法採用了替代、置換和混淆等技術,以及多... ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...