請使用0.9以後的版本: 示例代碼 1、只需要配置kafka的server groupid autocommit 序列化 autooffsetreset(其中 bootstrap.server group.id key.deserializer value.deserializer 必須指定); 2 ...
請使用0.9以後的版本:
示例代碼
Properties props = new Properties();
props.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset","earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
try{
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}finally{
consumer.close();
}
1、只需要配置kafka的server groupid autocommit 序列化 autooffsetreset(其中 bootstrap.server group.id key.deserializer value.deserializer 必須指定);
2、用這些Properties構建consumer對象(KafkaConsumer還有其他構造,可以把序列化傳進去);
3、subscribe訂閱topic列表(可以用正則訂閱Pattern.compile("kafka.*")
使用正則必須指定一個listener subscribe(Pattern pattern, ConsumerRebalanceListener listener)); 可以重寫這個介面來實現 分區變更時的邏輯。如果設置了enable.auto.commit = true 就不用理會這個邏輯。
4、然後迴圈poll消息(這裡的1000是超時設定,如果沒有很多數據,也就等一秒);
5、處理消息(列印了offset key value 這裡寫處理邏輯)。
6、關閉KafkaConsumer(可以傳一個timeout值 等待秒數 預設是30)。
參數詳解
bootstrap.server(最好用主機名不用ip kafka內部用的主機名 除非自己配置了ip)
deserializer 反序列化consumer從broker端獲取的是位元組數組,還原回對象類型。
預設有十幾種:StringDeserializer LongDeserializer DoubleDeserializer。。
也可以自定義:定義serializer格式 創建自定義deserializer類實現Deserializer 介面 重寫邏輯
除了四個必傳的 bootstrap.server group.id key.deserializer value.deserializer
還有session.timeout.ms "coordinator檢測失敗的時間"
是檢測consumer掛掉的時間 為了可以及時的rebalance 預設是10秒 可以設置更小的值避免消息延遲。
max.poll.interval.ms "consumer處理邏輯最大時間"
處理邏輯比較複雜的時候 可以設置這個值 避免造成不必要的 rebalance ,因為兩次poll時間超過了這個參數,kafka認為這個consumer已經跟不上了,會踢出組,而且不能提交offset,就會重覆消費。預設是5分鐘。
auto.offset.reset "無位移或者位移越界時kafka的應對策略"
所以如果啟動了一個group從頭消費 成功提交位移後 重啟後還是接著消費 這個參數無效
所以3個值的解釋是:
earliset 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從最早的位移消費
latest 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據 none topic各分區都存在已提交的offset時,從offset後開始消費;只要有一個分區不存在已提交的offset,則拋出異常
(註意kafka-0.10.1.X版本之前: auto.offset.reset 的值為smallest,和,largest.(offest保存在zk中) 、
我們這是說的是新版本:kafka-0.10.1.X版本之後: auto.offset.reset 的值更改為:earliest,latest,和none (offest保存在kafka的一個特殊的topic名為:__consumer_offsets裡面))
enable.auto.commit 是否自動提交位移
true 自動提交 false需要用戶手動提交 有隻處理一次需要的 最近設置為false自己控制。
fetch.max.bytes consumer單次獲取最大位元組數
max.poll.records 單次poll返回的最大消息數
預設500條 如果消費很輕量 可以適當提高這個值 增加消費速度。
hearbeat.interval.ms consumer其他組員感知rabalance的時間
該值必須小於 session.timeout.ms 如果檢測到 consumer掛掉 也就根本無法感知rabalance了
connections.max.idle.ms 定期關閉連接的時間
預設是9分鐘 可以設置為-1 永不關閉
更多實時計算,Kafka等相關技術博文,歡迎關註實時流式計算