kafka consumer 配置詳解

来源:https://www.cnblogs.com/snake23/archive/2019/03/15/10538927.html
-Advertisement-
Play Games

1、Consumer Group 與 topic 訂閱 每個Consumer 進程都會劃歸到一個邏輯的Consumer Group中,邏輯的訂閱者是Consumer Group。所以一條message可以被多個訂閱message 所在的topic的每一個Consumer Group,也就好像是這條m ...


1、Consumer Group 與 topic 訂閱

每個Consumer 進程都會劃歸到一個邏輯的Consumer Group中,邏輯的訂閱者是Consumer Group。所以一條message可以被多個訂閱message 所在的topic的每一個Consumer Group,也就好像是這條message被廣播到每個Consumer Group一樣。而每個Consumer Group中,類似於一個Queue(JMS中的Queue)的概念差不多,即一條消息只會被Consumer Group中的一個Consumer消費。

 

1.1 Consumer 與 partition

    其實上面所說的訂閱關係還不夠明確,其實topic中的partition被分配到某個consumer上,也就是某個consumer訂閱了某個partition。 再重覆一下:consumer訂閱的是partition,而不是message。所以在同一時間點上,訂閱到同一個partition的consumer必然屬於不同的Consumer Group。

 

    在官方網站上,給出了這樣一張圖:

 

一個kafka cluster中的某個topic,有4個partition。有兩個consumer group (A and B)訂閱了該topic。 Consumer Group A有2個partition:p0、p1,Consumer Group B有4個partition:c3,c4,c5,c6。經過分區分配後,consumer與partition的訂閱關係如下:

複製代碼
Topic 中的4partitionConsumer Group A中的分配情況如下:
C1 訂閱p0,p3
C2 訂閱p1,p2
Topic 中的4partitionConsumer Group B中的分配情況如下:
C3 訂閱p0
C4 訂閱p3
C5 訂閱p1
C6 訂閱p2
複製代碼

 

 另外要知道的是,partition分配的工作其實是在consumer leader中完成的。

1.2 Consumer 與Consumer Group

Consumer Group與Consumer的關係是動態維護的:

當一個Consumer 進程掛掉 或者是卡住時,該consumer所訂閱的partition會被重新分配到該group內的其它的consumer上。當一個consumer加入到一個consumer group中時,同樣會從其它的consumer中分配出一個或者多個partition 到這個新加入的consumer。

    當啟動一個Consumer時,會指定它要加入的group,使用的是配置項:group.id。

為了維持Consumer 與 Consumer Group的關係,需要Consumer周期性的發送heartbeat到coordinator(協調者,在早期版本,以zookeeper作為協調者。後期版本則以某個broker作為協調者)。當Consumer由於某種原因不能發Heartbeat到coordinator時,並且時間超過session.timeout.ms時,就會認為該consumer已退出,它所訂閱的partition會分配到同一group 內的其它的consumer上。而這個過程,被稱為rebalance。

 

那麼現在有這樣一個問題:如果一個consumer 進程一直在周期性的發送heartbeat,但是它就是不消費消息,這種狀態稱為livelock狀態。那麼在這種狀態下,它所訂閱的partition不消息是否就一直不能被消費呢?

 

1.3 Coordinator

    Coordinator 協調者,協調consumer、broker。早期版本中Coordinator,使用zookeeper實現,但是這樣做,rebalance的負擔太重。為瞭解決scalable的問題,不再使用zookeeper,而是讓每個broker來負責一些group的管理,這樣consumer就完全不再依賴zookeeper了。

 

1.3.1 Consumer連接到coordinator

    從Consumer的實現來看,在執行poll或者是join group之前,都要保證已連接到Coordinator。連接到coordinator的過程是:

    1)連接到最後一次連接的broker(如果是剛啟動的consumer,則要根據配置中的borker)。它會響應一個包含coordinator信息(host, port等)的response。

    2)連接到coordinator。

 

1.4 Consumer Group Management

    Consumer Group 管理中,也是需要coordinator的參與。一個Consumer要join到一個group中,或者一個consumer退出時,都要進行rebalance。進行rebalance的流程是:

1)會給一個coordinator發起Join請求(請求中要包括自己的一些元數據,例如自己感興趣的topics)

2)Coordinator 根據這些consumer的join請求,選擇出一個leader,並通知給各個consumer。這裡的leader是consumer group 內的leader,是由某個consumer擔任,不要與partition的leader混淆。

3)Consumer leader 根據這些consumer的metadata,重新為每個consumer member重新分配partition。分配完畢通過coordinator把最新分配情況同步給每個consumer。

4)Consumer拿到最新的分配後,繼續工作。

 

 

2、Consumer Fetch Message

   

在Kafka partition中,每個消息有一個唯一標識,即partition內的offset。每個consumer group中的訂閱到某個partition的consumer在從partition中讀取數據時,是依次讀取的。

   

    上圖中,Consumer A、B分屬於不用的Consumer Group。Consumer B讀取到offset =11,Consumer A讀取到offset=9 。這個值表示Consumer Group中的某個Consumer 在下次讀取該partition時會從哪個offset的 message開始讀取,即 Consumer Group A 中的Consumer下次會從offset = 9 的message 讀取, Consumer Group B 中的Consumer下次會從offset = 11 的message 讀取。

    這裡並沒有說是Consumer A 下次會從offset = 9 的message讀取,原因是Consumer A可能會退出Group ,然後Group A 進行rebalance,即重新分配分區。

 

2.1 poll 方法

 

Consumer讀取partition中的數據是通過調用發起一個fetch請求來執行的。而從KafkaConsumer來看,它有一個poll方法。但是這個poll方法只是可能會發起fetch請求。原因是:Consumer每次發起fetch請求時,讀取到的數據是有限制的,通過配置項max.partition.fetch.bytes來限制的。而在執行poll方法時,會根據配置項個max.poll.records來限制一次最多pool多少個record。

那麼就可能出現這樣的情況: 在滿足max.partition.fetch.bytes限制的情況下,假如fetch到了100個record,放到本地緩存後,由於max.poll.records限制每次只能poll出15個record。那麼KafkaConsumer就需要執行7次才能將這一次通過網路發起的fetch請求所fetch到的這100個record消費完畢。其中前6次是每次pool中15個record,最後一次是poll出10個record。

 

    在consumer中,還有另外一個配置項:max.poll.interval.ms ,它表示最大的poll數據間隔,如果超過這個間隔沒有發起pool請求,但heartbeat仍舊在發,就認為該consumer處於 livelock狀態。就會將該consumer退出consumer group。所以為了不使Consumer 自己被退出,Consumer 應該不停的發起poll(timeout)操作。而這個動作 KafkaConsumer Client是不會幫我們做的,這就需要自己在程式中不停的調用poll方法了。

 

2.2 commit offset

    當一個consumer因某種原因退出Group時,進行重新分配partition後,同一group中的另一個consumer在讀取該partition時,怎麼能夠知道上一個consumer該從哪個offset的message讀取呢?也是是如何保證同一個group內的consumer不重覆消費消息呢?上面說了一次走網路的fetch請求會拉取到一定量的數據,但是這些數據還沒有被消息完畢,Consumer就掛掉了,下一次進行數據fetch時,是否會從上次讀到的數據開始讀取,而導致Consumer消費的數據丟失嗎?

    為了做到這一點,當使用完poll從本地緩存拉取到數據之後,需要client調用commitSync方法(或者commitAsync方法)去commit 下一次該去讀取 哪一個offset的message。

    而這個commit方法會通過走網路的commit請求將offset在coordinator中保留,這樣就能夠保證下一次讀取(不論進行了rebalance)時,既不會重覆消費消息,也不會遺漏消息。

 

    對於offset的commit,Kafka Consumer Java Client支持兩種模式:由KafkaConsumer自動提交,或者是用戶通過調用commitSync、commitAsync方法的方式完成offset的提交。

 

自動提交的例子:

 

複製代碼
   Properties props = new Properties();

     props.put("bootstrap.servers", "localhost:9092");

     props.put("group.id", "test");

     props.put("enable.auto.commit", "true");

     props.put("auto.commit.interval.ms", "1000");

     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

     consumer.subscribe(Arrays.asList("foo", "bar"));

     while (true) {

         ConsumerRecords<String, String> records = consumer.poll(100);

         for (ConsumerRecord<String, String> record : records)

             System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

     }
複製代碼

手動提交的例子: 

複製代碼
Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "false");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     final int minBatchSize = 200;
     List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records) {
             buffer.add(record);
         }
         if (buffer.size() >= minBatchSize) {
             insertIntoDb(buffer);
             consumer.commitSync();
             buffer.clear();
         }
     }
複製代碼

在手動提交時,需要註意的一點是:要提交的是下一次要讀取的offset,例如: 

複製代碼
try {
         while(running) {
            // 取得消息
             ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
            // 根據分區來遍曆數據:
             for (TopicPartition partition : records.partitions()) {
                 List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                 // 數據處理
                 for (ConsumerRecord<String, String> record : partitionRecords) {
                     System.out.println(record.offset() + ": " + record.value());
                 }
                 // 取得當前讀取到的最後一條記錄的offset
                 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                // 提交offset,記得要 + 1
                 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
             }
         }
     } finally {
       consumer.close();
     }
複製代碼

 

 

3、Consumer的線程安全性

KafkaProducer是線程安全的,上一節已經瞭解到。但Consumer卻沒有設計成線程安全的。當用戶想要在在多線程環境下使用kafkaConsumer時,需要自己來保證synchronized。如果沒有這樣的保證,就會拋出ConcurrentModificatinException的。

當你想要關閉Consumer或者為也其它的目的想要中斷Consumer的處理時,可以調用consumer的wakeup方法。這個方法會拋出WakeupException。

 

例如:

複製代碼
public class KafkaConsumerRunner implements Runnable {
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final KafkaConsumer consumer;
 
     public void run() {
         try {
             consumer.subscribe(Arrays.asList("topic"));
             while (!closed.get()) {
                 ConsumerRecords records = consumer.poll(10000);
                 // Handle new records
             }
         } catch (WakeupException e) {
             // Ignore exception if closing
             if (!closed.get()) throw e;
         } finally {
             consumer.close();
         }
     }
 
     // Shutdown hook which can be called from a separate thread
     public void shutdown() {
         closed.set(true);
         consumer.wakeup();
     }
 }
複製代碼

  

4、Consumer Configuration

    在kafka 0.9+使用Java Consumer替代了老版本的scala Consumer。新版的配置如下:

·bootstrap.servers

在啟動consumer時配置的broker地址的。不需要將cluster中所有的broker都配置上,因為啟動後會自動的發現cluster所有的broker。

    它配置的格式是:host1:port1;host2:port2…

·key.descrializervalue.descrializer

Message record 的key, value的反序列化類。

·group.id

用於表示該consumer想要加入到哪個group中。預設值是 “”。

·heartbeat.interval.ms

心跳間隔。心跳是在consumer與coordinator之間進行的。心跳是確定consumer存活,加入或者退出group的有效手段。

    這個值必須設置的小於session.timeout.ms,因為:

當Consumer由於某種原因不能發Heartbeat到coordinator時,並且時間超過session.timeout.ms時,就會認為該consumer已退出,它所訂閱的partition會分配到同一group 內的其它的consumer上。

    通常設置的值要低於session.timeout.ms的1/3。

    預設值是:3000 (3s)

·session.timeout.ms

Consumer session 過期時間。這個值必須設置在broker configuration中的group.min.session.timeout.ms 與 group.max.session.timeout.ms之間。

其預設值是:10000 (10 s)

 

·enable.auto.commit

Consumer 在commit offset時有兩種模式:自動提交,手動提交。手動提交在前面已經說過。自動提交:是Kafka Consumer會在後臺周期性的去commit。

預設值是true。

·auto.commit.interval.ms

    自動提交間隔。範圍:[0,Integer.MAX],預設值是 5000 (5 s)

 

·auto.offset.reset

    這個配置項,是告訴Kafka Broker在發現kafka在沒有初始offset,或者當前的offset是一個不存在的值(如果一個record被刪除,就肯定不存在了)時,該如何處理。它有4種處理方式:

1) earliest:自動重置到最早的offset。

2) latest:看上去重置到最晚的offset。

3) none:如果邊更早的offset也沒有的話,就拋出異常給consumer,告訴consumer在整個consumer group中都沒有發現有這樣的offset。

4) 如果不是上述3種,只拋出異常給consumer。

 

預設值是latest。

 

·connections.max.idle.ms

連接空閑超時時間。因為consumer只與broker有連接(coordinator也是一個broker),所以這個配置的是consumer到broker之間的。

預設值是:540000 (9 min)

 

·fetch.max.wait.ms

Fetch請求發給broker後,在broker中可能會被阻塞的(當topic中records的總size小於fetch.min.bytes時),此時這個fetch請求耗時就會比較長。這個配置就是來配置consumer最多等待response多久。

 

·fetch.min.bytes

當consumer向一個broker發起fetch請求時,broker返回的records的大小最小值。如果broker中數據量不夠的話會wait,直到數據大小滿足這個條件。

取值範圍是:[0, Integer.Max],預設值是1。

預設值設置為1的目的是:使得consumer的請求能夠儘快的返回。

 

·fetch.max.bytes

一次fetch請求,從一個broker中取得的records最大大小。如果在從topic中第一個非空的partition取消息時,如果取到的第一個record的大小就超過這個配置時,仍然會讀取這個record,也就是說在這片情況下,只會返回這一條record。

    broker、topic都會對producer發給它的message size做限制。所以在配置這值時,可以參考broker的message.max.bytes 和 topic的max.message.bytes的配置。

取值範圍是:[0, Integer.Max],預設值是:52428800 (5 MB)

 

 

·max.partition.fetch.bytes

一次fetch請求,從一個partition中取得的records最大大小。如果在從topic中第一個非空的partition取消息時,如果取到的第一個record的大小就超過這個配置時,仍然會讀取這個record,也就是說在這片情況下,只會返回這一條record。

    broker、topic都會對producer發給它的message size做限制。所以在配置這值時,可以參考broker的message.max.bytes 和 topic的max.message.bytes的配置。

 

·max.poll.interval.ms

前面說過要求程式中不間斷的調用poll()。如果長時間沒有調用poll,且間隔超過這個值時,就會認為這個consumer失敗了。

 

·max.poll.records

    Consumer每次調用poll()時取到的records的最大數。

 

·receive.buffer.byte

Consumer receiver buffer (SO_RCVBUF)的大小。這個值在創建Socket連接時會用到。

取值範圍是:[-1, Integer.MAX]。預設值是:65536 (64 KB)

如果值設置為-1,則會使用操作系統預設的值。

 

·request.timeout.ms

請求發起後,並不一定會很快接收到響應信息。這個配置就是來配置請求超時時間的。預設值是:305000 (305 s)

 

·client.id

Consumer進程的標識。如果設置一個人為可讀的值,跟蹤問題會比較方便。

 

·interceptor.classes

    用戶自定義interceptor。

·metadata.max.age.ms

Metadata數據的刷新間隔。即便沒有任何的partition訂閱關係變更也行執行。

範圍是:[0, Integer.MAX],預設值是:300000 (5 min)


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 下麵的代碼用於取4和2的中點,說明位移運算符優先順序低於加減: 而正確的寫法如下所示: 一般取中點的操作我們寫為 1 mid = low + ((high-low)>>1) 而不是 是考慮到了有可能會溢出的情況。 ...
  • 1. 日誌簡介 在Java項目中,日誌是必不可少的功能,日誌對於快速定位問題,檢查日常項目運行狀態等有非常重要的作用,但是目前Java日誌存在多種框架,如:Log4j、Log4j2、Commons Logging、Slf4j、Logback、Jul等。 2. 常用日誌框架 Log4j,Apache基 ...
  • 一個渣渣的分享: 立方變自身 觀察下麵的現象,某個數字的立方,按位累加仍然等於自身。1^3 = 1 8^3 = 512 5+1+2=817^3 = 4913 4+9+1+3=17... 請你計算包括1,8,17在內,符合這個性質的正整數一共有多少個? 請填寫該數字,不要填寫任何多餘的內容或說明性的文 ...
  • 本篇博文以 ipdb 模塊為例子,主要記錄源代碼剖析思想與思路,介紹分析源代碼的方法 ...
  • 大家好,今天我們接著上一節的全棧營銷個人博客製作,上一節我們把博客的模板給載入運行起來。今天我們主要講解後臺模板分類的添加,後臺導航的添加,及前臺導航的動態調用。一個好的博客,導航很重要,導航就像你網站的地圖,有導向作用,能引導用戶去他想去的地方,再一個好處是對seo優化有很大的好處,所以導航是我們... ...
  • 最近在學python,網上很難找到對應的演算法題網站,專業演算法網站大部分都是國外的,之前在w3cschool看到有三個級別的Javascript腳本演算法挑戰,嘗試用python實現,代碼量相對比較少,如果你有更好的解法,還請不吝賜教,初學python,希望和大家一起日有所長。 目錄 1.判斷電話號碼算 ...
  • 》》首先要去碼雲註冊個賬號 提示(儘量使用英文名)創建用戶名 使用郵箱登錄 》》然後創建庫 》填寫項目的基礎信息 》》之後在碼雲上就創建了項目 》》之後安裝 Git https://git-scm.com/download(一直下一步安裝) 》》然後開始設置pycharm 打開PyCharm 進入F ...
  • RPC,英文名稱Remote Procedure Call Protocol,即遠程過程通訊協議。 可以設想一種情況,有一個人,叫A,A想要翻開一本書,非常簡單,讓大腦控制自己兩隻手,輕易就可以看到書本內容。突然有一天,A想看另一本書,他發現自己沒有,而好朋友B有這本書,但是怎麼跟好朋友B借呢?於是 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...