ServerCnxnFactory 用於接收客戶端連接、管理客戶端session、處理客戶端請求。 ServerCnxn抽象類 代表一個客戶端連接對象: 從網路讀寫數據 數據編解碼 將請求轉發給上層組件或者從上層組件接收響應 管理連接狀態,比如:enableRecv、sessionTimeout、s ...
ServerCnxnFactory
用於接收客戶端連接、管理客戶端session、處理客戶端請求。
ServerCnxn抽象類
代表一個客戶端連接對象:
- 從網路讀寫數據
- 數據編解碼
- 將請求轉發給上層組件或者從上層組件接收響應
- 管理連接狀態,比如:enableRecv、sessionTimeout、stale、invalid等
- 保存當前的packetsReceived、packetsSent、lastCxid、lastZxid等
- 繼承了Watcher介面,也可以作為監聽器
兩個實現類:
- NIOServerCnxn - 基於NIO
- NettyServerCnxn - 基於Netty
NIOServerCnxnFactory
基於NIO的非阻塞、多線程的ServerCnxnFactory實現類,多線程之間通過queue通信:
- 1個accept線程,用來接收客戶端連接,交給selector線程處理
- 1-N個selector線程,每個線程會select 1/N個連接,多個selector線程的原因是,由於有大量連接,select()可能會成為性能瓶頸
- 0-M個socket IO worker線程,做socket讀寫,如果配置為0則selector線程來做IO
- 1個清理線程,用於關閉空閑連接
線程數量分配示例:32核的機器,1accept線程,1個清理線程,4個selector線程,64個worker線程。
configure方法
-
不支持ssl
-
創建ConnectionExpirerThread線程
-
根據CPU核數確定各種線程的數量
int numCores = Runtime.getRuntime().availableProcessors(); // 32 cores sweet spot seems to be 4 selector threads numSelectorThreads = Integer.getInteger( ZOOKEEPER_NIO_NUM_SELECTOR_THREADS, Math.max((int) Math.sqrt((float) numCores / 2), 1)); // 64 numWorkerThreads = Integer.getInteger(ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);
-
創建SelectorThread線程
-
創建ServerSocketChannel、啟動監聽、設置非阻塞
-
創建AcceptThread線程
start方法
啟動acceptThread、selectorThreads、workerPool、expirerThread線程。
acceptThread線程
1個accept線程,用來接收客戶端連接,交給selector線程處理:
-
select查找acceptable的key
-
doAccept接受連接
if (key.isAcceptable()) { if (!doAccept()) { pauseAccept(10); } }
-
給sc(SocketChannel)設置非阻塞、驗證遠程IP連接數不超過maxClientCnxns(60)、獲取SelectorThread開始select讀寫事件
// Round-robin assign this connection to a selector thread if (!selectorIterator.hasNext()) { selectorIterator = selectorThreads.iterator(); } SelectorThread selectorThread = selectorIterator.next(); // 使用隊列緩存SocketChannel if (!selectorThread.addAcceptedConnection(sc)) { throw new IOException("Unable to add connection to selector queue"); }
selectorThread線程
run方法select讀寫事件、接受客戶連接、為key註冊"感興趣"的事件:
-
run方法
public void run() { try { while (!stopped) { try { select(); // select讀寫事件 processAcceptedConnections(); // 接受客戶連接 processInterestOpsUpdateRequests(); } catch (RuntimeException e) { } catch (Exception e) { } } } // ... }
-
接受客戶連接會註冊OP_READ、創建NIOServerCnxn、綁定到key上面
private void processAcceptedConnections() { SocketChannel accepted; while (!stopped && (accepted = acceptedQueue.poll()) != null) { SelectionKey key = null; try { key = accepted.register(selector, SelectionKey.OP_READ); NIOServerCnxn cnxn = createConnection(accepted, key, this); key.attach(cnxn); // 綁定到key上 addCnxn(cnxn); // 維護連接層會話 } catch (IOException e) { // 略 } } }
-
select到讀寫事件會交給handleIO方法處理
private void handleIO(SelectionKey key) { IOWorkRequest workRequest = new IOWorkRequest(this, key); NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment(); // Stop selecting this key while processing on its connection cnxn.disableSelectable(); key.interestOps(0); // 重置感興趣的事件,IO處理完成之後會重新註冊讀寫事件 touchCnxn(cnxn); // 維護連接層會話,刷新過期時間 workerPool.schedule(workRequest); // workRequest.doWork方法做非同步讀寫 }
-
為key註冊"感興趣"的事件
private void processInterestOpsUpdateRequests() { SelectionKey key; while (!stopped && (key = updateQueue.poll()) != null) { NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment(); if (cnxn.isSelectable()) { key.interestOps(cnxn.getInterestOps()); } } }
workRequest.doWork方法
workRequest是IOWorkRequest類型對象,doWork會read數據並傳遞給上層組件:
public void doWork() throws InterruptedException {
// 略
if (key.isReadable() || key.isWritable()) {
cnxn.doIO(key); // 在workerPool線程上執行
// 略
touchCnxn(cnxn); // 維護連接層會話,刷新過期時間
}
// 略
}
數據包使用 len body 方式傳輸,read的過程不介紹了,cnxn在read到完整的數據之後會調用readConnectRequest或readRequest方法將數據傳遞給上層組件:
// 應用層建立連接
private void readConnectRequest() throws IOException, ClientCnxnLimitException {
BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
// ConnectRequest封裝:
// protocolVersion(0), lastZxidSeen(0), timeOut(3s), sessionId(0), passwd(16位byte), readOnly(F)
ConnectRequest request = protocolManager.deserializeConnectRequest(bia);
zkServer.processConnectRequest(this, request);
initialized = true;
}
protected void readRequest() throws IOException {
RequestHeader h = new RequestHeader();
// 請求頭,封裝客戶端xid和type由客戶端傳遞過來
ByteBufferInputStream.byteBuffer2Record(incomingBuffer, h);
// 轉ByteBufferRequestRecord對象,封裝請求位元組流
// readRecord將位元組流反序列化為指定的Record實現類對象
RequestRecord request = RequestRecord.fromBytes(incomingBuffer.slice());
zkServer.processPacket(this, h, request);
}
NettyServerCnxnFactory
基於Netty的ServerCnxnFactory實現。
CnxnChannelHandler類
核心的網路層處理器,此處記錄重要代碼:
class CnxnChannelHandler extends ChannelDuplexHandler {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final Channel channel = ctx.channel();
// 略
// 創建NettyServerCnxn綁定到channel
NettyServerCnxn cnxn = new NettyServerCnxn(channel, zkServer, NettyServerCnxnFactory.this);
ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
// 略
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
try {
NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
if (cnxn == null) {
LOG.error("channelRead() on a closed or closing NettyServerCnxn");
} else {
// 讀取請求數據
cnxn.processMessage((ByteBuf) msg);
}
} catch (Exception ex) {
throw ex;
}
} finally {
ReferenceCountUtil.release(msg);
}
}
}
cnxn讀取請求數據
void processMessage(ByteBuf buf) {
if (throttled.get()) {
// 略
} else {
if (queuedBuffer != null) {
appendToQueuedBuffer(buf.retainedDuplicate());
processQueuedBuffer();
} else {
receiveMessage(buf); // 解碼邏輯在此方法中
// Have to check !closingChannel, because an error in
// receiveMessage() could have led to close() being called.
if (!closingChannel && buf.isReadable()) {
if (queuedBuffer == null) {
queuedBuffer = channel.alloc().compositeBuffer();
}
appendToQueuedBuffer(buf.retainedSlice(buf.readerIndex(), buf.readableBytes()));
}
}
}
}
read到完整的數據之後會將數據傳遞給上層組件:
if (initialized) {
RequestHeader h = new RequestHeader();
ByteBufferInputStream.byteBuffer2Record(bb, h);
RequestRecord request = RequestRecord.fromBytes(bb.slice());
zks.processPacket(this, h, request);
} else {
// 應用層建立連接
BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(bb));
ConnectRequest request = protocolManager.deserializeConnectRequest(bia);
zks.processConnectRequest(this, request);
initialized = true;
}
ZooKeeperServer處理方法
processConnectRequest方法處理連接請求
ZooKeeperServer的processConnectRequest方法用來處理連接請求:
public void processConnectRequest(
ServerCnxn cnxn, ConnectRequest request) throws IOException, ClientCnxnLimitException {
long sessionId = request.getSessionId(); // 預設0
int tokensNeeded = 1;
// 略
// ro驗證
if (!request.getReadOnly() && this instanceof ReadOnlyZooKeeperServer) {
String msg = "Refusing session request for not-read-only client";
throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT);
}
// 客戶端zxid驗證
if (request.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
String msg = "Refusing session request for client "
+ cnxn.getRemoteSocketAddress()
+ " as it has seen zxid 0x"
+ Long.toHexString(request.getLastZxidSeen())
+ " our last zxid is 0x"
+ Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid())
+ " client must try another server";
throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD);
}
int sessionTimeout = request.getTimeOut(); // 客戶端預設30000
byte[] passwd = request.getPasswd();
int minSessionTimeout = getMinSessionTimeout(); // 預設tickTime * 2
if (sessionTimeout < minSessionTimeout) {
sessionTimeout = minSessionTimeout;
}
int maxSessionTimeout = getMaxSessionTimeout(); // 預設tickTime * 20
if (sessionTimeout > maxSessionTimeout) {
sessionTimeout = maxSessionTimeout;
}
cnxn.setSessionTimeout(sessionTimeout); // 設置超時時長
// We don't want to receive any packets until we are sure that the session is setup
cnxn.disableRecv();
if (sessionId == 0) {
// 創建新session
long id = createSession(cnxn, passwd, sessionTimeout);
} else {
validateSession(cnxn, sessionId);
// 殺掉舊的session和連接
if (serverCnxnFactory != null) {
serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
}
if (secureServerCnxnFactory != null) {
secureServerCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
}
// add新session
cnxn.setSessionId(sessionId);
// 返回connect響應
reopenSession(cnxn, sessionId, passwd, sessionTimeout);
ServerMetrics.getMetrics().CONNECTION_REVALIDATE_COUNT.add(1);
}
}
// 重點看一下創建新session
long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) {
if (passwd == null) {
passwd = new byte[0];
}
long sessionId = sessionTracker.createSession(timeout); // 創建Session返回sessionId
Random r = new Random(sessionId ^ superSecret);
r.nextBytes(passwd); // passwd會賦值給request.passwd
CreateSessionTxn txn = new CreateSessionTxn(timeout);
cnxn.setSessionId(sessionId);
// 給業務層處理器提交createSession請求
// RequestRecord.fromRecord(txn)返回SimpleRequestRecord對象,封裝Record對象
Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, RequestRecord.fromRecord(txn), null);
submitRequest(si);
return sessionId;
}
processPacket方法處理業務請求
ZooKeeperServer的processPacket方法用來處理業務請求:
public void processPacket(ServerCnxn cnxn, RequestHeader h, RequestRecord request) throws IOException {
cnxn.incrOutstandingAndCheckThrottle(h);
if (h.getType() == OpCode.auth) {
// 略
return;
} else if (h.getType() == OpCode.sasl) {
// 略
} else {
if (!authHelper.enforceAuthentication(cnxn, h.getXid())) {
return;
} else {
Request si = new Request(
cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), request, cnxn.getAuthInfo());
int length = request.limit();
// 大請求驗證
if (isLargeRequest(length)) { // 預設返回false
checkRequestSizeWhenMessageReceived(length);
si.setLargeRequestSize(length);
}
si.setOwner(ServerCnxn.me);
// 提交給業務層處理器
submitRequest(si);
}
}
}
submitRequest流程
- 先把request提交給requestThrottler組件
- requestThrottler是一個限流(預設不啟用)組件,內部使用隊列緩存request,非同步線程消費隊列,將request提交給業務處理器
- 直到submitRequest方法,業務處理才離開workerPool線程
if (request != null) {
if (request.isStale()) {
ServerMetrics.getMetrics().STALE_REQUESTS.add(1);
}
final long elapsedTime = Time.currentElapsedTime() - request.requestThrottleQueueTime;
ServerMetrics.getMetrics().REQUEST_THROTTLE_QUEUE_TIME.add(elapsedTime);
// 預設不限流
if (shouldThrottleOp(request, elapsedTime)) {
request.setIsThrottled(true);
ServerMetrics.getMetrics().THROTTLED_OPS.add(1);
}
// 提交
zks.submitRequestNow(request);
}
submitRequestNow方法將請求提交給業務層處理器:
public void submitRequestNow(Request si) {
// 略
try {
touch(si.cnxn); // 刷新session過期時間
boolean validpacket = Request.isValid(si.type);
if (validpacket) {
setLocalSessionFlag(si);
firstProcessor.processRequest(si); // 提交給業務層處理器
if (si.cnxn != null) {
incInProcess();
}
} else {
// Update request accounting/throttling limits
requestFinished(si);
new UnimplementedRequestProcessor().processRequest(si);
}
} catch (MissingSessionException e) {
requestFinished(si);
} catch (RequestProcessorException e) {
requestFinished(si);
}
}
Leader客戶端業務層處理器鏈
在之前的文章已經介紹,leader使用LeaderZooKeeperServer作為服務實現類。
本章節介紹"leader處理客戶端請求"的流程。
處理器鏈
// 構建處理器鏈
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor toBeAppliedProcessor =
new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
commitProcessor = new CommitProcessor(
toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());
commitProcessor.start();
ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);
proposalProcessor.initialize();
prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
prepRequestProcessor.start();
firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
setupContainerManager();
}
-
FinalRequestProcessor - 處理與請求相關的事務,並提供查詢服務,給客戶端發響應,位於RequestProcessor鏈末尾
-
ToBeAppliedRequestProcessor - 維護toBeApplied列表,之後必須是FinalRequestProcessor且processRequest必須同步處理
-
CommitProcessor - 等待commit完成之後調用下游RequestProcessor處理器
-
ProposalRequestProcessor - 發起proposal並將Request轉發給內部的SyncRequestProcessor和AckRequestProcessor
public ProposalRequestProcessor(LeaderZooKeeperServer zks, RequestProcessor nextProcessor) { this.zks = zks; this.nextProcessor = nextProcessor; // 內部有維護一個SyncRequestProcessor和AckRequestProcessor AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader()); syncProcessor = new SyncRequestProcessor(zks, ackProcessor); forwardLearnerRequestsToCommitProcessorDisabled = Boolean.getBoolean( FORWARD_LEARNER_REQUESTS_TO_COMMIT_PROCESSOR_DISABLED); }
-
PrepRequestProcessor - 通常位於RequestProcessor鏈開頭,為更新請求關聯的事務做設置
-
LeaderRequestProcessor - 負責執行本地會話升級,只有直接提交給leader的Request才能通過這個處理器
LeaderRequestProcessor
public void processRequest(Request request) throws RequestProcessorException {
// 略
// 預設不支持localSession
Request upgradeRequest = null;
try {
upgradeRequest = lzks.checkUpgradeSession(request);
} catch (KeeperException ke) {
// 略
} catch (IOException ie) {
// 略
}
// 此處upgradeRequest==null
if (upgradeRequest != null) {
nextProcessor.processRequest(upgradeRequest);
}
// 調用下游processor
nextProcessor.processRequest(request);
}
PrepRequestProcessor
事務設置:
- 使用隊列緩存request
- 消費線程從隊列拉request設置事務
run方法
public void run() {
try {
while (true) {
ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_SIZE.add(submittedRequests.size());
Request request = submittedRequests.take();
ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_TIME
.add(Time.currentElapsedTime() - request.prepQueueStartTime);
// 略
if (Request.requestOfDeath == request) {
break;
}
request.prepStartTime = Time.currentElapsedTime();
pRequest(request);
}
} catch (Exception e) {
handleException(this.getName(), e);
}
}
protected void pRequest(Request request) throws RequestProcessorException {
request.setHdr(null);
request.setTxn(null);
if (!request.isThrottled()) {
pRequestHelper(request);
}
request.zxid = zks.getZxid(); // zxid
long timeFinishedPrepare = Time.currentElapsedTime();
ServerMetrics.getMetrics().PREP_PROCESS_TIME.add(timeFinishedPrepare - request.prepStartTime);
nextProcessor.processRequest(request); // 調用下游processor
ServerMetrics.getMetrics().PROPOSAL_PROCESS_TIME.add(Time.currentElapsedTime() - timeFinishedPrepare);
}
pRequestHelper方法
private void pRequestHelper(Request request) {
try {
switch (request.type) {
case OpCode.createContainer:
case OpCode.create:
case OpCode.create2:
// 創建節點請求封裝path、data、acl、flag
CreateRequest create2Request = request.readRequestRecord(CreateRequest::new);
// zks.getNextZxid()獲取遞增zxid
pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request);
break;
case OpCode.createTTL:
// 創建ttl請求封裝path、data、acl、flag、ttl
CreateTTLRequest createTtlRequest = request.readRequestRecord(CreateTTLRequest::new);
pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest);
break;
case OpCode.deleteContainer:
// 封裝path
DeleteContainerRequest deleteContainerRequest =
request.readRequestRecord(DeleteContainerRequest::new);
pRequest2Txn(request.type, zks.getNextZxid(), request, deleteContainerRequest);
break;
case OpCode.delete:
// 刪除節點請求封裝path、version
DeleteRequest deleteRequest = request.readRequestRecord(DeleteRequest::new);
pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest);
break;
case OpCode.setData:
// 設置節點數據請求封裝path、data、version
SetDataRequest setDataRequest = request.readRequestRecord(SetDataRequest::new);
pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest);
break;
case OpCode.reconfig:
// reconfig請求封裝joiningServers、leavingServers、newMembers、curConfigId
ReconfigRequest reconfigRequest = request.readRequestRecord(ReconfigRequest::new);
pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest);
break;
case OpCode.setACL:
// 設置acl請求封裝path、acl、version
SetACLRequest setAclRequest = request.readRequestRecord(SetACLRequest::new);
pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest);
break;
case OpCode.check:
// check請求封裝path、version
CheckVersionRequest checkRequest = request.readRequestRecord(CheckVersionRequest::new);
pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest);
break;
case OpCode.multi:
// 遍歷 逐個pRequest2Txn
// pRequest2Txn(op.getType(), zxid, request, subrequest)
// 封裝MultiTxn
break;
// create/close session don't require request record
case OpCode.createSession:
case OpCode.closeSession:
if (!request.isLocalSession()) { // 非本地會話
pRequest2Txn(request.type, zks.getNextZxid(), request, null);
}
break;
// All the rest don't need to create a Txn - just verify session
case OpCode.sync:
// sync,exists,getData,getACL,getChildren,getAllChildrenNumber,getChildren2,ping
// setWatches,setWatches2,checkWatches,removeWatches,getEphemerals,multiRead,addWatch
case OpCode.whoAmI:
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
break;
default:
break;
}
} catch (KeeperException e) {
// 略
} catch (Exception e) {
// 略
}
}
pRequest2Txn方法流程
代碼量大,僅對重點的業務類型做簡單分析。
該方法首先會為request設置TxnHeader信息:
if (request.getHdr() == null) {
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), type));
}
TxnHeader封裝事務請求頭:
public class TxnHeader implements Record {
private long clientId; // 會話ID
private int cxid; // 客戶端xid
private long zxid; // 服務端xid
private long time;
private int type; // 操作類型
}
pRequest2Txn - create相關操作
create/create2/createTTL/createContainer操作:
-
從flags創建createMode並驗證ttl和ephemeral
-
驗證acl
zks.checkACL(request.cnxn, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo, path, listACL);
-
生成順序節點path
int parentCVersion = parentRecord.stat.getCversion(); if (createMode.isSequential()) { // 形如/users/admin0000000001 path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion); } validatePath(path, request.sessionId);
-
request.setTxn
int newCversion = parentRecord.stat.getCversion() + 1; if (type == OpCode.createContainer) { request.setTxn(new CreateContainerTxn(path, data, listACL, newCversion)); } else if (type == OpCode.createTTL) { request.setTxn(new CreateTTLTxn(path, data, listACL, newCversion, ttl)); } else { request.setTxn(new CreateTxn(path, data, listACL, createMode.isEphemeral(), newCversion)); }
-
獲取ephemeralOwner
TxnHeader hdr = request.getHdr(); long ephemeralOwner = 0; if (createMode.isContainer()) { ephemeralOwner = EphemeralType.CONTAINER_EPHEMERAL_OWNER; } else if (createMode.isTTL()) { ephemeralOwner = EphemeralType.TTL.toEphemeralOwner(ttl); } else if (createMode.isEphemeral()) { ephemeralOwner = request.sessionId; }
-
addChangeRecord
StatPersisted s = DataTree.createStat(hdr.getZxid(), hdr.getTime(), ephemeralOwner); parentRecord = parentRecord.duplicate(request.getHdr().getZxid()); parentRecord.childCount++; parentRecord.stat.setCversion(newCversion); parentRecord.stat.setPzxid(request.getHdr().getZxid()); parentRecord.precalculatedDigest = precalculateDigest( DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat); addChangeRecord(parentRecord); // 維護outstandingChanges集 ChangeRecord nodeRecord = new ChangeRecord(request.getHdr().getZxid(), path, s, 0, listACL); nodeRecord.data = data; nodeRecord.precalculatedDigest = precalculateDigest(DigestOpCode.ADD, path, nodeRecord.data, s); setTxnDigest(request, nodeRecord.precalculatedDigest); addChangeRecord(nodeRecord); // 維護outstandingChanges集
pRequest2Txn - delete操作
-
驗證acl和version
checkAndIncVersion(nodeRecord.stat.getVersion(), deleteRequest.getVersion(), path); // stat.version與delete.version需要一致
-
驗證沒有子節點,有子節點無法刪除
-
創建DeleteTxn
request.setTxn(new DeleteTxn(path));
-
addChangeRecord
parentRecord = parentRecord.duplicate(request.getHdr().getZxid()); parentRecord.childCount--; parentRecord.stat.setPzxid(request.getHdr().getZxid()); parentRecord.precalculatedDigest = precalculateDigest( DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat); addChangeRecord(parentRecord); // 維護outstandingChanges集 nodeRecord = new ChangeRecord(request.getHdr().getZxid(), path, null, -1, null); nodeRecord.precalculatedDigest = precalculateDigest(DigestOpCode.REMOVE, path); setTxnDigest(request, nodeRecord.precalculatedDigest); addChangeRecord(nodeRecord); // 維護outstandingChanges集
pRequest2Txn - setData操作
-
驗證acl、獲取newVersion
-
創建SetDataTxn
request.setTxn(new SetDataTxn(path, setDataRequest.getData(), newVersion));
-
addChangeRecord
nodeRecord = nodeRecord.duplicate(request.getHdr().getZxid()); nodeRecord.stat.setVersion(newVersion); nodeRecord.stat.setMtime(request.getHdr().getTime()); nodeRecord.stat.setMzxid(zxid); nodeRecord.data = setDataRequest.getData(); nodeRecord.precalculatedDigest = precalculateDigest( DigestOpCode.UPDATE, path, nodeRecord.data, nodeRecord.stat); setTxnDigest(request, nodeRecord.precalculatedDigest); addChangeRecord(nodeRecord);
pRequest2Txn - setACL操作
-
驗證acl、獲取newVersion
-
創建SetACLTxn
request.setTxn(new SetACLTxn(path, listACL, newVersion));
-
addChangeRecord
pRequest2Txn - createSession操作
CreateSessionTxn createSessionTxn = request.readRequestRecord(CreateSessionTxn::new);
request.setTxn(createSessionTxn);
// only add the global session tracker but not to ZKDb
zks.sessionTracker.trackSession(request.sessionId, createSessionTxn.getTimeOut());
zks.setOwner(request.sessionId, request.getOwner());
pRequest2Txn - closeSession操作
long startTime = Time.currentElapsedTime();
synchronized (zks.outstandingChanges) {
// 獲取所有臨時節點
Set<String> es = zks.getZKDatabase().getEphemerals(request.sessionId);
for (ChangeRecord c : zks.outstandingChanges) {
if (c.stat == null) {
// Doing a delete
es.remove(c.path);
} else if (c.stat.getEphemeralOwner() == request.sessionId) {
es.add(c.path);
}
}
for (String path2Delete : es) {
if (digestEnabled) {
parentPath = getParentPathAndValidate(path2Delete);
parentRecord = getRecordForPath(parentPath);
parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
parentRecord.stat.setPzxid(request.getHdr().getZxid());
parentRecord.precalculatedDigest = precalculateDigest(
DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
addChangeRecord(parentRecord);
}
nodeRecord = new ChangeRecord(request.getHdr().getZxid(), path2Delete, null, 0, null);
nodeRecord.precalculatedDigest = precalculateDigest(DigestOpCode.REMOVE, path2Delete);
addChangeRecord(nodeRecord);
}
if (ZooKeeperServer.isCloseSessionTxnEnabled()) {
request.setTxn(new CloseSessionTxn(new ArrayList<String>(es)));
}
zks.sessionTracker.setSessionClosing(request.sessionId);
}
ProposalRequestProcessor
processRequest方法
public void processRequest(Request request) throws RequestProcessorException {
if (request instanceof LearnerSyncRequest) { // 處理sync命令,後續補充sync命令分析
zks.getLeader().processSync((LearnerSyncRequest) request);
} else {
if (shouldForwardToNextProcessor(request)) {
nextProcessor.processRequest(request); // 調用下游processor(CommitProcessor)
}
if (request.getHdr() != null) { // 事務消息需要發proposal、寫磁碟
// We need to sync and get consensus on any transactions
try {
zks.getLeader().propose(request); // 給follower發proposal
} catch (XidRolloverException e) {
throw new RequestProcessorException(e.getMessage(), e);
}
// 該對象的nextProcessor是AckRequestProcessor
syncProcessor.processRequest(request);
}
}
}
發proposal
發起一個proposal併發給所有成員:
public Proposal propose(Request request) throws XidRolloverException {
// zxid的低32位滿了,強制重新選舉,生成新一輪epoch和zxid
if ((request.zxid & 0xffffffffL) == 0xffffffffL) {
String msg =
"zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start";
shutdown(msg);
throw new XidRolloverException(msg);
}
// 序列化
byte[] data = request.getSerializeData();
proposalStats.setLastBufferSize(data.length);
// 封裝數據包
QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);
// 封裝Proposal對象
Proposal p = new Proposal();
p.packet = pp;
p.request = request;
synchronized (this) {
p.addQuorumVerifier(self.getQuorumVerifier());
if (request.getHdr().getType() == OpCode.reconfig) {
// 此處會把lastSeenQuorumVerifier寫入zoo.cfg.dynamic.next文件
self.setLastSeenQuorumVerifier(request.qv, true);
}
if (self.getQuorumVerifier().getVersion() < self.getLastSeenQuorumVerifier().getVersion()) {
p.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}
lastProposed = p.packet.getZxid();
// 緩存到outstandingProposals中,processAck時會根據quorum狀態確定是否提交
outstandingProposals.put(lastProposed, p);
// 給follower發數據
sendPacket(pp);
}
ServerMetrics.getMetrics().PROPOSAL_COUNT.add(1);
return p;
}
syncProcessor.processRequest方法
processRequest方法將request放入queuedRequests隊列,非同步線程消費做業務處理:
-
從queuedRequests拉request
-
寫txnlog
zks.getZKDatabase().append(si);
-
滾動txnlog文件、生成snapshot文件
// 預設當logCount超過了50000或logSize超過2GB時觸發 if (shouldSnapshot()) { resetSnapshotStats(); // 滾動txnlog文件 zks.getZKDatabase().rollLog(); // 生成snapshot文件 if (!snapThreadMutex.tryAcquire()) { LOG.warn("Too busy to snap, skipping"); } else { // 非同步線程生成snapshot文件 new ZooKeeperThread("Snapshot Thread") { public void run() { try { zks.takeSnapshot(); } catch (Exception e) { } finally { snapThreadMutex.release(); } } }.start(); } }
-
之後會把request傳遞給nextProcessor(AckRequestProcessor對象)
AckRequestProcessor
public void processRequest(Request request) {
QuorumPeer self = leader.self;
if (self != null) {
request.logLatency(ServerMetrics.getMetrics().PROPOSAL_ACK_CREATION_LATENCY);
leader.processAck(self.getMyId(), request.zxid, null);
}
}
processAck
Keep a count of acks that are received by the leader for a particular proposal.
public synchronized void processAck(long sid, long zxid, SocketAddress followerAddr) {
// 略
if ((zxid & 0xffffffffL) == 0) {
// We no longer process NEWLEADER ack with this method. However,
// the learner sends an ack back to the leader after it gets
// UPTODATE, so we just ignore the message.
return;
}
if (outstandingProposals.size() == 0) {
return;
}
// 說明zxid的數據已經提交
if (lastCommitted >= zxid) {
// The proposal has already been committed
return;
}
Proposal p = outstandingProposals.get(zxid);
if (p == null) {
return;
}
// 略
p.addAck(sid); // 添加ack
boolean hasCommitted = tryToCommit(p, zxid, followerAddr);
// reconfig類型命令的特殊處理,略
}
tryToCommit方法會判斷quorum狀態,即超過半數ack,如果到了quorum狀態:
-
從outstandingProposals集移除
-
加入到toBeApplied集
-
給follower發COMMIT
public void commit(long zxid) { synchronized (this) { lastCommitted = zxid; } QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null); sendPacket(qp); // 發給follower }
-
提交
zk.commitProcessor.commit(p.request); // 會進入commitProcessor的committedRequests隊列
CommitProcessor
processRequest方法
本地寫磁碟之後即調用此方法:
- 把request提交到queuedRequests隊列
- 寫請求提交到queuedWriteRequests隊列
public void processRequest(Request request) {
request.commitProcQueueStartTime = Time.currentElapsedTime();
queuedRequests.add(request); // 所有請求
// If the request will block, add it to the queue of blocking requests
if (needCommit(request)) { // 寫請求
queuedWriteRequests.add(request);
numWriteQueuedRequests.incrementAndGet();
} else {
numReadQueuedRequests.incrementAndGet();
}
wakeup();
}
commit方法
follower對proposal到了quorum狀態後,會使用這個方法提交事務,然後會將事務寫到ZKDatabase中。
public void commit(Request request) {
request.commitRecvTime = Time.currentElapsedTime();
ServerMetrics.getMetrics().COMMITS_QUEUED.add(1);
committedRequests.add(request); // 進committedRequests隊列
wakeup();
}
run方法
對比queuedRequests、queuedWriteRequests、committedRequests這幾個隊列,將提交成功的請求或讀請求轉發給下游的ToBeAppliedRequestProcessor處理器。
ToBeAppliedRequestProcessor
維護toBeApplied列表:清理已提交成功的request數據。
FinalRequestProcessor
處理與請求相關的事務,並提供查詢服務,給客戶端發響應,位於RequestProcessor鏈末尾。
- 執行事務
- 區分OpCode執行對應邏輯,發迴響應
- 使用cnxn把響應返回給客戶端
執行事務
if (!request.isThrottled()) {
rc = applyRequest(request);
}
// ProcessTxnResult rc = zks.processTxn(request);
createSession操作
zks.finishSessionInit(request.cnxn, true);
closeSession操作
給客戶端發closeConn數據包:
cnxn.sendCloseSession();
create相關操作
create、create2、createTTL、createContainer操作,創建對應的Response對象。
delete相關操作
略。
setData操作
返回SetDataResponse響應。
setACL操作
返回SetACLResponse響應。
getData操作
private Record handleGetDataRequest(
Record request, ServerCnxn cnxn, List<Id> authInfo) throws KeeperException, IOException {
GetDataRequest getDataRequest = (GetDataRequest) request;
String path = getDataRequest.getPath();
DataNode n = zks.getZKDatabase().getNode(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
// 檢查許可權
zks.checkACL(cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, authInfo, path, null);
Stat stat = new Stat();
// 查詢數據、addWatcher
byte[] b = zks.getZKDatabase().getData(path, stat, getDataRequest.getWatch() ? cnxn : null);
return new GetDataResponse(b, stat);
}
setWatches相關操作
setWatches、setWatches2操作:
SetWatches2 setWatches = request.readRequestRecord(SetWatches2::new);
long relativeZxid = setWatches.getRelativeZxid();
zks.getZKDatabase().setWatches(relativeZxid,
setWatches.getDataWatches(),
setWatches.getExistWatches(),
setWatches.getChildWatches(),
setWatches.getPersistentWatches(),
setWatches.getPersistentRecursiveWatches(),
cnxn);
addWatch操作
AddWatchRequest addWatcherRequest = request.readRequestRecord(AddWatchRequest::new);
zks.getZKDatabase().addWatch(addWatcherRequest.getPath(), cnxn, addWatcherRequest.getMode());
removeWatches操作
RemoveWatchesRequest removeWatches = request.readRequestRecord(RemoveWatchesRequest::new);
WatcherType type = WatcherType.fromInt(removeWatches.getType());
path = removeWatches.getPath();
boolean removed = zks.getZKDatabase().removeWatch(path, type, cnxn);
getChildren相關操作
getChildren、getChildren2操作:
GetChildren2Request getChildren2Request = request.readRequestRecord(GetChildren2Request::new);
Stat stat = new Stat();
path = getChildren2Request.getPath();
DataNode n = zks.getZKDatabase().getNode(path);
zks.checkACL(request.cnxn, zks.getZKDatabase().aclForNode(n),
ZooDefs.Perms.READ, request.authInfo, path, null);
List<String> children = zks.getZKDatabase()
.getChildren(path, stat, getChildren2Request.getWatch() ? cnxn : null);
rsp = new GetChildren2Response(children, stat);
Follower處理Leader數據
處理器鏈
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new CommitProcessor(
finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
commitProcessor.start();
firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
((FollowerRequestProcessor) firstProcessor).start();
syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor(getFollower()));
syncProcessor.start();
}
處理器鏈:
FollowerRequestProcessor -> CommitProcessor -> FinalRequestProcessor
commitProcessor和syncProcessor處理leader的proposal和commit請求。
processPacket方法
在"zookeeper源碼(08)leader、follower和observer"中已經介紹,Follower使用processPacket方法處理來自leader的數據包:
protected void processPacket(QuorumPacket qp) throws Exception {
switch (qp.getType()) {
case Leader.PING:
ping(qp);
break;
case Leader.PROPOSAL: // 提案
ServerMetrics.getMetrics().LEARNER_PROPOSAL_RECEIVED_COUNT.add(1);
TxnLogEntry logEntry = SerializeUtils.deserializeTxn(qp.getData());
TxnHeader hdr = logEntry.getHeader();
Record txn = logEntry.getTxn();
TxnDigest digest = logEntry.getDigest();
// 略
lastQueued = hdr.getZxid();
// 略
// 記錄log數據
// 使用syncProcessor持久化log數據,之後給leader發ack
fzk.logRequest(hdr, txn, digest);
// 略
if (om != null) {
// 略
}
break;
case Leader.COMMIT: // 提交
ServerMetrics.getMetrics().LEARNER_COMMIT_RECEIVED_COUNT.add(1);
fzk.commit(qp.getZxid()); // 使用commitProcessor提交log
if (om != null) {
// 略
}
break;
case Leader.COMMITANDACTIVATE: // Similar to COMMIT, only for a reconfig operation
// get the new configuration from the request
Request request = fzk.pendingTxns.element();
SetDataTxn setDataTxn = (SetDataTxn) request.getTxn();
QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData(), UTF_8));
// get new designated leader from (current) leader's message
ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
long suggestedLeaderId = buffer.getLong();
final long zxid = qp.getZxid();
boolean majorChange = self.processReconfig(qv, suggestedLeaderId, zxid, true);
// commit (writes the new config to ZK tree (/zookeeper/config)
fzk.commit(zxid);
// 略
break;
case Leader.UPTODATE:
// leader告知follower已處於最新狀態,可以開始響應客戶端
// 正常情況下不應該再出現該類型請求
break;
case Leader.REVALIDATE:
if (om == null || !om.revalidateLearnerSession(qp)) {
revalidate(qp);
}
break;
case Leader.SYNC:
fzk.sync(); // sync命令
break;
default:
LOG.warn("Unknown packet type");
break;
}
}
處理PROPOSAL
syncProcessor.processRequest方法
processRequest方法將request放入queuedRequests隊列,非同步線程消費做業務處理。
在本地持久化之後,調用下游處理器(SendAckRequestProcessor對象)。
SendAckRequestProcessor
public void processRequest(Request si) {
if (si.type != OpCode.sync) {
// 確認zxid已持久化
QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null, null);
try {
si.logLatency(ServerMetrics.getMetrics().PROPOSAL_ACK_CREATION_LATENCY);
learner.writePacket(qp, false);
} catch (IOException e) {
// learner.sock.close();
}
}
}
public void flush() throws IOException {
try {
learner.writePacket(null, true);
} catch (IOException e) {
// learner.sock.close();
}
}
處理COMMIT
提交給commitProcessor處理器,該處理器會繼續向下游(FinalRequestProcessor)傳遞。
FinalRequestProcessor
上文已經介紹,此處省略。
Observer處理Leader數據
處理器鏈
protected void setupRequestProcessors() {
// We might consider changing the processor behaviour of
// Observers to, for example, remove the disk sync requirements.
// Currently, they behave almost exactly the same as followers.
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new CommitProcessor(
finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
commitProcessor.start();
firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
((ObserverRequestProcessor) firstProcessor).start();
if (syncRequestProcessorEnabled) {
syncProcessor = new SyncRequestProcessor(this, null);
syncProcessor.start();
}
}
processPacket方法
protected void processPacket(QuorumPacket qp) throws Exception {
TxnLogEntry logEntry;
TxnHeader hdr;
TxnDigest digest;
Record txn;
switch (qp.getType()) {
case Leader.PING:
ping(qp);
break;
case Leader.PROPOSAL:
LOG.warn("Ignoring proposal");
break;
case Leader.COMMIT:
LOG.warn("Ignoring commit");
break;
case Leader.UPTODATE:
LOG.error("Received an UPTODATE message after Observer started");
break;
case Leader.REVALIDATE:
revalidate(qp);
break;
case Leader.SYNC:
((ObserverZooKeeperServer) zk).sync();
break;
case Leader.INFORM:
ServerMetrics.getMetrics().LEARNER_COMMIT_RECEIVED_COUNT.add(1);
logEntry = SerializeUtils.deserializeTxn(qp.getData());
hdr = logEntry.getHeader();
txn = logEntry.getTxn();
digest = logEntry.getDigest();
Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, 0);
request.logLatency(ServerMetrics.getMetrics().COMMIT_PROPAGATION_LATENCY);
request.setTxnDigest(digest);
ObserverZooKeeperServer obs = (ObserverZooKeeperServer) zk;
obs.commitRequest(request);
break;
case Leader.INFORMANDACTIVATE: // 處理reconfig請求
// get new designated leader from (current) leader's message
ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
long suggestedLeaderId = buffer.getLong();
byte[] remainingdata = new byte[buffer.remaining()];
buffer.get(remainingdata);
logEntry = SerializeUtils.deserializeTxn(remainingdata);
hdr = logEntry.getHeader();
txn = logEntry.getTxn();
digest = logEntry.getDigest();
QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) txn).getData(), UTF_8));
request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, 0);
request.setTxnDigest(digest);
obs = (ObserverZooKeeperServer) zk;
boolean majorChange = self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
obs.commitRequest(request);
if (majorChange) {
throw new Exception("changes proposed in reconfig");
}
break;
default:
LOG.warn("Unknown packet type: {}", LearnerHandler.packetToString(qp));
break;
}
}