Guaranteeing Message Processing Storm保證每一個tuple被完全處理。Strom中一個核心的機制是它提供了一種跟蹤tuple血統的能力,它使用了一種十分有效的方式跟蹤topology中的tuple。 Storm中最基本的抽象是提供了至少一次(at-least-on ...
Guaranteeing Message Processing
Storm保證每一個tuple被完全處理。Strom中一個核心的機制是它提供了一種跟蹤tuple血統的能力,它使用了一種十分有效的方式跟蹤topology中的tuple。
Storm中最基本的抽象是提供了至少一次(at-least-once)處理的保證,當你使用隊列系統的時候也可以提供相同的保證。
Messages are only replayed when there are failures.(消息只有在失敗的時候才會被重新投放)
Trident是在基本抽象之上的更高層面的抽象,它可以實現精準的只執行一次的處理。
對於保證消息處理,Storm提供了幾種不同的級別,包括盡最大努力、至少一次處理、精確的一次處理。( best effort、at least once、exactly once)
一個消息被“完全處理”是什麼意思?
從一個spout出來的一個tuple可能觸發上千個tuples。考慮下麵的例子:
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("sentences", new KestrelSpout("kestrel.backtype.com", 22133, "sentence_queue", new StringScheme())); builder.setBolt("split", new SplitSentence(), 10) .shuffleGrouping("sentences"); builder.setBolt("count", new WordCount(), 20) .fieldsGrouping("split", new Fields("word"));
這個topology讀取從一個Kestrel隊列出來的句子,並且將句子分割成單詞,然後發送每個單詞已經它們出現的次數。從一個spout出來的一個tuple可能觸發上千個tuples在這裡的意思是:每個單詞都是一個tuple,並且每個每個單詞都要更新它們出現的次數。這個例子中,消息樹可能看起來是這樣的:
當這個tuple樹被耗盡並且樹中每個消息都已經被處理了,那麼這個時候這個從一個spout出來的一個tuple被稱之為“完全處理”。當一個消息樹中的消息在指定的超時時間內沒有被完全處理,則認為這個tuple失敗了。預設的超時時間是30秒。
如果一個消息被完全處理或者失敗了將會發生什麼?
為了理解這個問題,讓我們看一下從spout處理的一個tuple的生命周期。(let's take a look at the lifecycle of a tuple coming off of a spout)
public interface ISpout extends Serializable { void open(Map conf, TopologyContext context, SpoutOutputCollector collector); void close(); void nextTuple(); void ack(Object msgId); void fail(Object msgId); }
首先,Storm通過在Spout中調用nextTuple方法從Spout中請求一個tuple。這個Spout使用open方法中提供的SpoutOutputCollector來發送一個tuple到其中一個streams。在發送一個tuple的時候,這個Spout提供一個“message id”用來表示這個tuple。發送消息的代碼可能看起來是這樣的:
_collector.emit(new Values("field1", "field2", 3) , msgId);
接下來,這個tuple達到bolt,並且Storm對於跟蹤這個tuple的消息樹很關心。如果Storm發現一個tuple被完全處理,那麼Storm將會調用原始的那個帶有message id的Spout中的ack方法。同樣的,如果一個tuple超時,那麼Storm將會調用那個Spout的fail方法。註意,一個tuple只能被創建它的那個Spout任務所acked或者failed。
讓我們用KestrelSpout再回顧一遍Spout是怎樣保證消息處理的。當KestrelSpout從Kestrel隊列中消費了一個消息以後,它“opens”這個消息。意思是這個消息並沒有實際的從隊列中刪除,代替的只是標記這個消息為“pending”狀態,等待消息被完成以後的確認。處於pending狀態的消息不會被髮送給隊列的其它消費者。另外,如果一個客戶端連接斷開了,那麼它所持有的所有pending狀態的消息將會被重新放回隊列。當一個消息被打開的時候,Kestrel提供這個消息數據給客戶端,並且給這個消息一個唯一的id。當發送tuple給SpoutOutputCollector的時候KestrelSpout用這個精確的id作為這個tuple的“message id”。隨後,當這個KestrelSpout上的ack或者fail被調用的時候,這個KestrelSpout發送ack或者fail消息給Kestrel,同時還帶上message id,以決定是從隊列中刪除這個消息還是將這個消息重新放回隊列。
Storm的可靠性API是什麼?
想要使用Storm的可靠性,有兩件事情是你必須做的。第一,無論何時你在為一個tuple樹創建一個新的鏈接的時候你必須告訴Storm。第二,你必須告訴Storm什麼時候一個單個的tuple算是完成處理了。通過做以上兩件事情,Storm就可以發現什麼時候這個tuple的樹是被完全處理了,並且在適當的時候ack或者fail這個tuple。Storm的API提供了一些簡潔的方式來做上面說的這些事情。
Specifying a link in the tuple tree is called anchoring. Anchoring is done at the same time you emit a new tuple.
讓我們用下麵這個bolt作為例子來具體看一下:
public class SplitSentence extends BaseRichBolt { OutputCollector _collector; public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = collector; } public void execute(Tuple tuple) { String sentence = tuple.getString(0); for(String word: sentence.split(" ")) { _collector.emit(tuple, new Values(word)); } _collector.ack(tuple); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
Each word tuple is anchored by specifying the input tuple as the first argument to emit
. Since the word tuple is anchored, the spout tuple at the root of the tree will be replayed later on if the word tuple failed to be processed downstream.(這段話的意思是,調用emit方法的時候指定它的第一個參數tuple的時候,這個被指定的tuple就被anchored了,一旦這個tuple被anchored以後它將作為它的消息tree的root,如果這個tuple在隨後處理失敗了,這個tuple會被重新投放)
與之相對的,我們再看一下下麵這種發送方式:
_collector.emit(new Values(word));
這種方式會造成這個unanchored,也就是說,用這種方式的話tuple就不能被anchored。如果這個tuple在下游被處理失敗了,那麼根tuple不會被重新投放。根據你的系統的容錯性要求來決定,有時候用一個unanchored的tuple也很合適。
一個輸出tuple可以被anchored到多個輸入tuple。在做stream的join或者聚集很有用。一個multi-anchored的tuple失敗會造成多個tuple被重放。例如:
List<Tuple> anchors = new ArrayList<Tuple>(); anchors.add(tuple1); anchors.add(tuple2); _collector.emit(anchors, new Values(1, 2, 3));
DAG和tree是一回事,只是在先前的版本中叫tree。
當你已經完成了對tuple樹中某一個tuple的處理的時候,你可以調用OutputCollector的ack或者fail方法。
你可以在OutputCollector中用fail方法來立即失敗這個tuple。通過明確失敗這個tuple,這個tuple可以被快速重放而不是等待這個tuple超時以後再重放。
你處理的每一個tuple都必須被acked或者failed。Storm用記憶體來跟蹤每一個tuple,因此如果你不ack/fail每一個tuple,那麼這個任務將會一直存在於記憶體中。
大量的bolts有相同的行為模式:它們讀取一個輸入tuple,然後基於這個tuple發送tuple,在execute方法的最後ack這個tuple。Storm有一個叫做BasicBolt的介面已經為你封裝好了這種模式。下麵這個例子就是:
public class SplitSentence extends BaseBasicBolt { public void execute(Tuple tuple, BasicOutputCollector collector) { String sentence = tuple.getString(0); for(String word: sentence.split(" ")) { collector.emit(new Values(word)); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
這種實現方式比前面的例子中那種方式更簡單,而且表達的語義完全相同。Tuple被髮送給BasicOutputCollector的時候自動被anchored,並且輸入tuple在execute方法執行完以後自動被acked。
考慮到tuple會被重放,我應該怎麼做才能使我的應用正常工作?
視情況而定
Storm如何有效地實現可靠性?
一個Storm topology有一組特殊的“acker”任務跟蹤每一個spout tuple的DAG(有向無環圖)。當一個acker看到一個DAG被完成的時候,它一個消息給那個創建這個spout tuple的spout task來確認這個消息。你可以設置一個topology的acker任務的數量,預設是每個woker一個這樣的task。
理解Storm的可靠性實現最好的方式就是看tuples的生命周期和tuple的DAG。當一個tuple在topology中被創建的時候,無論是在spout中還是在bolt中被創建,都會被指定一個64位的id。這些id被acker用來跟蹤每個spout tuple的DAG。
當你在一個bolt中發送一個新的tuple的時候,已經被ahchored過的那些tuple的id會被覆制到新的tuple中,因此,每一個tuple都知道它所在的tuple樹中所有spout tuples的id。當tuple被確認的時候,它給acker任務發一個消息告訴acker這個tuple樹怎樣變化的。特別的,它會告訴acker“我現在已經這個spout tuple的樹中完成了,這有一個新tuple被anchored”。
現在,你已經理解了可靠性的演算法,讓我們來複習一下所有失敗的情況,並且看一下對於每一中情況Strom是如何避免數據丟失的:
- A tuple isn't acked because the task died:這種情況下,失敗的那個tuple的tuple樹的根tuple將會超時並且重放
- Acker task dies:這種情況下,這個acker跟蹤的所有的spout tuple都將超時並且重放
- Spout task dies:這種情況下,由spout的源來負責消息的重放。例如:隊列想Kestrel和RabbitMQ在客戶端連接斷開的時候將所有pending狀態的消息重新放回隊列。
本節重點
1、一個tuple的tuple樹被耗盡並且樹中的每個消息都被處理,則稱這個tuple被完全處理。反之,沒有被完全處理則認為這個tuple失敗。預設處理超時時間是30秒。
2、Strom通過調用Spot中的nextTuple方法請求一個tuple。在Spout中發送一個tuple的時候,Spout提供一個"message id"用於標識這個tuple。
3、如果Storm發現一個tuple被完成處理,那麼將會在原始的那個Spout中調用ack方法,同時將message id作為參數。同樣的,如果tuple失敗,則調用那個Spout中的fail方法。註意,tuple被acked或者failed都是在創建這個tuple的那同一個Spout中調用。
4、Specifying a link in the tuple tree is called anchoring. Anchoring is done at the same time you emit a new tuple.
5、在Bolt中將特定的輸入tuple作為OutputCollector的emit方法第一個參數,那麼這個輸出Bolt是被anchored的,因而如果這個輸出tuple在下游被處理失敗的話,這個tuple所在的tuple樹的根tuple將會被重放。否則,如果沒有將輸入tuple作為第一個參數的話,則是unanchored,進而失敗的時候也不會被重放。
6、Storm topology有專門的task來跟蹤每一個spout tuple的DAG,這些特殊的task被叫做"acker"。預設每個worker一個acker。當acker發現一個DAG完成的時候,它發消息給創建這個spout tuple的spout task以此來確認這個消息。
7、在topology中,當一個tuple被創建的時候,都會被指定一個64位的id。acker用這個id來跟蹤每個tuple。
8、由於tuple可能會重放,因此在處理的時候需要根據message id去重
進一步精簡
每一個從Spout出來的tuple都帶著一個"message id",重放的時候會用到這個id。
每一個tuple在創建的時候都被指定了一個64位的id,在acker跟蹤tuple的DAG(也叫tuple tree)會用到。
每一個tuple被acked的時候,它會發一個消息告訴acker這個tuple tree發生了怎樣的改變
一個tuple從Spout出來,隨後會經歷很多個Bolt,在一步的Bolt中都可能產生新的tuple。因此,由這個原始的tuple開始將會衍生出很多tuple,這些tuple構成一個tuple tree(也叫tuple DAG)。只有當tree耗盡並且其中所有的tuple都處理完成,這個tuple才叫完全處理,否則tuple失敗。tuple完全處理後將會在原始那個Spout上調用ack方法,並且用message id作為參數,tuple失敗以後同樣在那個Spout上調用fail方法。當tuple tree中某一個tuple完成處理以後可以調用OutputCollector的ack方法確認,這個tuple便會從DAG中刪除。當acker發現DAG完成以後,發消息告訴那個Spout task來確認這個消息。
參考 http://storm.apache.org/releases/current/Guaranteeing-message-processing.html