Kafka單線程Consumer及參數詳解

来源:https://www.cnblogs.com/tree1123/archive/2019/08/16/11362252.html
-Advertisement-
Play Games

請使用0.9以後的版本: 示例代碼 1、只需要配置kafka的server groupid autocommit 序列化 autooffsetreset(其中 bootstrap.server group.id key.deserializer value.deserializer 必須指定); 2 ...


請使用0.9以後的版本:

示例代碼

 Properties props = new Properties();
        props.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        
        props.put("auto.offset.reset","earliest");
        
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("foo", "bar"));
      try{  
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
         }
        }finally{
          consumer.close();
        }

1、只需要配置kafka的server groupid autocommit 序列化 autooffsetreset(其中 bootstrap.server group.id key.deserializer value.deserializer 必須指定);

2、用這些Properties構建consumer對象(KafkaConsumer還有其他構造,可以把序列化傳進去);

3、subscribe訂閱topic列表(可以用正則訂閱Pattern.compile("kafka.*")

使用正則必須指定一個listener subscribe(Pattern pattern, ConsumerRebalanceListener listener)); 可以重寫這個介面來實現 分區變更時的邏輯。如果設置了enable.auto.commit = true 就不用理會這個邏輯。

4、然後迴圈poll消息(這裡的1000是超時設定,如果沒有很多數據,也就等一秒);

5、處理消息(列印了offset key value 這裡寫處理邏輯)。

6、關閉KafkaConsumer(可以傳一個timeout值 等待秒數 預設是30)。

參數詳解

bootstrap.server(最好用主機名不用ip kafka內部用的主機名 除非自己配置了ip)

deserializer 反序列化consumer從broker端獲取的是位元組數組,還原回對象類型。

預設有十幾種:StringDeserializer LongDeserializer DoubleDeserializer。。

也可以自定義:定義serializer格式 創建自定義deserializer類實現Deserializer 介面 重寫邏輯

除了四個必傳的 bootstrap.server group.id key.deserializer value.deserializer

還有session.timeout.ms "coordinator檢測失敗的時間"

是檢測consumer掛掉的時間 為了可以及時的rebalance 預設是10秒 可以設置更小的值避免消息延遲。

max.poll.interval.ms "consumer處理邏輯最大時間"

處理邏輯比較複雜的時候 可以設置這個值 避免造成不必要的 rebalance ,因為兩次poll時間超過了這個參數,kafka認為這個consumer已經跟不上了,會踢出組,而且不能提交offset,就會重覆消費。預設是5分鐘。

auto.offset.reset "無位移或者位移越界時kafka的應對策略"

所以如果啟動了一個group從頭消費 成功提交位移後 重啟後還是接著消費 這個參數無效

所以3個值的解釋是:

earliset 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從最早的位移消費

latest 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據 none topic各分區都存在已提交的offset時,從offset後開始消費;只要有一個分區不存在已提交的offset,則拋出異常

(註意kafka-0.10.1.X版本之前: auto.offset.reset 的值為smallest,和,largest.(offest保存在zk中) 、

我們這是說的是新版本:kafka-0.10.1.X版本之後: auto.offset.reset 的值更改為:earliest,latest,和none (offest保存在kafka的一個特殊的topic名為:__consumer_offsets裡面))

enable.auto.commit 是否自動提交位移

true 自動提交 false需要用戶手動提交 有隻處理一次需要的 最近設置為false自己控制。

fetch.max.bytes consumer單次獲取最大位元組數

max.poll.records 單次poll返回的最大消息數

預設500條 如果消費很輕量 可以適當提高這個值 增加消費速度。

hearbeat.interval.ms consumer其他組員感知rabalance的時間

該值必須小於 session.timeout.ms 如果檢測到 consumer掛掉 也就根本無法感知rabalance了

connections.max.idle.ms 定期關閉連接的時間

預設是9分鐘 可以設置為-1 永不關閉

更多實時計算,Kafka等相關技術博文,歡迎關註實時流式計算


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 操作系統 : windows7_x64 創建vhd 磁碟管理 --> 操作 --> 創建vhd 掛載vhd 腳本: Python版本: https://github.com/mike-zhang/pyExamples/blob/master/tools/vhdFileOpt/load_vhd.py ...
  • 1、通過位置變數創建linux系統賬戶及密碼$1 是執行腳本的第一個參數,$2 是執行腳本的第二個參數 2、每周5使用tar命令備份/var/log 下的所有日誌文件,防止文件覆蓋 3、實時監控本機記憶體和硬碟剩餘空間,剩餘記憶體小於500M,根分區剩餘空間小於1000M的時候發送報警郵件 4、腳本生成 ...
  • 系統環境及架構 在master和slave上分別進行資料庫的安裝 創建資料庫文件存放路徑 配置mysql配置文件 #在mysqlMaster上配置mysql配置文件 在mysqlSlave上配置mysql配置文件 #若是不寫上這個欄位,在本機用 命令 進入mysql會報錯,提示預設路徑/var/li ...
  • [學習筆記] 2.4 DataInputStream的用法 馬 克-to-win:DataInputStream顧名思義:就是專門用來讀各種各樣的數據的,比如(int,char,long等),一定要註意 DataOutputStream 與DataInputStream配合使用,而且二者讀寫的順序要 ...
  • into用法: DECLARE a NUMBER; b number; c number;BEGIN SELECT MAX(SAL),MIN(SAL),AVG(SAL) INTO A,B,C FROM EMP; DBMS_OUTPUT.PUT_LINE('最高工資:'|| a); DBMS_OUTP ...
  • 有時候我們會通過mongo shell 運行一些腳本,去執行更新或運維需求。mongo shell 可執行的代碼可以實現比較複雜的功能,代碼也可以比較豐富。當執行報錯時,如果可以快速定位到錯誤點,對解決bug, 可以事半功倍。 我們先測試一下: Case 1 簡單的向集合中插入一筆數據 執行代碼: ...
  • 進去root許可權(su) 1.從https://mirrors.tuna.tsinghua.edu.cn/apache/hive/hive-1.2.2/apache-hive-1.2.2-bin.tar.gz獲取鏡像地址選擇版本下載(此處使用清華開源的Apache-hive1.2.2版本) wget ...
  • 一.首先解釋一下可能會查詢的基礎問題: 1.1db2 “with ur”是什麼意思: 在DB2中,共有四種隔離級:RS,RR,CS,UR.以下對四種隔離級進行一些描述,同時附上個人做試驗的結果。隔離級是影響加鎖策略的重要環節,它直接影響加鎖的範圍及鎖的持續時間。兩個應用程式即使執行的相同的操作,也可 ...
一周排行
    -Advertisement-
    Play Games
  • C#TMS系統代碼-基礎頁面BaseCity學習 本人純新手,剛進公司跟領導報道,我說我是java全棧,他問我會不會C#,我說大學學過,他說這個TMS系統就給你來管了。外包已經把代碼給我了,這幾天先把增刪改查的代碼背一下,說不定後面就要趕鴨子上架了 Service頁面 //using => impo ...
  • 委托與事件 委托 委托的定義 委托是C#中的一種類型,用於存儲對方法的引用。它允許將方法作為參數傳遞給其他方法,實現回調、事件處理和動態調用等功能。通俗來講,就是委托包含方法的記憶體地址,方法匹配與委托相同的簽名,因此通過使用正確的參數類型來調用方法。 委托的特性 引用方法:委托允許存儲對方法的引用, ...
  • 前言 這幾天閑來沒事看看ABP vNext的文檔和源碼,關於關於依賴註入(屬性註入)這塊兒產生了興趣。 我們都知道。Volo.ABP 依賴註入容器使用了第三方組件Autofac實現的。有三種註入方式,構造函數註入和方法註入和屬性註入。 ABP的屬性註入原則參考如下: 這時候我就開始疑惑了,因為我知道 ...
  • C#TMS系統代碼-業務頁面ShippingNotice學習 學一個業務頁面,ok,領導開完會就被裁掉了,很突然啊,他收拾東西的時候我還以為他要旅游提前請假了,還在尋思為什麼回家連自己買的幾箱飲料都要叫跑腿帶走,怕被偷嗎?還好我在他開會之前拿了兩瓶芬達 感覺感覺前面的BaseCity差不太多,這邊的 ...
  • 概述:在C#中,通過`Expression`類、`AndAlso`和`OrElse`方法可組合兩個`Expression<Func<T, bool>>`,實現多條件動態查詢。通過創建表達式樹,可輕鬆構建複雜的查詢條件。 在C#中,可以使用AndAlso和OrElse方法組合兩個Expression< ...
  • 閑來無聊在我的Biwen.QuickApi中實現一下極簡的事件匯流排,其實代碼還是蠻簡單的,對於初學者可能有些幫助 就貼出來,有什麼不足的地方也歡迎板磚交流~ 首先定義一個事件約定的空介面 public interface IEvent{} 然後定義事件訂閱者介面 public interface I ...
  • 1. 案例 成某三甲醫預約系統, 該項目在2024年初進行上線測試,在正常運行了兩天後,業務系統報錯:The connection pool has been exhausted, either raise MaxPoolSize (currently 800) or Timeout (curren ...
  • 背景 我們有些工具在 Web 版中已經有了很好的實踐,而在 WPF 中重新開發也是一種費時費力的操作,那麼直接集成則是最省事省力的方法了。 思路解釋 為什麼要使用 WPF?莫問為什麼,老 C# 開發的堅持,另外因為 Windows 上已經裝了 Webview2/edge 整體打包比 electron ...
  • EDP是一套集組織架構,許可權框架【功能許可權,操作許可權,數據訪問許可權,WebApi許可權】,自動化日誌,動態Interface,WebApi管理等基礎功能於一體的,基於.net的企業應用開發框架。通過友好的編碼方式實現數據行、列許可權的管控。 ...
  • .Net8.0 Blazor Hybird 桌面端 (WPF/Winform) 實測可以完整運行在 win7sp1/win10/win11. 如果用其他工具打包,還可以運行在mac/linux下, 傳送門BlazorHybrid 發佈為無依賴包方式 安裝 WebView2Runtime 1.57 M ...