Kafka原理剖析之「位點提交」

来源:https://www.cnblogs.com/xijiu/p/18119723
-Advertisement-
Play Games

多租戶的概念是我在畢業後不久進第一家公司接觸到的,當時所在部門的業務是計劃建設一套基於自研的、基於開放 API 的、基於 PaaS 的、面向企業(ToB)的多租戶架構平臺,將我們的服務可以成規模地、穩定高效地交付給客戶使用。 ...


一、背景

Kafka的位點提交一直是Consumer端非常重要的一部分,業務上我們經常遇到的消息丟失、消息重覆也與其息息相關。位點提交說簡單也簡單,說複雜也確實複雜,沒有人能用一段簡短的話將其說清楚,最近團隊生產環境便遇到一個小概率的報錯

Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. The coordinator is not available.

此錯誤一齣,Consumer的流量直接跌0,且無法自愈,雖然客戶端重啟後可自動恢復,但影響及損失還是非常巨大的,當然最後定位就是「位點提交」一手炮製的,是開源的一個重大bug,受此影響的版本跨越2.6.x ~ 3.1.x :

https://issues.apache.org/jira/browse/KAFKA-13840

藉此bug正好來梳理一下Kafka有關位點提交的知識點

二、概述

關於位點提交(commit offset)大家最直觀的感受便是自動或手動提交,但是仔細一想,還是有很多細節問題,例如:

  • 手動的同步提交與非同步提交有什麼區別?
  • 使用自動提交模式時,提交動作是同步的還是非同步的?
  • 消費模式使用assign或subscribe,在提交位點時有區別嗎?
  • 同步提交與非同步提交能否混合使用?
  • 手動提交與自動提交能否混合使用?

其實這些問題都是萬變不離其宗,我們把各個特征總結一下,這些問題自然也就迎刃而解

三、為什麼要提交位點?

在開始介紹各類位點提交的策略之前,我們先拋出一個靈魂拷問:“為什麼一定要提交位點?”。 Consumer會周期性的從Broker拉取消息,每次拉取消息的時候,順便提交位點不可以嗎?為什麼一定要讓用戶感知提交位點,還提供了各種各樣的策略?

其實回答這個問題,我們理解以下2個配置就夠了

  • fetch.max.bytes and max.partition.fetch.bytes
    • 均作用於Broker端。首先需要明確的是,Consumer的一次拉取經常是針對多個partition的,因此max.partition.fetch.bytes控制的一個partition拉取消息的最大值,而fetch.max.bytes控制的則是本次請求整體的上限
  • max.poll.records
    • 作用於Consumer端。而此參數控制的就是一次 poll 方法最多返回的消息條數,因此並不是每次調用 poll 方法都會發起一次網路請求的

因此也就導致了發起網路的頻次跟用戶處理業務數據的頻次是不一樣的

簡單總結一下,單次網路請求拉取的數據量可能是很大的,需要客戶端通過多次調用poll()方法來消化,如果按照網路請求的頻次來提交位點的話,那這個提交頻次未免太粗了,Consumer一旦發生重啟,將會導致大量的消息重覆

其次按照網路請求的頻次來提交位點的話,程式將變得不夠靈活,業務端對於消息的處理會有自己的理解,將提交位點的發起動作放在Consumer,設計更有彈性

四、Consumer網路模型簡介

4.1、單線程的Consumer

在開始介紹Consumer端的網路模型之前,我們先看下Producer的

可見Producer是線程安全的,Producer內部維護了一個併發的緩存隊列,所有的數據都會先寫入隊列,然後由Sender線程負責將其發送至網路

而Consumer則不同,我們羅列一下Consumer的特點

  • 單線程(業務處理與網路共用一個線程)
  • 非線程安全

不過這裡說的單線程不夠嚴謹,在0.10.1版本以後:

  • Subscribe模式下,Consumer將心跳邏輯放在了一個獨立線程中,如果消息處理邏輯不能在 max.poll.interval.ms 內完成,則consumer將停止發送心跳,然後發送LeaveGroup請求主動離組,從而引發coordinator開啟新一輪rebalance
  • Assign模式下,則只有一個Main線程

用戶處理業務邏輯的時間可能會很長,因此心跳線程的引入主要是為瞭解決心跳問題,其非常輕量,因此我們泛泛的講,Consumer其實就是單線程的,包括提交位點,那一個單線程的客戶端是如何保證高效的吞吐,又是如何與用戶處理數據的邏輯解耦呢?其實這是個很有意思,也很有深度的問題,但不是本文的重點,後續我們再展開詳聊

因此我們知道,所有提交位點的動作均是交由Consumer Main線程來提交的,但是單線程並不意味著阻塞,不要忘記,我們底層依賴的是JDK的NIO,因此網路發送、接受部分均是非同步執行的

4.2、網路模型

既然Consumer是單線程,而NIO是非同步的,那麼Consumer如何處理這些網路請求呢?Producer比較好理解,有一個專門負責交互的Sender線程,而單線程的Consumer如何處理呢

其實Consumer所有的網路發送動作,均放在main線程中,而在Consumer內部,為每個建聯的Broker都維護了一個unsent列表,這個列表中存放了待發送的請求,每次業務端程式執行consumer.poll()方法時,會先後觸發2次網路發送的操作:

  1. 嘗試將所有Broker待發送區的數據發送出去
  2. 處理網路接收到的請求
  3. 嘗試將所有Broker待發送區的數據發送出去(again)

回到我們位點提交的case中,如果某個Broker積攢了大量的未發送請求,那提交位點的請求豈不是要等待很久才能發出去?是的,如果unsent列表中有很多請求確實會這樣,不過正常情況下,同一個Broker中不會積攢大量請求,如果一次從Broker中拉取的消息還沒有被消費完,是不會向該Broker再次發送請求的,因此業務poll()的頻率是要遠高於網路發送頻率的,而單次poll時,又會觸發2次trySend,因此可保證不存在unsent列表的數據過多而發不出的情況

BTW:Consumer的網路三件套:NetworkClient、Selector、KafkaChannel與Producer是完全一樣的。關於Consumer的核心組件,盜用一張網上的圖

有了上面的基礎,我們再來討論位點提交的方式,就會變得非常清晰明朗了

五、手動-非同步提交

執行非同步提交的代碼通常是這樣寫的

while (true) {
    // 拉取消息
    ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
    // 如果拉取的消息不為空
    if (!records.isEmpty()) {
        // 執行常規業務處理邏輯
        doBusiness(records);
        // 非同步提交位點
        kafkaConsumer.commitAsync();
    }
}

kafkaConsumer.commitAsync() 負責拼接提交位點的request,然後將請求放在對應Broker的unsent列表中,程式將返回,待到下一次業務執行poll(),或合適的時機,會將此請求發出去,並不阻塞main線程

而對於提交位點的結果,如果指定了回調函數,如下:

kafkaConsumer.commitAsync(new OffsetCommitCallback() {
    @Override
    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
    }
});

可以對異常進行處理,也可以拿到實時提交的位點

而對於沒有指定回調函數的case,Consumer會提供一個預設的回調函數org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.DefaultOffsetCommitCallback,在發生異常時,輸出error日誌

六、手動-同步提交

而對於同步提交

kafkaConsumer.commitSync();

首先需要明確的是,同步提交是會阻塞Consumer的Main線程的,手動提交會首先將提交請求放在對應Broker的unsent列表的尾部,繼而不斷地觸發調用,將網路待發送區的數據發送出去,同時不間斷接收網路請求,直到收到本次提交的響應;不過同步提交也有超時時間,預設為60s,如果超時,將會拋出TimeoutException異常

同步提交是低效的,會影響Consumer整體的消費吞吐,而有些相對嚴苛的業務場景,同步提交又是必不可少的,讀者根據自己的業務case來決定使用哪種策略

七、自動提交

與手動提交相對的,便是自動提交,首先明確一點,自動提交的模式,是非同步提交

自動提交並不是啟動一個全新的線程去提交位點,也不是嚴格按照固定時間間隔去提交。自動提交與手動提交一樣,也是由Consumer Main線程觸發的

由於位點提交、處理業務邏輯、網路收發、元數據更新等,都共用了Consumer的Main線程,因此並不能保證提交位點的時間間隔嚴格控制在auto.commit.interval.ms(預設5000,即5s)內,因此真實提交位點的時間間隔只會大於等於auto.commit.interval.ms

總結一下自動提交的特點:

  • 非同步提交
  • 提交操作由Consumer的Main線程發起
  • 配置 auto.commit.interval.ms 只能保證提交的最小間隔,真實提交時間間隔通常大於此配置

至此,我們嘗試回答一下剛開始提出的問題

  • 手動的同步提交與非同步提交有什麼區別?
    • 同步提交會阻塞Consumer的Main線程,相對而言,非同步提交性能更高
  • 使用自動提交模式時,提交動作是同步的還是非同步的?
    • 非同步的
  • 消費模式使用assign或subscribe,在提交位點時有區別嗎?
    • subscribe模式會有心跳線程,心跳線程維護了與Coordinator的建聯
  • 同步提交與非同步提交能否混合使用?
    • 可以,通常在大部分場景使用非同步提交,而在需要明確拿到已提交位點的case下使用同步提交
  • 手動提交與自動提交能否混合使用?
    • 可以,不過語義上會有很多衝突,不建議混合使用

八、開源Bug

回到文章剛開始提到的異常報錯

Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. The coordinator is not available.

這個bug並不是在所有case下都會存在

  • Subscribe
    • 自動提交 -- 正常運行
    • 手動-非同步提交 -- 正常運行
    • 手動-同步提交 -- 正常運行
  • Assign
    • 自動提交 -- Bug
    • 手動-非同步提交 -- Bug
    • 手動-同步提交 -- 正常運行

為什麼會出現如何奇怪的情況呢?其實跟一下源碼便會有結論

8.1、Subscribe

在Subscribe模式下,Consumer與Coordinator的交互是通過線程org.apache.kafka.clients.consumer.internals.AbstractCoordinator.HeartbeatThread進行的。當程式發現Coordinator找不到時,便會發起尋找Coordinator的網路請求,方法如下

// org.apache.kafka.clients.consumer.internals.AbstractCoordinator#lookupCoordinator
protected synchronized RequestFuture<Void> lookupCoordinator() {
    if (findCoordinatorFuture == null) {
        // find a node to ask about the coordinator
        Node node = this.client.leastLoadedNode();
        if (node == null) {
            log.debug("No broker available to send FindCoordinator request");
            return RequestFuture.noBrokersAvailable();
        } else {
            findCoordinatorFuture = sendFindCoordinatorRequest(node);
        }
    }
    return findCoordinatorFuture;
}

而其中涉及一個findCoordinatorFuture的成員變數,必須要滿足findCoordinatorFuture == null,才會真正發起網路請求,因此在方法執行完,需要將其置空,如下方法

// org.apache.kafka.clients.consumer.internals.AbstractCoordinator#clearFindCoordinatorFuture
private synchronized void clearFindCoordinatorFuture() {
    findCoordinatorFuture = null;
}

說白了,也就是每次調用,都需要lookupCoordinator()clearFindCoordinatorFuture()成對兒出現;當然心跳線程也是這樣做的

if (coordinatorUnknown()) {
    if (findCoordinatorFuture != null) {
        // clear the future so that after the backoff, if the hb still sees coordinator unknown in
        // the next iteration it will try to re-discover the coordinator in case the main thread cannot
        // 清理輔助變數findCoordinatorFuture
        clearFindCoordinatorFuture();

        // backoff properly
        AbstractCoordinator.this.wait(rebalanceConfig.retryBackoffMs);
    } else {
        // 尋找Coordinator
        lookupCoordinator();
    }
}

因此在Subscribe模式下,無論何種提交方式,都是沒有Bug的

8.2、Assign

因為自動提交也是非同步提交,因此我們只聚焦在同步提交與非同步提交。其實同步提交與非同步提交,它們構建入參、處理響應等均是調用的同一個方法,唯一不同的是發起調用處的邏輯。我們先看下同步提交的邏輯


// org.apache.kafka.clients.consumer.internals.AbstractCoordinator#ensureCoordinatorReady
protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
    if (!coordinatorUnknown())
        return true;

    do {
        if (fatalFindCoordinatorException != null) {
            final RuntimeException fatalException = fatalFindCoordinatorException;
            fatalFindCoordinatorException = null;
            throw fatalException;
        }
        final RequestFuture<Void> future = lookupCoordinator();

        // some other business
        // .......

        clearFindCoordinatorFuture();
        if (fatalException != null)
            throw fatalException;
    } while (coordinatorUnknown() && timer.notExpired());

    return !coordinatorUnknown();
}

沒有問題,lookupCoordinator()clearFindCoordinatorFuture()又成對兒出現

而非同步提交呢?

// org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#commitOffsetsAsync
public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
    invokeCompletedOffsetCommitCallbacks();

    if (!coordinatorUnknown()) {
        doCommitOffsetsAsync(offsets, callback);
    } else {
        // we don't know the current coordinator, so try to find it and then send the commit
        // or fail (we don't want recursive retries which can cause offset commits to arrive
        // out of order). Note that there may be multiple offset commits chained to the same
        // coordinator lookup request. This is fine because the listeners will be invoked in
        // the same order that they were added. Note also that AbstractCoordinator prevents
        // multiple concurrent coordinator lookup requests.
        pendingAsyncCommits.incrementAndGet();
        lookupCoordinator().addListener(new RequestFutureListener<Void>() {
            @Override
            public void onSuccess(Void value) {
                // do something
            }

            @Override
            public void onFailure(RuntimeException e) {
                // do something
            }
        });
    }

    // ensure the commit has a chance to be transmitted (without blocking on its completion).
    // Note that commits are treated as heartbeats by the coordinator, so there is no need to
    // explicitly allow heartbeats through delayed task execution.
    client.pollNoWakeup();
}

非常遺憾,只有lookupCoordinator(),卻沒有clearFindCoordinatorFuture(),導致成員變數一直得不到重置,也就無法正常發起尋找Coordinator的請求,其實如果修複的話,也非常簡單,只需要在RequestFutureListener的回調結果中顯式調用clearFindCoordinatorFuture()即可

這個bug隱藏的很深,只靠單測,感覺還是很難發現的,bug已經在3.2.1版本修複。雖然我們生產環境是2.8.2的Broker,但是還是可以直接通過升級Consumer版本來解決,即便client版本高於了server端。這個當然得益於Kafka靈活的版本策略,還是要為其點個贊的

參考文檔


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 一、是什麼 WebSocket,是一種網路傳輸協議,位於OSI模型的應用層。可在單個TCP連接上進行全雙工通信,能更好的節省伺服器資源和帶寬並達到實時通迅 客戶端和伺服器只需要完成一次握手,兩者之間就可以創建持久性的連接,併進行雙向數據傳輸 從上圖可見,websocket伺服器與客戶端通過握手連接, ...
  • 1. Module Module是NestJS 的基本組織單位。 模塊系統基於 Node.js 的 CommonJS 模塊系統,但提供了更高級別的抽象和組織方式。通過使用模塊,你可以將應用程式拆分成多個獨立且可復用的部分,每個模塊都負責實現特定的功能或業務邏輯。 模塊可以封裝相關的代碼、配置和依賴關 ...
  • 使用defineModel時,為什麼子組件內沒有任何關於props的定義和emit事件觸發的代碼?修改defineModel返回值會修改父組件上綁定的變數,這是否破壞了vue的單向數據流呢? ...
  • React 學習之 createElement React 元素 在 React 中,元素是 React 應用的最小構建塊。 一個 React 元素是 React 對象的一個輕量級、靜態的表示。 它們被 React 用於知道屏幕上什麼應該被渲染,併在數據改變時保持 UI 的更新。 React 元素是 ...
  • 一、三次握手 三次握手(Three-way Handshake)其實就是指建立一個TCP連接時,需要客戶端和伺服器總共發送3個包 主要作用就是為了確認雙方的接收能力和發送能力是否正常、指定自己的初始化序列號為後面的可靠性傳送做準備 過程如下: 第一次握手:客戶端給服務端發一個 SYN 報文,並指明客 ...
  • 首先寫一個bind的簡單示例: 'use strict' function fn() { console.log('this::', this) console.log('arguments::', arguments) } // fn() // 這裡調用時this 在嚴格模式下是undefined ...
  • React 學習之 Hello World React 簡介 React是一個用於構建用戶界面的JavaScript庫,由Facebook開發並維護。React通過聲明式的方式來構建UI,使得代碼更易於理解和測試。React的核心概念包括組件(Component)和虛擬DOM(Virtual DOM ...
  • nvm nvm(Node Version Manager)是一個Node.js的版本管理器。 安裝nvm windows安裝nvm 1. 下載nvm 下載地址:nvm-windows,下載 nvm-noinstall 或者 nvm-setup.exe 如果使用 nvm-noinstall 可以運行 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...