本文將從啟動類開始詳細分析zookeeper的啟動流程: 載入配置的過程 集群啟動過程 單機版啟動過程 啟動類 org.apache.zookeeper.server.quorum.QuorumPeerMain類。 用於啟動zookeeper服務,第一個參數用來指定配置文件,配置文件properti ...
本文將從啟動類開始詳細分析zookeeper的啟動流程:
- 載入配置的過程
- 集群啟動過程
- 單機版啟動過程
啟動類
org.apache.zookeeper.server.quorum.QuorumPeerMain類。
用於啟動zookeeper服務,第一個參數用來指定配置文件,配置文件properties格式,例如以下配置參數:
- dataDir - 數據存儲目錄
- dataLogDir - txnlog(事務日誌)存儲目錄,預設dataDir
- clientPort - 接收客戶端連接的埠,例如2181
- tickTime - leader做quorum驗證的周期時長,預設3000ms
- initLimit - leader等待follower連接、數據同步ack的最大tick數量
- syncLimit - leader發送同步數據等待ack的最大tick數量
- server.id - 用於quorum協議的host:port[:port]格式的server列表
載入配置
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
config.parse(args[0]);
}
args[0]是配置文件名。
- 載入普通配置
- 載入quorumPeer配置
- 從dynamic文件載入quorumVerifier配置,zk會在processReconfig時生成dynamic文件,預設文件名zoo.cfg.dynamic.${version}格式,預設不開啟Reconfig功能
- 從dynamic文件載入lastQuorumPeer配置,同上,預設zoo.cfg.dynamic.next文件
載入普通配置
QuorumPeerConfig封裝以下欄位:
// 監聽客戶端連接的地址,使用clientPort和clientPortAddress配置確定,用來創建cnxnFactory
protected InetSocketAddress clientPortAddress;
// 用來創建secureCnxnFactory,使用secureClientPort和secureClientPortAddress配置確定
protected InetSocketAddress secureClientPortAddress;
// quorum使用ssl通信
protected boolean sslQuorum = false;
// portUnification配置,使用UnifiedServerSocket創建套接字
protected boolean shouldUsePortUnification = false;
// 用來創建ObserverMaster,該組件可以實現鏈式的數據複製,減小leader的負載
protected int observerMasterPort;
// 自動重新載入ssl文件
protected boolean sslQuorumReloadCertFiles = false;
// 數據目錄和事務log目錄
protected File dataDir;
protected File dataLogDir;
// dynamicConfig文件名
protected String dynamicConfigFileStr = null;
// 配置文件名,非配置參數
protected String configFileStr = null;
// leader做quorum驗證的周期時長,預設300ms
protected int tickTime = ZooKeeperServer.DEFAULT_TICK_TIME;
// 單個addr的client最大連接數
protected int maxClientCnxns = 60;
// 超時時長
protected int minSessionTimeout = -1;
protected int maxSessionTimeout = -1;
// metricsProvider.className配置,MetricsProvider實現類全名
protected String metricsProviderClassName = DefaultMetricsProvider.class.getName();
// 封裝metricsProvider.*下麵的配置
protected Properties metricsProviderConfiguration = new Properties();
// 本地session配置
protected boolean localSessionsEnabled = false;
protected boolean localSessionsUpgradingEnabled = false;
// client連接的backlog數設置
protected int clientPortListenBacklog = -1;
// leader等待follower連接、數據同步ack的最大tick數量
protected int initLimit;
// leader發送同步數據等待ack的最大tick數量
protected int syncLimit;
// 大於0時用來計算連接leader的超時時長
protected int connectToLearnerMasterLimit;
// 必須是3
protected int electionAlg = 3;
// 未使用的配置
protected int electionPort = 2182;
// 監聽所有IP地址
protected boolean quorumListenOnAllIPs = false;
// myid配置
protected long serverId = UNSET_SERVERID;
protected QuorumVerifier quorumVerifier = null, lastSeenQuorumVerifier = null;
// autopurge.snapRetainCount配置
protected int snapRetainCount = 3;
// autopurge.purgeInterval配置
protected int purgeInterval = 0;
// 開啟同步
protected boolean syncEnabled = true;
protected String initialConfig;
// PARTICIPANT|OBSERVER
protected LearnerType peerType = LearnerType.PARTICIPANT;
/**
* Configurations for the quorumpeer-to-quorumpeer sasl authentication
*/
protected boolean quorumServerRequireSasl = false;
protected boolean quorumLearnerRequireSasl = false;
protected boolean quorumEnableSasl = false;
protected String quorumServicePrincipal = QuorumAuth.QUORUM_KERBEROS_SERVICE_PRINCIPAL_DEFAULT_VALUE;
protected String quorumLearnerLoginContext = QuorumAuth.QUORUM_LEARNER_SASL_LOGIN_CONTEXT_DFAULT_VALUE;
protected String quorumServerLoginContext = QuorumAuth.QUORUM_SERVER_SASL_LOGIN_CONTEXT_DFAULT_VALUE;
protected int quorumCnxnThreadsSize;
// multi address related configs
// multiAddress.enabled配置
private boolean multiAddressEnabled = Boolean.parseBoolean(
System.getProperty(QuorumPeer.CONFIG_KEY_MULTI_ADDRESS_ENABLED, "false"));
// multiAddress.reachabilityCheckEnabled配置
private boolean multiAddressReachabilityCheckEnabled = Boolean.parseBoolean(
System.getProperty(QuorumPeer.CONFIG_KEY_MULTI_ADDRESS_REACHABILITY_CHECK_ENABLED, "true"));
// multiAddress.reachabilityCheckTimeoutMs配置
private int multiAddressReachabilityCheckTimeoutMs = Integer.parseInt(
System.getProperty(QuorumPeer.CONFIG_KEY_MULTI_ADDRESS_REACHABILITY_CHECK_TIMEOUT_MS,
String.valueOf(1000)));
// 創建QuorumVerifier時使用,不為null時會創建QuorumOracleMaj
protected String oraclePath;
// Minimum snapshot retain count.
private final int MIN_SNAP_RETAIN_COUNT = 3;
// JVM Pause Monitor feature switch
protected boolean jvmPauseMonitorToRun = false;
// JVM Pause Monitor warn threshold in ms
protected long jvmPauseWarnThresholdMs = JvmPauseMonitor.WARN_THRESHOLD_DEFAULT;
// JVM Pause Monitor info threshold in ms
protected long jvmPauseInfoThresholdMs = JvmPauseMonitor.INFO_THRESHOLD_DEFAULT;
// JVM Pause Monitor sleep time in ms
protected long jvmPauseSleepTimeMs = JvmPauseMonitor.SLEEP_TIME_MS_DEFAULT;
在parse方法的最後一個else分支,會將其他配置首碼zookeeper.之後設置到System環境變數中:
System.setProperty("zookeeper." + key, value);
比如一些SSL相關配置參數:
ssl.quorum.keyStore.location=/path/to/keystore.jks
ssl.quorum.keyStore.password=password
ssl.quorum.trustStore.location=/path/to/truststore.jks
ssl.quorum.trustStore.password=password
解析配置文件之後,會根據ssl相關參數做ssl配置:
if (this.secureClientPortAddress != null) {
configureSSLAuth();
}
預設會將X509AuthenticationProvider作為ssl認證組件:
// key = "zookeeper.authProvider.x509"
System.setProperty(ProviderRegistry.AUTHPROVIDER_PROPERTY_PREFIX + "x509",
"org.apache.zookeeper.server.auth.X509AuthenticationProvider");
其餘的都是參數驗證代碼,不詳細說明。
載入quorumPeer配置
// backward compatibility - dynamic configuration in the same file as
// static configuration params see writeDynamicConfig()
if (dynamicConfigFileStr == null) {
// 解析quorum配置
setupQuorumPeerConfig(zkProp, true);
if (isDistributed() && isReconfigEnabled()) { // 預設reconfigEnabled==false分支進不來
// we don't backup static config for standalone mode.
// we also don't backup if reconfig feature is disabled.
// 備份zoo.cfg到zoo.cfg.bak
backupOldConfig();
}
}
解析quorum配置:
void setupQuorumPeerConfig(Properties prop,
boolean configBackwardCompatibilityMode) throws IOException, ConfigException {
quorumVerifier = parseDynamicConfig(
prop, electionAlg, true, configBackwardCompatibilityMode, oraclePath);
// 讀取${dataDir}/myid文件,給serverId賦值
setupMyId();
// 對比clientPortAddress配置與quorum配置進行重新賦值
setupClientPort();
// 對比peerType配置與quorum配置進行重新賦值
setupPeerType();
checkValidity(); // 參數驗證
}
parseDynamicConfig方法需要看一下:
public static QuorumVerifier parseDynamicConfig(
Properties dynamicConfigProp, int eAlg, boolean warnings,
boolean configBackwardCompatibilityMode, String oraclePath) throws IOException, ConfigException {
boolean isHierarchical = false;
for (Entry<Object, Object> entry : dynamicConfigProp.entrySet()) {
String key = entry.getKey().toString().trim();
// group.*和weight.*的參數配置
if (key.startsWith("group") || key.startsWith("weight")) {
isHierarchical = true;
} else if (!configBackwardCompatibilityMode &&
!key.startsWith("server.") && !key.equals("version")) {
throw new ConfigException("Unrecognised parameter: " + key);
}
}
QuorumVerifier qv = createQuorumVerifier(dynamicConfigProp, isHierarchical, oraclePath);
// 驗證 略
}
private static QuorumVerifier createQuorumVerifier(
Properties dynamicConfigProp, boolean isHierarchical, String oraclePath) throws ConfigException {
if (oraclePath == null) {
return createQuorumVerifier(dynamicConfigProp, isHierarchical);
} else {
return new QuorumOracleMaj(dynamicConfigProp, oraclePath);
}
}
private static QuorumVerifier createQuorumVerifier(
Properties dynamicConfigProp, boolean isHierarchical) throws ConfigException {
if (isHierarchical) {
return new QuorumHierarchical(dynamicConfigProp);
} else {
return new QuorumMaj(dynamicConfigProp);
}
}
QuorumMaj
解析集群配置,配置形如:
server.1=server_config;client_config
server.2=server_config;client_config
server.3=server_config;client_config
# 配置兩個也能啟動,但是只能提供副本能力,無法保證高可用
# 使用分號分隔server_config和client_config
# 1. server_config格式:
# host:quorumPort:electionPort 或 host:quorumPort:electionPort:type
# 可以配置多個,使用|分隔
# 例如:
# 127.0.0.1:2888:3888:PARTICIPANT
# 2. client_config可以沒有,格式: port或host:port
構造方法:
public QuorumMaj(Properties props) throws ConfigException {
for (Entry<Object, Object> entry : props.entrySet()) {
String key = entry.getKey().toString();
String value = entry.getValue().toString();
if (key.startsWith("server.")) {
int dot = key.indexOf('.');
// 獲取serverId
long sid = Long.parseLong(key.substring(dot + 1));
// 創建QuorumServer對象,解析value字元串
// value格式: server_config或server_config;client_config
// server_config格式是使用|分隔的列表,每個元素是:
// host:quorumPort:electionPort或host:quorumPort:electionPort:type
// client_config格式: port或host:port
QuorumServer qs = new QuorumServer(sid, value);
allMembers.put(Long.valueOf(sid), qs);
if (qs.type == LearnerType.PARTICIPANT) {
votingMembers.put(Long.valueOf(sid), qs); // 投票成員
} else {
observingMembers.put(Long.valueOf(sid), qs); // observer成員
}
} else if (key.equals("version")) {
version = Long.parseLong(value, 16);
}
}
half = votingMembers.size() / 2; // 成員半數,例如5/2=2
}
QuorumHierarchical
比QuorumMaj多了group和weight等特性。
從dynamic文件載入quorumVerifier配置
用於載入quorumVerifier信息:從dynamicConfigFileStr參數指定的文件載入quorumPeer配置,方式與上一小節一樣。
從dynamic文件載入lastQuorumPeer配置
用於載入lastSeenQuorumVerifier信息:從zoo.cfg.dynamic.next文件載入lastSeenQuorumVerifier配置,方式與上一小節一樣。
創建並啟動DatadirCleanupManager
預設配置時不啟動。
// Start and schedule the purge task
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(
config.getDataDir(),
config.getDataLogDir(),
config.getSnapRetainCount(), // 預設3
config.getPurgeInterval());
purgeMgr.start();
啟動周期任務:
public void start() {
if (PurgeTaskStatus.STARTED == purgeTaskStatus) {
LOG.warn("Purge task is already running.");
return;
}
// 預設不啟動
if (purgeInterval <= 0) {
LOG.info("Purge task is not scheduled.");
return;
}
timer = new Timer("PurgeTask", true);
TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);
timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));
purgeTaskStatus = PurgeTaskStatus.STARTED;
}
清理邏輯在PurgeTask的run方法:
public void run() {
try {
PurgeTxnLog.purge(logsDir, snapsDir, snapRetainCount);
} catch (Exception e) {}
}
啟動集群
if (args.length == 1 && config.isDistributed()) {
runFromConfig(config);
}
isDistributed判斷:
public boolean isDistributed() {
return quorumVerifier != null && (!standaloneEnabled || quorumVerifier.getVotingMembers().size() > 1);
}
// standaloneEnabled預設true
創建並啟動MetricsProvider
final MetricsProvider metricsProvider;
try {
metricsProvider = MetricsProviderBootstrap.startMetricsProvider(
config.getMetricsProviderClassName(), // DefaultMetricsProvider
config.getMetricsProviderConfiguration());
} catch (MetricsProviderLifeCycleException error) {
throw new IOException("Cannot boot MetricsProvider " + config.getMetricsProviderClassName(), error);
}
// 註冊到全局
ServerMetrics.metricsProviderInitialized(metricsProvider);
創建ServerCnxnFactory
ServerCnxnFactory cnxnFactory = null;
ServerCnxnFactory secureCnxnFactory = null;
if (config.getClientPortAddress() != null) {
// 預設使用NIOServerCnxnFactory實現類
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
}
if (config.getSecureClientPortAddress() != null) {
secureCnxnFactory = ServerCnxnFactory.createFactory();
secureCnxnFactory.configure(config.getSecureClientPortAddress(),
config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true);
}
ServerCnxnFactory有兩個主要的子類:
- NIOServerCnxnFactory
- NettyServerCnxnFactory
預設使用NIOServerCnxnFactory實現類,可以使用-Dzookeeper.serverCnxnFactory=xx來修改:
-Dzookeeper.serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
ServerCnxnFactory
用於接收客戶端連接、管理客戶端session、處理客戶端請求。
NIOServerCnxnFactory
基於NIO的非阻塞、多線程的ServerCnxnFactory實現類,多線程之間通過queue通信:
- 1個accept線程,用來接收客戶端連接,交給selector線程處理
- 1-N個selector線程,每個線程會select 1/N個連接,多個selector線程的原因是,由於有大量連接,select()可能會成為性能瓶頸
- 0-M個socket IO worker線程,做socket讀寫,如果配置為0則selector線程來做IO
- 1個清理線程,用於關閉空閑連接
線程數量分配示例:32核的機器,1accept線程,1個清理線程,4個selector線程,64個worker線程。
configure方法:
-
不支持ssl
-
創建ConnectionExpirerThread線程
-
根據核數確定各個線程的數量
int numCores = Runtime.getRuntime().availableProcessors(); // 32 cores sweet spot seems to be 4 selector threads numSelectorThreads = Integer.getInteger( ZOOKEEPER_NIO_NUM_SELECTOR_THREADS, Math.max((int) Math.sqrt((float) numCores / 2), 1)); // 64 numWorkerThreads = Integer.getInteger(ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);
-
創建SelectorThread線程
-
創建ServerSocketChannel、啟動監聽、設置非阻塞
-
創建AcceptThread線程
start方法啟動各種線程:
- acceptThread
- selectorThreads
- workerPool
- expirerThread
NettyServerCnxnFactory
基於Netty的ServerCnxnFactory實現,使用CnxnChannelHandler作為業務處理器。
後續會有文章詳細分析。
創建並啟動QuorumPeer
管理quorum協議,伺服器可能處於以下三種狀態:
- Leader選舉 - 每個伺服器將選出一個leader,最初都會選自己
- Follower節點 - 將與Leader同步並複製所有事務
- Leader節點 - 處理請求並將其轉發給Follower節點,大多數Follower節點必須同步,該請求才能被提交
創建QuorumPeer並使用QuorumPeerConfig為其設置屬性:
public QuorumPeer() throws SaslException {
super("QuorumPeer");
quorumStats = new QuorumStats(this);
jmxRemotePeerBean = new HashMap<>();
adminServer = AdminServerFactory.createAdminServer(); // http管理的服務,使用JettyAdminServer實現類
x509Util = createX509Util();
initialize();
reconfigEnabled = QuorumPeerConfig.isReconfigEnabled(); // 預設false不開啟Reconfig功能
}
下麵記錄一下重要的步驟。
創建FileTxnSnapLog
quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir()));
FileTxnSnapLog類:操作TxnLog和SnapShot的入口類。
此步驟會創建dataDir和snapDir目錄、判斷數據目錄可寫、創建txnLog和snapLog對象訪問數據文件。
創建並初始化ZKDatabase
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.initConfigInZKDatabase();
維護zookeeper伺服器記憶體資料庫,包括session、dataTree和committedlog數據,從磁碟讀取日誌和快照後啟動。
內部使用DataTree存儲數據,先看一下創建和初始化階段的代碼。
構造方法:創建DataTree對象,創建/zookeeper/quota、/zookeeper/config節點,創建dataWatches和childWatches對象(使用WatchManager實現類)。
initConfigInZKDatabase方法:
public synchronized void initConfigInZKDatabase() {
if (zkDb != null) {
zkDb.initConfigInZKDatabase(getQuorumVerifier());
}
}
public synchronized void initConfigInZKDatabase(QuorumVerifier qv) {
try {
if (this.dataTree.getNode(ZooDefs.CONFIG_NODE) == null) {
// should only happen during upgrade
this.dataTree.addConfigNode();
}
// 把當前QuorumVerifier保存到/zookeeper/config中
// qv.toString()格式如下:
// server.1=host1:2888:3888:participant;host1:2181\n
// server.2=host2:2888:3888:participant;host2:2181\n
// ...
// version=2
this.dataTree.setData(ZooDefs.CONFIG_NODE,
qv.toString().getBytes(UTF_8), // data
-1, // version
qv.getVersion(), // txid
Time.currentWallTime());
} catch (NoNodeException e) {}
}
設置QuorumVerifier
quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
if (config.getLastSeenQuorumVerifier() != null) {
quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
}
初始化啟動QuorumPeer
// 初始化QuorumAuthServer
quorumPeer.initialize();
// 啟動QuorumPeer
quorumPeer.start();
// 線程阻塞
quorumPeer.join();
啟動QuorumPeer方法:
public synchronized void start() {
loadDataBase();
startServerCnxnFactory();
try {
adminServer.start();
} catch (AdminServerException e) {}
startLeaderElection();
startJvmPauseMonitor();
super.start();
}
啟動QuorumPeer流程
ZKDatabase載入
從txnlog和snapshot載入dataTree數據:
long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
- 倒序查找所有snapshot文件,從文件名解析snapZxid作為dataTree的lastProcessedZxid屬性,文件內容解析到dataTree中
- 如果從snapshot文件未找到數據,則生成snapshot.0文件,將當前dataTree(空的)保存到裡面
- 使用fastForwardFromEdits方法從txnlog載入數據
獲取currentEpoch和acceptedEpoch的值:
// 當前zxid
long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
// 當前epoch = zxid >> 32L
long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
從${dataDir}/currentEpoch文件讀取currentEpoch值:
currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
- 如果文件不存在,直接使用epochOfZxid作為currentEpoch並保存到文件
- 如果currentEpoch比epochOfZxid小,則繼續查找${dataDir}/currentEpoch.tmp文件作為currentEpoch保存到文件,如果文件不存在則拋數據異常
從${dataDir}/acceptedEpoch文件讀取acceptedEpoch值:
acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
- 如果文件不存在,直接使用epochOfZxid作為acceptedEpoch並保存到文件
- 如果acceptedEpoch比currentEpoch小則拋數據異常
啟動serverCnxnFactory
private void startServerCnxnFactory() {
if (cnxnFactory != null) {
cnxnFactory.start(); // NIOServerCnxnFactory在啟動階段會啟動內部的4類線程
}
if (secureCnxnFactory != null) {
secureCnxnFactory.start();
}
}
啟動AdminServer
預設使用JettyAdminServer實現類,負責提供管理端的http介面。
啟動選舉
public synchronized void startLeaderElection() {
try {
if (getPeerState() == ServerState.LOOKING) {
// 投自己一票,封裝zxid和epoch
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
}
} catch (IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}
// electionType總是3
this.electionAlg = createElectionAlgorithm(electionType);
}
protected Election createElectionAlgorithm(int electionAlgorithm) {
Election le = null;
// TODO: use a factory rather than a switch
// 可以使用策略模式替換switch語句
switch (electionAlgorithm) {
case 1:
throw new UnsupportedOperationException("Election Algorithm 1 is not supported.");
case 2:
throw new UnsupportedOperationException("Election Algorithm 2 is not supported.");
case 3:
QuorumCnxManager qcm = createCnxnManager();
QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
// 關閉oldQcm
if (oldQcm != null) {
oldQcm.halt();
}
// 用來啟動ServerSocket監聽
QuorumCnxManager.Listener listener = qcm.listener;
if (listener != null) {
listener.start();
FastLeaderElection fle = new FastLeaderElection(this, qcm);
fle.start();
le = fle;
}
break;
default:
assert false;
}
return le;
}
創建QuorumCnxManager對象:
public QuorumCnxManager createCnxnManager() {
// 預設tickTime * syncLimit
// 按照zoo_sample.cfg文件配置是2000 * 5
int timeout = quorumCnxnTimeoutMs > 0 ? quorumCnxnTimeoutMs : this.tickTime * this.syncLimit;
return new QuorumCnxManager(
this,
this.getMyId(),
this.getView(), // serverId->quorumServer
this.authServer,
this.authLearner,
timeout,
this.getQuorumListenOnAllIPs(), // 是否監聽所有IP預設false
this.quorumCnxnThreadsSize, // 預設20
this.isQuorumSaslAuthEnabled());
}
QuorumCnxManager類:
This class implements a connection manager for leader election using TCP.
It maintains one connection for every pair of servers. The tricky part is to guarantee that there is exactly one connection for every pair of servers that are operating correctly and that can communicate over the network. If two servers try to start a connection concurrently, then the connection manager uses a very simple tie-breaking mechanism to decide which connection to drop based on the IP addressed of the two parties.
For every peer, the manager maintains a queue of messages to send. If the connection to any particular peer drops, then the sender thread puts the message back on the list. As this implementation currently uses a queue implementation to maintain messages to send to another peer, we add the message to the tail of the queue, thus changing the order of messages. Although this is not a problem for the leader election, it could be a problem when consolidating peer communication. This is to be verified, though.
- 維護leader選舉時server之間的tcp連接
- 確保兩個server之間存在一個連接,如果兩個server同時建立連接,則始終保留id大的一方建立的連接
- 隊列緩存待發送的消息
FastLeaderElection類:
- 使用TCP實現leader選舉
- 使用QuorumCnxManager管理連接
- 某些參數可以改變選舉行為,比如finalizeWait參數決定leader確定之前需要等待的時間
啟動線程
QuorumPeer繼承了ZooKeeperThread類,最後會使用super.start()啟動線程。run方法while迴圈,根據當前的ServerState執行不同的邏輯。
啟動單機版服務
啟動入口
在QuorumPeerMain的initializeAndRun階段:
if (args.length == 1 && config.isDistributed()) {
runFromConfig(config);
} else {
// 啟動單機版服務
ZooKeeperServerMain.main(args);
}
ZooKeeperServerMain.main方法:
ZooKeeperServerMain main = new ZooKeeperServerMain();
try {
main.initializeAndRun(args);
}
// 略
initializeAndRun方法:
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException {
// 略
ServerConfig config = new ServerConfig();
if (args.length == 1) {
config.parse(args[0]); // args[0]是配置文件
} else {
config.parse(args); // args = {clientPortAddress, dataDir, tickTime, maxClientCnxns}
}
runFromConfig(config);
}
啟動流程
- 創建FileTxnSnapLog對象
- 創建ZooKeeperServer對象
- 創建並啟動AdminServer組件
- 創建並啟動cnxnFactory和secureCnxnFactory用於接受客戶端連接、處理客戶端請求,會啟動ZooKeeperServer、ZXDatabase等核心組件
- 創建並啟動ContainerManager組件