Kafka系列3:深入理解Kafka消費者

来源:https://www.cnblogs.com/mcbye/archive/2020/02/17/kafka-consumer-in-detail.html
-Advertisement-
Play Games

消費者和消費者組 如何創建消費者 如何消費消息 消費者配置 提交和偏移量 再均衡 結束消費 ...


上面兩篇聊了Kafka概況和Kafka生產者,包含了Kafka的基本概念、設計原理、設計核心以及生產者的核心原理。本篇單獨聊聊Kafka的消費者,包括如下內容:

  • 消費者和消費者組
  • 如何創建消費者
  • 如何消費消息
  • 消費者配置
  • 提交和偏移量
  • 再均衡
  • 結束消費

消費者和消費者組

概念

Kafka消費者對象訂閱主題並接收Kafka的消息,然後驗證消息並保存結果。
Kafka消費者消費者組的一部分。一個消費者組裡的消費者訂閱的是同一個主題,每個消費者接收主題一部分分區的消息。
消費者組的設計是對消費者進行的一個橫向伸縮,用於解決消費者消費數據的速度跟不上生產者生產數據的速度的問題,通過增加消費者,讓它們分擔負載,分別處理部分分區的消息。

消費者數目與分區數目

在一個消費者組中的消費者消費的是一個主題的部分分區的消息,而一個主題中包含若幹個分區,一個消費者組中也包含著若幹個消費者。當二者的數量關係處於不同的大小關係時,Kafka消費者的工作狀態也是不同的。看以下三種情況:

  1. 消費者數目<分區數目:此時不同分區的消息會被均衡地分配到這些消費者;
  2. 消費者數目=分區數目:每個消費者會負責一個分區的消息進行消費;
  3. 消費者數目>分區數目:此時會有多餘的消費者處於空閑狀態,其他的消費者與分區一對一地進行消費。

    分區再均衡

    當消費者數目與分區數目在以上三種關係間變化時,比如有新的消費者加入、或者有一個消費者發生崩潰時,會發生分區再均衡
    分區再均衡是指分區的所有權從一個消費者轉移到另一個消費者。再均衡為消費者組帶來了高可用性和伸縮性。但是同時,也會發生如下問題:
  • 在再均衡發生的時候,消費者無法讀取消息,會造成整個消費者組有一小段時間的不可用;
  • 當分區被重新分配給另一個消費者時,消費者當前的讀取狀態會丟失,它有可能需要去刷新緩存,在它重新恢復狀態之前會拖慢應用。
    因此也要儘量避免不必要的再均衡。
    那麼消費者組是怎麼知道一個消費者可不可用呢?
    消費者通過向被指派為群組協調器的Broker發送心跳來維持它們和群組的從屬關係以及它們對分區的所有權關係。只要消費者以正常的時間間隔發送心跳,就被認為是活躍的,說明它還在讀取分區里的消息。消費者會在輪詢消息或提交偏移量時發送心跳。如果消費者停止發送心跳的時間足夠長,會話就會過期,群組協調器認為它已經死亡,就會觸發一次再均衡。
    還有一點需要註意的是,當發生再均衡時,需要做一些清理工作,具體的操作方法可以通過在調用subscribe()方法時傳入一個ConsumerRebalanceListener實例即可。

    如何創建消費者

    創建Kafka的消費者對象的過程與創建生產者的過程是類似的,需要傳入必要的屬性。在創建消費者的時候以下以下三個選項是必選的:
  • bootstrap.servers :指定 broker 的地址清單,清單里不需要包含所有的 broker 地址,生產者會從給定的 broker 里查找 broker 的信息。不過建議至少要提供兩個 broker 的信息作為容錯;
  • key.deserializer :指定鍵的反序列化器;
  • value.deserializer :指定值的反序列化器。
    後兩個序列化器的說明與生產者的是一樣的。
    一個簡單的創建消費者的代碼樣例如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    String topic = "Hello";
    String group = "group1";
    Properties props = new Properties();
    props.put("bootstrap.servers", "server:9091");
    /*指定分組 ID*/
    props.put("group.id", group);
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

如何消費消息

訂閱主題

創建了Kafka消費者之後,接著就可以訂閱主題了。訂閱主題可以使用如下兩個 API :

  • consumer.subscribe(Collection topics) :指明需要訂閱的主題的集合;
  • consumer.subscribe(Pattern pattern) :使用正則來匹配需要訂閱的集合。
    代碼樣例:
    1
    consumer.subscribe(Collections.singletonList(topic));

輪詢消費

消息輪詢是消費者API的核心,消費者通過輪詢 API(poll) 向伺服器定時請求數據。一旦消費者訂閱了主題,輪詢就會處理所有的細節,包括群組協調、分區再均衡、發送心跳和獲取數據,這使得開發者只需要關註從分區返回的數據,然後進行業務處理。
一個簡單的消費者消費的代碼樣例如下:

1
2
3
4
5
6
7
8
9
10
11
12
try {
while (true) {
// 輪詢獲取數據
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s,partition = %d, key = %s, value = %s, offset = %d,\n",
record.topic(), record.partition(), record.key(), record.value(), record.offset());
}
}
} finally {
consumer.close();
}

 

消費者配置

與生產者類似,消費者也有完整的配置列表。接下來一一介紹這些重要的屬性。

fetch.min.byte

消費者從伺服器獲取記錄的最小位元組數。如果可用的數據量小於設置值,broker 會等待有足夠的可用數據時才會把它返回給消費者。主要是為了降低消費者和Broker的工作負載。

fetch.max.wait.ms

broker 返回給消費者數據的等待時間,預設是 500ms。如果消費者獲取最小數據量的要求得不到滿足,就會在等待最多該屬性所設置的時間後獲取到數據。實際要看二者哪個條件先滿足。

max.partition.fetch.bytes

該屬性指定了伺服器從每個分區返回給消費者的最大位元組數,預設為 1MB。

session.timeout.ms

消費者在被認為死亡之前可以與伺服器斷開連接的時間,預設是 3s。

auto.offset.reset

該屬性指定了消費者在讀取一個沒有偏移量的分區或者偏移量無效的情況下該作何處理:

  • latest (預設值) :在偏移量無效的情況下,消費者將從最新的記錄開始讀取數據(在消費者啟動之後生成的最新記錄);
  • earliest :在偏移量無效的情況下,消費者將從起始位置讀取分區的記錄。

    enable.auto.commit

    是否自動提交偏移量,預設值是 true。為了避免出現重覆消費和數據丟失,可以把它設置為 false。

    client.id

    客戶端 id,伺服器用來識別消息的來源。

    max.poll.records

    單次調用 poll() 方法能夠返回的記錄數量。

    receive.buffer.bytes & send.buffer.byte

    這兩個參數分別指定 TCP socket 接收和發送數據包緩衝區的大小,-1 代表使用操作系統的預設值。

    提交和偏移量

    提交是指更新分區當前位置的操作,分區當前的位置,也就是所謂的偏移量

    什麼是偏移量

    Kafka 的每一條消息都有一個偏移量屬性,記錄了其在分區中的位置,偏移量是一個單調遞增的整數。消費者通過往一個叫作 _consumer_offset 的特殊主題發送消息,消息里包含每個分區的偏移量。 如果消費者一直處於運行狀態,那麼偏移量就沒有 什麼用處。不過,如果有消費者退出或者新分區加入,此時就會觸發再均衡。完成再均衡之後,每個消費者可能分配到新的分區,而不是之前處理的那個。為了能夠繼續之前的工作,消費者需要讀取每個分區最後一次提交的偏移量,然後從偏移量指定的地方繼續處理。 因為這個原因,所以如果不能正確提交偏移量,就可能會導致數據丟失或者重覆出現消費,比如下麵情況:
  • 如果提交的偏移量小於客戶端處理的最後一個消息的偏移量 ,那麼處於兩個偏移量之間的消息就會被重覆消費;
  • 如果提交的偏移量大於客戶端處理的最後一個消息的偏移量,那麼處於兩個偏移量之間的消息將會丟失。

    偏移量提交

    那麼消費者如何提交偏移量呢?
    Kafka 支持自動提交和手動提交偏移量兩種方式。
    自動提交:
    只需要將消費者的 enable.auto.commit 屬性配置為 true 即可完成自動提交的配置。 此時每隔固定的時間,消費者就會把 poll() 方法接收到的最大偏移量進行提交,提交間隔由 auto.commit.interval.ms 屬性進行配置,預設值是 5s。
    使用自動提交是存在隱患的,假設我們使用預設的 5s 提交時間間隔,在最近一次提交之後的 3s 發生了再均衡,再均衡之後,消費者從最後一次提交的偏移量位置開始讀取消息。這個時候偏移量已經落後了 3s ,所以在這 3s 內到達的消息會被重覆處理。可以通過修改提交時間間隔來更頻繁地提交偏移量,減小可能出現重覆消息的時間窗,不過這種情況是無法完全避免的。基於這個原因,Kafka 也提供了手動提交偏移量的 API,使得用戶可以更為靈活的提交偏移量。
    手動提交:
    用戶可以通過將 enable.auto.commit 設為 false,然後手動提交偏移量。基於用戶需求手動提交偏移量可以分為兩大類:
    手動提交當前偏移量:即手動提交當前輪詢的最大偏移量;
    手動提交固定偏移量:即按照業務需求,提交某一個固定的偏移量。
    而按照 Kafka API,手動提交偏移量又可以分為同步提交和非同步提交。
    同步提交
    通過調用 consumer.commitSync() 來進行同步提交,不傳遞任何參數時提交的是當前輪詢的最大偏移量。
    1
    2
    3
    4
    5
    6
    7
    8
    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
    for (ConsumerRecord<String, String> record : records) {
    System.out.println(record);
    }
    // 同步提交
    consumer.commitSync();
    }

如果某個提交失敗,同步提交還會進行重試,這可以保證數據能夠最大限度提交成功,但是同時也會降低程式的吞吐量。
非同步提交
為瞭解決同步提交降低程式吞吐量的問題,又有了非同步提交的方案。
非同步提交可以提高程式的吞吐量,因為此時你可以儘管請求數據,而不用等待 Broker 的響應。代碼樣例如下:

1
2
3
4
5
6
7
8
9
10
11
12
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
// 非同步提交並定義回調
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
offsets.forEach((x, y) -> System.out.printf("topic = %s,partition = %d, offset = %s \n", x.topic(), x.partition(), y.offset()));
}
}
});
}

 

非同步提交如果失敗,錯誤信息和偏移量都會被記錄下來。
儘管如此,非同步提交存在的問題是,如果提交失敗不能重試,因為重試可能會出現小偏移量覆蓋大偏移量的問題。
雖然程式不能在失敗時候進行自動重試,但是我們是可以手動進行重試。可以通過一個 Map<TopicPartition, Integer> offsets 來維護你提交的每個分區的偏移量,也就是非同步提交的順序,在每次提交偏移量之後或在回調里提交偏移量時遞增序列號。然後當失敗時候,你可以判斷失敗的偏移量是否小於你維護的同主題同分區的最後提交的偏移量,如果小於則代表你已經提交了更大的偏移量請求,此時不需要重試,否則就可以進行手動重試。
同步和非同步組合提交
當發生關閉消費者或者再均衡時,一定要確保能夠提交成功,為了保證性能和可靠性,又有了同步和非同步組合提交的方式。也就是在消費者關閉前組合使用commitAsync()方法和commitSync()方法。代碼樣例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
}
// 非同步提交
consumer.commitAsync();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
// 因為即將要關閉消費者,所以要用同步提交保證提交成功
consumer.commitSync();
} finally {
consumer.close();
}
}

 

提交特定的偏移量

上面的提交方式都是提交當前最大的偏移量,但如果需要提交的是特定的一個偏移量呢?
只需要在重載的提交方法中傳入偏移量參數即可。代碼樣例如下:

1
2
3
4
// 同步提交特定偏移量
commitSync(Map<TopicPartition, OffsetAndMetadata> offsets)
// 非同步提交特定偏移量
commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)

 

結束消費

上面的消費過程都是以無限迴圈的方式來演示的,那麼如何來優雅地停止消費者的輪詢呢。
Kafka 提供了 consumer.wakeup() 方法用於退出輪詢。
如果確定要退出迴圈,需要通過另一個線程調用consumer.wakeup()方法;如果迴圈運行在主線程里,可以在ShutdownHook里調用該方法。
它通過拋出 WakeupException 異常來跳出迴圈。需要註意的是,在退出線程時最好顯示的調用 consumer.close() , 此時消費者會提交任何還沒有提交的東西,並向群組協調器發送消息,告知自己要離開群組,接下來就會觸發再均衡 ,而不需要等待會話超時。
下麵的示例代碼為監聽控制台輸出,當輸入 exit 時結束輪詢,關閉消費者並退出程式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 調用wakeup優雅的退出輪詢
final Thread mainThread = Thread.currentThread();
new Thread(() -> {
Scanner sc = new Scanner(System.in);
while (sc.hasNext()) {
if ("exit".equals(sc.next())) {
consumer.wakeup();
try {
// 等待主線程完成提交偏移量、關閉消費者等操作
mainThread.join();
break;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();

try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<String, String> rd : records) {
System.out.printf("topic = %s,partition = %d, key = %s, value = %s, offset = %d,\n", rd.topic(), rd.partition(), rd.key(), rd.value(), rd.offset());
}
}
} catch (WakeupException e) {
// 無需處理此異常
} finally {
consumer.close();
}

 

關註我的公眾號,獲取更多關於面試、技術的文章及福利資源。

Dali王的技術博客公眾號

【參考資料】
《Kafka 權威指南》


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 我用Go和gRPC創建了一個微服務項目,並試圖找出最好的程式結構,它可以作為我其他項目的模板。我還將程式設計和編程的最佳實踐應用於Go Microservice程式,例如清晰架構(Clean Architecture),依賴註入(Dependency Injection),日誌記錄,錯誤處理等。我有 ...
  • 近期因為疫情原因,一直是在家辦公了,也導致了和同事對接介面上出現了很多小問題,這也從側面反映出我個人對項目的設計不全面。 上面是對接介面時產生的一個問題:遠程伺服器返回錯誤:(414)Request-URI Too Large 這個問題主要是對方往項目介面中傳遞參數的時候,參數的長度特別長,而且程式 ...
  • collection裡面有什麼子類?(list和set是實現了collection介面的。) List: 1.可以允許重覆的對象(可重覆,有序集合)。2.可以插入多個null元素。3.常用的實現類有 ArrayList、LinkedList 和 Vector。ArrayList 最為流行,它提供了使 ...
  • 準備 Maven概述 什麼是Maven? Maven 的正確發音是[ˈmevən], 在美國是 一個口語化的詞語,代表專家、內行的意思。 Maven 是一個項目管理工具,它包含了一個項目對象模型 (POM:Project Object Model),一組標準集合,一個項目生命周 期(Project ...
  • 準備 1、官⽹地址:http://tomcat.apache.org下載。 2、解壓文件,並放到指定路徑,給該文件授權。 3、啟動和停止 Tomcat部署Java項目的3種方式: 部署方式1: 生成war包= webapps = 執行tomcat idea打包成war https://blog.cs ...
  • 一、JS加密 1.有的反爬蟲策略採用js對需要傳輸的數據進行加密處理。 2.經過加密,傳輸的就是密文 3.加密函數或者過程一定是在瀏覽器完成,也就是一定會把代碼(js代碼)暴露給使用者 4.通多閱讀加密演算法,就可以模擬出加密過程,從而達到破解。 5.舉一個案例 """ 破解有道詞典 """ from ...
  • 下麵的代碼主要用於使用python語言調用NASA官方的MODIS處理工具HEG進行投影坐標轉換與重採樣批量處理 主要參考 1. HEG的用戶手冊:https://newsroom.gsfc.nasa.gov/sdptoolkit/HEG/HEG215/EED2 TP 030_Rev01_HEG_U ...
  • 內部類的訪問規則 + 內部類可以直接訪問外部類中的成員,包括私有成員。 因為內部類中持有了一個外部類的引用,格式為:外部類名.this + 外部類要訪問內部類,必須要建立內部對象。 運行結果: 訪問 1. 當內部類定義在外部類的成員位置上,而且非私有,可以在外部其他類中直接建立內部類對象。 格式: ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...