開始分析 【1】分析入口類做了什麼 //org.apache.zookeeper.server.quorum包下QuorumPeerMain類 public static void main(String[] args) { QuorumPeerMain main = new QuorumPeerM ...
開始分析
【1】分析入口類做了什麼
//org.apache.zookeeper.server.quorum包下QuorumPeerMain類 public static void main(String[] args) { QuorumPeerMain main = new QuorumPeerMain(); try { main.initializeAndRun(args); } catch (IllegalArgumentException e) {..} catch (ConfigException e) {..} catch (DatadirException e) {..} catch (AdminServerException e) {..} catch (Exception e) {..} ServiceUtils.requestSystemExit(ExitCode.EXECUTION_FINISHED.getValue()); } protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException { QuorumPeerConfig config = new QuorumPeerConfig(); if (args.length == 1) { //解析配置文件載入到記憶體 //主要是調用了QuorumPeerConfig類#parse方法,解析邏輯在parseProperties方法 config.parse(args[0]); } //啟動延時的定期清理快照數據文件 DatadirCleanupManager purgeMgr = new DatadirCleanupManager( config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval()); purgeMgr.start(); if (args.length == 1 && config.isDistributed()) { //集群的入口 runFromConfig(config); } else { //單機的入口 ZooKeeperServerMain.main(args); } }
【2】runFromConfig方法做了什麼
public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException { try { ManagedUtil.registerLog4jMBeans(); } catch (JMException e) { LOG.warn("Unable to register log4j JMX control", e); } LOG.info("Starting quorum peer, myid=" + config.getServerId()); final MetricsProvider metricsProvider; try { metricsProvider = MetricsProviderBootstrap.startMetricsProvider( config.getMetricsProviderClassName(), config.getMetricsProviderConfiguration()); } catch (MetricsProviderLifeCycleException error) { throw new IOException("Cannot boot MetricsProvider " + config.getMetricsProviderClassName(), error); } try { ServerMetrics.metricsProviderInitialized(metricsProvider); ProviderRegistry.initialize(); ServerCnxnFactory cnxnFactory = null; ServerCnxnFactory secureCnxnFactory = null; if (config.getClientPortAddress() != null) { //初始化服務端連接對象 cnxnFactory = ServerCnxnFactory.createFactory(); //設置監聽埠,從配置文件中拿 cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false); } if (config.getSecureClientPortAddress() != null) { secureCnxnFactory = ServerCnxnFactory.createFactory(); secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true); } //構建本機節點,並將配置參數的數據傳入 quorumPeer = getQuorumPeer(); quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir())); quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled()); quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled()); //quorumPeer.setQuorumPeers(config.getAllMembers()); quorumPeer.setElectionType(config.getElectionAlg()); quorumPeer.setMyid(config.getServerId()); quorumPeer.setTickTime(config.getTickTime()); quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout()); quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout()); quorumPeer.setInitLimit(config.getInitLimit()); quorumPeer.setSyncLimit(config.getSyncLimit()); quorumPeer.setConnectToLearnerMasterLimit(config.getConnectToLearnerMasterLimit()); quorumPeer.setObserverMasterPort(config.getObserverMasterPort()); quorumPeer.setConfigFileName(config.getConfigFilename()); quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog()); quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory())); quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false); if (config.getLastSeenQuorumVerifier() != null) { quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false); } quorumPeer.initConfigInZKDatabase(); //將連接對象也存入本節點 quorumPeer.setCnxnFactory(cnxnFactory); quorumPeer.setSecureCnxnFactory(secureCnxnFactory); quorumPeer.setSslQuorum(config.isSslQuorum()); quorumPeer.setUsePortUnification(config.shouldUsePortUnification()); quorumPeer.setLearnerType(config.getPeerType()); quorumPeer.setSyncEnabled(config.getSyncEnabled()); quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs()); if (config.sslQuorumReloadCertFiles) { quorumPeer.getX509Util().enableCertFileReloading(); } quorumPeer.setMultiAddressEnabled(config.isMultiAddressEnabled()); quorumPeer.setMultiAddressReachabilityCheckEnabled(config.isMultiAddressReachabilityCheckEnabled()); quorumPeer.setMultiAddressReachabilityCheckTimeoutMs(config.getMultiAddressReachabilityCheckTimeoutMs()); // sets quorum sasl authentication configurations quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl); if (quorumPeer.isQuorumSaslAuthEnabled()) { quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl); quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl); quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal); quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext); quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext); } quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize); quorumPeer.initialize(); if (config.jvmPauseMonitorToRun) { quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config)); } //啟動節點 quorumPeer.start(); ZKAuditProvider.addZKStartStopAuditLog(); quorumPeer.join(); } catch (InterruptedException e) { // warn, but generally this is ok LOG.warn("Quorum Peer interrupted", e); } finally { try { metricsProvider.stop(); } catch (Throwable error) { LOG.warn("Error while stopping metrics", error); } } }
【3】通信對象的選擇
//ServerCnxnFactory類#createFactory方法 //初始化通信對象 public static ServerCnxnFactory createFactory() throws IOException { //屬性值展示:String ZOOKEEPER_SERVER_CNXN_FACTORY = "zookeeper.serverCnxnFactory" //官方推薦netty:則應該是ServerCnxnFactory類的子類NettyServerCnxnFactory String serverCnxnFactoryName = System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY); if (serverCnxnFactoryName == null) { //但是預設是子類NIOServerCnxnFactory serverCnxnFactoryName = NIOServerCnxnFactory.class.getName(); } try { //利用反射進行初始化 ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName).getDeclaredConstructor().newInstance(); LOG.info("Using {} as server connection factory", serverCnxnFactoryName); return serverCnxnFactory; } catch (Exception e) { IOException ioe = new IOException("Couldn't instantiate " + serverCnxnFactoryName, e); throw ioe; } }
【4】記憶體資料庫的設計
//org.apache.zookeeper.server包下DataTree類 //節點數據是final NodeHashMap nodes; public class DataTree { private static final Logger LOG = LoggerFactory.getLogger(DataTree.class); private final RateLogger RATE_LOGGER = new RateLogger(LOG, 15 * 60 * 1000); //該映射提供了對datanode的快速查找 private final NodeHashMap nodes; private IWatchManager dataWatches; private IWatchManager childWatches; //緩存所有datanode的路徑和數據的總大小 private final AtomicLong nodeDataSize = new AtomicLong(0); //根結點 private static final String rootZookeeper = "/"; private static final String procZookeeper = Quotas.procZookeeper; private static final String procChildZookeeper = procZookeeper.substring(1); private static final String quotaZookeeper = Quotas.quotaZookeeper; private static final String quotaChildZookeeper = quotaZookeeper.substring(procZookeeper.length() + 1); private static final String configZookeeper = ZooDefs.CONFIG_NODE; private static final String configChildZookeeper = configZookeeper.substring(procZookeeper.length() + 1); private final PathTrie pTrie = new PathTrie(); public static final int STAT_OVERHEAD_BYTES = (6 * 8) + (5 * 4); private final Map<Long, HashSet<String>> ephemerals = new ConcurrentHashMap<Long, HashSet<String>>(); private final Set<String> containers = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>()); private final Set<String> ttls = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>()); private final ReferenceCountedACLCache aclCache = new ReferenceCountedACLCache(); public static final int DIGEST_LOG_LIMIT = 1024; public static final int DIGEST_LOG_INTERVAL = 128; private ZxidDigest digestFromLoadedSnapshot; private volatile ZxidDigest lastProcessedZxidDigest; private boolean firstMismatchTxn = true; private final List<DigestWatcher> digestWatchers = new ArrayList<>(); private LinkedList<ZxidDigest> digestLog = new LinkedList<>(); private final DigestCalculator digestCalculator; } public class DataNode implements Record { private volatile long digest; // 指示該節點的摘要是否是最新的 volatile boolean digestCached; byte[] data; Long acl; public StatPersisted stat; private Set<String> children = null; private static final Set<String> EMPTY_SET = Collections.emptySet(); }
【5】quorumPeer.start()節點啟動方法又做了什麼
@Override public synchronized void start() { if (!getView().containsKey(myid)) { throw new RuntimeException("My id " + myid + " not in the peer list"); } //載入快照文件數據到記憶體 loadDataBase(); //啟動通信對象 startServerCnxnFactory(); try { //JettyAdminServer,啟動內嵌Jetty服務,預設8080埠 adminServer.start(); } catch (AdminServerException e) { LOG.warn("Problem starting AdminServer", e); } //初始化選舉數據 startLeaderElection(); startJvmPauseMonitor(); super.start(); } private void startServerCnxnFactory() { if (cnxnFactory != null) { //如果有配置netty通信,則NettyServerCnxnFactory類#start方法 cnxnFactory.start(); } if (secureCnxnFactory != null) { secureCnxnFactory.start(); } } //NettyServerCnxnFactory類#start方法 @Override public void start() { if (listenBacklog != -1) { bootstrap.option(ChannelOption.SO_BACKLOG, listenBacklog); } LOG.info("binding to port {}", localAddress); parentChannel = bootstrap.bind(localAddress).syncUninterruptibly().channel(); // Port changes after bind() if the original port was 0, update // localAddress to get the real port. localAddress = (InetSocketAddress) parentChannel.localAddress(); LOG.info("bound to port {}", getLocalPort()); } //選舉數據構建 public synchronized void startLeaderElection() { try { if (getPeerState() == ServerState.LOOKING) { //構建選票,myid伺服器id標記,最大的事務id,當前伺服器的選舉輪次 currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch()); } } catch (IOException e) { RuntimeException re = new RuntimeException(e.getMessage()); re.setStackTrace(e.getStackTrace()); throw re; } //確定選舉演算法,預設傳的是3 this.electionAlg = createElectionAlgorithm(electionType); } //節點狀態 public enum ServerState { LOOKING, //等待狀態 FOLLOWING, //從節點 LEADING, //主節點 OBSERVING //觀察狀態 }
【6】選舉演算法分析(內涵多層隊列架構)
//選舉演算法分析(3.8版本已經將過時的演算法去除了) protected Election createElectionAlgorithm(int electionAlgorithm) { Election le = null; //TODO: use a factory rather than a switch switch (electionAlgorithm) { case 1: throw new UnsupportedOperationException("Election Algorithm 1 is not supported."); case 2: throw new UnsupportedOperationException("Election Algorithm 2 is not supported."); case 3: QuorumCnxManager qcm = createCnxnManager(); QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm); if (oldQcm != null) { LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)"); oldQcm.halt(); } QuorumCnxManager.Listener listener = qcm.listener; if (listener != null) { //啟動監聽線程 listener.start(); //構建收發消息線程 FastLeaderElection fle = new FastLeaderElection(this, qcm); fle.start(); le = fle; } else { LOG.error("Null listener when initializing cnx manager"); } break; default: assert false; } return le; }
【7】翻閱監聽線程 listener 做了什麼
//翻閱監聽線程做了什麼,主要是看run方法 @Override public void run() { if (!shutdown) { LOG.debug("Listener thread started, myId: {}", self.getId()); Set<InetSocketAddress> addresses; if (self.getQuorumListenOnAllIPs()) { addresses = self.getElectionAddress().getWildcardAddresses(); } else { addresses = self.getElectionAddress().getAllAddresses(); } CountDownLatch latch = new CountDownLatch(addresses.size()); //迴圈的方式針對每個地址構建一個ListenerHandler listenerHandlers = addresses.stream().map(address -> new ListenerHandler(address, self.shouldUsePortUnification(), self.isSslQuorum(), latch)) .collect(Collectors.toList()); //針對每個ListenerHandler都會有一個對應的線程進行處理(線程池) final ExecutorService executor = Executors.newFixedThreadPool(addresses.size()); try { listenerHandlers.forEach(executor::submit); } finally { // prevent executor's threads to leak after ListenerHandler tasks complete executor.shutdown(); } try { latch.await(); } catch (InterruptedException ie) {..} finally { // Clean up for shutdown. for (ListenerHandler handler : listenerHandlers) { try { handler.close(); } catch (IOException ie) {...} } } } LOG.info("Leaving listener"); if (!shutdown) { if (socketException.get()) { // After leaving listener thread, the host cannot join the quorum anymore, // this is a severe error that we cannot recover from, so we need to exit socketBindErrorHandler.run(); } } } class ListenerHandler implements Runnable, Closeable { private ServerSocket serverSocket; private InetSocketAddress address; private boolean portUnification; private boolean sslQuorum; private CountDownLatch latch; ListenerHandler(InetSocketAddress address, boolean portUnification, boolean sslQuorum,CountDownLatch latch) { this.address = address; this.portUnification = portUnification; this.sslQuorum = sslQuorum; this.latch = latch; } /** * Sleeps on acceptConnections(). */ @Override public void run() { try { Thread.currentThread().setName("ListenerHandler-" + address); //建立連接 acceptConnections(); try { close(); } catch (IOException e) {...} } catch (Exception e) {...} finally { latch.countDown(); } } @Override public synchronized void close() throws IOException { if (serverSocket != null && !serverSocket.isClosed()) { LOG.debug("Trying to close listeners: {}", serverSocket); serverSocket.close(); } } /** * Sleeps on accept(). */ private void acceptConnections() { int numRetries = 0; Socket client = null; while ((!shutdown) && (portBindMaxRetry == 0 || numRetries < portBindMaxRetry)) { try { //創建serverSocket serverSocket = createNewServerSocket(); LOG.info("{} is accepting connections now, my election bind port: {}", QuorumCnxManager.this.mySid, address.toString()); while (!shutdown) { try { client = serverSocket.accept(); setSockOpts(client); //處理連接消息 if (quorumSaslAuthEnabled) { receiveConnectionAsync(client); } else { receiveConnection(client); } numRetries = 0; } catch (SocketTimeoutException e) {...} } } catch (IOException e) { if (shutdown) { break; } if (e instanceof SocketException) { socketException.set(true); } numRetries++; try { close(); Thread.sleep(1000); } catch (IOException ie) {...} catch (InterruptedException ie) {...} closeSocket(client); } } if (!shutdown) {...} } private ServerSocket createNewServerSocket() throws IOException { ServerSocket socket; if (portUnification) { LOG.info("Creating TLS-enabled quorum server socket"); socket = new UnifiedServerSocket(self.getX509Util(), true); } else if (sslQuorum) { LOG.info("Creating TLS-only quorum server socket"); socket = new UnifiedServerSocket(self.getX509Util(), false); } else { socket = new ServerSocket(); } socket.setReuseAddress(true); address = new InetSocketAddress(address.getHostString(), address.getPort()); //綁定地址與埠 socket.bind(address); return socket; } }
【7.1】receiveConnection方法怎麼處理接收到的消息
public void receiveConnection(final Socket sock) { DataInputStream din = null; try { din = new DataInputStream(new BufferedInputStream(sock.getInputStream())); handleConnection(sock, din); } catch (IOException e) { closeSocket(sock); } } private void handleConnection(Socket sock, DataInputStream din) throws IOException { Long sid = null, protocolVersion = null; MultipleAddresses electionAddr = null; try { // 從輸入流中讀入一個Long(實際上是服務的ID) protocolVersion = din.readLong(); if (protocolVersion >= 0) { // this is a server id and not a protocol version sid = protocolVersion; } else { try { InitialMessage init = InitialMessage.parse(protocolVersion, din); sid = init.sid; if (!init.electionAddr.isEmpty()) { electionAddr = new MultipleAddresses(init.electionAddr, Duration.ofMillis(self.getMultiAddressReachabilityCheckTimeoutMs())); } } catch (InitialMessage.InitialMessageException ex) { closeSocket(sock); return; } } if (sid == QuorumPeer.OBSERVER_ID) { sid = observerCounter.getAndDecrement(); } } catch (IOException e) { closeSocket(sock); return; } // do authenticating learner authServer.authenticate(sock, din); //關閉不必要的連接 //因為socket是雙工的,而之前我們是針對了每個服務都要與之建立連接(則有,我連它【自身發起的連接】,它連了我【對方發起的連接】) //說白了兩條通道有一條不是必要的 if (sid < self.getId()) { //對方的id小於自身id SendWorker sw = senderWorkerMap.get(sid); if (sw != null) { sw.finish(); } // 關閉當前連接 closeSocket(sock); // 創建當前節點到對面節點的連接 if (electionAddr != null) { connectOne(sid, electionAddr); } else { connectOne(sid); } } //自身的話不需要做什麼 else if (sid == self.getId()) {...} else { // 對方id大於自身id // 使用目標節點到當前節點的連接 SendWorker sw = new SendWorker(sock, sid); RecvWorker rw = new RecvWorker(sock, din, sid, sw); sw.setRecv(rw); SendWorker vsw = senderWorkerMap.get(sid); if (vsw != null) { vsw.finish(); } //更新senderWorker與queueSend senderWorkerMap.put(sid, sw); queueSendMap.putIfAbsent(sid, new CircularBlockingQueue<>(SEND_CAPACITY)); sw.start(); rw.start(); } } // 創建當前節點到對面節點的連接 synchronized boolean connectOne(long sid, MultipleAddresses electionAddr) { // 判斷連接是否已經存在 if (senderWorkerMap.get(sid) != null) { if (self.isMultiAddressEnabled() && electionAddr.size() > 1 && self.isMultiAddressReachabilityCheckEnabled()) { senderWorkerMap.get(sid).asyncValidateIfSocketIsStillReachable(); } return true; } //初始化連接 return initiateConnectionAsync(electionAddr, sid); } public boolean initiateConnectionAsync(final MultipleAddresses electionAddr, final Long sid) { if (!inprogressConnections.add(sid)) { return true; } try { connectionExecutor.execute(new QuorumConnectionReqThread(electionAddr, sid)); connectionThreadCnt.incrementAndGet(); } catch (Throwable e) { inprogressConnections.remove(sid); return false; } return true; } //QuorumConnectionReqThread類#run方法 @Override public void run() { try { initiateConnection(electionAddr, sid); } finally { inprogressConnections.remove(sid); } } //真正建立socket連接 public void initiateConnection(final MultipleAddresses electionAddr, final Long sid) { Socket sock = null; try { if (self.isSslQuorum()) { sock = self.getX509Util().createSSLSocket(); } else { sock = SOCKET_FACTORY.get(); } setSockOpts(sock); sock.connect(electionAddr.getReachableOrOne(), cnxTO); if (sock instanceof SSLSocket) { SSLSocket sslSock = (SSLSocket) sock; sslSock.startHandshake(); } } catch (X509Exception e) { closeSocket(sock); return; } catch (UnresolvedAddressException | IOException e) { closeSocket(sock); return; } try { startConnection(sock, sid); } catch (IOException e) { closeSocket(sock); } } private boolean startConnection(Socket sock, Long sid) throws IOException { DataOutputStream dout = null; DataInputStream din = null; try { BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream()); dout = new DataOutputStream(buf); long protocolVersion = self.isMultiAddressEnabled() ? PROTOCOL_VERSION_V2 : PROTOCOL_VERSION_V1; dout.writeLong(protocolVersion); dout.writeLong(self.getId()); // now we send our election address. For the new protocol version, we can send multiple addresses. Collection<InetSocketAddress> addressesToSend = protocolVersion == PROTOCOL_VERSION_V2 ? self.getElectionAddress().getAllAddresses() : Arrays.asList(self.getElectionAddress().getOne()); String addr = addressesToSend.stream() .map(NetUtils::formatInetAddr).collect(Collectors.joining("|")); byte[] addr_bytes = addr.getBytes(); dout.writeInt(addr_bytes.length); dout.write(addr_bytes); dout.flush();