前言 在上一篇 "Kafka使用Java實現數據的生產和消費demo" 中介紹如何簡單的使用kafka進行數據傳輸。本篇則重點介紹kafka中的 consumer 消費者的講解。 應用場景 在上一篇kafka的consumer消費者,我們使用的是自動提交offset下標。 但是offset下標自動提 ...
前言
在上一篇 Kafka使用Java實現數據的生產和消費demo 中介紹如何簡單的使用kafka進行數據傳輸。本篇則重點介紹kafka中的 consumer 消費者的講解。
應用場景
在上一篇kafka的consumer消費者,我們使用的是自動提交offset下標。
但是offset下標自動提交其實在很多場景都不適用,因為自動提交是在kafka拉取到數據之後就直接提交,這樣很容易丟失數據,尤其是在需要事物控制的時候。
很多情況下我們需要從kafka成功拉取數據之後,對數據進行相應的處理之後再進行提交。如拉取數據之後進行寫入mysql這種 , 所以這時我們就需要進行手動提交kafka的offset下標。
這裡順便說下offset具體是什麼。
offset:指的是kafka的topic中的每個消費組消費的下標。
簡單的來說就是一條消息對應一個offset下標,每次消費數據的時候如果提交offset,那麼下次消費就會從提交的offset加一那裡開始消費。
比如一個topic中有100條數據,我消費了50條並且提交了,那麼此時的kafka服務端記錄提交的offset就是49(offset從0開始),那麼下次消費的時候offset就從50開始消費。
測試
說了這麼,那麼我們開始進行手動提交測試。
首先,使用kafka 的producer 程式往kafka集群發送了100條測試數據。
程式列印中已經成功發送了,這裡我們在kafka伺服器使用命令中來查看是否成功發送.
命令如下:
kafka-console-consumer.sh --zookeeper master:2181 --topic KAFKA_TEST2 --from-beginning
註:
1.master 是我在linux中做了IP映射的關係,實際可以換成IP。
2.因為kafka是集群,所以也可以在集群的其他機器進行消費。
可以看到已經成功發送了100條。
成功發送消息之後,我們再使用kafka的consumer 進行數據消費。
因為是用來測試手動提交
所以 將 enable.auto.commit 改成 false 進行手動提交
並且設置每次拉取最大10條
props.put("enable.auto.commit", "false");
props.put("max.poll.records", 10);
將提交方式改成false之後
需要手動提交只需加上這段代碼
consumer.commitSync();
那麼首先嘗試消費不提交,測試能不能重覆消費。
右鍵運行main方法進行消費,不提交offset下標。
成功消費之後,結束程式,再次運行main方法進行消費,也不提交offset下標。
並未手動進行提交,而且並未更改消費組名,但是可以看到已經重覆消費了!
接下來,開始測試手動提交。
- 測試目的:
1.測試手動提交之後的offset,能不能再次消費。
2.測試未提交的offset,能不能再次進行消費。 - 測試方法: 當消費到50條的時候,進行手動提交,然後剩下的50條不進行提交。
- 希望達成的目的: 手動提交的offset不能再次消費,未提交的可以再次進行消費。
為了達到上述目的,我們測試只需添加如下代碼即可:
if(list.size()==50){
consumer.commitSync();
}
更改代碼之後,開始運行程式
測試示例圖如下:
簡單的一看,和之前未提交的一樣,貌似沒有什麼問題。
但是正常來說,未提交的下標不應該重覆進行消費,直到它提交為止嗎?
因為要進行重覆消費,但是messageNo 會一直累加,只會手動的提交前50條offset,
後面的50條offset會一直無法消費,所以列印的條數不應該是100,而是應該一直列印。
那麼測試的結果和預想的為什麼不一致呢?
之前不是已經測試過可以重覆消費未提交的offset嗎?
其實這點可以根據兩次啟動方式的不同而得出結論。
開始測試未提交重覆消費的時候,實際我是啟動-暫停-啟動,那麼本地的consumer實際是被初始化過兩次。
而剛剛測試的實際consumer只有初始化一次。
至於為什麼初始化一次就不行呢?
因為kafka的offset下標的記錄實際會有兩份,服務端會自己記錄一份,本地的消費者客戶端也會記錄一份,提交的offset會告訴服務端已經消費到這了,但是本地的並不會因此而改變offset進行再次消費。
簡單的來說假如有10條數據,在第5條的時候進行提交了offset下標,那麼服務端就知道該組消費的下標到第5條了,如果同組其他的consumer進行消費的時候就會從第6條開始進行消費。但是本地的消費者客戶端並不會因此而改變,它還是會繼續消費下去,並不會再次從第6條開始消費,所以會出現上圖情況。
但是項目中運行之後,是不會因此而重啟的,所以這時我們可以換一種思路。
就是如果觸發某個條件,所以導致offset未提交,我們就可以關閉之前的consumer,然後新new一個consumer,這樣就可以再次進行消費了! 當然配置要和之前的一樣。
那麼將之前的提交代碼更改如下:
if(list.size()==50){
consumer.commitSync();
}else if(list.size()>50){
consumer.close();
init();
list.clear();
list2.clear();
}
註:這裡因為是測試,為了簡單明瞭,所以條件我寫的很簡單。實際情況請根據個人的為準。
示例圖如下:
說明:
1.因為每次是拉取10條,所以在60條的時候kafka的配置初始化了,然後又從新拉取了50-60條的數據,但是沒有提交,所以並不會影響實際結果。
2.這裡為了方便截圖展示,所以列印條件改了,但是不影響程式!
從測試結果中,我們達到了之前想要測試的目的,未提交的offset可以重覆進行消費。
這種做法一般也可以滿足大部分需求。
例如從kafka獲取數據入庫,如果一批數據入庫成功,就提交offset,否則不提交,然後再次拉取。
但是這種做法並不能最大的保證數據的完整性。比如在運行的時候,程式掛了之類的。
所以還有一種方法是手動的指定offset下標進行獲取數據,直到kafka的數據處理成功之後,將offset記錄下來,比如寫在資料庫中。那麼這種做法,等到下一篇再進行嘗試吧!
該項目我放在github上了,有興趣的可以看看!
地址:https://github.com/xuwujing/kafka
到此,本文結束,謝謝閱讀!