關於Kafka 的 consumer 消費者處理的一些見解

来源:https://www.cnblogs.com/xuwujing/archive/2018/02/08/8432984.html
-Advertisement-
Play Games

前言 在上一篇 "Kafka使用Java實現數據的生產和消費demo" 中介紹如何簡單的使用kafka進行數據傳輸。本篇則重點介紹kafka中的 consumer 消費者的講解。 應用場景 在上一篇kafka的consumer消費者,我們使用的是自動提交offset下標。 但是offset下標自動提 ...


前言

在上一篇 Kafka使用Java實現數據的生產和消費demo 中介紹如何簡單的使用kafka進行數據傳輸。本篇則重點介紹kafka中的 consumer 消費者的講解。

應用場景

在上一篇kafka的consumer消費者,我們使用的是自動提交offset下標。
但是offset下標自動提交其實在很多場景都不適用,因為自動提交是在kafka拉取到數據之後就直接提交,這樣很容易丟失數據,尤其是在需要事物控制的時候。
很多情況下我們需要從kafka成功拉取數據之後,對數據進行相應的處理之後再進行提交。如拉取數據之後進行寫入mysql這種 , 所以這時我們就需要進行手動提交kafka的offset下標。

這裡順便說下offset具體是什麼。
offset:指的是kafka的topic中的每個消費組消費的下標。
簡單的來說就是一條消息對應一個offset下標,每次消費數據的時候如果提交offset,那麼下次消費就會從提交的offset加一那裡開始消費。
比如一個topic中有100條數據,我消費了50條並且提交了,那麼此時的kafka服務端記錄提交的offset就是49(offset從0開始),那麼下次消費的時候offset就從50開始消費。

測試

說了這麼,那麼我們開始進行手動提交測試。
首先,使用kafka 的producer 程式往kafka集群發送了100條測試數據。

這裡寫圖片描述
程式列印中已經成功發送了,這裡我們在kafka伺服器使用命令中來查看是否成功發送.
命令如下:

 kafka-console-consumer.sh  --zookeeper master:2181  --topic KAFKA_TEST2 --from-beginning

這裡寫圖片描述

註:
1.master 是我在linux中做了IP映射的關係,實際可以換成IP。
2.因為kafka是集群,所以也可以在集群的其他機器進行消費。

可以看到已經成功發送了100條。

成功發送消息之後,我們再使用kafka的consumer 進行數據消費。

因為是用來測試手動提交
所以 將 enable.auto.commit 改成 false 進行手動提交
並且設置每次拉取最大10條

props.put("enable.auto.commit", "false");
props.put("max.poll.records", 10);

將提交方式改成false之後
需要手動提交只需加上這段代碼

consumer.commitSync();

那麼首先嘗試消費不提交,測試能不能重覆消費。
右鍵運行main方法進行消費,不提交offset下標。
這裡寫圖片描述

成功消費之後,結束程式,再次運行main方法進行消費,也不提交offset下標。
這裡寫圖片描述

並未手動進行提交,而且並未更改消費組名,但是可以看到已經重覆消費了!

接下來,開始測試手動提交。

  1. 測試目的:
    1.測試手動提交之後的offset,能不能再次消費。
    2.測試未提交的offset,能不能再次進行消費。
  2. 測試方法: 當消費到50條的時候,進行手動提交,然後剩下的50條不進行提交。
  3. 希望達成的目的: 手動提交的offset不能再次消費,未提交的可以再次進行消費。

為了達到上述目的,我們測試只需添加如下代碼即可:

if(list.size()==50){
    consumer.commitSync();
}

更改代碼之後,開始運行程式
測試示例圖如下:
這裡寫圖片描述

簡單的一看,和之前未提交的一樣,貌似沒有什麼問題。
但是正常來說,未提交的下標不應該重覆進行消費,直到它提交為止嗎?
因為要進行重覆消費,但是messageNo 會一直累加,只會手動的提交前50條offset,
後面的50條offset會一直無法消費,所以列印的條數不應該是100,而是應該一直列印。

那麼測試的結果和預想的為什麼不一致呢?
之前不是已經測試過可以重覆消費未提交的offset嗎?
其實這點可以根據兩次啟動方式的不同而得出結論。
開始測試未提交重覆消費的時候,實際我是啟動-暫停-啟動,那麼本地的consumer實際是被初始化過兩次。
而剛剛測試的實際consumer只有初始化一次。
至於為什麼初始化一次就不行呢?
因為kafka的offset下標的記錄實際會有兩份,服務端會自己記錄一份,本地的消費者客戶端也會記錄一份,提交的offset會告訴服務端已經消費到這了,但是本地的並不會因此而改變offset進行再次消費。

簡單的來說假如有10條數據,在第5條的時候進行提交了offset下標,那麼服務端就知道該組消費的下標到第5條了,如果同組其他的consumer進行消費的時候就會從第6條開始進行消費。但是本地的消費者客戶端並不會因此而改變,它還是會繼續消費下去,並不會再次從第6條開始消費,所以會出現上圖情況。

但是項目中運行之後,是不會因此而重啟的,所以這時我們可以換一種思路。
就是如果觸發某個條件,所以導致offset未提交,我們就可以關閉之前的consumer,然後新new一個consumer,這樣就可以再次進行消費了! 當然配置要和之前的一樣。

那麼將之前的提交代碼更改如下:

if(list.size()==50){
    consumer.commitSync();
}else if(list.size()>50){
    consumer.close();
    init();
    list.clear();
    list2.clear();
}

註:這裡因為是測試,為了簡單明瞭,所以條件我寫的很簡單。實際情況請根據個人的為準。

示例圖如下:
這裡寫圖片描述
說明:
1.因為每次是拉取10條,所以在60條的時候kafka的配置初始化了,然後又從新拉取了50-60條的數據,但是沒有提交,所以並不會影響實際結果。
2.這裡為了方便截圖展示,所以列印條件改了,但是不影響程式!

從測試結果中,我們達到了之前想要測試的目的,未提交的offset可以重覆進行消費。
這種做法一般也可以滿足大部分需求。
例如從kafka獲取數據入庫,如果一批數據入庫成功,就提交offset,否則不提交,然後再次拉取。
但是這種做法並不能最大的保證數據的完整性。比如在運行的時候,程式掛了之類的。
所以還有一種方法是手動的指定offset下標進行獲取數據,直到kafka的數據處理成功之後,將offset記錄下來,比如寫在資料庫中。那麼這種做法,等到下一篇再進行嘗試吧!

該項目我放在github上了,有興趣的可以看看!
地址:https://github.com/xuwujing/kafka

到此,本文結束,謝謝閱讀!


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • hiernate4入門 【實戰】本文將講述hibernate4的入門,我寫是以實用入門為主,不作過多展開,我寫東西如果開頭標了實戰二字,必然會保證讀者能夠獲取源代碼並且能夠按要求做的話一定能跑起來,否則沒有意義。 【環境說明:】 jdk8,eclipse4.7,hibernate4.3.11,mys ...
  • 上一篇介紹了Servlet初始化,以及如何處理HTTP請求,實際上在這兩個過程中,都伴隨著Servlet的生命周期,都是Servlet生命周期的一部分。同時,由於Tomcat容器預設是採用單實例多線程的方式處理多個請求,這一特性就導致了線程安全問題的存在。因此,本篇主要講述Servlet生命周期與線 ...
  • 一、整體流程 1、客戶端註冊Watcher 2、服務端處理Watcher 3、客戶端回調Watcher 二、Watcher 通知狀態與事件類型 state type path WatcheEvent 只有三個屬性 state type path 不會告訴原始數據內容和更新內容,需要client再次主 ...
  • "minio java client" 使用okhttp作為底層的http實現,在產品包裡面區域網上傳文件的速度一直只有400~800KB/s,經過一天排查發現是 禁用了即時編譯導致。 發現問題的場景 minio java的使用架構圖是這樣的: [Minio Server] ...
  • Python擅長的領域: WEB開發:Django\pyramid\Tornado\Bottle\Flask\WebPy網路編程:Twisted\Requests\Scrapy\Paramiko科學運算:SciPy\Pandas\IpythonGUI圖形開發:wxPythin\PyQT\Kivy運維 ...
  • 1.1 基於UDP協議實現簡單的套接字通信 udp是無鏈接的,先啟動哪一端都不會報錯 udp套接字簡單示例 1.1.1.1 客戶端: from socket import * client=socket(AF_INET,SOCK_DGRAM) #數據報協議,創建一個客戶的套接字 while True ...
  • 在python中,可以使用try...except語句來處理異常。做法是,把可能引發異常的語句放在try 塊中,把處理異常的語句放在except 塊中。 當程式在try內部打開文件引發異常時,會跳過try中剩下的代碼,直接跳轉到except中的語句處理異常。於是輸出了“File not exists ...
  • Python讀寫csv文件 覺得有用的話,歡迎一起討論相互學習~ "Follow Me" 前言 逗號分隔值(Comma Separated Values,CSV,有時也稱為字元分隔值,因為分隔字元也可以不是逗號),其文件以純文本形式存儲表格數據(數字和文本)。純文本意味著該文件是一個字元序列,不含必 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...