KafkaSpout的核心邏輯PartitionManager

来源:http://www.cnblogs.com/intsmaze/archive/2016/10/13/5958199.html
-Advertisement-
Play Games

新浪微博:intsmaze劉洋洋哥 KafkaSpout的核心邏輯都是由PartitionManager來實現的。但是這個類實現時候需要考慮的東西有些多,0.92至0.93,至當前(2015.3.14)的master一直在變化。在這裡,先分析一下最近的發佈版0.93里的邏輯。也提出一些問題,希望以後 ...


新浪微博:intsmaze劉洋洋哥

 

KafkaSpout的核心邏輯都是由PartitionManager來實現的。

但是這個類實現時候需要考慮的東西有些多,0.92至0.93,至當前(2015.3.14)的master一直在變化。在這裡,先分析一下最近的發佈版0.93里的邏輯。也提出一些問題,希望以後Apache Storm會把這個類實現地更完美一些。

PartitionManager的主要功能

PartitionManager用來管理單個Partition。提供持久化讀取進度、讀取消息功能,並提供Storm的spout需要實現的nextTuple, fail, ack等功能。

實現PartitionManager需要考慮的問題

有一些問題是設計PartitionManager時必須考慮的,先把他們提一下,然後看下0.93版PartitionManager的實現。

關於批量讀取消息以及緩存消息

由於Kafka的實現細節(為了高吞吐而設計的log格式,通訊協議),Kafka的SimpleConsumer每次讀取消息是會去讀取一批,而不能指定響應想要包含的具體的offset,並且由於消息按批壓縮,使得每次讀取的響應包含的offset可能比想要的最小的offset還要小(但不會更大)。所以,對於PartitoinManager來說,在內部構造一個緩存,保存每次讀取得到的一批message是一種自然而且高效的方式。

允許有超過一個message處於pendding(已發送但沒有ack)狀態?

如果在發射一個message的tuple之後,就開始等待。那麼ack、fail、commit的邏輯就會很簡單。但這樣消息的處理效率會被極大的降低,且不說還可能使得下游bolt的一些task沒事可做。所以一定得允許多個message正在被blot處理,也就是需要有pendding messages的集合。

有了pendding的messages集合,ack, fail, commit的邏輯就變得比較複雜,且需要做出一些折衷。

    當有message對應的tuple失敗時,如何在處理其它正常的消息時,特殊處理失敗的這些message?
    如果有message產生的tuple在多次重覆後仍然失敗,應該怎麼做?丟棄它嗎?
    在Zookeeper中應該記錄什麼信息?
    如果下游的bolt處理的進度太慢怎麼辦?如何衡量處理速度是否達到需求?

PartitionManager的具體實現
在Zookeeper中記錄信息的意義:

下麵是PartitionManager的commit方法的主要部分:

Map<Object, Object> data = (Map<Object, Object>) ImmutableMap.builder()
                    .put("topology", ImmutableMap.of("id", _topologyInstanceId,
                            "name", _stormConf.get(Config.TOPOLOGY_NAME)))
                    .put("offset", lastCompletedOffset)
                    .put("partition", _partition.partition)
                    .put("broker", ImmutableMap.of("host", _partition.host.host,
                            "port", _partition.host.port))
                    .put("topic", _spoutConfig.topic).build();
 _state.writeJSON(committedPath(), data);

_committedTo = lastCompletedOffset;

    topology.id 記錄了這個topology實例的id。當PartitionManager的構造函數被調用時,它會從Zookeeper里獲取topology.id,以判斷當前的task是否跟記錄zookeeper里信息的是一個topology實例,如果不是,說明這是一個新提交的topology,這時,會判斷是否設置了forceFromStart,如果是同一個topology實例,就不理會forceFromStart
    topology.name topology的名字,這個目前沒用到。
    offset 在這個offset之前(不包括它)的所有消息都已經處理完成。
    partition partition id
    broker 此partition的leader
    topic partition所屬的topic。註意,在PartitionManager初始化時,它並沒有判斷這個spout task的topic跟記錄里的一致。所以,如果兩個topology, 有同樣的SpoutConfig.id,但是不同的topic,會引發混亂。

另外,這個JSON數據寫的路徑committedPath也是很重要的。PartitionManager初始化時,會從這個committedPath路徑讀取信息。

    private String committedPath() {
        return _spoutConfig.zkRoot + "/" + _spoutConfig.id + "/" + _partition.getId();
    }


所以,如果spoutConfig.id配置得不當,KafkaSpout就無法獲取正確的進度記錄。

另外,在所有記錄里,最重要的就是offset這個記錄。它的意義,使得PartitionManager不得不做出很多權衡。
PartitionManager用到的集合和記錄

    Long _emittedToOffset;
    SortedSet<Long> _pending = new TreeSet<Long>();
    SortedSet<Long> failed = new TreeSet<Long>();
    Long _committedTo;
    LinkedList<MessageAndRealOffset> _waitingToEmit = new LinkedList<MessageAndRealOffset>();
    long numberFailed, numberAcked;

    _pending 所有己讀取,但還沒有被ack的消息,都在這裡。
    failed 所有己經認定為failed的tuple來自的message的offset都在這裡
    _waitingToEmit 所有己經被讀取,但是還沒經過“解析,emit tuple"步驟的消息都在這。
    _emittedToOffset offset小於它的消息都已經被讀取了
    _comittedTo 所有offset小於它的消息都已被ack,或者由於某些原因不再需要被處理。

當PartitionManager的next方法被調用以emit新tuple時,它只會從_waitingToEmit取消息。那麼failed里的消息如何被再重試呢?原因在於_waitingToEmit為空時,next方法會調用fill方法,而fill方法會考慮到failed集合內的元素,不過是一種比較粗放的做法。
fill方法

fill方法的主要邏輯依次分為三個部分:

    判斷該從哪個offset開始,從Kafka抓取消息
    抓取消息,處理offset out of range 異常
    把抓取到的消息放到_waitingToEmit集合中,同時與failed集合與pendding集合交互。

第一部分:

        final boolean had_failed = !failed.isEmpty();
        // Are there failed tuples? If so, fetch those first.
        if (had_failed) {
            offset = failed.first();
        } else {
            offset = _emittedToOffset;
        }


這段代碼里,offset即是將要從Kafka里抓取消息的offset。當failed集合不為空時,就用failed集合的最小的offset做為下次要抓取的offset。

如果沒有failed消息,fill方法就會從之前讀取過的最大的offset繼續抓取。在知道了從何處抓取之後,開始真正的抓取過程:

     try {
            msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset);
        } catch (UpdateOffsetException e) { //如果是offset "out of range", 並且設置了useStartOffsetTimeIfOffsetOutOfRange
            _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, _spoutConfig);
            LOG.warn("Using new offset: {}", _emittedToOffset);
            // fetch failed, so don't update the metrics
            return;
        }


出現了UpdateOffsetException代表出現了這種情況:想要抓取的offset不在Kafka能提供的offset所在的範圍之內,並且已經在config里設置了useStartOffsetTimeIfOffsetOutOfRange為true。想要抓取的offset不在Kafka提供的範圍可能有幾種原因:這部分消息被Kafka的log retention功能給刪除了;leader變更,使得部分消息丟失(如果沒有設置ack為-1的話);以及其它異常。這時候,fill方法會調用KafkaUtils的getOffset方法,不過這個方法有些不符合useStartOffsetTimeIfOffsetOutOfRange的意思,即它並不是一定會從startOffsetTime配置中配置的offsetTime開始讀。

    public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config) {
        long startOffsetTime = kafka.api.OffsetRequest.LatestTime();
        if ( config.forceFromStart ) {
            startOffsetTime = config.startOffsetTime;
        }
        return getOffset(consumer, topic, partition, startOffsetTime);
    }


可以看出,如果沒有設置forceFromStart,那麼這個方法返回的offset將會是當前最大的offset,而忽略KafkaConfig中startOffsetTime的配置,使得PartitionManager直接跳到最新的消息開始處理。這樣乍一看莫名其妙,但是試想,如果startOffsetTime對應的offset也out of range呢,這樣KafkaSpout就陷入了死迴圈。而LatestOffsetTime()是始終存在的。但是,這樣做而沒有單獨的配置,也沒有日誌記錄說明這種權衡,會給用戶帶來麻煩。

在獲取fetch到消息以後,獲取的消息集可能會包含了各種例外情況,需要細緻處理:

   for (MessageAndOffset msg : msgs) {
                final Long cur_offset = msg.offset();
                if (cur_offset < offset) {
                    // Skip any old offsets.
                    continue;
                }
                if (!had_failed  failed.contains(cur_offset)) {
                    numMessages += 1;
                    _pending.add(cur_offset);//_pending表示已經讀取而未被ack的消息
                    _waitingToEmit.add(new MessageAndRealOffset(msg.message(), cur_offset));
                    _emittedToOffset = Math.max(msg.nextOffset(), _emittedToOffset);
                    if (had_failed) {//說明此消息在failed集合里。從failed集合里remove掉它,因為它被重新加入了_waitingToEmit集合,將要被重新處理。
                        failed.remove(cur_offset);//
                    }
                }


    首先,需要考慮到FetchRequest指定的是返回集中最小的offset A,但是,實際上Kafka只保證返回的消息集中包括了offset為A的消息,這個消息集中也可能包括了offset比A更小的消息(由於壓縮)。所以,fill方法首先要skip掉這些offset更小的消息
    如果failed集合為空,fill方法就把得到的消息集中所有offset大於A的消息加入_waitingToEmit集合,同時加入_pending集合。然後把_emittedToOffset設為當前讀取過的最大的offset。
    如果had_failed並且讀取到的消息在failed集合中,它在把這條消息加入_waitingToEmit集合與_pending集合後,還要把它從failed集合中去掉,否則這條消息就會永遠在failed集合里。只有在fill方法中,failed集合中的元素才可能被移除,加入到_waitingToEmit集合,使它有機會被重新emit。這使得如果failed集合不為空時,此次抓取的消息里,只有failed的消息會被重發。

通過對fill方法的分析可以看到,如果一個消息始終fail,除非在PartitionManager的其它方法中把它移除,否則它會使PartitionManager的處理進度停止。下麵將要看到,在fail和ack方法中,這樣一直fail的消息還是有機會被丟棄的,但這取決於你的配置,而這些配置是很微妙的。
ack方法

ack方法的主要功能是把消息從_pending集合中去掉,表示這個消息處理完成。從_pending集合去除,PartitionManager才能獲取正確的處理進度的信息,以更新Zookeeper里的記錄。但是,它還有別的作用。

    public void ack(Long offset) {
        if (!_pending.isEmpty() && _pending.first() < offset - _spoutConfig.maxOffsetBehind) {
            // Too many things pending! 已讀取但未確認的消息太多了,就把此次確認的offset - maxOffsetBehind之前的清除了
            _pending.headSet(offset - _spoutConfig.maxOffsetBehind).clear();
        }
        _pending.remove(offset);//從_pending集合中移除它,表示這個消息已被處理
        numberAcked++;
    }

當一個offset被ack時,ack方法會把所有小於offset - _spoutConfig.maxOffsetBehind的消息從_pending中移除。也就是說,即使這些被移除的消息失敗了,也認為他們處理成功,使得在Zookeeper中記錄的進度忽略這些被移除的消息。所以,假如task重啟,那麼這些失敗但被移除出_pending集合的消息就不會被再處理。那麼,這些失敗了的消息,當Storm的acker發現它們處理失敗,會發生什麼呢?這由fail方法決定。
fail方法

public void fail(Long offset) {
        if (offset < _emittedToOffset - _spoutConfig.maxOffsetBehind) {
            LOG.info(
                    "Skipping failed tuple at offset=" + offset +
                            " because it's more than maxOffsetBehind=" + _spoutConfig.maxOffsetBehind +
                            " behind _emittedToOffset=" + _emittedToOffset
            );
        } else {
            LOG.debug("failing at offset=" + offset + " with _pending.size()=" + _pending.size() + " pending and _emittedToOffset=" + _emittedToOffset);
            failed.add(offset);
            numberFailed++;
            if (numberAcked == 0 && numberFailed > _spoutConfig.maxOffsetBehind) {
                throw new RuntimeException("Too many tuple failures");
            }
        }
    }

當一個消息對應的tuple被fail時,fail方法會首先判斷這個消息是否落後太多。如果它的offset小於(當前讀取的最大offset - maxOffsetBehind), 那麼就不把它加到failed集合里,使得它很可能不會被重新處理。如果不落後太多,就把它加到failed集合里,使得它可以被重新處理。如果沒有消息ack,並且總的failed次數大於maxOffsetBehind,就拋出異常,代表PartitionManager工作出錯,而這種情況只有在處理第一批消息並且這批消息的個數大於maxOffsetBehind時才可能發生。這樣,有可能在某些情況下,使得PartitionManager卡住,但不會有異常。而且用numberFailed與spoutConfig.maxOffsetBehind比較,有些令人莫名其秒。
commit方法

commit方法被調用時,會調用lastCompletedOffset方法獲取當前PartitionManager處理的進度,並且將這個進度持久化。這個“進度”,是說在此之前的所有消息都已被ack,或“不需要ack”, 總之,是說這些消息已處理完畢。

    public long lastCompletedOffset() {
        if (_pending.isEmpty()) {
            return _emittedToOffset;
        } else {
            return _pending.first();
        }
    }

在此,體現了_pending的作用。_pend中最小的元素,代表之前的元素都已處理完成。如果_pending為空,說明所有已被讀取的元素都已處理完畢。

陷阱

failed方法,使得PartitonManager的有些行為非常隱晦。結合ack、fill和commit方法,可能會出現以下特殊情況,這些情況和KafkaConfig.maxOffBehind配置,及KafkaConfig.useStartOffsetTimeIfOffsetOutOfRange配置、KafkaConfig.fetchSizeBytes配置相關。

    maxOffsetBehind設置得較小,而fetchSizeBytes相對較大,使得maxOffsetBehind小於一次fetch得到的消息總數。設這批fetch得到的消息的offset範圍為[a, b],那麼所有小於(b - maxOffsetBehind)的offset的消息,即使處理失敗,也不會被重試。設這樣失敗,但不會被重試的消息中的某個的offset為X, 那麼如果某個大於( X + maxOffsetBehind)的消息被ack時,offset為X的這個消息會被從_pending集合中移除。但是如果所有大於(X + maxOffsetBehind)的消息都被fail了,而在(_emmittedToOffset與_emittedToOffset - maxOffsetBehind之間) 有消息failed了,那麼failed集合中不會包括X,但會包括比X的offset大的元素,X不會被重試,但X會一直停留在_pending集合,造成commit無法更新實際進度,並且帶來記憶體泄漏。
    如果maxOffsetBehind比較大,就可能有failed的消息永遠不會被忽略,而會一直重試,直到它成功。所以必須保證每個tuple最後都被ack。而fill方法使得在這個failed的消息中的所有tuple都被ack之前,PartitionManager無法處理後續的消息。這樣後續的blot必須保證每個tuple都被ack(或者自己加入邏輯判斷是否發生了這種有消息一直被重發的情況),否則這個partition就會卡在fail的這個消息上。
    如果把useStartOffsetTimeIfOffsetOutOfRange設為true,同時forceFromStart設為false, 而startOffsetTime不為LatestTime,那麼PartitonManager想要獲取的消息out of range時,它會直接跳到LatestTime開始處理消息,而不會從startOffsetTime開始。這可能發生在Kafka進行了log retention之後。
    如果一條消息被拆成多個tuple發送,那麼只要其中有一個tuple處理失敗,這條消息產生的所有tuple就可能被重新發送。

總之,當前PartitionManager的實現還有很多需要改進之處,而且有些情況容易給用戶帶來困擾。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 在實際開發中可能需要用到兩個Service相互監視的情況,本示例就是實現此功能以作參考。 服務B: IBridgeInterface.aidl 界面: AndroidManifest.xml 由於涉及到跨進程,onServiceConnected() 方法中使用 而不能直接類型轉換 onStartC ...
  • package com.sinostride.smartcity.util;import java.io.UnsupportedEncodingException;/** * Created by lhd on 2016/10/12. * 基本類型轉換的工具類 */ public class Bas ...
  • JS調用OC 很多應用裡面或多或少的調用了網頁,來達到絢麗的效果,所謂的JS調用OC.....舉個例子吧,網頁上有個按鈕 點擊按鈕跳轉界面,跳轉的動作由OC的代碼實現。 OC調用JS 還是舉個例子,我們OC代碼創建了輸入框比如輸入用戶名,輸入完成後顯示在網頁上,顯示用戶的用戶名 一.利用webVie ...
  • 在知乎的“Android自定義控制項,你們是如何系統學習的?”問題中看到的一個答案,答主為GcsSloop,他的博客上有一系列的自定義View學習博文,今天閱讀並實踐了部分,對自定義View有更完整的認知。推薦沒有系統瞭解的Android開發者閱讀。在這裡不做內容的搬運工了,請到作者博客查看。//其中 ...
  • 本文實現如下幾個界面之間的平移動畫實現 本文地址:http://www.cnblogs.com/wuyudong/p/5954847.html,轉載請註明出處。 分析: 導航界面移動過程中,平移動畫 上一頁移入動畫 (-屏幕寬度,y) >(0,y) 上一頁移出動畫 (0,y) >(屏幕寬度,y) 下 ...
  • 先提一下需求,用一個自定義EditText實現禁止輸入表情。談一下自定義控制項中自定義屬性的定義和使用方式。 第一步當然是自定義類EditTextNoEmoji繼承EditText,同時重寫三個構造方法。這裡需要註意的是,三個構造方法中必須分別實現父類對應的構造方法,就是三個super();而不能出現 ...
  • 最近遇到一個問題,用戶數據丟失,拿到用戶資料庫文件以後,發現資料庫損壞。database disk image is malformed因此希望可以找到一種方法,可以檢測出來資料庫是否損壞,經過google,找到了一種方法,先記錄下來。+ (BOOL)checkIntegrity { NSStrin... ...
  • 目錄: 數學函數 統計函數 應用示例 控制流 數學函數 ceiling(x): 大於等於 x 的最小整數, 如: ceiling(3.213) --> 4 floor(x): 小於等於 x 的最大整數,如: floor(3.6534) --> 3 trunc(x): 取x的整數部分, 如: trun ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...