在消費Kafka中分區的數據時,我們需要跟蹤哪些消息是讀取過的、哪些是沒有讀取過的。這是讀取消息不丟失的關鍵所在。Kafka是通過offset順序讀取事件的。如果一個消費者退出,再重啟的時候,它知道從哪兒繼續讀取消息進行處理。所以,消費者需要「提交」屬於它們自己的偏移量。如果消費者已經提交了偏移量,... ...
在消費Kafka中分區的數據時,我們需要跟蹤哪些消息是讀取過的、哪些是沒有讀取過的。這是讀取消息不丟失的關鍵所在。
Kafka是通過offset順序讀取事件的。如果一個消費者退出,再重啟的時候,它知道從哪兒繼續讀取消息進行處理。所以,消費者需要「提交」屬於它們自己的偏移量。如果消費者已經提交了偏移量,但消息沒有得到有效處理,此時就會造成消費者消息丟失。所以,我們應該重視偏移量提交的時間點以及提交的方式。
Kafka消費者的可靠性配置
1、group.id
- 如果兩個消費者有相同的 group.id,並且定義同一個主題,那麼每個消費者都會消費一個分區的數據
2、auto.offset.reset
- 這個參數的作用是:當沒有偏移量提交(例如:消費者第一次啟動、或者請求的偏移量在broker上不存在時),消費者會如何處理
- earliest:消費者從分區的開始位置讀取大量的重覆數據,可以保證個最少的數據丟失
- latest:消費者會從分區的末尾開始讀取數據,可以減少重覆讀,但很有可能會錯過一些消息
3、enable.auto.commit
- 可以設置自動提交偏移量,可以在代碼中手動提交偏移量
- 自動提交,可以讓消費者邏輯更簡單
- 但它無法控制重覆處理消息、或者如果消息交給另外一個後臺線程去處理,自動提交機制可能會在消息還沒有處理完就提交了偏移量
4、auto.commit.interval.ms
- 通過該參數,可以配置提交的頻率。預設:每5秒鐘提交一次
- 提交的頻率高,也是會增加額外的開銷的
顯示提交偏移量
如果我們希望能夠更有效地控制偏移量提交的時間點,就需要顯示地提交偏移量。
1、總是在處理完事件後再提交偏移量
如果所有的處理都是在輪詢里完成,無需在輪詢之間維護狀態,那麼可以使用自動提交,或者在輪詢結束後進行手動提交。
2、提交頻率是性能和重覆消息數量之間的權衡
這個意思是:提交頻率越高,重覆消息處理的數量越少,性能也是比較低的。提交頻率越低,重覆消息處理的數量越多,性能是比較好的。所以,要根據實際的情況,來衡量在什麼時機,來提交偏移量。即使是在最簡單的場景你,也需要在一個迴圈中多次提交偏移量。
3、確保對提交的偏移量心裡有數
一定要在處理完消息後,再提交偏移量,否則會出現某些消息會被處理。
4、消費者可能需要重試
但處理消息出現問題時,例如:把Kafka中的數據寫入到HBase中,此時HBase臨時不可用。我們想要重試。假設這條消息是:#30,#30處理失敗了。那大家想想?#31能提交嗎?
顯然是不能的,如果#31提交了,那麼#31之前的所有數據,都不會被處理了。我們可以使用以下幾種模式來處理:
模式一
① 但遇到可重試錯誤時,提交最後一個處理成功的偏移量
② 把沒有處理好的消息保存到緩衝區
③ 調用 pause() 方法,確保其他的輪詢不會返回數據
④ 嘗試重新處理緩存中的數據,如果重試成功,或者重試次數達到上限並決定放棄,把錯誤記錄下來並丟棄消息
⑤ 調用 resume() 方法讓消費者繼續從輪詢里獲取新數據
模式二
① 遇到可重試錯誤時,把錯誤寫入一個獨立的主題,然後繼續
② 用一個獨立的消費者組負責從該主題上讀取錯誤消息,併進行重試
5、長時間處理
有時候要進行比較複雜的處理,暫停輪詢的時間不能超過幾秒鐘。要保持輪詢,因為只有在輪詢過程中,才能往broker發送心跳。可以使用一個線程池來處理數據,可以讓輪詢不獲取新的數據,直到工作縣好吃呢個處理完成。消費者一直保持輪詢,心跳正常,就不會發生再均衡。
8、僅一次傳遞
有的程式不僅是需要“至少一次”(at least-once語義)(意味著沒有數據丟失),還需要僅一次(exactly-once)語義。實現一次性語義,最常用的辦法就是把結果寫入到一個支持唯一鍵的系統里,比如:k-v存儲、關係資料庫、ES或者其他數據存儲。可以使用主題、分區和偏移量來作為主鍵,這樣,可以碰巧讀取到同一個相同的消息,直接覆蓋寫入就可以了。這種稱為冪等性寫入。
還有一種,就是使用關係型資料庫,HDFS中一些被定義過的原子操作也經常用來達到相同的目的。把消息和偏移量放在同一個事務里,這樣讓它們保持同步。消費者啟動,獲取最近處理過的偏移量,調用seek()方法從偏移量位置繼續讀取數據
參考文件:
「Kafka權威指南」