在zookeeper中,follower也可以接收客戶端連接,處理客戶端請求,本文將分析follower處理客戶端請求的流程: 讀請求處理 寫請求轉發與響應 follower接收轉發客戶端請求 網路層接收客戶端數據包 leader、follower都會啟動ServerCnxnFactory組件,用來 ...
在zookeeper中,follower也可以接收客戶端連接,處理客戶端請求,本文將分析follower處理客戶端請求的流程:
- 讀請求處理
- 寫請求轉發與響應
follower接收轉發客戶端請求
網路層接收客戶端數據包
leader、follower都會啟動ServerCnxnFactory組件,用來接收客戶端連接、讀取客戶端數據包、將客戶端數據包轉發給zk應用層。
在"zookeeper源碼(08)請求處理及數據讀寫流程"一文中已經介紹,ServerCnxn在讀取到客戶端數據包之後,會調用zookeeperServer的processConnectRequest或processPacket方法:
- processConnectRequest方法:創建session
- processPacket方法:處理業務請求
processConnectRequest創建session
- 會使用sessionTracker生成sessionId、創建session對象
- 生成一個密碼
- 提交一個createSession類型Request並提交給業務處理器
long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) {
// 生成sessionId、創建session對象
long sessionId = sessionTracker.createSession(timeout);
// 生成密碼
Random r = new Random(sessionId ^ superSecret);
r.nextBytes(passwd);
// 提交createSession類型Request
CreateSessionTxn txn = new CreateSessionTxn(timeout);
cnxn.setSessionId(sessionId);
Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, RequestRecord.fromRecord(txn), null);
submitRequest(si);
return sessionId;
}
processPacket處理業務請求
- 封裝Request
- 驗證largeRequest
- 提交業務層處理器
Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), request, cnxn.getAuthInfo());
int length = request.limit();
if (isLargeRequest(length)) {
// checkRequestSize will throw IOException if request is rejected
checkRequestSizeWhenMessageReceived(length);
si.setLargeRequestSize(length);
}
si.setOwner(ServerCnxn.me);
submitRequest(si);
FollowerRequestProcessor處理器
在follower端,客戶端請求會由FollowerRequestProcessor處理:
- 把請求提交下游CommitProcessor處理器
- 寫請求轉發給leader處理
- 讀請求經過CommitProcessor直接轉發給FinalRequestProcessor處理器,直接查詢數據返回給客戶端
public void run() {
try {
while (!finished) {
Request request = queuedRequests.take();
// Screen quorum requests against ACLs first 略
// 轉發給CommitProcessor處理器
// 提交到queuedRequests隊列
// 寫請求還會提交到queuedWriteRequests隊列
maybeSendRequestToNextProcessor(request);
// ...
// 寫請求需要轉發給leader處理
switch (request.type) {
case OpCode.sync:
zks.pendingSyncs.add(request); // 待同步命令
zks.getFollower().request(request);
break;
case OpCode.create:
case OpCode.create2:
case OpCode.createTTL:
case OpCode.createContainer:
case OpCode.delete:
case OpCode.deleteContainer:
case OpCode.setData:
case OpCode.reconfig:
case OpCode.setACL:
case OpCode.multi:
case OpCode.check:
zks.getFollower().request(request);
break;
case OpCode.createSession:
case OpCode.closeSession:
if (!request.isLocalSession()) {
zks.getFollower().request(request);
}
break;
}
}
} catch (Exception e) {
handleException(this.getName(), e);
}
}
轉發leader
zks.getFollower().request(request);
Learner轉發請求:
void request(Request request) throws IOException {
// 略
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream oa = new DataOutputStream(baos);
oa.writeLong(request.sessionId); // sessionId
oa.writeInt(request.cxid); // 客戶端xid
oa.writeInt(request.type); // 業務類型
byte[] payload = request.readRequestBytes(); // 請求體
if (payload != null) {
oa.write(payload);
}
oa.close();
// 封裝REQUEST數據包
QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo);
writePacket(qp, true); // 通過網路發給leader伺服器
}
leader處理follower請求
LearnerHandler接收REQUEST請求
case Leader.REQUEST:
bb = ByteBuffer.wrap(qp.getData());
sessionId = bb.getLong(); // 解析請求信息
cxid = bb.getInt();
type = bb.getInt();
bb = bb.slice();
Request si;
if (type == OpCode.sync) {
si = new LearnerSyncRequest(
this, sessionId, cxid, type, RequestRecord.fromBytes(bb), qp.getAuthinfo());
} else {
si = new Request(null, sessionId, cxid, type, RequestRecord.fromBytes(bb), qp.getAuthinfo());
}
si.setOwner(this); // 用來判斷請求來自follower
learnerMaster.submitLearnerRequest(si); // 提交給業務處理器
requestsReceived.incrementAndGet();
submitLearnerRequest提交業務處理器:
public void submitLearnerRequest(Request si) {
zk.submitLearnerRequest(si);
}
LeaderZooKeeperServer提交業務處理器:
public void submitLearnerRequest(Request request) {
// 提交給PrepRequestProcessor處理器
prepRequestProcessor.processRequest(request);
}
從此處開始走leader處理寫請求流程。
leader處理寫請求流程回顧
- PrepRequestProcessor - 做事務設置
- ProposalRequestProcessor - 發起proposal,將Request轉發給SyncRequestProcessor寫事務log、本地ack
- CommitProcessor - 讀請求直接調用下游處理器,寫請求需要等待足夠的ack之後commit再調用下游RequestProcessor處理器
- ToBeAppliedRequestProcessor - 維護toBeApplied列表
- FinalRequestProcessor - 把事務應用到ZKDatabase,提供查詢功能,返迴響應
follower處理leader數據
在follower中,Follower使用processPacket方法處理來自leader的數據包,此處看一下PROPOSAL和COMMIT的邏輯。
PROPOSAL數據包
fzk.logRequest(hdr, txn, digest);
logRequest會使用syncProcessor將事務寫入到txnlog文件,之後調用SendAckRequestProcessor處理器給leader發ack數據包。
leader收到超過半數的ack之後會發COMMIT數據包讓各個節點將事務應用到ZKDatabase中。
COMMIT數據包
fzk.commit(qp.getZxid());
CommitProcessor處理器會將其提交到committedRequests隊列,之後客戶端Request會繼續向下游FinalRequestProcessor處理器傳遞。
FinalRequestProcessor處理器
- 把事務應用到ZKDatabase中
- 提供查詢功能
- 給客戶端返迴響應