多租戶的概念是我在畢業後不久進第一家公司接觸到的,當時所在部門的業務是計劃建設一套基於自研的、基於開放 API 的、基於 PaaS 的、面向企業(ToB)的多租戶架構平臺,將我們的服務可以成規模地、穩定高效地交付給客戶使用。 ...
一、背景
Kafka的位點提交一直是Consumer端非常重要的一部分,業務上我們經常遇到的消息丟失、消息重覆也與其息息相關。位點提交說簡單也簡單,說複雜也確實複雜,沒有人能用一段簡短的話將其說清楚,最近團隊生產環境便遇到一個小概率的報錯
“Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. The coordinator is not available.”
此錯誤一齣,Consumer的流量直接跌0,且無法自愈,雖然客戶端重啟後可自動恢復,但影響及損失還是非常巨大的,當然最後定位就是「位點提交」一手炮製的,是開源的一個重大bug,受此影響的版本跨越2.6.x ~ 3.1.x :
https://issues.apache.org/jira/browse/KAFKA-13840
藉此bug正好來梳理一下Kafka有關位點提交的知識點
二、概述
關於位點提交(commit offset)大家最直觀的感受便是自動或手動提交,但是仔細一想,還是有很多細節問題,例如:
- 手動的同步提交與非同步提交有什麼區別?
- 使用自動提交模式時,提交動作是同步的還是非同步的?
- 消費模式使用assign或subscribe,在提交位點時有區別嗎?
- 同步提交與非同步提交能否混合使用?
- 手動提交與自動提交能否混合使用?
其實這些問題都是萬變不離其宗,我們把各個特征總結一下,這些問題自然也就迎刃而解
三、為什麼要提交位點?
在開始介紹各類位點提交的策略之前,我們先拋出一個靈魂拷問:“為什麼一定要提交位點?”。 Consumer會周期性的從Broker拉取消息,每次拉取消息的時候,順便提交位點不可以嗎?為什麼一定要讓用戶感知提交位點,還提供了各種各樣的策略?
其實回答這個問題,我們理解以下2個配置就夠了
fetch.max.bytes
andmax.partition.fetch.bytes
- 均作用於Broker端。首先需要明確的是,Consumer的一次拉取經常是針對多個partition的,因此max.partition.fetch.bytes控制的一個partition拉取消息的最大值,而fetch.max.bytes控制的則是本次請求整體的上限
max.poll.records
- 作用於Consumer端。而此參數控制的就是一次 poll 方法最多返回的消息條數,因此並不是每次調用 poll 方法都會發起一次網路請求的
因此也就導致了發起網路的頻次跟用戶處理業務數據的頻次是不一樣的
簡單總結一下,單次網路請求拉取的數據量可能是很大的,需要客戶端通過多次調用poll()
方法來消化,如果按照網路請求的頻次來提交位點的話,那這個提交頻次未免太粗了,Consumer一旦發生重啟,將會導致大量的消息重覆
其次按照網路請求的頻次來提交位點的話,程式將變得不夠靈活,業務端對於消息的處理會有自己的理解,將提交位點的發起動作放在Consumer,設計更有彈性
四、Consumer網路模型簡介
4.1、單線程的Consumer
在開始介紹Consumer端的網路模型之前,我們先看下Producer的
可見Producer是線程安全的,Producer內部維護了一個併發的緩存隊列,所有的數據都會先寫入隊列,然後由Sender線程負責將其發送至網路
而Consumer則不同,我們羅列一下Consumer的特點
- 單線程(業務處理與網路共用一個線程)
- 非線程安全
不過這裡說的單線程不夠嚴謹,在0.10.1版本以後:
- Subscribe模式下,Consumer將心跳邏輯放在了一個獨立線程中,如果消息處理邏輯不能在
max.poll.interval.ms
內完成,則consumer將停止發送心跳,然後發送LeaveGroup請求主動離組,從而引發coordinator開啟新一輪rebalance - Assign模式下,則只有一個Main線程
用戶處理業務邏輯的時間可能會很長,因此心跳線程的引入主要是為瞭解決心跳問題,其非常輕量,因此我們泛泛的講,Consumer其實就是單線程的,包括提交位點,那一個單線程的客戶端是如何保證高效的吞吐,又是如何與用戶處理數據的邏輯解耦呢?其實這是個很有意思,也很有深度的問題,但不是本文的重點,後續我們再展開詳聊
因此我們知道,所有提交位點的動作均是交由Consumer Main線程來提交的,但是單線程並不意味著阻塞,不要忘記,我們底層依賴的是JDK的NIO,因此網路發送、接受部分均是非同步執行的
4.2、網路模型
既然Consumer是單線程,而NIO是非同步的,那麼Consumer如何處理這些網路請求呢?Producer比較好理解,有一個專門負責交互的Sender線程,而單線程的Consumer如何處理呢
其實Consumer所有的網路發送動作,均放在main線程中,而在Consumer內部,為每個建聯的Broker都維護了一個unsent列表,這個列表中存放了待發送的請求,每次業務端程式執行consumer.poll()
方法時,會先後觸發2次網路發送的操作:
- 嘗試將所有Broker待發送區的數據發送出去
- 處理網路接收到的請求
- 嘗試將所有Broker待發送區的數據發送出去(again)
回到我們位點提交的case中,如果某個Broker積攢了大量的未發送請求,那提交位點的請求豈不是要等待很久才能發出去?是的,如果unsent列表中有很多請求確實會這樣,不過正常情況下,同一個Broker中不會積攢大量請求,如果一次從Broker中拉取的消息還沒有被消費完,是不會向該Broker再次發送請求的,因此業務poll()的頻率是要遠高於網路發送頻率的,而單次poll時,又會觸發2次trySend,因此可保證不存在unsent列表的數據過多而發不出的情況
BTW:Consumer的網路三件套:NetworkClient、Selector、KafkaChannel與Producer是完全一樣的。關於Consumer的核心組件,盜用一張網上的圖
有了上面的基礎,我們再來討論位點提交的方式,就會變得非常清晰明朗了
五、手動-非同步提交
執行非同步提交的代碼通常是這樣寫的
while (true) {
// 拉取消息
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
// 如果拉取的消息不為空
if (!records.isEmpty()) {
// 執行常規業務處理邏輯
doBusiness(records);
// 非同步提交位點
kafkaConsumer.commitAsync();
}
}
kafkaConsumer.commitAsync()
負責拼接提交位點的request,然後將請求放在對應Broker的unsent列表中,程式將返回,待到下一次業務執行poll(),或合適的時機,會將此請求發出去,並不阻塞main線程
而對於提交位點的結果,如果指定了回調函數,如下:
kafkaConsumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
}
});
可以對異常進行處理,也可以拿到實時提交的位點
而對於沒有指定回調函數的case,Consumer會提供一個預設的回調函數org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.DefaultOffsetCommitCallback
,在發生異常時,輸出error日誌
六、手動-同步提交
而對於同步提交
kafkaConsumer.commitSync();
首先需要明確的是,同步提交是會阻塞Consumer的Main線程的,手動提交會首先將提交請求放在對應Broker的unsent列表的尾部,繼而不斷地觸發調用,將網路待發送區的數據發送出去,同時不間斷接收網路請求,直到收到本次提交的響應;不過同步提交也有超時時間,預設為60s,如果超時,將會拋出TimeoutException異常
同步提交是低效的,會影響Consumer整體的消費吞吐,而有些相對嚴苛的業務場景,同步提交又是必不可少的,讀者根據自己的業務case來決定使用哪種策略
七、自動提交
與手動提交相對的,便是自動提交,首先明確一點,自動提交的模式,是非同步提交
自動提交並不是啟動一個全新的線程去提交位點,也不是嚴格按照固定時間間隔去提交。自動提交與手動提交一樣,也是由Consumer Main線程觸發的
由於位點提交、處理業務邏輯、網路收發、元數據更新等,都共用了Consumer的Main線程,因此並不能保證提交位點的時間間隔嚴格控制在auto.commit.interval.ms(預設5000,即5s)內,因此真實提交位點的時間間隔只會大於等於auto.commit.interval.ms
總結一下自動提交的特點:
- 非同步提交
- 提交操作由Consumer的Main線程發起
- 配置
auto.commit.interval.ms
只能保證提交的最小間隔,真實提交時間間隔通常大於此配置
至此,我們嘗試回答一下剛開始提出的問題
- 手動的同步提交與非同步提交有什麼區別?
- 同步提交會阻塞Consumer的Main線程,相對而言,非同步提交性能更高
- 使用自動提交模式時,提交動作是同步的還是非同步的?
- 非同步的
- 消費模式使用assign或subscribe,在提交位點時有區別嗎?
- subscribe模式會有心跳線程,心跳線程維護了與Coordinator的建聯
- 同步提交與非同步提交能否混合使用?
- 可以,通常在大部分場景使用非同步提交,而在需要明確拿到已提交位點的case下使用同步提交
- 手動提交與自動提交能否混合使用?
- 可以,不過語義上會有很多衝突,不建議混合使用
八、開源Bug
回到文章剛開始提到的異常報錯
“Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. The coordinator is not available.”
這個bug並不是在所有case下都會存在
- Subscribe
- 自動提交 -- 正常運行
- 手動-非同步提交 -- 正常運行
- 手動-同步提交 -- 正常運行
- Assign
- 自動提交 -- Bug
- 手動-非同步提交 -- Bug
- 手動-同步提交 -- 正常運行
為什麼會出現如何奇怪的情況呢?其實跟一下源碼便會有結論
8.1、Subscribe
在Subscribe模式下,Consumer與Coordinator的交互是通過線程org.apache.kafka.clients.consumer.internals.AbstractCoordinator.HeartbeatThread
進行的。當程式發現Coordinator找不到時,便會發起尋找Coordinator的網路請求,方法如下
// org.apache.kafka.clients.consumer.internals.AbstractCoordinator#lookupCoordinator
protected synchronized RequestFuture<Void> lookupCoordinator() {
if (findCoordinatorFuture == null) {
// find a node to ask about the coordinator
Node node = this.client.leastLoadedNode();
if (node == null) {
log.debug("No broker available to send FindCoordinator request");
return RequestFuture.noBrokersAvailable();
} else {
findCoordinatorFuture = sendFindCoordinatorRequest(node);
}
}
return findCoordinatorFuture;
}
而其中涉及一個findCoordinatorFuture
的成員變數,必須要滿足findCoordinatorFuture == null
,才會真正發起網路請求,因此在方法執行完,需要將其置空,如下方法
// org.apache.kafka.clients.consumer.internals.AbstractCoordinator#clearFindCoordinatorFuture
private synchronized void clearFindCoordinatorFuture() {
findCoordinatorFuture = null;
}
說白了,也就是每次調用,都需要lookupCoordinator()
與clearFindCoordinatorFuture()
成對兒出現;當然心跳線程也是這樣做的
if (coordinatorUnknown()) {
if (findCoordinatorFuture != null) {
// clear the future so that after the backoff, if the hb still sees coordinator unknown in
// the next iteration it will try to re-discover the coordinator in case the main thread cannot
// 清理輔助變數findCoordinatorFuture
clearFindCoordinatorFuture();
// backoff properly
AbstractCoordinator.this.wait(rebalanceConfig.retryBackoffMs);
} else {
// 尋找Coordinator
lookupCoordinator();
}
}
因此在Subscribe模式下,無論何種提交方式,都是沒有Bug的
8.2、Assign
因為自動提交也是非同步提交,因此我們只聚焦在同步提交與非同步提交。其實同步提交與非同步提交,它們構建入參、處理響應等均是調用的同一個方法,唯一不同的是發起調用處的邏輯。我們先看下同步提交的邏輯
// org.apache.kafka.clients.consumer.internals.AbstractCoordinator#ensureCoordinatorReady
protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
if (!coordinatorUnknown())
return true;
do {
if (fatalFindCoordinatorException != null) {
final RuntimeException fatalException = fatalFindCoordinatorException;
fatalFindCoordinatorException = null;
throw fatalException;
}
final RequestFuture<Void> future = lookupCoordinator();
// some other business
// .......
clearFindCoordinatorFuture();
if (fatalException != null)
throw fatalException;
} while (coordinatorUnknown() && timer.notExpired());
return !coordinatorUnknown();
}
沒有問題,lookupCoordinator()
與clearFindCoordinatorFuture()
又成對兒出現
而非同步提交呢?
// org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#commitOffsetsAsync
public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
invokeCompletedOffsetCommitCallbacks();
if (!coordinatorUnknown()) {
doCommitOffsetsAsync(offsets, callback);
} else {
// we don't know the current coordinator, so try to find it and then send the commit
// or fail (we don't want recursive retries which can cause offset commits to arrive
// out of order). Note that there may be multiple offset commits chained to the same
// coordinator lookup request. This is fine because the listeners will be invoked in
// the same order that they were added. Note also that AbstractCoordinator prevents
// multiple concurrent coordinator lookup requests.
pendingAsyncCommits.incrementAndGet();
lookupCoordinator().addListener(new RequestFutureListener<Void>() {
@Override
public void onSuccess(Void value) {
// do something
}
@Override
public void onFailure(RuntimeException e) {
// do something
}
});
}
// ensure the commit has a chance to be transmitted (without blocking on its completion).
// Note that commits are treated as heartbeats by the coordinator, so there is no need to
// explicitly allow heartbeats through delayed task execution.
client.pollNoWakeup();
}
非常遺憾,只有lookupCoordinator()
,卻沒有clearFindCoordinatorFuture()
,導致成員變數一直得不到重置,也就無法正常發起尋找Coordinator的請求,其實如果修複的話,也非常簡單,只需要在RequestFutureListener的回調結果中顯式調用clearFindCoordinatorFuture()
即可
這個bug隱藏的很深,只靠單測,感覺還是很難發現的,bug已經在3.2.1版本修複。雖然我們生產環境是2.8.2的Broker,但是還是可以直接通過升級Consumer版本來解決,即便client版本高於了server端。這個當然得益於Kafka靈活的版本策略,還是要為其點個贊的
參考文檔