storm從入門到放棄(三),放棄使用 StreamId 特性

来源:http://www.cnblogs.com/intsmaze/archive/2017/08/04/7283442.html
-Advertisement-
Play Games

序:StreamId是storm中實現DAG有向無環圖的重要一個特性,但是從實際生產環境來看,這個功能其實蠻影響生產環境的穩定性的,我們系統在迭代時會帶來整體服務的不可用。 StreamId是storm中實現DAG有向無環圖的重要一個特性,官方也提供對應的介面實現讓開發者自己靈活化構造自己的ADG圖 ...


  序:StreamId是storm中實現DAG有向無環圖的重要一個特性,但是從實際生產環境來看,這個功能其實蠻影響生產環境的穩定性的,我們系統在迭代時會帶來整體服務的不可用。

  StreamId是storm中實現DAG有向無環圖的重要一個特性,官方也提供對應的介面實現讓開發者自己靈活化構造自己的ADG圖。但是從我這一年從事流式計算的工作中得到的結果也很尷尬的,很多人不知道storm的這一個特性,甚至某些數據中也沒有提及。當然這也比較幸運,不知道這個特性就可以少踩點坑了。因為從實際生產環境來看,這個功能其實蠻影響生產環境的穩定性的,為什麼這麼說,hey,hey,look dowm。

  原文和作者一起討論:http://www.cnblogs.com/intsmaze/p/7283442.html

  合作微信:intsmaze

  

  實際開發中,很多人沒有用streamid,其實只是沒有顯示指定罷了,預設streamid的名稱為default,這也就是為什麼消息可以由一個bolt發往另一個bolt了。我們自己顯示指定streamid可以實現進入某一個bolt的消息,某些消息發給下游的Abolt,另一些消息發給下游的Bbolt。

  比如有這樣一個需求砸向你的臉上,有很多其他系統的消息發送到kafka某一個主題中,現在用storm去kafka消費該主題,在bolt-業務這個節點進行消息類型的判斷,然後根據判斷將消息發送到不同的下游bolt進行處理以便將這些消息發往不同的渠道介面中。這樣一個需求我們利用streamid很容易實現,看起來也沒有什麼問題。關於sreamid的使用可以文章末尾。

  為什麼在實際生產我不建議這樣使用,生產中經常會面對迭代開發的情況,業務不斷的變化,你的代碼也要不斷的修改,第三方介面的變動,你也要不斷的修改與第三方交互的程式。如果這周要修改bolt-微信,然後到發佈的時候,你必須停掉整個拓撲任務這明顯不是我們想要的,我們期望的是只停掉bolt-微信而不影響其他的業務線。這個時候就會發現這個實現方式很雞肋的。那我們應該怎麼做,看一下我在某信用卡中心的實現方案,看了後,你會替我慶幸我沒有為了圖前期的簡單而採用顯示streamid導致後面每該一處很小的功能導致整個拓撲任務不提供服務一段時間。

  我們的系統會收到交易信息,然後根據業務bolt進行處理,然後形成話術推送給不同的渠道bolt,這些渠道bolt對接各個部門(這些部門接受到我們的話術後,將話術推送給微信用戶,支付寶用戶等),而我們的對外渠道多大15個左右。同時應為業務的不斷提出,以及對接部門介面的變化我們這些渠道bolt也要跟隨變化。所以我們在業務bolt和渠道bolt中引入了第三方消息系統kafka隊列,而不是用storm內部的Disruptor隊列。這樣原本一個拓撲任務,我們進行拆分為一個業務拓撲,以及多個其渠道拓撲,渠道拓撲與業務拓撲通信通過kafka的主題來協調。如果某一天我們要修改微信渠道的業務,我們只需要停掉微信拓撲即可,整個系統並不會受到影響,原本推送給微信渠道的消息也不會因此丟失它保存在kafka主題中,一旦微信拓撲上線即可立馬消費掉。   

  後話,我這樣說有點絕對了,具體看系統的情況來權衡。

  streamid在storm中的正確打開方式。

public class ProduceRecordSpout extends BaseRichSpout {

    private static final long serialVersionUID = 1L;

    private SpoutOutputCollector collector;

    private String recordLines;
    private String type;

    public ProduceRecordSpout(String type, String lines) {
        this.type = type;
        recordLines = lines;
    }
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }
public void nextTuple() { Utils.sleep(5000); System.out.println("record is "+recordLines); List<Object> values = new Values(type, recordLines); collector.emit(values, values); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("type", "record")); } }

public class DistributeByTypeBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; private OutputCollector collector; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple input) { String type = input.getString(0); String word = input.getString(1); switch (type) { case Type.NUMBER: collector.emit("stream-number-saver", input, new Values(type, word)); collector.emit(input, new Values("other", "message coming")); break; case Type.STRING: collector.emit("stream-string-saver", input, new Values(type, word)); collector.emit(input, new Values("other", "message coming")); break; default: collector.emit(input, new Values(type, word)); } collector.ack(input); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream("stream-number-saver", new Fields("type", "word")); declarer.declareStream("stream-string-saver", new Fields("type", "word")); declarer.declare(new Fields("type", "word")); } }

public class SaveBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; private OutputCollector collector; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple input) { System.out.println("個人微信:intsmaze"+ "SourceComponent=" + input.getSourceComponent() + ", SourceStreamId=" + input.getSourceStreamId() + ", type=" + input.getString(0) + ", value=" + input.getString(1)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { } }

public class SaveDefaultBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; private OutputCollector collector; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple input) { System.out.println("個人微博:猥瑣發育的碼農"+ "SourceComponent=" + input.getSourceComponent() + ", SourceStreamId=" + input.getSourceStreamId() + ", type=" + input.getString(0) + ", value=" + input.getString(1)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { } }

public class SaveTwoBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; private OutputCollector collector; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple input) { System.out.println("博客鏈接:http://www.cnblogs.com/intsmaze/p/7283442.html"+ "SourceComponent=" + input.getSourceComponent() + ", SourceStreamId=" + input.getSourceStreamId() + ", type=" + input.getString(0) + ", value=" + input.getString(1)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
public class StreamTopologyMain { public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout-number", new ProduceRecordSpout(Type.NUMBER, "80966 31"), 1); builder.setSpout("spout-string", new ProduceRecordSpout(Type.STRING, "hello the word"), 1); builder.setBolt("bolt-distributor", new DistributeByTypeBolt(), 2) .shuffleGrouping("spout-number") .shuffleGrouping("spout-string"); builder.setBolt("bolt-number-saver", new SaveBolt(), 1).shuffleGrouping("bolt-distributor", "stream-number-saver"); builder.setBolt("bolt-string-saver", new SaveTwoBolt(), 1).shuffleGrouping("bolt-distributor", "stream-string-saver"); builder.setBolt("bolt-default-saver", new SaveDefaultBolt(), 1).shuffleGrouping("bolt-distributor"); Config conf = new Config(); conf.setDebug(false); String name = StreamTopologyMain.class.getSimpleName(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(name, conf, builder.createTopology()); Thread.sleep(60 * 60 * 1000); cluster.shutdown(); } } interface Type { String NUMBER = "NUMBER"; String STRING = "STRING"; }

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 在本地裝了centos 7的虛擬機,先要裝環境,選擇的mysql是5.6,以下是安裝的過程: 到此資料庫安裝完畢 啟動的過程中報錯了 是因為/var/log/mariadb這文件夾不存在,我們創建 再啟動 成功 ...
  • mysql 命令導出 Select *from 表名 into outfile "地址";導入 Source D:/mydb.sql /*將外部的sql文件導入到資料庫中*/ dos 命令mysqldump -uroot –p123456 test > c:/a.sql 在dos 視窗下,輸入該命令 ...
  • 原因:中間存在回車符或者換行符,所以要先將此符號替換掉; LTRIM(RTRIM(REPLACE(REPLACE( A,char(13),''),char(10),'') )) LTRIM(A) 去換左邊空格 RTRIM(A) 去換右邊空格 REPLACE( A,char(13),'') 將回車符替 ...
  • 轉載微信公眾號“ 架構師之路“文章 其實看完我還是有些地方不明白,先留著以後慢慢消化~~ 本文將以“好友中心”為例,介紹“多對多”類業務,隨著數據量的逐步增大,資料庫性能顯著降低,資料庫水平切分相關的架構實踐。 一、什麼是多對多關係 所謂的“多對多”,來自資料庫設計中的“實體-關係”ER模型,用來描 ...
  • 1.啟動mysql:1、net start mysql(停止mysql:net stop mysql 其中,mysql是安裝mysql時服務的名字) 2.登錄mysql:cmd中輸入:mysql -h localhost -u username -p 輸入密碼。<-h -u -p 後面的參數緊跟> ...
  • (1)mysql是一個小型關係型資料庫管理系統。 (2)mysql是一個快速、多線程、多用戶、健壯的SQL資料庫伺服器。與其他資料庫管理系統比,mysql有以下的優勢: mysql是一個關係資料庫管理系統。 mysql是開源的。 mysql伺服器是一個快速的、可靠和易使用的資料庫伺服器。 mysql ...
  • 啟動和停止SQL Server服務三種形式 電腦—>右鍵—>管理—>服務和應用程式—>服務—>sql server(MSSQLSERVER) 開始—>安裝路徑—>配置工具—>sql server配置管理器 Windows(鍵)+r 啟動SQL Server服務:net start mssqlser ...
  • 一、sql中的group by 用法解析: Group By語句從英文的字面意義上理解就是“根據(by)一定的規則進行分組(Group)”。 作用:通過一定的規則將一個數據集劃分成若幹個小的區域,然後針對若幹個小區域進行數據處理。 註意:group by 是先排序後分組! 舉例說明:如果要用到gro ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...