Storm保證消息處理

来源:https://www.cnblogs.com/cjsblog/archive/2018/02/01/8400167.html
-Advertisement-
Play Games

Guaranteeing Message Processing Storm保證每一個tuple被完全處理。Strom中一個核心的機制是它提供了一種跟蹤tuple血統的能力,它使用了一種十分有效的方式跟蹤topology中的tuple。 Storm中最基本的抽象是提供了至少一次(at-least-on ...


Guaranteeing Message Processing

Storm保證每一個tuple被完全處理。Strom中一個核心的機制是它提供了一種跟蹤tuple血統的能力,它使用了一種十分有效的方式跟蹤topology中的tuple。

Storm中最基本的抽象是提供了至少一次(at-least-once)處理的保證,當你使用隊列系統的時候也可以提供相同的保證。

Messages are only replayed when there are failures.(消息只有在失敗的時候才會被重新投放)

Trident是在基本抽象之上的更高層面的抽象,它可以實現精準的只執行一次的處理。

對於保證消息處理,Storm提供了幾種不同的級別,包括盡最大努力、至少一次處理、精確的一次處理。( best effort、at least once、exactly once)

一個消息被“完全處理”是什麼意思?

從一個spout出來的一個tuple可能觸發上千個tuples。考慮下麵的例子:

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentences", new KestrelSpout("kestrel.backtype.com",
                                               22133,
                                               "sentence_queue",
                                               new StringScheme()));
builder.setBolt("split", new SplitSentence(), 10)
        .shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 20)
        .fieldsGrouping("split", new Fields("word"));

這個topology讀取從一個Kestrel隊列出來的句子,並且將句子分割成單詞,然後發送每個單詞已經它們出現的次數。從一個spout出來的一個tuple可能觸發上千個tuples在這裡的意思是:每個單詞都是一個tuple,並且每個每個單詞都要更新它們出現的次數。這個例子中,消息樹可能看起來是這樣的:

當這個tuple樹被耗盡並且樹中每個消息都已經被處理了,那麼這個時候這個從一個spout出來的一個tuple被稱之為“完全處理”。當一個消息樹中的消息在指定的超時時間內沒有被完全處理,則認為這個tuple失敗了。預設的超時時間是30秒。

如果一個消息被完全處理或者失敗了將會發生什麼?

為了理解這個問題,讓我們看一下從spout處理的一個tuple的生命周期。(let's take a look at the lifecycle of a tuple coming off of a spout)

public interface ISpout extends Serializable {
    void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
    void close();
    void nextTuple();
    void ack(Object msgId);
    void fail(Object msgId);
}

首先,Storm通過在Spout中調用nextTuple方法從Spout中請求一個tuple。這個Spout使用open方法中提供的SpoutOutputCollector來發送一個tuple到其中一個streams。在發送一個tuple的時候,這個Spout提供一個“message id”用來表示這個tuple。發送消息的代碼可能看起來是這樣的:

_collector.emit(new Values("field1", "field2", 3) , msgId);

接下來,這個tuple達到bolt,並且Storm對於跟蹤這個tuple的消息樹很關心。如果Storm發現一個tuple被完全處理,那麼Storm將會調用原始的那個帶有message id的Spout中的ack方法。同樣的,如果一個tuple超時,那麼Storm將會調用那個Spoutfail方法。註意,一個tuple只能被創建它的那個Spout任務所acked或者failed。

讓我們用KestrelSpout再回顧一遍Spout是怎樣保證消息處理的。當KestrelSpout從Kestrel隊列中消費了一個消息以後,它“opens”這個消息。意思是這個消息並沒有實際的從隊列中刪除,代替的只是標記這個消息為“pending”狀態,等待消息被完成以後的確認。處於pending狀態的消息不會被髮送給隊列的其它消費者。另外,如果一個客戶端連接斷開了,那麼它所持有的所有pending狀態的消息將會被重新放回隊列。當一個消息被打開的時候,Kestrel提供這個消息數據給客戶端,並且給這個消息一個唯一的id。當發送tuple給SpoutOutputCollector的時候KestrelSpout用這個精確的id作為這個tuple的“message id”。隨後,當這個KestrelSpout上的ack或者fail被調用的時候,這個KestrelSpout發送ack或者fail消息給Kestrel,同時還帶上message id,以決定是從隊列中刪除這個消息還是將這個消息重新放回隊列。

Storm的可靠性API是什麼?

想要使用Storm的可靠性,有兩件事情是你必須做的。第一,無論何時你在為一個tuple樹創建一個新的鏈接的時候你必須告訴Storm。第二,你必須告訴Storm什麼時候一個單個的tuple算是完成處理了。通過做以上兩件事情,Storm就可以發現什麼時候這個tuple的樹是被完全處理了,並且在適當的時候ack或者fail這個tuple。Storm的API提供了一些簡潔的方式來做上面說的這些事情。

Specifying a link in the tuple tree is called anchoring. Anchoring is done at the same time you emit a new tuple.

讓我們用下麵這個bolt作為例子來具體看一下:

public class SplitSentence extends BaseRichBolt {
        OutputCollector _collector;

        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            _collector = collector;
        }

        public void execute(Tuple tuple) {
            String sentence = tuple.getString(0);
            for(String word: sentence.split(" ")) {
                _collector.emit(tuple, new Values(word));
            }
            _collector.ack(tuple);
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }        
    }

Each word tuple is anchored by specifying the input tuple as the first argument to emit. Since the word tuple is anchored, the spout tuple at the root of the tree will be replayed later on if the word tuple failed to be processed downstream.(這段話的意思是,調用emit方法的時候指定它的第一個參數tuple的時候,這個被指定的tuple就被anchored了,一旦這個tuple被anchored以後它將作為它的消息tree的root,如果這個tuple在隨後處理失敗了,這個tuple會被重新投放)

與之相對的,我們再看一下下麵這種發送方式:

_collector.emit(new Values(word));

這種方式會造成這個unanchored,也就是說,用這種方式的話tuple就不能被anchored。如果這個tuple在下游被處理失敗了,那麼根tuple不會被重新投放。根據你的系統的容錯性要求來決定,有時候用一個unanchored的tuple也很合適。

一個輸出tuple可以被anchored到多個輸入tuple。在做stream的join或者聚集很有用。一個multi-anchored的tuple失敗會造成多個tuple被重放。例如:

List<Tuple> anchors = new ArrayList<Tuple>();
anchors.add(tuple1);
anchors.add(tuple2);
_collector.emit(anchors, new Values(1, 2, 3));

DAG和tree是一回事,只是在先前的版本中叫tree。

當你已經完成了對tuple樹中某一個tuple的處理的時候,你可以調用OutputCollectorack或者fail方法。

你可以在OutputCollector中用fail方法來立即失敗這個tuple。通過明確失敗這個tuple,這個tuple可以被快速重放而不是等待這個tuple超時以後再重放。

你處理的每一個tuple都必須被acked或者failed。Storm用記憶體來跟蹤每一個tuple,因此如果你不ack/fail每一個tuple,那麼這個任務將會一直存在於記憶體中。

大量的bolts有相同的行為模式:它們讀取一個輸入tuple,然後基於這個tuple發送tuple,在execute方法的最後ack這個tuple。Storm有一個叫做BasicBolt的介面已經為你封裝好了這種模式。下麵這個例子就是:

public class SplitSentence extends BaseBasicBolt {
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String sentence = tuple.getString(0);
            for(String word: sentence.split(" ")) {
                collector.emit(new Values(word));
            }
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }        
 }

這種實現方式比前面的例子中那種方式更簡單,而且表達的語義完全相同。Tuple被髮送給BasicOutputCollector的時候自動被anchored,並且輸入tuple在execute方法執行完以後自動被acked。

考慮到tuple會被重放,我應該怎麼做才能使我的應用正常工作?

視情況而定

Storm如何有效地實現可靠性?

一個Storm topology有一組特殊的“acker”任務跟蹤每一個spout tuple的DAG(有向無環圖)。當一個acker看到一個DAG被完成的時候,它一個消息給那個創建這個spout tuple的spout task來確認這個消息。你可以設置一個topology的acker任務的數量,預設是每個woker一個這樣的task。

理解Storm的可靠性實現最好的方式就是看tuples的生命周期和tuple的DAG。當一個tuple在topology中被創建的時候,無論是在spout中還是在bolt中被創建,都會被指定一個64位的id。這些id被acker用來跟蹤每個spout tuple的DAG。

當你在一個bolt中發送一個新的tuple的時候,已經被ahchored過的那些tuple的id會被覆制到新的tuple中,因此,每一個tuple都知道它所在的tuple樹中所有spout tuples的id。當tuple被確認的時候,它給acker任務發一個消息告訴acker這個tuple樹怎樣變化的。特別的,它會告訴acker“我現在已經這個spout tuple的樹中完成了,這有一個新tuple被anchored”。

現在,你已經理解了可靠性的演算法,讓我們來複習一下所有失敗的情況,並且看一下對於每一中情況Strom是如何避免數據丟失的:

  • A tuple isn't acked because the task died:這種情況下,失敗的那個tuple的tuple樹的根tuple將會超時並且重放
  • Acker task dies:這種情況下,這個acker跟蹤的所有的spout tuple都將超時並且重放
  • Spout task dies:這種情況下,由spout的源來負責消息的重放。例如:隊列想Kestrel和RabbitMQ在客戶端連接斷開的時候將所有pending狀態的消息重新放回隊列。

 

本節重點

1、一個tuple的tuple樹被耗盡並且樹中的每個消息都被處理,則稱這個tuple被完全處理。反之,沒有被完全處理則認為這個tuple失敗。預設處理超時時間是30秒。

2、Strom通過調用Spot中的nextTuple方法請求一個tuple。在Spout中發送一個tuple的時候,Spout提供一個"message id"用於標識這個tuple。

3、如果Storm發現一個tuple被完成處理,那麼將會在原始的那個Spout中調用ack方法,同時將message id作為參數。同樣的,如果tuple失敗,則調用那個Spout中的fail方法。註意,tuple被acked或者failed都是在創建這個tuple的那同一個Spout中調用。

4、Specifying a link in the tuple tree is called anchoring. Anchoring is done at the same time you emit a new tuple. 

5、在Bolt中將特定的輸入tuple作為OutputCollector的emit方法第一個參數,那麼這個輸出Bolt是被anchored的,因而如果這個輸出tuple在下游被處理失敗的話,這個tuple所在的tuple樹的根tuple將會被重放。否則,如果沒有將輸入tuple作為第一個參數的話,則是unanchored,進而失敗的時候也不會被重放。

6、Storm topology有專門的task來跟蹤每一個spout tuple的DAG,這些特殊的task被叫做"acker"。預設每個worker一個acker。當acker發現一個DAG完成的時候,它發消息給創建這個spout tuple的spout task以此來確認這個消息。

7、在topology中,當一個tuple被創建的時候,都會被指定一個64位的id。acker用這個id來跟蹤每個tuple。

8、由於tuple可能會重放,因此在處理的時候需要根據message id去重

 

進一步精簡

每一個從Spout出來的tuple都帶著一個"message id",重放的時候會用到這個id。

每一個tuple在創建的時候都被指定了一個64位的id,在acker跟蹤tuple的DAG(也叫tuple tree)會用到。

每一個tuple被acked的時候,它會發一個消息告訴acker這個tuple tree發生了怎樣的改變

一個tuple從Spout出來,隨後會經歷很多個Bolt,在一步的Bolt中都可能產生新的tuple。因此,由這個原始的tuple開始將會衍生出很多tuple,這些tuple構成一個tuple tree(也叫tuple DAG)。只有當tree耗盡並且其中所有的tuple都處理完成,這個tuple才叫完全處理,否則tuple失敗。tuple完全處理後將會在原始那個Spout上調用ack方法,並且用message id作為參數,tuple失敗以後同樣在那個Spout上調用fail方法。當tuple tree中某一個tuple完成處理以後可以調用OutputCollector的ack方法確認,這個tuple便會從DAG中刪除。當acker發現DAG完成以後,發消息告訴那個Spout task來確認這個消息。

 

參考 http://storm.apache.org/releases/current/Guaranteeing-message-processing.html

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 創建新用戶 創建一個叫xiaoming的用戶: [root@192 ~]# adduser xiaoming 為這個用戶初始化密碼,linux會判斷密碼複雜度,不過可以強行忽略: [root@192 ~]# passwd xiaoming更改用戶 xiaoming 的密碼 。新的 密碼:無效的密碼: ...
  • 相關實驗示例 創建文件系統 創建和刪除swap分區 以文件方式創建和刪除swap分區 實現配額 創建和刪除軟RAID 邏輯捲相關 創建iso文件 ...
  • 相關命令工具 lsof sync mknod losetup uuidgen free df du lscpu dd convert ...
  • ## 1.下載xl2tpd.tar.gz源碼包 ```wget http://pkgs.fedoraproject.org/repo/pkgs/xl2tpd/xl2tpd-1.3.8.tar.gz/d244fdcd88f64601b64b7302870afca8/xl2tpd-1.3.8.tar.g ...
  • package com.test.test; import java.io.FileInputStream;import java.io.FileNotFoundException;import java.io.IOException;import java.sql.Connection;impor ...
  • eBay:使用MongoDB創建關鍵業務的多數據中心應用 作為全球前十的零售品牌,eBay的活躍用戶有一億七千多萬,並擁有跨越全世界190個市場的10億購物清單,這樣的規模下,eBay絕對不允許出現宕機的情況。這也就是為什麼公司會依賴於MongoDB提供企業級平臺標準以及面向用戶的應用。 在今年的M ...
  • mysql: dbs 資料庫系統 bdms 資料庫管理系統 bda 資料庫管理員 db 資料庫 dba通過dbms來操作db! 軟體項目開發周期中資料庫設計01.需求分析階段:分析客戶的業務和數據處理需求02.概要設計階段:設計資料庫的E-R模型圖,確認需求信息的正確和完整03.詳細設計階段:應用三 ...
  • 在使用關係資料庫時,表連接和對結果集的篩選是必不可少的查詢技能,對於他們的用法你都搞清楚了麽?請讓我們一起來過一遍。 表創建與初始化: Inner Join 結果集: 對於Inner Join, 條件在on里或者where 里效果相同 Left Join 結果集: Left Join 條件在On從句 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...