Kafka系列3:深入理解Kafka消費者

来源:https://www.cnblogs.com/mcbye/archive/2020/02/17/kafka-consumer-in-detail.html
-Advertisement-
Play Games

消費者和消費者組 如何創建消費者 如何消費消息 消費者配置 提交和偏移量 再均衡 結束消費 ...


上面兩篇聊了Kafka概況和Kafka生產者,包含了Kafka的基本概念、設計原理、設計核心以及生產者的核心原理。本篇單獨聊聊Kafka的消費者,包括如下內容:

  • 消費者和消費者組
  • 如何創建消費者
  • 如何消費消息
  • 消費者配置
  • 提交和偏移量
  • 再均衡
  • 結束消費

消費者和消費者組

概念

Kafka消費者對象訂閱主題並接收Kafka的消息,然後驗證消息並保存結果。
Kafka消費者消費者組的一部分。一個消費者組裡的消費者訂閱的是同一個主題,每個消費者接收主題一部分分區的消息。
消費者組的設計是對消費者進行的一個橫向伸縮,用於解決消費者消費數據的速度跟不上生產者生產數據的速度的問題,通過增加消費者,讓它們分擔負載,分別處理部分分區的消息。

消費者數目與分區數目

在一個消費者組中的消費者消費的是一個主題的部分分區的消息,而一個主題中包含若幹個分區,一個消費者組中也包含著若幹個消費者。當二者的數量關係處於不同的大小關係時,Kafka消費者的工作狀態也是不同的。看以下三種情況:

  1. 消費者數目<分區數目:此時不同分區的消息會被均衡地分配到這些消費者;
  2. 消費者數目=分區數目:每個消費者會負責一個分區的消息進行消費;
  3. 消費者數目>分區數目:此時會有多餘的消費者處於空閑狀態,其他的消費者與分區一對一地進行消費。

    分區再均衡

    當消費者數目與分區數目在以上三種關係間變化時,比如有新的消費者加入、或者有一個消費者發生崩潰時,會發生分區再均衡
    分區再均衡是指分區的所有權從一個消費者轉移到另一個消費者。再均衡為消費者組帶來了高可用性和伸縮性。但是同時,也會發生如下問題:
  • 在再均衡發生的時候,消費者無法讀取消息,會造成整個消費者組有一小段時間的不可用;
  • 當分區被重新分配給另一個消費者時,消費者當前的讀取狀態會丟失,它有可能需要去刷新緩存,在它重新恢復狀態之前會拖慢應用。
    因此也要儘量避免不必要的再均衡。
    那麼消費者組是怎麼知道一個消費者可不可用呢?
    消費者通過向被指派為群組協調器的Broker發送心跳來維持它們和群組的從屬關係以及它們對分區的所有權關係。只要消費者以正常的時間間隔發送心跳,就被認為是活躍的,說明它還在讀取分區里的消息。消費者會在輪詢消息或提交偏移量時發送心跳。如果消費者停止發送心跳的時間足夠長,會話就會過期,群組協調器認為它已經死亡,就會觸發一次再均衡。
    還有一點需要註意的是,當發生再均衡時,需要做一些清理工作,具體的操作方法可以通過在調用subscribe()方法時傳入一個ConsumerRebalanceListener實例即可。

    如何創建消費者

    創建Kafka的消費者對象的過程與創建生產者的過程是類似的,需要傳入必要的屬性。在創建消費者的時候以下以下三個選項是必選的:
  • bootstrap.servers :指定 broker 的地址清單,清單里不需要包含所有的 broker 地址,生產者會從給定的 broker 里查找 broker 的信息。不過建議至少要提供兩個 broker 的信息作為容錯;
  • key.deserializer :指定鍵的反序列化器;
  • value.deserializer :指定值的反序列化器。
    後兩個序列化器的說明與生產者的是一樣的。
    一個簡單的創建消費者的代碼樣例如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    String topic = "Hello";
    String group = "group1";
    Properties props = new Properties();
    props.put("bootstrap.servers", "server:9091");
    /*指定分組 ID*/
    props.put("group.id", group);
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

如何消費消息

訂閱主題

創建了Kafka消費者之後,接著就可以訂閱主題了。訂閱主題可以使用如下兩個 API :

  • consumer.subscribe(Collection topics) :指明需要訂閱的主題的集合;
  • consumer.subscribe(Pattern pattern) :使用正則來匹配需要訂閱的集合。
    代碼樣例:
    1
    consumer.subscribe(Collections.singletonList(topic));

輪詢消費

消息輪詢是消費者API的核心,消費者通過輪詢 API(poll) 向伺服器定時請求數據。一旦消費者訂閱了主題,輪詢就會處理所有的細節,包括群組協調、分區再均衡、發送心跳和獲取數據,這使得開發者只需要關註從分區返回的數據,然後進行業務處理。
一個簡單的消費者消費的代碼樣例如下:

1
2
3
4
5
6
7
8
9
10
11
12
try {
while (true) {
// 輪詢獲取數據
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s,partition = %d, key = %s, value = %s, offset = %d,\n",
record.topic(), record.partition(), record.key(), record.value(), record.offset());
}
}
} finally {
consumer.close();
}

 

消費者配置

與生產者類似,消費者也有完整的配置列表。接下來一一介紹這些重要的屬性。

fetch.min.byte

消費者從伺服器獲取記錄的最小位元組數。如果可用的數據量小於設置值,broker 會等待有足夠的可用數據時才會把它返回給消費者。主要是為了降低消費者和Broker的工作負載。

fetch.max.wait.ms

broker 返回給消費者數據的等待時間,預設是 500ms。如果消費者獲取最小數據量的要求得不到滿足,就會在等待最多該屬性所設置的時間後獲取到數據。實際要看二者哪個條件先滿足。

max.partition.fetch.bytes

該屬性指定了伺服器從每個分區返回給消費者的最大位元組數,預設為 1MB。

session.timeout.ms

消費者在被認為死亡之前可以與伺服器斷開連接的時間,預設是 3s。

auto.offset.reset

該屬性指定了消費者在讀取一個沒有偏移量的分區或者偏移量無效的情況下該作何處理:

  • latest (預設值) :在偏移量無效的情況下,消費者將從最新的記錄開始讀取數據(在消費者啟動之後生成的最新記錄);
  • earliest :在偏移量無效的情況下,消費者將從起始位置讀取分區的記錄。

    enable.auto.commit

    是否自動提交偏移量,預設值是 true。為了避免出現重覆消費和數據丟失,可以把它設置為 false。

    client.id

    客戶端 id,伺服器用來識別消息的來源。

    max.poll.records

    單次調用 poll() 方法能夠返回的記錄數量。

    receive.buffer.bytes & send.buffer.byte

    這兩個參數分別指定 TCP socket 接收和發送數據包緩衝區的大小,-1 代表使用操作系統的預設值。

    提交和偏移量

    提交是指更新分區當前位置的操作,分區當前的位置,也就是所謂的偏移量

    什麼是偏移量

    Kafka 的每一條消息都有一個偏移量屬性,記錄了其在分區中的位置,偏移量是一個單調遞增的整數。消費者通過往一個叫作 _consumer_offset 的特殊主題發送消息,消息里包含每個分區的偏移量。 如果消費者一直處於運行狀態,那麼偏移量就沒有 什麼用處。不過,如果有消費者退出或者新分區加入,此時就會觸發再均衡。完成再均衡之後,每個消費者可能分配到新的分區,而不是之前處理的那個。為了能夠繼續之前的工作,消費者需要讀取每個分區最後一次提交的偏移量,然後從偏移量指定的地方繼續處理。 因為這個原因,所以如果不能正確提交偏移量,就可能會導致數據丟失或者重覆出現消費,比如下麵情況:
  • 如果提交的偏移量小於客戶端處理的最後一個消息的偏移量 ,那麼處於兩個偏移量之間的消息就會被重覆消費;
  • 如果提交的偏移量大於客戶端處理的最後一個消息的偏移量,那麼處於兩個偏移量之間的消息將會丟失。

    偏移量提交

    那麼消費者如何提交偏移量呢?
    Kafka 支持自動提交和手動提交偏移量兩種方式。
    自動提交:
    只需要將消費者的 enable.auto.commit 屬性配置為 true 即可完成自動提交的配置。 此時每隔固定的時間,消費者就會把 poll() 方法接收到的最大偏移量進行提交,提交間隔由 auto.commit.interval.ms 屬性進行配置,預設值是 5s。
    使用自動提交是存在隱患的,假設我們使用預設的 5s 提交時間間隔,在最近一次提交之後的 3s 發生了再均衡,再均衡之後,消費者從最後一次提交的偏移量位置開始讀取消息。這個時候偏移量已經落後了 3s ,所以在這 3s 內到達的消息會被重覆處理。可以通過修改提交時間間隔來更頻繁地提交偏移量,減小可能出現重覆消息的時間窗,不過這種情況是無法完全避免的。基於這個原因,Kafka 也提供了手動提交偏移量的 API,使得用戶可以更為靈活的提交偏移量。
    手動提交:
    用戶可以通過將 enable.auto.commit 設為 false,然後手動提交偏移量。基於用戶需求手動提交偏移量可以分為兩大類:
    手動提交當前偏移量:即手動提交當前輪詢的最大偏移量;
    手動提交固定偏移量:即按照業務需求,提交某一個固定的偏移量。
    而按照 Kafka API,手動提交偏移量又可以分為同步提交和非同步提交。
    同步提交
    通過調用 consumer.commitSync() 來進行同步提交,不傳遞任何參數時提交的是當前輪詢的最大偏移量。
    1
    2
    3
    4
    5
    6
    7
    8
    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
    for (ConsumerRecord<String, String> record : records) {
    System.out.println(record);
    }
    // 同步提交
    consumer.commitSync();
    }

如果某個提交失敗,同步提交還會進行重試,這可以保證數據能夠最大限度提交成功,但是同時也會降低程式的吞吐量。
非同步提交
為瞭解決同步提交降低程式吞吐量的問題,又有了非同步提交的方案。
非同步提交可以提高程式的吞吐量,因為此時你可以儘管請求數據,而不用等待 Broker 的響應。代碼樣例如下:

1
2
3
4
5
6
7
8
9
10
11
12
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
// 非同步提交並定義回調
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
offsets.forEach((x, y) -> System.out.printf("topic = %s,partition = %d, offset = %s \n", x.topic(), x.partition(), y.offset()));
}
}
});
}

 

非同步提交如果失敗,錯誤信息和偏移量都會被記錄下來。
儘管如此,非同步提交存在的問題是,如果提交失敗不能重試,因為重試可能會出現小偏移量覆蓋大偏移量的問題。
雖然程式不能在失敗時候進行自動重試,但是我們是可以手動進行重試。可以通過一個 Map<TopicPartition, Integer> offsets 來維護你提交的每個分區的偏移量,也就是非同步提交的順序,在每次提交偏移量之後或在回調里提交偏移量時遞增序列號。然後當失敗時候,你可以判斷失敗的偏移量是否小於你維護的同主題同分區的最後提交的偏移量,如果小於則代表你已經提交了更大的偏移量請求,此時不需要重試,否則就可以進行手動重試。
同步和非同步組合提交
當發生關閉消費者或者再均衡時,一定要確保能夠提交成功,為了保證性能和可靠性,又有了同步和非同步組合提交的方式。也就是在消費者關閉前組合使用commitAsync()方法和commitSync()方法。代碼樣例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
}
// 非同步提交
consumer.commitAsync();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
// 因為即將要關閉消費者,所以要用同步提交保證提交成功
consumer.commitSync();
} finally {
consumer.close();
}
}

 

提交特定的偏移量

上面的提交方式都是提交當前最大的偏移量,但如果需要提交的是特定的一個偏移量呢?
只需要在重載的提交方法中傳入偏移量參數即可。代碼樣例如下:

1
2
3
4
// 同步提交特定偏移量
commitSync(Map<TopicPartition, OffsetAndMetadata> offsets)
// 非同步提交特定偏移量
commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)

 

結束消費

上面的消費過程都是以無限迴圈的方式來演示的,那麼如何來優雅地停止消費者的輪詢呢。
Kafka 提供了 consumer.wakeup() 方法用於退出輪詢。
如果確定要退出迴圈,需要通過另一個線程調用consumer.wakeup()方法;如果迴圈運行在主線程里,可以在ShutdownHook里調用該方法。
它通過拋出 WakeupException 異常來跳出迴圈。需要註意的是,在退出線程時最好顯示的調用 consumer.close() , 此時消費者會提交任何還沒有提交的東西,並向群組協調器發送消息,告知自己要離開群組,接下來就會觸發再均衡 ,而不需要等待會話超時。
下麵的示例代碼為監聽控制台輸出,當輸入 exit 時結束輪詢,關閉消費者並退出程式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 調用wakeup優雅的退出輪詢
final Thread mainThread = Thread.currentThread();
new Thread(() -> {
Scanner sc = new Scanner(System.in);
while (sc.hasNext()) {
if ("exit".equals(sc.next())) {
consumer.wakeup();
try {
// 等待主線程完成提交偏移量、關閉消費者等操作
mainThread.join();
break;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();

try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<String, String> rd : records) {
System.out.printf("topic = %s,partition = %d, key = %s, value = %s, offset = %d,\n", rd.topic(), rd.partition(), rd.key(), rd.value(), rd.offset());
}
}
} catch (WakeupException e) {
// 無需處理此異常
} finally {
consumer.close();
}

 

關註我的公眾號,獲取更多關於面試、技術的文章及福利資源。

Dali王的技術博客公眾號

【參考資料】
《Kafka 權威指南》


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 我用Go和gRPC創建了一個微服務項目,並試圖找出最好的程式結構,它可以作為我其他項目的模板。我還將程式設計和編程的最佳實踐應用於Go Microservice程式,例如清晰架構(Clean Architecture),依賴註入(Dependency Injection),日誌記錄,錯誤處理等。我有 ...
  • 近期因為疫情原因,一直是在家辦公了,也導致了和同事對接介面上出現了很多小問題,這也從側面反映出我個人對項目的設計不全面。 上面是對接介面時產生的一個問題:遠程伺服器返回錯誤:(414)Request-URI Too Large 這個問題主要是對方往項目介面中傳遞參數的時候,參數的長度特別長,而且程式 ...
  • collection裡面有什麼子類?(list和set是實現了collection介面的。) List: 1.可以允許重覆的對象(可重覆,有序集合)。2.可以插入多個null元素。3.常用的實現類有 ArrayList、LinkedList 和 Vector。ArrayList 最為流行,它提供了使 ...
  • 準備 Maven概述 什麼是Maven? Maven 的正確發音是[ˈmevən], 在美國是 一個口語化的詞語,代表專家、內行的意思。 Maven 是一個項目管理工具,它包含了一個項目對象模型 (POM:Project Object Model),一組標準集合,一個項目生命周 期(Project ...
  • 準備 1、官⽹地址:http://tomcat.apache.org下載。 2、解壓文件,並放到指定路徑,給該文件授權。 3、啟動和停止 Tomcat部署Java項目的3種方式: 部署方式1: 生成war包= webapps = 執行tomcat idea打包成war https://blog.cs ...
  • 一、JS加密 1.有的反爬蟲策略採用js對需要傳輸的數據進行加密處理。 2.經過加密,傳輸的就是密文 3.加密函數或者過程一定是在瀏覽器完成,也就是一定會把代碼(js代碼)暴露給使用者 4.通多閱讀加密演算法,就可以模擬出加密過程,從而達到破解。 5.舉一個案例 """ 破解有道詞典 """ from ...
  • 下麵的代碼主要用於使用python語言調用NASA官方的MODIS處理工具HEG進行投影坐標轉換與重採樣批量處理 主要參考 1. HEG的用戶手冊:https://newsroom.gsfc.nasa.gov/sdptoolkit/HEG/HEG215/EED2 TP 030_Rev01_HEG_U ...
  • 內部類的訪問規則 + 內部類可以直接訪問外部類中的成員,包括私有成員。 因為內部類中持有了一個外部類的引用,格式為:外部類名.this + 外部類要訪問內部類,必須要建立內部對象。 運行結果: 訪問 1. 當內部類定義在外部類的成員位置上,而且非私有,可以在外部其他類中直接建立內部類對象。 格式: ...
一周排行
    -Advertisement-
    Play Games
  • 概述:在C#中,++i和i++都是自增運算符,其中++i先增加值再返回,而i++先返回值再增加。應用場景根據需求選擇,首碼適合先增後用,尾碼適合先用後增。詳細示例提供清晰的代碼演示這兩者的操作時機和實際應用。 在C#中,++i 和 i++ 都是自增運算符,但它們在操作上有細微的差異,主要體現在操作的 ...
  • 上次發佈了:Taurus.MVC 性能壓力測試(ap 壓測 和 linux 下wrk 壓測):.NET Core 版本,今天計劃準備壓測一下 .NET 版本,來測試並記錄一下 Taurus.MVC 框架在 .NET 版本的性能,以便後續持續優化改進。 為了方便對比,本文章的電腦環境和測試思路,儘量和... ...
  • .NET WebAPI作為一種構建RESTful服務的強大工具,為開發者提供了便捷的方式來定義、處理HTTP請求並返迴響應。在設計API介面時,正確地接收和解析客戶端發送的數據至關重要。.NET WebAPI提供了一系列特性,如[FromRoute]、[FromQuery]和[FromBody],用 ...
  • 原因:我之所以想做這個項目,是因為在之前查找關於C#/WPF相關資料時,我發現講解圖像濾鏡的資源非常稀缺。此外,我註意到許多現有的開源庫主要基於CPU進行圖像渲染。這種方式在處理大量圖像時,會導致CPU的渲染負擔過重。因此,我將在下文中介紹如何通過GPU渲染來有效實現圖像的各種濾鏡效果。 生成的效果 ...
  • 引言 上一章我們介紹了在xUnit單元測試中用xUnit.DependencyInject來使用依賴註入,上一章我們的Sample.Repository倉儲層有一個批量註入的介面沒有做單元測試,今天用這個示例來演示一下如何用Bogus創建模擬數據 ,和 EFCore 的種子數據生成 Bogus 的優 ...
  • 一、前言 在自己的項目中,涉及到實時心率曲線的繪製,項目上的曲線繪製,一般很難找到能直接用的第三方庫,而且有些還是定製化的功能,所以還是自己繪製比較方便。很多人一聽到自己畫就害怕,感覺很難,今天就分享一個完整的實時心率數據繪製心率曲線圖的例子;之前的博客也分享給DrawingVisual繪製曲線的方 ...
  • 如果你在自定義的 Main 方法中直接使用 App 類並啟動應用程式,但發現 App.xaml 中定義的資源沒有被正確載入,那麼問題可能在於如何正確配置 App.xaml 與你的 App 類的交互。 確保 App.xaml 文件中的 x:Class 屬性正確指向你的 App 類。這樣,當你創建 Ap ...
  • 一:背景 1. 講故事 上個月有個朋友在微信上找到我,說他們的軟體在客戶那邊隔幾天就要崩潰一次,一直都沒有找到原因,讓我幫忙看下怎麼回事,確實工控類的軟體環境複雜難搞,朋友手上有一個崩潰的dump,剛好丟給我來分析一下。 二:WinDbg分析 1. 程式為什麼會崩潰 windbg 有一個厲害之處在於 ...
  • 前言 .NET生態中有許多依賴註入容器。在大多數情況下,微軟提供的內置容器在易用性和性能方面都非常優秀。外加ASP.NET Core預設使用內置容器,使用很方便。 但是筆者在使用中一直有一個頭疼的問題:服務工廠無法提供請求的服務類型相關的信息。這在一般情況下並沒有影響,但是內置容器支持註冊開放泛型服 ...
  • 一、前言 在項目開發過程中,DataGrid是經常使用到的一個數據展示控制項,而通常表格的最後一列是作為操作列存在,比如會有編輯、刪除等功能按鈕。但WPF的原始DataGrid中,預設只支持固定左側列,這跟大家習慣性操作列放最後不符,今天就來介紹一種簡單的方式實現固定右側列。(這裡的實現方式參考的大佬 ...