Kafka單線程Consumer及參數詳解

来源:https://www.cnblogs.com/tree1123/archive/2019/08/16/11362252.html
-Advertisement-
Play Games

請使用0.9以後的版本: 示例代碼 1、只需要配置kafka的server groupid autocommit 序列化 autooffsetreset(其中 bootstrap.server group.id key.deserializer value.deserializer 必須指定); 2 ...


請使用0.9以後的版本:

示例代碼

 Properties props = new Properties();
        props.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        
        props.put("auto.offset.reset","earliest");
        
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("foo", "bar"));
      try{  
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
         }
        }finally{
          consumer.close();
        }

1、只需要配置kafka的server groupid autocommit 序列化 autooffsetreset(其中 bootstrap.server group.id key.deserializer value.deserializer 必須指定);

2、用這些Properties構建consumer對象(KafkaConsumer還有其他構造,可以把序列化傳進去);

3、subscribe訂閱topic列表(可以用正則訂閱Pattern.compile("kafka.*")

使用正則必須指定一個listener subscribe(Pattern pattern, ConsumerRebalanceListener listener)); 可以重寫這個介面來實現 分區變更時的邏輯。如果設置了enable.auto.commit = true 就不用理會這個邏輯。

4、然後迴圈poll消息(這裡的1000是超時設定,如果沒有很多數據,也就等一秒);

5、處理消息(列印了offset key value 這裡寫處理邏輯)。

6、關閉KafkaConsumer(可以傳一個timeout值 等待秒數 預設是30)。

參數詳解

bootstrap.server(最好用主機名不用ip kafka內部用的主機名 除非自己配置了ip)

deserializer 反序列化consumer從broker端獲取的是位元組數組,還原回對象類型。

預設有十幾種:StringDeserializer LongDeserializer DoubleDeserializer。。

也可以自定義:定義serializer格式 創建自定義deserializer類實現Deserializer 介面 重寫邏輯

除了四個必傳的 bootstrap.server group.id key.deserializer value.deserializer

還有session.timeout.ms "coordinator檢測失敗的時間"

是檢測consumer掛掉的時間 為了可以及時的rebalance 預設是10秒 可以設置更小的值避免消息延遲。

max.poll.interval.ms "consumer處理邏輯最大時間"

處理邏輯比較複雜的時候 可以設置這個值 避免造成不必要的 rebalance ,因為兩次poll時間超過了這個參數,kafka認為這個consumer已經跟不上了,會踢出組,而且不能提交offset,就會重覆消費。預設是5分鐘。

auto.offset.reset "無位移或者位移越界時kafka的應對策略"

所以如果啟動了一個group從頭消費 成功提交位移後 重啟後還是接著消費 這個參數無效

所以3個值的解釋是:

earliset 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從最早的位移消費

latest 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據 none topic各分區都存在已提交的offset時,從offset後開始消費;只要有一個分區不存在已提交的offset,則拋出異常

(註意kafka-0.10.1.X版本之前: auto.offset.reset 的值為smallest,和,largest.(offest保存在zk中) 、

我們這是說的是新版本:kafka-0.10.1.X版本之後: auto.offset.reset 的值更改為:earliest,latest,和none (offest保存在kafka的一個特殊的topic名為:__consumer_offsets裡面))

enable.auto.commit 是否自動提交位移

true 自動提交 false需要用戶手動提交 有隻處理一次需要的 最近設置為false自己控制。

fetch.max.bytes consumer單次獲取最大位元組數

max.poll.records 單次poll返回的最大消息數

預設500條 如果消費很輕量 可以適當提高這個值 增加消費速度。

hearbeat.interval.ms consumer其他組員感知rabalance的時間

該值必須小於 session.timeout.ms 如果檢測到 consumer掛掉 也就根本無法感知rabalance了

connections.max.idle.ms 定期關閉連接的時間

預設是9分鐘 可以設置為-1 永不關閉

更多實時計算,Kafka等相關技術博文,歡迎關註實時流式計算


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 操作系統 : windows7_x64 創建vhd 磁碟管理 --> 操作 --> 創建vhd 掛載vhd 腳本: Python版本: https://github.com/mike-zhang/pyExamples/blob/master/tools/vhdFileOpt/load_vhd.py ...
  • 1、通過位置變數創建linux系統賬戶及密碼$1 是執行腳本的第一個參數,$2 是執行腳本的第二個參數 2、每周5使用tar命令備份/var/log 下的所有日誌文件,防止文件覆蓋 3、實時監控本機記憶體和硬碟剩餘空間,剩餘記憶體小於500M,根分區剩餘空間小於1000M的時候發送報警郵件 4、腳本生成 ...
  • 系統環境及架構 在master和slave上分別進行資料庫的安裝 創建資料庫文件存放路徑 配置mysql配置文件 #在mysqlMaster上配置mysql配置文件 在mysqlSlave上配置mysql配置文件 #若是不寫上這個欄位,在本機用 命令 進入mysql會報錯,提示預設路徑/var/li ...
  • [學習筆記] 2.4 DataInputStream的用法 馬 克-to-win:DataInputStream顧名思義:就是專門用來讀各種各樣的數據的,比如(int,char,long等),一定要註意 DataOutputStream 與DataInputStream配合使用,而且二者讀寫的順序要 ...
  • into用法: DECLARE a NUMBER; b number; c number;BEGIN SELECT MAX(SAL),MIN(SAL),AVG(SAL) INTO A,B,C FROM EMP; DBMS_OUTPUT.PUT_LINE('最高工資:'|| a); DBMS_OUTP ...
  • 有時候我們會通過mongo shell 運行一些腳本,去執行更新或運維需求。mongo shell 可執行的代碼可以實現比較複雜的功能,代碼也可以比較豐富。當執行報錯時,如果可以快速定位到錯誤點,對解決bug, 可以事半功倍。 我們先測試一下: Case 1 簡單的向集合中插入一筆數據 執行代碼: ...
  • 進去root許可權(su) 1.從https://mirrors.tuna.tsinghua.edu.cn/apache/hive/hive-1.2.2/apache-hive-1.2.2-bin.tar.gz獲取鏡像地址選擇版本下載(此處使用清華開源的Apache-hive1.2.2版本) wget ...
  • 一.首先解釋一下可能會查詢的基礎問題: 1.1db2 “with ur”是什麼意思: 在DB2中,共有四種隔離級:RS,RR,CS,UR.以下對四種隔離級進行一些描述,同時附上個人做試驗的結果。隔離級是影響加鎖策略的重要環節,它直接影響加鎖的範圍及鎖的持續時間。兩個應用程式即使執行的相同的操作,也可 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...