新浪微博:intsmaze劉洋洋哥 KafkaSpout的核心邏輯都是由PartitionManager來實現的。但是這個類實現時候需要考慮的東西有些多,0.92至0.93,至當前(2015.3.14)的master一直在變化。在這裡,先分析一下最近的發佈版0.93里的邏輯。也提出一些問題,希望以後 ...
新浪微博:intsmaze劉洋洋哥
KafkaSpout的核心邏輯都是由PartitionManager來實現的。
但是這個類實現時候需要考慮的東西有些多,0.92至0.93,至當前(2015.3.14)的master一直在變化。在這裡,先分析一下最近的發佈版0.93里的邏輯。也提出一些問題,希望以後Apache Storm會把這個類實現地更完美一些。
PartitionManager的主要功能
PartitionManager用來管理單個Partition。提供持久化讀取進度、讀取消息功能,並提供Storm的spout需要實現的nextTuple, fail, ack等功能。
實現PartitionManager需要考慮的問題
有一些問題是設計PartitionManager時必須考慮的,先把他們提一下,然後看下0.93版PartitionManager的實現。
關於批量讀取消息以及緩存消息
由於Kafka的實現細節(為了高吞吐而設計的log格式,通訊協議),Kafka的SimpleConsumer每次讀取消息是會去讀取一批,而不能指定響應想要包含的具體的offset,並且由於消息按批壓縮,使得每次讀取的響應包含的offset可能比想要的最小的offset還要小(但不會更大)。所以,對於PartitoinManager來說,在內部構造一個緩存,保存每次讀取得到的一批message是一種自然而且高效的方式。
允許有超過一個message處於pendding(已發送但沒有ack)狀態?
如果在發射一個message的tuple之後,就開始等待。那麼ack、fail、commit的邏輯就會很簡單。但這樣消息的處理效率會被極大的降低,且不說還可能使得下游bolt的一些task沒事可做。所以一定得允許多個message正在被blot處理,也就是需要有pendding messages的集合。
有了pendding的messages集合,ack, fail, commit的邏輯就變得比較複雜,且需要做出一些折衷。
當有message對應的tuple失敗時,如何在處理其它正常的消息時,特殊處理失敗的這些message?
如果有message產生的tuple在多次重覆後仍然失敗,應該怎麼做?丟棄它嗎?
在Zookeeper中應該記錄什麼信息?
如果下游的bolt處理的進度太慢怎麼辦?如何衡量處理速度是否達到需求?
PartitionManager的具體實現
在Zookeeper中記錄信息的意義:
下麵是PartitionManager的commit方法的主要部分:
Map<Object, Object> data = (Map<Object, Object>) ImmutableMap.builder() .put("topology", ImmutableMap.of("id", _topologyInstanceId, "name", _stormConf.get(Config.TOPOLOGY_NAME))) .put("offset", lastCompletedOffset) .put("partition", _partition.partition) .put("broker", ImmutableMap.of("host", _partition.host.host, "port", _partition.host.port)) .put("topic", _spoutConfig.topic).build(); _state.writeJSON(committedPath(), data); _committedTo = lastCompletedOffset;
topology.id 記錄了這個topology實例的id。當PartitionManager的構造函數被調用時,它會從Zookeeper里獲取topology.id,以判斷當前的task是否跟記錄zookeeper里信息的是一個topology實例,如果不是,說明這是一個新提交的topology,這時,會判斷是否設置了forceFromStart,如果是同一個topology實例,就不理會forceFromStart
topology.name topology的名字,這個目前沒用到。
offset 在這個offset之前(不包括它)的所有消息都已經處理完成。
partition partition id
broker 此partition的leader
topic partition所屬的topic。註意,在PartitionManager初始化時,它並沒有判斷這個spout task的topic跟記錄里的一致。所以,如果兩個topology, 有同樣的SpoutConfig.id,但是不同的topic,會引發混亂。
另外,這個JSON數據寫的路徑committedPath也是很重要的。PartitionManager初始化時,會從這個committedPath路徑讀取信息。
private String committedPath() { return _spoutConfig.zkRoot + "/" + _spoutConfig.id + "/" + _partition.getId(); }
所以,如果spoutConfig.id配置得不當,KafkaSpout就無法獲取正確的進度記錄。
另外,在所有記錄里,最重要的就是offset這個記錄。它的意義,使得PartitionManager不得不做出很多權衡。
PartitionManager用到的集合和記錄
Long _emittedToOffset; SortedSet<Long> _pending = new TreeSet<Long>(); SortedSet<Long> failed = new TreeSet<Long>(); Long _committedTo; LinkedList<MessageAndRealOffset> _waitingToEmit = new LinkedList<MessageAndRealOffset>(); long numberFailed, numberAcked;
_pending 所有己讀取,但還沒有被ack的消息,都在這裡。
failed 所有己經認定為failed的tuple來自的message的offset都在這裡
_waitingToEmit 所有己經被讀取,但是還沒經過“解析,emit tuple"步驟的消息都在這。
_emittedToOffset offset小於它的消息都已經被讀取了
_comittedTo 所有offset小於它的消息都已被ack,或者由於某些原因不再需要被處理。
當PartitionManager的next方法被調用以emit新tuple時,它只會從_waitingToEmit取消息。那麼failed里的消息如何被再重試呢?原因在於_waitingToEmit為空時,next方法會調用fill方法,而fill方法會考慮到failed集合內的元素,不過是一種比較粗放的做法。
fill方法
fill方法的主要邏輯依次分為三個部分:
判斷該從哪個offset開始,從Kafka抓取消息
抓取消息,處理offset out of range 異常
把抓取到的消息放到_waitingToEmit集合中,同時與failed集合與pendding集合交互。
第一部分:
final boolean had_failed = !failed.isEmpty(); // Are there failed tuples? If so, fetch those first. if (had_failed) { offset = failed.first(); } else { offset = _emittedToOffset; }
這段代碼里,offset即是將要從Kafka里抓取消息的offset。當failed集合不為空時,就用failed集合的最小的offset做為下次要抓取的offset。
如果沒有failed消息,fill方法就會從之前讀取過的最大的offset繼續抓取。在知道了從何處抓取之後,開始真正的抓取過程:
try { msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset); } catch (UpdateOffsetException e) { //如果是offset "out of range", 並且設置了useStartOffsetTimeIfOffsetOutOfRange _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, _spoutConfig); LOG.warn("Using new offset: {}", _emittedToOffset); // fetch failed, so don't update the metrics return; }
出現了UpdateOffsetException代表出現了這種情況:想要抓取的offset不在Kafka能提供的offset所在的範圍之內,並且已經在config里設置了useStartOffsetTimeIfOffsetOutOfRange為true。想要抓取的offset不在Kafka提供的範圍可能有幾種原因:這部分消息被Kafka的log retention功能給刪除了;leader變更,使得部分消息丟失(如果沒有設置ack為-1的話);以及其它異常。這時候,fill方法會調用KafkaUtils的getOffset方法,不過這個方法有些不符合useStartOffsetTimeIfOffsetOutOfRange的意思,即它並不是一定會從startOffsetTime配置中配置的offsetTime開始讀。
public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config) { long startOffsetTime = kafka.api.OffsetRequest.LatestTime(); if ( config.forceFromStart ) { startOffsetTime = config.startOffsetTime; } return getOffset(consumer, topic, partition, startOffsetTime); }
可以看出,如果沒有設置forceFromStart,那麼這個方法返回的offset將會是當前最大的offset,而忽略KafkaConfig中startOffsetTime的配置,使得PartitionManager直接跳到最新的消息開始處理。這樣乍一看莫名其妙,但是試想,如果startOffsetTime對應的offset也out of range呢,這樣KafkaSpout就陷入了死迴圈。而LatestOffsetTime()是始終存在的。但是,這樣做而沒有單獨的配置,也沒有日誌記錄說明這種權衡,會給用戶帶來麻煩。
在獲取fetch到消息以後,獲取的消息集可能會包含了各種例外情況,需要細緻處理:
for (MessageAndOffset msg : msgs) { final Long cur_offset = msg.offset(); if (cur_offset < offset) { // Skip any old offsets. continue; } if (!had_failed failed.contains(cur_offset)) { numMessages += 1; _pending.add(cur_offset);//_pending表示已經讀取而未被ack的消息 _waitingToEmit.add(new MessageAndRealOffset(msg.message(), cur_offset)); _emittedToOffset = Math.max(msg.nextOffset(), _emittedToOffset); if (had_failed) {//說明此消息在failed集合里。從failed集合里remove掉它,因為它被重新加入了_waitingToEmit集合,將要被重新處理。 failed.remove(cur_offset);// } }
首先,需要考慮到FetchRequest指定的是返回集中最小的offset A,但是,實際上Kafka只保證返回的消息集中包括了offset為A的消息,這個消息集中也可能包括了offset比A更小的消息(由於壓縮)。所以,fill方法首先要skip掉這些offset更小的消息
如果failed集合為空,fill方法就把得到的消息集中所有offset大於A的消息加入_waitingToEmit集合,同時加入_pending集合。然後把_emittedToOffset設為當前讀取過的最大的offset。
如果had_failed並且讀取到的消息在failed集合中,它在把這條消息加入_waitingToEmit集合與_pending集合後,還要把它從failed集合中去掉,否則這條消息就會永遠在failed集合里。只有在fill方法中,failed集合中的元素才可能被移除,加入到_waitingToEmit集合,使它有機會被重新emit。這使得如果failed集合不為空時,此次抓取的消息里,只有failed的消息會被重發。
通過對fill方法的分析可以看到,如果一個消息始終fail,除非在PartitionManager的其它方法中把它移除,否則它會使PartitionManager的處理進度停止。下麵將要看到,在fail和ack方法中,這樣一直fail的消息還是有機會被丟棄的,但這取決於你的配置,而這些配置是很微妙的。
ack方法
ack方法的主要功能是把消息從_pending集合中去掉,表示這個消息處理完成。從_pending集合去除,PartitionManager才能獲取正確的處理進度的信息,以更新Zookeeper里的記錄。但是,它還有別的作用。
public void ack(Long offset) { if (!_pending.isEmpty() && _pending.first() < offset - _spoutConfig.maxOffsetBehind) { // Too many things pending! 已讀取但未確認的消息太多了,就把此次確認的offset - maxOffsetBehind之前的清除了 _pending.headSet(offset - _spoutConfig.maxOffsetBehind).clear(); } _pending.remove(offset);//從_pending集合中移除它,表示這個消息已被處理 numberAcked++; }
當一個offset被ack時,ack方法會把所有小於offset - _spoutConfig.maxOffsetBehind的消息從_pending中移除。也就是說,即使這些被移除的消息失敗了,也認為他們處理成功,使得在Zookeeper中記錄的進度忽略這些被移除的消息。所以,假如task重啟,那麼這些失敗但被移除出_pending集合的消息就不會被再處理。那麼,這些失敗了的消息,當Storm的acker發現它們處理失敗,會發生什麼呢?這由fail方法決定。
fail方法
public void fail(Long offset) { if (offset < _emittedToOffset - _spoutConfig.maxOffsetBehind) { LOG.info( "Skipping failed tuple at offset=" + offset + " because it's more than maxOffsetBehind=" + _spoutConfig.maxOffsetBehind + " behind _emittedToOffset=" + _emittedToOffset ); } else { LOG.debug("failing at offset=" + offset + " with _pending.size()=" + _pending.size() + " pending and _emittedToOffset=" + _emittedToOffset); failed.add(offset); numberFailed++; if (numberAcked == 0 && numberFailed > _spoutConfig.maxOffsetBehind) { throw new RuntimeException("Too many tuple failures"); } } }
當一個消息對應的tuple被fail時,fail方法會首先判斷這個消息是否落後太多。如果它的offset小於(當前讀取的最大offset - maxOffsetBehind), 那麼就不把它加到failed集合里,使得它很可能不會被重新處理。如果不落後太多,就把它加到failed集合里,使得它可以被重新處理。如果沒有消息ack,並且總的failed次數大於maxOffsetBehind,就拋出異常,代表PartitionManager工作出錯,而這種情況只有在處理第一批消息並且這批消息的個數大於maxOffsetBehind時才可能發生。這樣,有可能在某些情況下,使得PartitionManager卡住,但不會有異常。而且用numberFailed與spoutConfig.maxOffsetBehind比較,有些令人莫名其秒。
commit方法
commit方法被調用時,會調用lastCompletedOffset方法獲取當前PartitionManager處理的進度,並且將這個進度持久化。這個“進度”,是說在此之前的所有消息都已被ack,或“不需要ack”, 總之,是說這些消息已處理完畢。
public long lastCompletedOffset() { if (_pending.isEmpty()) { return _emittedToOffset; } else { return _pending.first(); } }
在此,體現了_pending的作用。_pend中最小的元素,代表之前的元素都已處理完成。如果_pending為空,說明所有已被讀取的元素都已處理完畢。
陷阱
failed方法,使得PartitonManager的有些行為非常隱晦。結合ack、fill和commit方法,可能會出現以下特殊情況,這些情況和KafkaConfig.maxOffBehind配置,及KafkaConfig.useStartOffsetTimeIfOffsetOutOfRange配置、KafkaConfig.fetchSizeBytes配置相關。
maxOffsetBehind設置得較小,而fetchSizeBytes相對較大,使得maxOffsetBehind小於一次fetch得到的消息總數。設這批fetch得到的消息的offset範圍為[a, b],那麼所有小於(b - maxOffsetBehind)的offset的消息,即使處理失敗,也不會被重試。設這樣失敗,但不會被重試的消息中的某個的offset為X, 那麼如果某個大於( X + maxOffsetBehind)的消息被ack時,offset為X的這個消息會被從_pending集合中移除。但是如果所有大於(X + maxOffsetBehind)的消息都被fail了,而在(_emmittedToOffset與_emittedToOffset - maxOffsetBehind之間) 有消息failed了,那麼failed集合中不會包括X,但會包括比X的offset大的元素,X不會被重試,但X會一直停留在_pending集合,造成commit無法更新實際進度,並且帶來記憶體泄漏。
如果maxOffsetBehind比較大,就可能有failed的消息永遠不會被忽略,而會一直重試,直到它成功。所以必須保證每個tuple最後都被ack。而fill方法使得在這個failed的消息中的所有tuple都被ack之前,PartitionManager無法處理後續的消息。這樣後續的blot必須保證每個tuple都被ack(或者自己加入邏輯判斷是否發生了這種有消息一直被重發的情況),否則這個partition就會卡在fail的這個消息上。
如果把useStartOffsetTimeIfOffsetOutOfRange設為true,同時forceFromStart設為false, 而startOffsetTime不為LatestTime,那麼PartitonManager想要獲取的消息out of range時,它會直接跳到LatestTime開始處理消息,而不會從startOffsetTime開始。這可能發生在Kafka進行了log retention之後。
如果一條消息被拆成多個tuple發送,那麼只要其中有一個tuple處理失敗,這條消息產生的所有tuple就可能被重新發送。
總之,當前PartitionManager的實現還有很多需要改進之處,而且有些情況容易給用戶帶來困擾。