消費者和消費者組 如何創建消費者 如何消費消息 消費者配置 提交和偏移量 再均衡 結束消費 ...
上面兩篇聊了Kafka概況和Kafka生產者,包含了Kafka的基本概念、設計原理、設計核心以及生產者的核心原理。本篇單獨聊聊Kafka的消費者,包括如下內容:
- 消費者和消費者組
- 如何創建消費者
- 如何消費消息
- 消費者配置
- 提交和偏移量
- 再均衡
- 結束消費
消費者和消費者組
概念
Kafka消費者對象訂閱主題並接收Kafka的消息,然後驗證消息並保存結果。
Kafka消費者是消費者組的一部分。一個消費者組裡的消費者訂閱的是同一個主題,每個消費者接收主題一部分分區的消息。
消費者組的設計是對消費者進行的一個橫向伸縮,用於解決消費者消費數據的速度跟不上生產者生產數據的速度的問題,通過增加消費者,讓它們分擔負載,分別處理部分分區的消息。
消費者數目與分區數目
在一個消費者組中的消費者消費的是一個主題的部分分區的消息,而一個主題中包含若幹個分區,一個消費者組中也包含著若幹個消費者。當二者的數量關係處於不同的大小關係時,Kafka消費者的工作狀態也是不同的。看以下三種情況:
- 消費者數目<分區數目:此時不同分區的消息會被均衡地分配到這些消費者;
- 消費者數目=分區數目:每個消費者會負責一個分區的消息進行消費;
- 消費者數目>分區數目:此時會有多餘的消費者處於空閑狀態,其他的消費者與分區一對一地進行消費。
分區再均衡
當消費者數目與分區數目在以上三種關係間變化時,比如有新的消費者加入、或者有一個消費者發生崩潰時,會發生分區再均衡。
分區再均衡是指分區的所有權從一個消費者轉移到另一個消費者。再均衡為消費者組帶來了高可用性和伸縮性。但是同時,也會發生如下問題:
- 在再均衡發生的時候,消費者無法讀取消息,會造成整個消費者組有一小段時間的不可用;
- 當分區被重新分配給另一個消費者時,消費者當前的讀取狀態會丟失,它有可能需要去刷新緩存,在它重新恢復狀態之前會拖慢應用。
因此也要儘量避免不必要的再均衡。
那麼消費者組是怎麼知道一個消費者可不可用呢?
消費者通過向被指派為群組協調器的Broker發送心跳來維持它們和群組的從屬關係以及它們對分區的所有權關係。只要消費者以正常的時間間隔發送心跳,就被認為是活躍的,說明它還在讀取分區里的消息。消費者會在輪詢消息或提交偏移量時發送心跳。如果消費者停止發送心跳的時間足夠長,會話就會過期,群組協調器認為它已經死亡,就會觸發一次再均衡。
還有一點需要註意的是,當發生再均衡時,需要做一些清理工作,具體的操作方法可以通過在調用subscribe()方法時傳入一個ConsumerRebalanceListener實例即可。如何創建消費者
創建Kafka的消費者對象的過程與創建生產者的過程是類似的,需要傳入必要的屬性。在創建消費者的時候以下以下三個選項是必選的: - bootstrap.servers :指定 broker 的地址清單,清單里不需要包含所有的 broker 地址,生產者會從給定的 broker 里查找 broker 的信息。不過建議至少要提供兩個 broker 的信息作為容錯;
- key.deserializer :指定鍵的反序列化器;
- value.deserializer :指定值的反序列化器。
後兩個序列化器的說明與生產者的是一樣的。
一個簡單的創建消費者的代碼樣例如下:1
2
3
4
5
6
7
8
9String topic = "Hello";
String group = "group1";
Properties props = new Properties();
props.put("bootstrap.servers", "server:9091");
/*指定分組 ID*/
props.put("group.id", group);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
如何消費消息
訂閱主題
創建了Kafka消費者之後,接著就可以訂閱主題了。訂閱主題可以使用如下兩個 API :
- consumer.subscribe(Collection topics) :指明需要訂閱的主題的集合;
- consumer.subscribe(Pattern pattern) :使用正則來匹配需要訂閱的集合。
代碼樣例:1
consumer.subscribe(Collections.singletonList(topic));
輪詢消費
消息輪詢是消費者API的核心,消費者通過輪詢 API(poll) 向伺服器定時請求數據。一旦消費者訂閱了主題,輪詢就會處理所有的細節,包括群組協調、分區再均衡、發送心跳和獲取數據,這使得開發者只需要關註從分區返回的數據,然後進行業務處理。
一個簡單的消費者消費的代碼樣例如下:
1
|
try {
|
消費者配置
與生產者類似,消費者也有完整的配置列表。接下來一一介紹這些重要的屬性。
fetch.min.byte
消費者從伺服器獲取記錄的最小位元組數。如果可用的數據量小於設置值,broker 會等待有足夠的可用數據時才會把它返回給消費者。主要是為了降低消費者和Broker的工作負載。
fetch.max.wait.ms
broker 返回給消費者數據的等待時間,預設是 500ms。如果消費者獲取最小數據量的要求得不到滿足,就會在等待最多該屬性所設置的時間後獲取到數據。實際要看二者哪個條件先滿足。
max.partition.fetch.bytes
該屬性指定了伺服器從每個分區返回給消費者的最大位元組數,預設為 1MB。
session.timeout.ms
消費者在被認為死亡之前可以與伺服器斷開連接的時間,預設是 3s。
auto.offset.reset
該屬性指定了消費者在讀取一個沒有偏移量的分區或者偏移量無效的情況下該作何處理:
- latest (預設值) :在偏移量無效的情況下,消費者將從最新的記錄開始讀取數據(在消費者啟動之後生成的最新記錄);
- earliest :在偏移量無效的情況下,消費者將從起始位置讀取分區的記錄。
enable.auto.commit
是否自動提交偏移量,預設值是 true。為了避免出現重覆消費和數據丟失,可以把它設置為 false。client.id
客戶端 id,伺服器用來識別消息的來源。max.poll.records
單次調用 poll() 方法能夠返回的記錄數量。receive.buffer.bytes & send.buffer.byte
這兩個參數分別指定 TCP socket 接收和發送數據包緩衝區的大小,-1 代表使用操作系統的預設值。提交和偏移量
提交是指更新分區當前位置的操作,分區當前的位置,也就是所謂的偏移量。什麼是偏移量
Kafka 的每一條消息都有一個偏移量屬性,記錄了其在分區中的位置,偏移量是一個單調遞增的整數。消費者通過往一個叫作 _consumer_offset 的特殊主題發送消息,消息里包含每個分區的偏移量。 如果消費者一直處於運行狀態,那麼偏移量就沒有 什麼用處。不過,如果有消費者退出或者新分區加入,此時就會觸發再均衡。完成再均衡之後,每個消費者可能分配到新的分區,而不是之前處理的那個。為了能夠繼續之前的工作,消費者需要讀取每個分區最後一次提交的偏移量,然後從偏移量指定的地方繼續處理。 因為這個原因,所以如果不能正確提交偏移量,就可能會導致數據丟失或者重覆出現消費,比如下麵情況: - 如果提交的偏移量小於客戶端處理的最後一個消息的偏移量 ,那麼處於兩個偏移量之間的消息就會被重覆消費;
- 如果提交的偏移量大於客戶端處理的最後一個消息的偏移量,那麼處於兩個偏移量之間的消息將會丟失。
偏移量提交
那麼消費者如何提交偏移量呢?
Kafka 支持自動提交和手動提交偏移量兩種方式。自動提交:
只需要將消費者的 enable.auto.commit 屬性配置為 true 即可完成自動提交的配置。 此時每隔固定的時間,消費者就會把 poll() 方法接收到的最大偏移量進行提交,提交間隔由 auto.commit.interval.ms 屬性進行配置,預設值是 5s。
使用自動提交是存在隱患的,假設我們使用預設的 5s 提交時間間隔,在最近一次提交之後的 3s 發生了再均衡,再均衡之後,消費者從最後一次提交的偏移量位置開始讀取消息。這個時候偏移量已經落後了 3s ,所以在這 3s 內到達的消息會被重覆處理。可以通過修改提交時間間隔來更頻繁地提交偏移量,減小可能出現重覆消息的時間窗,不過這種情況是無法完全避免的。基於這個原因,Kafka 也提供了手動提交偏移量的 API,使得用戶可以更為靈活的提交偏移量。手動提交:
用戶可以通過將 enable.auto.commit 設為 false,然後手動提交偏移量。基於用戶需求手動提交偏移量可以分為兩大類:
手動提交當前偏移量:即手動提交當前輪詢的最大偏移量;
手動提交固定偏移量:即按照業務需求,提交某一個固定的偏移量。
而按照 Kafka API,手動提交偏移量又可以分為同步提交和非同步提交。
同步提交:
通過調用 consumer.commitSync() 來進行同步提交,不傳遞任何參數時提交的是當前輪詢的最大偏移量。1
2
3
4
5
6
7
8while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
}
// 同步提交
consumer.commitSync();
}
如果某個提交失敗,同步提交還會進行重試,這可以保證數據能夠最大限度提交成功,但是同時也會降低程式的吞吐量。
非同步提交
為瞭解決同步提交降低程式吞吐量的問題,又有了非同步提交的方案。
非同步提交可以提高程式的吞吐量,因為此時你可以儘管請求數據,而不用等待 Broker 的響應。代碼樣例如下:
1
|
while (true) {
|
非同步提交如果失敗,錯誤信息和偏移量都會被記錄下來。
儘管如此,非同步提交存在的問題是,如果提交失敗不能重試,因為重試可能會出現小偏移量覆蓋大偏移量的問題。
雖然程式不能在失敗時候進行自動重試,但是我們是可以手動進行重試。可以通過一個 Map<TopicPartition, Integer> offsets 來維護你提交的每個分區的偏移量,也就是非同步提交的順序,在每次提交偏移量之後或在回調里提交偏移量時遞增序列號。然後當失敗時候,你可以判斷失敗的偏移量是否小於你維護的同主題同分區的最後提交的偏移量,如果小於則代表你已經提交了更大的偏移量請求,此時不需要重試,否則就可以進行手動重試。
同步和非同步組合提交:
當發生關閉消費者或者再均衡時,一定要確保能夠提交成功,為了保證性能和可靠性,又有了同步和非同步組合提交的方式。也就是在消費者關閉前組合使用commitAsync()方法和commitSync()方法。代碼樣例如下:
1
|
try {
|
提交特定的偏移量
上面的提交方式都是提交當前最大的偏移量,但如果需要提交的是特定的一個偏移量呢?
只需要在重載的提交方法中傳入偏移量參數即可。代碼樣例如下:
1
|
// 同步提交特定偏移量
|
結束消費
上面的消費過程都是以無限迴圈的方式來演示的,那麼如何來優雅地停止消費者的輪詢呢。
Kafka 提供了 consumer.wakeup() 方法用於退出輪詢。
如果確定要退出迴圈,需要通過另一個線程調用consumer.wakeup()方法;如果迴圈運行在主線程里,可以在ShutdownHook里調用該方法。
它通過拋出 WakeupException 異常來跳出迴圈。需要註意的是,在退出線程時最好顯示的調用 consumer.close() , 此時消費者會提交任何還沒有提交的東西,並向群組協調器發送消息,告知自己要離開群組,接下來就會觸發再均衡 ,而不需要等待會話超時。
下麵的示例代碼為監聽控制台輸出,當輸入 exit 時結束輪詢,關閉消費者並退出程式:
1
|
// 調用wakeup優雅的退出輪詢
|
關註我的公眾號,獲取更多關於面試、技術的文章及福利資源。
【參考資料】
《Kafka 權威指南》