Kafka2.0消費者協調器源碼

来源:https://www.cnblogs.com/bigshark/archive/2019/07/17/11198481.html
-Advertisement-
Play Games

消費組和消費者 1. 消費組和消費者是一對多的關係。 2. 同一個消費組的消費者可以消費多個分區,且是獨占的。 3. 消費者的分區分配策略由介面 定義,內置三種分配策略 、`RoundRobinAssignor StickyAssignor`,支持自定義策略。 4. 不同消費組可以消費相同的分區,互 ...


消費組和消費者

  1. 消費組和消費者是一對多的關係。
  2. 同一個消費組的消費者可以消費多個分區,且是獨占的。
  3. 消費者的分區分配策略由介面PartitionAssignor定義,內置三種分配策略RangeAssignorRoundRobinAssignorStickyAssignor,支持自定義策略。
  4. 不同消費組可以消費相同的分區,互不幹擾。

消費者協調器和組協調器

  1. 客戶端的消費者協調器ConsumerCoordinator和服務端的組協調器GroupCoordinator通過心跳不斷保持通信。
  2. 消費者進行消費之前,需要確保協調器是 ready 的。
    1. 選擇具有最少請求的節點Node,即具有最少的InFlightRequests的節點。
    2. 向該節點發送獲取協調器節點的請求,發送流程類似發送拉取請求。
    3. 向找到的協調器節點發送加入組請求,此時會禁止心跳線程。
    4. 加入組響應處理器JoinGroupResponseHandler對響應進行處理,響應包含generationIdmemberIdleaderIdprotocol
    5. 如果是 leader 消費者,即memberId=leaderId,則需要根據分配策略protocol計算分區分配。
    6. 將分區分配結果封裝到同步組請求,再向協調器節點發送同步組請求。
    7. 同步組響應處理器SyncGroupResponseHandler對上述請求的響應進行處理。
    8. 如果第5步判斷不是 follower 消費者,同樣需要向協調器發送同步組請求,只是請求中不需要封裝分區分配結果,而是從組協調器獲取。
    9. 加入組成功後,啟動心跳線程。
    10. 更新本地緩存的分區分配,此處會調用消費者再平衡監聽器。

消費者狀態

  • UNJOINED:消費者初始狀態為UNJOINED,表示未加入消費組。
  • REBALANCING:消費者向協調器發送加入組請求之前,狀態變更為REBALANCING,表示再平衡狀態
  • STABLE:消費者監聽到消息成功返回,狀態變更為STABLE,表示穩定狀態,如果是失敗的消息,狀態重置為UNJOINED

心跳線程

  1. 消費者加入消費組之後會啟動心跳線程,並保持和組協調器的通信。
  2. 如果消費者狀態不是STABLE,則不發送心跳。
  3. 如果組協調器未知,則等待一段時間重試。
  4. 如果心跳會話超時,則標記協調器節點未知。
  5. 如果心跳輪詢超時,則發送離開組請求。
  6. 如果暫不需要發送心跳,則等待一段時間重試。
  7. 發送心跳,註冊響應監聽器,接收到響應後,設置接收時間,併進行下一輪的心跳。

偏移量

拉取偏移量

  1. 如果有指定的分區,消費者協調器從組協調器拉取一組分區和已提交偏移量的映射關係,緩存到SubscriptionState
  2. 設置偏移量重置策略:LATEST, EARLIEST,NONE
  3. 非同步地更新消費的偏移量位置。

提交偏移量

  1. 消費者協調器獲取當前的協調器節點。
  2. 向該節點發送提交偏移量請求,返回Future

加入組流程

加入組流程

消費者加入組流程的源碼分析

boolean updateAssignmentMetadataIfNeeded(final long timeoutMs) {
    final long startMs = time.milliseconds();
    if (!coordinator.poll(timeoutMs)) { // 獲取協調器
        return false;
    }
    // 更新偏移量
    return updateFetchPositions(remainingTimeAtLeastZero(timeoutMs, time.milliseconds() - startMs));
}
// 獲取協調器
public boolean poll(final long timeoutMs) {
    final long startTime = time.milliseconds();
    long currentTime = startTime;
    long elapsed = 0L;

    if (subscriptions.partitionsAutoAssigned()) { // 是自動分配主題類型
        // 更新心跳的上一次的輪詢時間
        pollHeartbeat(currentTime);

        if (coordinatorUnknown()) { // 協調器未知
            // 確保協調器已經 ready
            if (!ensureCoordinatorReady(remainingTimeAtLeastZero(timeoutMs, elapsed))) {
                return false;
            }
        }

        if (rejoinNeededOrPending()) { // 需要加入消費組
            // 加入組、同步組
            if (!ensureActiveGroup(remainingTimeAtLeastZero(timeoutMs, elapsed))) {
                return false;
            }
            currentTime = time.milliseconds();
        }
    } else { // 指定分區類型
        if (metadata.updateRequested() && !client.hasReadyNodes(startTime)) {// 如果沒有準備就緒的節點
            // 阻塞等待元數據更新
            final boolean metadataUpdated = client.awaitMetadataUpdate(remainingTimeAtLeastZero(timeoutMs, elapsed));
            if (!metadataUpdated && !client.hasReadyNodes(time.milliseconds())) {
                return false; // 更新元數據失敗
            }

            currentTime = time.milliseconds();
        }
    }

    maybeAutoCommitOffsetsAsync(currentTime); // 非同步自動提交偏移量
    return true;
}
// 確保協調器已經 ready
protected synchronized boolean ensureCoordinatorReady(final long timeoutMs) {
    final long startTimeMs = time.milliseconds();
    long elapsedTime = 0L;

    while (coordinatorUnknown()) { // 如果協調器未知
        final RequestFuture<Void> future = lookupCoordinator(); // 向當前請求隊列最少的節點,發送獲取協調器的請求
        client.poll(future, remainingTimeAtLeastZero(timeoutMs, elapsedTime));
        if (!future.isDone()) {
            break; // 響應未完成,退出
        }
    }

    return !coordinatorUnknown();
}
// 加入組、同步組
boolean ensureActiveGroup(long timeoutMs, long startMs) {
    startHeartbeatThreadIfNeeded(); // 啟動心跳線程
    return joinGroupIfNeeded(joinTimeoutMs, joinStartMs);
}
boolean joinGroupIfNeeded(final long timeoutMs, final long startTimeMs) {
    long elapsedTime = 0L;

    while (rejoinNeededOrPending()) {
        // 發送加入組請求
        final RequestFuture<ByteBuffer> future = initiateJoinGroup();
        client.poll(future, remainingTimeAtLeastZero(timeoutMs, elapsedTime));
        if (!future.isDone()) {
            // we ran out of time
            return false;
        }

        if (future.succeeded()) { // 加入成功,回調處理響應,更新緩存的分區分配
            ByteBuffer memberAssignment = future.value().duplicate();
            onJoinComplete(generation.generationId, generation.memberId, generation.protocol, memberAssignment);
        }
    }
    return true;
}
// 發送加入組請求
private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
    if (joinFuture == null) {
        disableHeartbeatThread(); // 暫停心跳線程

        state = MemberState.REBALANCING; // 狀態改為 REBALANCING
        joinFuture = sendJoinGroupRequest(); // 向協調器發送加入組請求
        joinFuture.addListener(new RequestFutureListener<ByteBuffer>() { // 響應監聽器
            @Override
            public void onSuccess(ByteBuffer value) { // 成功
                synchronized (AbstractCoordinator.this) {
                    state = MemberState.STABLE; // 狀態改為 STABLE
                    rejoinNeeded = false; // 不需要加入了

                    if (heartbeatThread != null)
                        heartbeatThread.enable(); // 啟動暫停了的心跳
                }
            }

            @Override
            public void onFailure(RuntimeException e) { // 失敗
                synchronized (AbstractCoordinator.this) {
                    state = MemberState.UNJOINED; // 狀態改為 UNJOINED
                }
            }
        });
    }
    return joinFuture;
}
// 向協調器發送加入組請求
RequestFuture<ByteBuffer> sendJoinGroupRequest() {

    JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(
            groupId,
            this.sessionTimeoutMs,
            this.generation.memberId,
            protocolType(),
            metadata()).setRebalanceTimeout(this.rebalanceTimeoutMs);

    int joinGroupTimeoutMs = Math.max(rebalanceTimeoutMs, rebalanceTimeoutMs + 5000);
    return client.send(coordinator, requestBuilder, joinGroupTimeoutMs)
            .compose(new JoinGroupResponseHandler()); // 非同步回調響應處理類
}
// 非同步回調響應處理類
private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
    @Override
    public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
        Errors error = joinResponse.error();
        if (error == Errors.NONE) {
            synchronized (AbstractCoordinator.this) {
                if (state != MemberState.REBALANCING) { // 如果是 REBALANCING,狀態異常
                    future.raise(new UnjoinedGroupException());
                } else {
                    AbstractCoordinator.this.generation = new Generation(joinResponse.generationId(), joinResponse.memberId(), joinResponse.groupProtocol());
                    if (joinResponse.isLeader()) { // 當前消費組是 leader
                        onJoinLeader(joinResponse).chain(future);
                    } else { // 當消費者是 follower
                        onJoinFollower().chain(future);
                    }
                }
            }
        }
    }
}
// 發送 leader 消費者同步組請求
private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
    try {
        // 根據響應的分配策略,給消費者分配分區
        Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.leaderId(), joinResponse.groupProtocol(), joinResponse.members());

        SyncGroupRequest.Builder requestBuilder = new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId, groupAssignment);
        return sendSyncGroupRequest(requestBuilder);
    } catch (RuntimeException e) {
        return RequestFuture.failure(e);
    }
}
// 發送 follower 消費者同步組請求
private RequestFuture<ByteBuffer> onJoinFollower() {
    SyncGroupRequest.Builder requestBuilder =
            new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId,
                    Collections.<String, ByteBuffer>emptyMap()); // 發送不帶分配信息的請求
    return sendSyncGroupRequest(requestBuilder);
}

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 摘要: 性能彪悍的V8引擎。 《 "JavaScript深入淺出" 》系列 : "JavaScript深入淺出第1課:箭頭函數中的this究竟是什麼鬼?" "JavaScript深入淺出第2課:函數是一等公民是什麼意思呢?" "JavaScript深入淺出第3課:什麼是垃圾回收演算法?" "JavaS ...
  • SRP = Single Responsibility Principle 定義:就一個類而言,應該只有一個能引起他變化的原因。通俗的說,即一個類只負責一項職責。 作用: 1、減少了類之間的耦合 2、最簡單最單純的事情才是最容易控制,最有效 3、當需求變化時,只需要修改一個地方 4、 避免寫臃腫的方 ...
  • Spring Cloud Alibaba | Nacos集群部署 [TOC] 1. Nacos支持三種部署模式 單機模式 用於測試和單機試用。 集群模式 用於生產環境,確保高可用。 多集群模式 用於多數據中心場景。 以上是官方提供的三種部署方式:單機模式對於企業來講,僅可用於測試環境或者開發環境,不 ...
  • <?phpclass db{ public $table=null; public $pdo; public $where=null; //where 條件 public $field=null; //要查詢的條件 public function __construct() { $this->pdo ...
  • Spring Cloud Alibaba | Nacos配置管理 Springboot: 2.1.6.RELEASE SpringCloud: Greenwich.SR1 如無特殊說明,本系列文章全採用以上版本 [TOC] 上一篇 "《Spring Cloud Alibaba | Nacos服務註冊 ...
  • 解壓壓縮包會有一個種子文件。直接迅雷下載即可,包含了韓順平老師的java入門視頻,jdbc,jsp,servlet,oracle,hibermate,spring,SHH框架,struct,linux,等十套視頻,並且還包含了配套的源碼。迅雷下載速度很快。20兆的寬頻下載速度大概是2.3MB/s,沒 ...
  • vector 是最簡單、最常用的數據存儲形式。 vector 似乎一組可以通過索引來訪問的順序存儲的數據元素。 我們可以用 vector 名和索引號的組合來表示一個具體的數據元素 例如:v[0]是5,v[1]是7。 vector 的索引號總是從“0”開始,每次加1. vector “知道自己的大小” ...
  • 經過接近1個月的時間,ElasticSearch6.x實戰教程終於成冊。這本實戰教程小冊有很多不足(甚至可能有錯誤),也是第一次完整推出一個系列的教程。 1年前,我開始真正接觸ES,在此之前僅停留在知道的階段,甚至連瞭解都算不上。1年後跳槽,新的知識新的領域爆炸式的噴涌而出,分散式、ES、Redis ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...