Storm1.1.1 對 0.10.x 版 Kafka之commit offsets

来源:https://www.cnblogs.com/divenswu/archive/2018/02/08/8431345.html
-Advertisement-
Play Games

由於 0.10.x 版 Kafka 與 0.8.x 版有很大的變化,這種變化對下游 Storm 有非常大的影響,0.10.x 版的 Kafka 不但增加了許可權管理的功能,而且還將 simple 和 high consumer 的 offsets 進行統一管理,也就意味著在 0.8.x 中 Storm ...


由於 0.10.x 版 Kafka 與 0.8.x 版有很大的變化,這種變化對下游 Storm 有非常大的影響,0.10.x 版的 Kafka 不但增加了許可權管理的功能,而且還將 simple 和 high consumer 的 offsets 進行統一管理,也就意味著在 0.8.x 中 Storm 需要去負責管理 offsets,而在 0.10.x 中,Storm 不需要關心 consumer 的 offsets 的問題,這對 KafkaSpout 的設計有很大的影響,本文就是對 Storm 對 0.10.x 版 Kafka 支持的實現部分的解析。

0.10.x 版 KafkaSpout 的實現

社區對新版 Kafka 的支持,總體分為兩種情況:

  1. 一種是選擇自動 commit 機制;
  2. 另一種是非自動 commit,就是將 commit 的權利交與 Storm 來控制。

下麵分別對這兩種情況進行分析。

Kafka Consumer 的一些配置會對 Storm 的性能很大影響,下麵的三個參數的設置對其性能的影響最大(預設值是根據MICROBENCHMARKING APACHE STORM 1.0 PERFORMANCE測試得到):

  • fetch.min.bytes:預設值 1;
  • fetch.max.wait.ms:預設值 500(ms);
  • Kafka Consumer instance poll timeout, 它可以在通過 KafkaSpoutConfig 的方法 setPollTimeoutMs 來配置,預設值是 200ms;

自動 commit 模式

自動 commit 模式就是 commit 的時機由 Consumer 來控制,本質上是非同步 commit,當定時達到時,就進行 commit。而 Storm 端並沒有進行任何記錄,也就是這部分的容錯完全由 Consumer 端來控制,而 Consumer 並不會關心數據的處理成功與否,只關心數據是否 commit,如果未 commit,就會重新發送數據,那麼就有可能導致下麵這個後果:

造成那些已經 commit、但 Storm 端處理失敗的數據丟失

丟失的原因

一些數據發送到 Spout 之後,恰好 commit 的定時到達,進行了 commit,但是這中間有某條或者幾條數據處理失敗,這就是說,這幾條處理失敗的數據已經進行 commit 了,Kafka 端也就不會重新進行發送。

可能出現的這種後果也確定了自動 commit 模式不能滿足我們的需求,為了保證數據不丟,需要數據在 Storm 中 ack 之後才能被 commit,因此,commit 還是應該由 Storm 端來進行控制,才能保證數據被正確處理。

非自動 commit 模式

當選用非自動的 commit 機制(實際上就是使用 Consumer 的同步 commit 機制)時,需要手動去設置 commit 的參數,有以下兩項需要設置:

  • offset.commit.period.ms:設置 spout 多久向 Kafka commit一次,在 KafkaSpoutConfig 的 setOffsetCommitPeriodMs 中配置;
  • max.uncommitted.offsets:控制在下一次拉取數據之前最多可以有多少數據在等待 commit,在 KafkaSpoutConfig 的 setMaxUncommittedOffsets 中配置;

spout 的處理過程

關於 Kafka 的幾個 offset 的概念,可以參考 offset的一些相關概念

KafkaSpout 的處理過程主要是在 nextTuple() 方法,其處理過程如下:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public void nextTuple() {   if (initialized) {     if (commit()) {// Step1 非自動 commit,並且定時達到       commitOffsetsForAckedTuples();// 對所有已經 ack 的 msgs 進行 commit     }       if (poll()) {//Step2 拉取的數據都已經發送,並且未 commit 的消息數小於設置的最大 uncommit 數       setWaitingToEmit(pollKafkaBroker());       //將拉取的所有 record 都放到 waitingToEmit 集合中,可能會重覆拉取數據(由於一些 msg 需要重試,通過修改 Last Committed Offset 的值來實現的)     }       if (waitingToEmit()) {//Step3 waitingToEmit 中還有數據       emit();//發送數據,但會跳過已經 ack 或者已經發送的消息     }   } else {     LOG.debug("Spout not initialized. Not sending tuples until initialization completes");   } }

上面主要分為三步:

  1. 如果是非自動 commit,並且 commit 定時達到,那麼就將所有已經 ack 的數據(這些數據的 offset 必須是連續的,不連續的數據不會進行 commit)進行 commit;
  2. 如果拉取的數據都已經發送,並且未 commit 的消息數(記錄在 numUncommittedOffsets 中)小於設置的最大 uncommit 數,那麼就根據更新後的 offset (將 offset 重置到需要重試的 msg 的最小 offset,這樣該 offset 後面的 msg 還是會被重新拉取)拉取數據,並將拉取到的數據存儲到 waitingToEmit 集合中;
  3. 如果 waitingToEmit 集合中還有數據,就發送數據,但在發送數據的過程中,會進行判斷,只發送沒有 ack 的數據。

KafkaSpout 如何進行容錯

舉個示例,如下圖所示

consumer offset

  1. 圖1表示一個 nextTuple() 迴圈結束之後,offset 為14那條數據處理失敗,而offset 為15-18的數據處理成功;
  2. 圖2表示在下次迴圈 Step 1 結束之後、Step 2 開始之前,Consumer 會將 the last committed offset 重置到 offset 為14的位置。

也就是說從 offset 為14開始,後面的數據會重新發送。

有人可能會問,那樣的話會不會造成數據重覆發送?

Storm 是如何解決這個問題的呢?答案就是 Storm 會用一個 map 記錄已經 ack 的數據(acked),Storm 在進行 commit 的時候也是根據這個 map 的數據進行 commit 的,不過 commit 數據的 offset 必須是連續的,如上圖所示,只能將 offset 為11-13的數據 commit,而15-18的數據由於 offset 為14的數據未處理成功而不能 commit。offset 為11-13的數據在 commit 成功後會從 map 中移除,而 offset 為15-18的數據依然在 map 中,Storm 在將從 Kafka 拉取的數據加入到 waitingToEmit 集合時後,進行 emit 數據時,會先檢測該數據是否存在 acked 中,如果存在的話,就證明該條數據已經處理過了,不會在進行發送。

這裡有幾點需要註意的:

  1. 對已經 ack 的 msg 進行 commit 時,所 commit 的 msg 的 offset 必須是連續的(該 msg 存儲在一個 TreeMap 中,按 offset 排序),斷續的數據會暫時接著保存在集合中,不會進行 commit,如果出現斷續,那就證明中間有數據處理失敗,需要重新處理;
  2. storm 處理 failed 的 msg,會保存到一個專門的集合中,在每次拉取數據時(是拉取數據,不是發送數據,發送數據時會檢測該數據是否已經成功處理),會遍歷該集合中包含的所有 TopicPartiion,獲取該 partition 的 Last Committed Offset;

這樣設計有一個副作用就是:如果有一個 msg 一直不成功,就會導致 KafkaSpout 因為這一條數據的影響而不斷地重覆拉取這批數據,造成整個拓撲卡在這裡。

Kafka Rebalance 的影響

Kafka Rebalance 可以參考Consumer Rebalance.

KafkaSpout 實現了一個內部類用來監控 Group Rebalance 的情況,實現了兩個回調函數,一旦發現 group 的狀態變為 preparingRabalance 之後

  1. onPartitionsRevoked 這個方法會在 Consumer 停止拉取數據之後、group 進行 rebalance 操作之前調用,作用是對已經 ack 的 msg 進行 commit;
  2. onPartitionsAssigned 這個方法 group 已經進行 reassignment 之後,開始拉取數據之前調用,作用是清理記憶體中不屬於這個線程的 msg、獲取 partition 的 last committed offset。

潛在的風險點

這部分還是有可能導致數據重覆發送的,設想下麵一種情況:

如果之前由於一個條消息處理失敗(Partition 1),造成部分數據沒有 commit 成功,在進行 rebalance 後,恰好 Partition 1 被分配到其他 spout 線程時,那麼當前的 spout 就會關於 Partition 1 的相關數據刪除掉,導致部分已經 commit 成功的數據(記錄在 acked 中)被刪除,而另外的 spout 就會重新拉取這部分數據進行處理,那麼就會導致這部分已經成功處理的數據重覆處理


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 前言: 親身用了大半年,沒出過重大毛病,也就是服務掛了,跟伺服器也沒啥關係。如果想更深入學習cloudstack可以試試高級網路,我是一直用的簡單網路(扁平網路)。由來:CloudStack的前身是Cloud com,後被思傑收購。英特爾、阿爾卡特-朗迅、瞻博網路、博科等都已宣佈支持CloudSta ...
  • root@test:/# dpkg -l | grep cobbler root@test:/# sudo dpkg --purge cobbler ...
  • Centos使用AD賬戶進行驗證,網上查有很多種,包括samba+winbind,sssd,nss-pam-ldapd等多種方式。今天介紹通過nss-pam-ldap驗證AD賬號。 一.實驗環境: 兩台主機:一臺windows server2012 R2 域控,一臺centos7.2客戶端使用AD賬 ...
  • 操作系統 : CentOS7.3.1611_x64 go語言版本:1.8.3 linux/amd64 InfluxDB版本:1.1.0 服務模塊介紹 源碼路徑: github.com/influxdata/influxdb/services/snapshotter service.go : snap ...
  • 學習目標 描述使用ASM的好處 管理ASM實例 創建和刪除ASM磁碟組 擴展ASM磁碟組 通過使用各種實用程式檢索ASM元數據 ASM對於管理員的好處 使用ASM可以免除: -I/O性能優化:ASM採用條帶化和鏡像所有數據的策略,且執行自動重新平衡操作。 -數據文件移動和重新組織:不再需要更改數據文 ...
  • 轉自腳本之家: 看看下麵的1.判斷是否有註入;and 1=1;and 1=22.初步判斷是否是mssql;and user>03.判斷資料庫系統;and (select count(*) from sysobjects)>0 mssql;and (select count(*) from msyso ...
  • 讀取數據如下: http://www.dofactory.com/reference/connection-strings ...
  • 偶爾對比起2016以下的版本(比如ssms2014),ssms2016有一個小地方有區別。就是報錯的行號有區別 舉個例子,下麵同樣的語句在ssms2014和ssms2016裡面運行。就是如下的效果 顯而易見,這裡的行9和 xml 變數的定義都是有問題的。所以ssms2014裡面直接給出 這樣的提示錯 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...