zookeeper源碼(09)follower處理客戶端請求

来源:https://www.cnblogs.com/xugf/p/18033571
-Advertisement-
Play Games

在zookeeper中,follower也可以接收客戶端連接,處理客戶端請求,本文將分析follower處理客戶端請求的流程: 讀請求處理 寫請求轉發與響應 follower接收轉發客戶端請求 網路層接收客戶端數據包 leader、follower都會啟動ServerCnxnFactory組件,用來 ...


在zookeeper中,follower也可以接收客戶端連接,處理客戶端請求,本文將分析follower處理客戶端請求的流程:

  • 讀請求處理
  • 寫請求轉發與響應

follower接收轉發客戶端請求

網路層接收客戶端數據包

leader、follower都會啟動ServerCnxnFactory組件,用來接收客戶端連接、讀取客戶端數據包、將客戶端數據包轉發給zk應用層。

在"zookeeper源碼(08)請求處理及數據讀寫流程"一文中已經介紹,ServerCnxn在讀取到客戶端數據包之後,會調用zookeeperServer的processConnectRequest或processPacket方法:

  • processConnectRequest方法:創建session
  • processPacket方法:處理業務請求

processConnectRequest創建session

  • 會使用sessionTracker生成sessionId、創建session對象
  • 生成一個密碼
  • 提交一個createSession類型Request並提交給業務處理器
long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) {
    // 生成sessionId、創建session對象
    long sessionId = sessionTracker.createSession(timeout);
    // 生成密碼
    Random r = new Random(sessionId ^ superSecret);
    r.nextBytes(passwd);
    // 提交createSession類型Request
    CreateSessionTxn txn = new CreateSessionTxn(timeout);
    cnxn.setSessionId(sessionId);
    Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, RequestRecord.fromRecord(txn), null);
    submitRequest(si);
    return sessionId;
}

processPacket處理業務請求

  • 封裝Request
  • 驗證largeRequest
  • 提交業務層處理器
Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), request, cnxn.getAuthInfo());
int length = request.limit();
if (isLargeRequest(length)) {
    // checkRequestSize will throw IOException if request is rejected
    checkRequestSizeWhenMessageReceived(length);
    si.setLargeRequestSize(length);
}
si.setOwner(ServerCnxn.me);
submitRequest(si);

FollowerRequestProcessor處理器

在follower端,客戶端請求會由FollowerRequestProcessor處理:

  1. 把請求提交下游CommitProcessor處理器
  2. 寫請求轉發給leader處理
  3. 讀請求經過CommitProcessor直接轉發給FinalRequestProcessor處理器,直接查詢數據返回給客戶端
public void run() {
    try {
        while (!finished) {

            Request request = queuedRequests.take();

            // Screen quorum requests against ACLs first 略

            // 轉發給CommitProcessor處理器
            // 提交到queuedRequests隊列
            // 寫請求還會提交到queuedWriteRequests隊列
            maybeSendRequestToNextProcessor(request);

            // ...

            // 寫請求需要轉發給leader處理
            switch (request.type) {
            case OpCode.sync:
                zks.pendingSyncs.add(request); // 待同步命令
                zks.getFollower().request(request);
                break;
            case OpCode.create:
            case OpCode.create2:
            case OpCode.createTTL:
            case OpCode.createContainer:
            case OpCode.delete:
            case OpCode.deleteContainer:
            case OpCode.setData:
            case OpCode.reconfig:
            case OpCode.setACL:
            case OpCode.multi:
            case OpCode.check:
                zks.getFollower().request(request);
                break;
            case OpCode.createSession:
            case OpCode.closeSession:
                if (!request.isLocalSession()) {
                    zks.getFollower().request(request);
                }
                break;
            }
        }
    } catch (Exception e) {
        handleException(this.getName(), e);
    }
}

轉發leader

zks.getFollower().request(request);

Learner轉發請求:

void request(Request request) throws IOException {
    // 略

    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    DataOutputStream oa = new DataOutputStream(baos);
    oa.writeLong(request.sessionId); // sessionId
    oa.writeInt(request.cxid); // 客戶端xid
    oa.writeInt(request.type); // 業務類型
    byte[] payload = request.readRequestBytes(); // 請求體
    if (payload != null) {
        oa.write(payload);
    }
    oa.close();
    // 封裝REQUEST數據包
    QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo);
    writePacket(qp, true); // 通過網路發給leader伺服器
}

leader處理follower請求

LearnerHandler接收REQUEST請求

case Leader.REQUEST:
    bb = ByteBuffer.wrap(qp.getData());
    sessionId = bb.getLong(); // 解析請求信息
    cxid = bb.getInt();
    type = bb.getInt();
    bb = bb.slice();
    Request si;
    if (type == OpCode.sync) {
        si = new LearnerSyncRequest(
            this, sessionId, cxid, type, RequestRecord.fromBytes(bb), qp.getAuthinfo());
    } else {
        si = new Request(null, sessionId, cxid, type, RequestRecord.fromBytes(bb), qp.getAuthinfo());
    }
    si.setOwner(this); // 用來判斷請求來自follower
    learnerMaster.submitLearnerRequest(si); // 提交給業務處理器
    requestsReceived.incrementAndGet();

submitLearnerRequest提交業務處理器:

public void submitLearnerRequest(Request si) {
    zk.submitLearnerRequest(si);
}

LeaderZooKeeperServer提交業務處理器:

public void submitLearnerRequest(Request request) {
    // 提交給PrepRequestProcessor處理器
    prepRequestProcessor.processRequest(request);
}

從此處開始走leader處理寫請求流程。

leader處理寫請求流程回顧

  • PrepRequestProcessor - 做事務設置
  • ProposalRequestProcessor - 發起proposal,將Request轉發給SyncRequestProcessor寫事務log、本地ack
  • CommitProcessor - 讀請求直接調用下游處理器,寫請求需要等待足夠的ack之後commit再調用下游RequestProcessor處理器
  • ToBeAppliedRequestProcessor - 維護toBeApplied列表
  • FinalRequestProcessor - 把事務應用到ZKDatabase,提供查詢功能,返迴響應

follower處理leader數據

在follower中,Follower使用processPacket方法處理來自leader的數據包,此處看一下PROPOSAL和COMMIT的邏輯。

PROPOSAL數據包

fzk.logRequest(hdr, txn, digest);

logRequest會使用syncProcessor將事務寫入到txnlog文件,之後調用SendAckRequestProcessor處理器給leader發ack數據包。

leader收到超過半數的ack之後會發COMMIT數據包讓各個節點將事務應用到ZKDatabase中。

COMMIT數據包

fzk.commit(qp.getZxid());

CommitProcessor處理器會將其提交到committedRequests隊列,之後客戶端Request會繼續向下游FinalRequestProcessor處理器傳遞。

FinalRequestProcessor處理器

  • 把事務應用到ZKDatabase中
  • 提供查詢功能
  • 給客戶端返迴響應

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 這裡給大家分享我在網上總結出來的一些知識,希望對大家有所幫助 一、NextTick是什麼 官方對其的定義 在下次 DOM 更新迴圈結束之後執行延遲回調。在修改數據之後立即使用這個方法,獲取更新後的 DOM 什麼意思呢? 我們可以理解成,Vue 在更新 DOM 時是非同步執行的。當數據發生變化,Vue將 ...
  • 本文主要討論了訂單履約系統的應用架構。首先提出了訂單履約系統的三大核心能力,分別是履約服務表達、履約調度和物流配送。文中還詳細介紹了訂單履約系統的應用架構,包括C端履約服務和B端管理模塊,以及領域層的能力。 ...
  • 軟體架構是成功開發軟體產品的基礎。精心設計的軟體架構可以大大提高系統的質量。它還有助於降低出錯風險,並使將來添加新特性和功能變得更加容易。在這篇博文中,我將為您列出 2024 年最值得一讀的軟體架構書籍,以及 2024 年將出版哪些有趣的軟體架構書籍。當然,這些書籍中的大多數也是 2023 年最佳軟 ...
  • Redis主要在記憶體中操作數據,記憶體是一種臨時存儲,一旦斷電(或者硬體故障、軟體錯誤等),記憶體中的數據就會煙消雲散。有的同學會說,數據不是會保存到硬碟嗎?是的,但是還是可能會有一些數據來不及寫入硬碟,這是Redis的持久化機制導致的。而且,即使Redis將全部數據都及時保存到了硬碟,硬碟出現問題也可... ...
  • 簡介 抽象工廠模式是一種創建型設計模式,它提供了一種創建一系列相關或相互依賴對象的介面,而無需指定它們具體的類。抽象工廠模式將一組具有共同主題的單個工廠封裝起來,它提供介面用於創建相關或依賴對象的家族,而不需要指定具體的類。 抽象工廠模式包含以下幾個核心角色: 抽象工廠(Abstract Facto ...
  • 本文分享自華為雲社區《java代碼實現非同步返回結果如何判斷非同步執行完成》,作者: 皮牙子抓飯。 在許多應用程式中,我們經常使用非同步操作來提高性能和響應度。在Java中,我們可以使用多線程或者非同步任務來執行耗時操作,並且在後臺處理過程完成後獲取結果。但是,在使用非同步操作時,我們通常需要知道非同步任務何時 ...
  • 本文介紹基於C++語言GDAL庫,為CreateCopy()函數創建的柵格圖像添加更多波段的方法。 在C++語言的GDAL庫中,我們可以基於CreateCopy()函數與Create()函數創建新的柵格圖像文件。其中,CreateCopy()函數需要基於一個已有的柵格圖像文件作為模板,將模板文件的各 ...
  • 項目簡介 IExcel 用於優雅地讀取和寫入 excel。 避免大 excel 出現 oom,簡約而不簡單。 特性 一行代碼搞定一切 OO 的方式操作 excel,編程更加方便優雅。 sax 模式讀取,SXSS 模式寫入。避免 excel 大文件 OOM。 基於註解,編程更加靈活。 設計簡單,註釋完 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...