Kafka原理剖析之「位點提交」

来源:https://www.cnblogs.com/xijiu/p/18119723
-Advertisement-
Play Games

多租戶的概念是我在畢業後不久進第一家公司接觸到的,當時所在部門的業務是計劃建設一套基於自研的、基於開放 API 的、基於 PaaS 的、面向企業(ToB)的多租戶架構平臺,將我們的服務可以成規模地、穩定高效地交付給客戶使用。 ...


一、背景

Kafka的位點提交一直是Consumer端非常重要的一部分,業務上我們經常遇到的消息丟失、消息重覆也與其息息相關。位點提交說簡單也簡單,說複雜也確實複雜,沒有人能用一段簡短的話將其說清楚,最近團隊生產環境便遇到一個小概率的報錯

Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. The coordinator is not available.

此錯誤一齣,Consumer的流量直接跌0,且無法自愈,雖然客戶端重啟後可自動恢復,但影響及損失還是非常巨大的,當然最後定位就是「位點提交」一手炮製的,是開源的一個重大bug,受此影響的版本跨越2.6.x ~ 3.1.x :

https://issues.apache.org/jira/browse/KAFKA-13840

藉此bug正好來梳理一下Kafka有關位點提交的知識點

二、概述

關於位點提交(commit offset)大家最直觀的感受便是自動或手動提交,但是仔細一想,還是有很多細節問題,例如:

  • 手動的同步提交與非同步提交有什麼區別?
  • 使用自動提交模式時,提交動作是同步的還是非同步的?
  • 消費模式使用assign或subscribe,在提交位點時有區別嗎?
  • 同步提交與非同步提交能否混合使用?
  • 手動提交與自動提交能否混合使用?

其實這些問題都是萬變不離其宗,我們把各個特征總結一下,這些問題自然也就迎刃而解

三、為什麼要提交位點?

在開始介紹各類位點提交的策略之前,我們先拋出一個靈魂拷問:“為什麼一定要提交位點?”。 Consumer會周期性的從Broker拉取消息,每次拉取消息的時候,順便提交位點不可以嗎?為什麼一定要讓用戶感知提交位點,還提供了各種各樣的策略?

其實回答這個問題,我們理解以下2個配置就夠了

  • fetch.max.bytes and max.partition.fetch.bytes
    • 均作用於Broker端。首先需要明確的是,Consumer的一次拉取經常是針對多個partition的,因此max.partition.fetch.bytes控制的一個partition拉取消息的最大值,而fetch.max.bytes控制的則是本次請求整體的上限
  • max.poll.records
    • 作用於Consumer端。而此參數控制的就是一次 poll 方法最多返回的消息條數,因此並不是每次調用 poll 方法都會發起一次網路請求的

因此也就導致了發起網路的頻次跟用戶處理業務數據的頻次是不一樣的

簡單總結一下,單次網路請求拉取的數據量可能是很大的,需要客戶端通過多次調用poll()方法來消化,如果按照網路請求的頻次來提交位點的話,那這個提交頻次未免太粗了,Consumer一旦發生重啟,將會導致大量的消息重覆

其次按照網路請求的頻次來提交位點的話,程式將變得不夠靈活,業務端對於消息的處理會有自己的理解,將提交位點的發起動作放在Consumer,設計更有彈性

四、Consumer網路模型簡介

4.1、單線程的Consumer

在開始介紹Consumer端的網路模型之前,我們先看下Producer的

可見Producer是線程安全的,Producer內部維護了一個併發的緩存隊列,所有的數據都會先寫入隊列,然後由Sender線程負責將其發送至網路

而Consumer則不同,我們羅列一下Consumer的特點

  • 單線程(業務處理與網路共用一個線程)
  • 非線程安全

不過這裡說的單線程不夠嚴謹,在0.10.1版本以後:

  • Subscribe模式下,Consumer將心跳邏輯放在了一個獨立線程中,如果消息處理邏輯不能在 max.poll.interval.ms 內完成,則consumer將停止發送心跳,然後發送LeaveGroup請求主動離組,從而引發coordinator開啟新一輪rebalance
  • Assign模式下,則只有一個Main線程

用戶處理業務邏輯的時間可能會很長,因此心跳線程的引入主要是為瞭解決心跳問題,其非常輕量,因此我們泛泛的講,Consumer其實就是單線程的,包括提交位點,那一個單線程的客戶端是如何保證高效的吞吐,又是如何與用戶處理數據的邏輯解耦呢?其實這是個很有意思,也很有深度的問題,但不是本文的重點,後續我們再展開詳聊

因此我們知道,所有提交位點的動作均是交由Consumer Main線程來提交的,但是單線程並不意味著阻塞,不要忘記,我們底層依賴的是JDK的NIO,因此網路發送、接受部分均是非同步執行的

4.2、網路模型

既然Consumer是單線程,而NIO是非同步的,那麼Consumer如何處理這些網路請求呢?Producer比較好理解,有一個專門負責交互的Sender線程,而單線程的Consumer如何處理呢

其實Consumer所有的網路發送動作,均放在main線程中,而在Consumer內部,為每個建聯的Broker都維護了一個unsent列表,這個列表中存放了待發送的請求,每次業務端程式執行consumer.poll()方法時,會先後觸發2次網路發送的操作:

  1. 嘗試將所有Broker待發送區的數據發送出去
  2. 處理網路接收到的請求
  3. 嘗試將所有Broker待發送區的數據發送出去(again)

回到我們位點提交的case中,如果某個Broker積攢了大量的未發送請求,那提交位點的請求豈不是要等待很久才能發出去?是的,如果unsent列表中有很多請求確實會這樣,不過正常情況下,同一個Broker中不會積攢大量請求,如果一次從Broker中拉取的消息還沒有被消費完,是不會向該Broker再次發送請求的,因此業務poll()的頻率是要遠高於網路發送頻率的,而單次poll時,又會觸發2次trySend,因此可保證不存在unsent列表的數據過多而發不出的情況

BTW:Consumer的網路三件套:NetworkClient、Selector、KafkaChannel與Producer是完全一樣的。關於Consumer的核心組件,盜用一張網上的圖

有了上面的基礎,我們再來討論位點提交的方式,就會變得非常清晰明朗了

五、手動-非同步提交

執行非同步提交的代碼通常是這樣寫的

while (true) {
    // 拉取消息
    ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
    // 如果拉取的消息不為空
    if (!records.isEmpty()) {
        // 執行常規業務處理邏輯
        doBusiness(records);
        // 非同步提交位點
        kafkaConsumer.commitAsync();
    }
}

kafkaConsumer.commitAsync() 負責拼接提交位點的request,然後將請求放在對應Broker的unsent列表中,程式將返回,待到下一次業務執行poll(),或合適的時機,會將此請求發出去,並不阻塞main線程

而對於提交位點的結果,如果指定了回調函數,如下:

kafkaConsumer.commitAsync(new OffsetCommitCallback() {
    @Override
    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
    }
});

可以對異常進行處理,也可以拿到實時提交的位點

而對於沒有指定回調函數的case,Consumer會提供一個預設的回調函數org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.DefaultOffsetCommitCallback,在發生異常時,輸出error日誌

六、手動-同步提交

而對於同步提交

kafkaConsumer.commitSync();

首先需要明確的是,同步提交是會阻塞Consumer的Main線程的,手動提交會首先將提交請求放在對應Broker的unsent列表的尾部,繼而不斷地觸發調用,將網路待發送區的數據發送出去,同時不間斷接收網路請求,直到收到本次提交的響應;不過同步提交也有超時時間,預設為60s,如果超時,將會拋出TimeoutException異常

同步提交是低效的,會影響Consumer整體的消費吞吐,而有些相對嚴苛的業務場景,同步提交又是必不可少的,讀者根據自己的業務case來決定使用哪種策略

七、自動提交

與手動提交相對的,便是自動提交,首先明確一點,自動提交的模式,是非同步提交

自動提交並不是啟動一個全新的線程去提交位點,也不是嚴格按照固定時間間隔去提交。自動提交與手動提交一樣,也是由Consumer Main線程觸發的

由於位點提交、處理業務邏輯、網路收發、元數據更新等,都共用了Consumer的Main線程,因此並不能保證提交位點的時間間隔嚴格控制在auto.commit.interval.ms(預設5000,即5s)內,因此真實提交位點的時間間隔只會大於等於auto.commit.interval.ms

總結一下自動提交的特點:

  • 非同步提交
  • 提交操作由Consumer的Main線程發起
  • 配置 auto.commit.interval.ms 只能保證提交的最小間隔,真實提交時間間隔通常大於此配置

至此,我們嘗試回答一下剛開始提出的問題

  • 手動的同步提交與非同步提交有什麼區別?
    • 同步提交會阻塞Consumer的Main線程,相對而言,非同步提交性能更高
  • 使用自動提交模式時,提交動作是同步的還是非同步的?
    • 非同步的
  • 消費模式使用assign或subscribe,在提交位點時有區別嗎?
    • subscribe模式會有心跳線程,心跳線程維護了與Coordinator的建聯
  • 同步提交與非同步提交能否混合使用?
    • 可以,通常在大部分場景使用非同步提交,而在需要明確拿到已提交位點的case下使用同步提交
  • 手動提交與自動提交能否混合使用?
    • 可以,不過語義上會有很多衝突,不建議混合使用

八、開源Bug

回到文章剛開始提到的異常報錯

Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. The coordinator is not available.

這個bug並不是在所有case下都會存在

  • Subscribe
    • 自動提交 -- 正常運行
    • 手動-非同步提交 -- 正常運行
    • 手動-同步提交 -- 正常運行
  • Assign
    • 自動提交 -- Bug
    • 手動-非同步提交 -- Bug
    • 手動-同步提交 -- 正常運行

為什麼會出現如何奇怪的情況呢?其實跟一下源碼便會有結論

8.1、Subscribe

在Subscribe模式下,Consumer與Coordinator的交互是通過線程org.apache.kafka.clients.consumer.internals.AbstractCoordinator.HeartbeatThread進行的。當程式發現Coordinator找不到時,便會發起尋找Coordinator的網路請求,方法如下

// org.apache.kafka.clients.consumer.internals.AbstractCoordinator#lookupCoordinator
protected synchronized RequestFuture<Void> lookupCoordinator() {
    if (findCoordinatorFuture == null) {
        // find a node to ask about the coordinator
        Node node = this.client.leastLoadedNode();
        if (node == null) {
            log.debug("No broker available to send FindCoordinator request");
            return RequestFuture.noBrokersAvailable();
        } else {
            findCoordinatorFuture = sendFindCoordinatorRequest(node);
        }
    }
    return findCoordinatorFuture;
}

而其中涉及一個findCoordinatorFuture的成員變數,必須要滿足findCoordinatorFuture == null,才會真正發起網路請求,因此在方法執行完,需要將其置空,如下方法

// org.apache.kafka.clients.consumer.internals.AbstractCoordinator#clearFindCoordinatorFuture
private synchronized void clearFindCoordinatorFuture() {
    findCoordinatorFuture = null;
}

說白了,也就是每次調用,都需要lookupCoordinator()clearFindCoordinatorFuture()成對兒出現;當然心跳線程也是這樣做的

if (coordinatorUnknown()) {
    if (findCoordinatorFuture != null) {
        // clear the future so that after the backoff, if the hb still sees coordinator unknown in
        // the next iteration it will try to re-discover the coordinator in case the main thread cannot
        // 清理輔助變數findCoordinatorFuture
        clearFindCoordinatorFuture();

        // backoff properly
        AbstractCoordinator.this.wait(rebalanceConfig.retryBackoffMs);
    } else {
        // 尋找Coordinator
        lookupCoordinator();
    }
}

因此在Subscribe模式下,無論何種提交方式,都是沒有Bug的

8.2、Assign

因為自動提交也是非同步提交,因此我們只聚焦在同步提交與非同步提交。其實同步提交與非同步提交,它們構建入參、處理響應等均是調用的同一個方法,唯一不同的是發起調用處的邏輯。我們先看下同步提交的邏輯


// org.apache.kafka.clients.consumer.internals.AbstractCoordinator#ensureCoordinatorReady
protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
    if (!coordinatorUnknown())
        return true;

    do {
        if (fatalFindCoordinatorException != null) {
            final RuntimeException fatalException = fatalFindCoordinatorException;
            fatalFindCoordinatorException = null;
            throw fatalException;
        }
        final RequestFuture<Void> future = lookupCoordinator();

        // some other business
        // .......

        clearFindCoordinatorFuture();
        if (fatalException != null)
            throw fatalException;
    } while (coordinatorUnknown() && timer.notExpired());

    return !coordinatorUnknown();
}

沒有問題,lookupCoordinator()clearFindCoordinatorFuture()又成對兒出現

而非同步提交呢?

// org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#commitOffsetsAsync
public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
    invokeCompletedOffsetCommitCallbacks();

    if (!coordinatorUnknown()) {
        doCommitOffsetsAsync(offsets, callback);
    } else {
        // we don't know the current coordinator, so try to find it and then send the commit
        // or fail (we don't want recursive retries which can cause offset commits to arrive
        // out of order). Note that there may be multiple offset commits chained to the same
        // coordinator lookup request. This is fine because the listeners will be invoked in
        // the same order that they were added. Note also that AbstractCoordinator prevents
        // multiple concurrent coordinator lookup requests.
        pendingAsyncCommits.incrementAndGet();
        lookupCoordinator().addListener(new RequestFutureListener<Void>() {
            @Override
            public void onSuccess(Void value) {
                // do something
            }

            @Override
            public void onFailure(RuntimeException e) {
                // do something
            }
        });
    }

    // ensure the commit has a chance to be transmitted (without blocking on its completion).
    // Note that commits are treated as heartbeats by the coordinator, so there is no need to
    // explicitly allow heartbeats through delayed task execution.
    client.pollNoWakeup();
}

非常遺憾,只有lookupCoordinator(),卻沒有clearFindCoordinatorFuture(),導致成員變數一直得不到重置,也就無法正常發起尋找Coordinator的請求,其實如果修複的話,也非常簡單,只需要在RequestFutureListener的回調結果中顯式調用clearFindCoordinatorFuture()即可

這個bug隱藏的很深,只靠單測,感覺還是很難發現的,bug已經在3.2.1版本修複。雖然我們生產環境是2.8.2的Broker,但是還是可以直接通過升級Consumer版本來解決,即便client版本高於了server端。這個當然得益於Kafka靈活的版本策略,還是要為其點個贊的

參考文檔


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 一、是什麼 WebSocket,是一種網路傳輸協議,位於OSI模型的應用層。可在單個TCP連接上進行全雙工通信,能更好的節省伺服器資源和帶寬並達到實時通迅 客戶端和伺服器只需要完成一次握手,兩者之間就可以創建持久性的連接,併進行雙向數據傳輸 從上圖可見,websocket伺服器與客戶端通過握手連接, ...
  • 1. Module Module是NestJS 的基本組織單位。 模塊系統基於 Node.js 的 CommonJS 模塊系統,但提供了更高級別的抽象和組織方式。通過使用模塊,你可以將應用程式拆分成多個獨立且可復用的部分,每個模塊都負責實現特定的功能或業務邏輯。 模塊可以封裝相關的代碼、配置和依賴關 ...
  • 使用defineModel時,為什麼子組件內沒有任何關於props的定義和emit事件觸發的代碼?修改defineModel返回值會修改父組件上綁定的變數,這是否破壞了vue的單向數據流呢? ...
  • React 學習之 createElement React 元素 在 React 中,元素是 React 應用的最小構建塊。 一個 React 元素是 React 對象的一個輕量級、靜態的表示。 它們被 React 用於知道屏幕上什麼應該被渲染,併在數據改變時保持 UI 的更新。 React 元素是 ...
  • 一、三次握手 三次握手(Three-way Handshake)其實就是指建立一個TCP連接時,需要客戶端和伺服器總共發送3個包 主要作用就是為了確認雙方的接收能力和發送能力是否正常、指定自己的初始化序列號為後面的可靠性傳送做準備 過程如下: 第一次握手:客戶端給服務端發一個 SYN 報文,並指明客 ...
  • 首先寫一個bind的簡單示例: 'use strict' function fn() { console.log('this::', this) console.log('arguments::', arguments) } // fn() // 這裡調用時this 在嚴格模式下是undefined ...
  • React 學習之 Hello World React 簡介 React是一個用於構建用戶界面的JavaScript庫,由Facebook開發並維護。React通過聲明式的方式來構建UI,使得代碼更易於理解和測試。React的核心概念包括組件(Component)和虛擬DOM(Virtual DOM ...
  • nvm nvm(Node Version Manager)是一個Node.js的版本管理器。 安裝nvm windows安裝nvm 1. 下載nvm 下載地址:nvm-windows,下載 nvm-noinstall 或者 nvm-setup.exe 如果使用 nvm-noinstall 可以運行 ...
一周排行
    -Advertisement-
    Play Games
  • GoF之工廠模式 @目錄GoF之工廠模式每博一文案1. 簡單說明“23種設計模式”1.2 介紹工廠模式的三種形態1.3 簡單工廠模式(靜態工廠模式)1.3.1 簡單工廠模式的優缺點:1.4 工廠方法模式1.4.1 工廠方法模式的優缺點:1.5 抽象工廠模式1.6 抽象工廠模式的優缺點:2. 總結:3 ...
  • 新改進提供的Taurus Rpc 功能,可以簡化微服務間的調用,同時可以不用再手動輸出模塊名稱,或調用路徑,包括負載均衡,這一切,由框架實現並提供了。新的Taurus Rpc 功能,將使得服務間的調用,更加輕鬆、簡約、高效。 ...
  • 本章將和大家分享ES的數據同步方案和ES集群相關知識。廢話不多說,下麵我們直接進入主題。 一、ES數據同步 1、數據同步問題 Elasticsearch中的酒店數據來自於mysql資料庫,因此mysql數據發生改變時,Elasticsearch也必須跟著改變,這個就是Elasticsearch與my ...
  • 引言 在我們之前的文章中介紹過使用Bogus生成模擬測試數據,今天來講解一下功能更加強大自動生成測試數據的工具的庫"AutoFixture"。 什麼是AutoFixture? AutoFixture 是一個針對 .NET 的開源庫,旨在最大程度地減少單元測試中的“安排(Arrange)”階段,以提高 ...
  • 經過前面幾個部分學習,相信學過的同學已經能夠掌握 .NET Emit 這種中間語言,並能使得它來編寫一些應用,以提高程式的性能。隨著 IL 指令篇的結束,本系列也已經接近尾聲,在這接近結束的最後,會提供幾個可供直接使用的示例,以供大伙分析或使用在項目中。 ...
  • 當從不同來源導入Excel數據時,可能存在重覆的記錄。為了確保數據的準確性,通常需要刪除這些重覆的行。手動查找並刪除可能會非常耗費時間,而通過編程腳本則可以實現在短時間內處理大量數據。本文將提供一個使用C# 快速查找並刪除Excel重覆項的免費解決方案。 以下是實現步驟: 1. 首先安裝免費.NET ...
  • C++ 異常處理 C++ 異常處理機制允許程式在運行時處理錯誤或意外情況。它提供了捕獲和處理錯誤的一種結構化方式,使程式更加健壯和可靠。 異常處理的基本概念: 異常: 程式在運行時發生的錯誤或意外情況。 拋出異常: 使用 throw 關鍵字將異常傳遞給調用堆棧。 捕獲異常: 使用 try-catch ...
  • 優秀且經驗豐富的Java開發人員的特征之一是對API的廣泛瞭解,包括JDK和第三方庫。 我花了很多時間來學習API,尤其是在閱讀了Effective Java 3rd Edition之後 ,Joshua Bloch建議在Java 3rd Edition中使用現有的API進行開發,而不是為常見的東西編 ...
  • 框架 · 使用laravel框架,原因:tp的框架路由和orm沒有laravel好用 · 使用強制路由,方便介面多時,分多版本,分文件夾等操作 介面 · 介面開發註意欄位類型,欄位是int,查詢成功失敗都要返回int(對接java等強類型語言方便) · 查詢介面用GET、其他用POST 代碼 · 所 ...
  • 正文 下午找企業的人去鎮上做貸後。 車上聽同事跟那個司機對罵,火星子都快出來了。司機跟那同事更熟一些,連我在內一共就三個人,同事那一手指桑罵槐給我都聽愣了。司機也是老社會人了,馬上聽出來了,為那個無辜的企業經辦人辯護,實際上是為自己辯護。 “這個事情你不能怪企業。”“但他們總不能讓銀行的人全權負責, ...