序:StreamId是storm中實現DAG有向無環圖的重要一個特性,但是從實際生產環境來看,這個功能其實蠻影響生產環境的穩定性的,我們系統在迭代時會帶來整體服務的不可用。 StreamId是storm中實現DAG有向無環圖的重要一個特性,官方也提供對應的介面實現讓開發者自己靈活化構造自己的ADG圖 ...
序:StreamId是storm中實現DAG有向無環圖的重要一個特性,但是從實際生產環境來看,這個功能其實蠻影響生產環境的穩定性的,我們系統在迭代時會帶來整體服務的不可用。
StreamId是storm中實現DAG有向無環圖的重要一個特性,官方也提供對應的介面實現讓開發者自己靈活化構造自己的ADG圖。但是從我這一年從事流式計算的工作中得到的結果也很尷尬的,很多人不知道storm的這一個特性,甚至某些數據中也沒有提及。當然這也比較幸運,不知道這個特性就可以少踩點坑了。因為從實際生產環境來看,這個功能其實蠻影響生產環境的穩定性的,為什麼這麼說,hey,hey,look dowm。
原文和作者一起討論:http://www.cnblogs.com/intsmaze/p/7283442.html
合作微信:intsmaze
實際開發中,很多人沒有用streamid,其實只是沒有顯示指定罷了,預設streamid的名稱為default,這也就是為什麼消息可以由一個bolt發往另一個bolt了。我們自己顯示指定streamid可以實現進入某一個bolt的消息,某些消息發給下游的Abolt,另一些消息發給下游的Bbolt。
比如有這樣一個需求砸向你的臉上,有很多其他系統的消息發送到kafka某一個主題中,現在用storm去kafka消費該主題,在bolt-業務這個節點進行消息類型的判斷,然後根據判斷將消息發送到不同的下游bolt進行處理以便將這些消息發往不同的渠道介面中。這樣一個需求我們利用streamid很容易實現,看起來也沒有什麼問題。關於sreamid的使用可以文章末尾。
為什麼在實際生產我不建議這樣使用,生產中經常會面對迭代開發的情況,業務不斷的變化,你的代碼也要不斷的修改,第三方介面的變動,你也要不斷的修改與第三方交互的程式。如果這周要修改bolt-微信,然後到發佈的時候,你必須停掉整個拓撲任務這明顯不是我們想要的,我們期望的是只停掉bolt-微信而不影響其他的業務線。這個時候就會發現這個實現方式很雞肋的。那我們應該怎麼做,看一下我在某信用卡中心的實現方案,看了後,你會替我慶幸我沒有為了圖前期的簡單而採用顯示streamid導致後面每該一處很小的功能導致整個拓撲任務不提供服務一段時間。
我們的系統會收到交易信息,然後根據業務bolt進行處理,然後形成話術推送給不同的渠道bolt,這些渠道bolt對接各個部門(這些部門接受到我們的話術後,將話術推送給微信用戶,支付寶用戶等),而我們的對外渠道多大15個左右。同時應為業務的不斷提出,以及對接部門介面的變化我們這些渠道bolt也要跟隨變化。所以我們在業務bolt和渠道bolt中引入了第三方消息系統kafka隊列,而不是用storm內部的Disruptor隊列。這樣原本一個拓撲任務,我們進行拆分為一個業務拓撲,以及多個其渠道拓撲,渠道拓撲與業務拓撲通信通過kafka的主題來協調。如果某一天我們要修改微信渠道的業務,我們只需要停掉微信拓撲即可,整個系統並不會受到影響,原本推送給微信渠道的消息也不會因此丟失它保存在kafka主題中,一旦微信拓撲上線即可立馬消費掉。
後話,我這樣說有點絕對了,具體看系統的情況來權衡。
streamid在storm中的正確打開方式。
public class ProduceRecordSpout extends BaseRichSpout { private static final long serialVersionUID = 1L; private SpoutOutputCollector collector; private String recordLines; private String type; public ProduceRecordSpout(String type, String lines) { this.type = type; recordLines = lines; } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; }
public void nextTuple() { Utils.sleep(5000); System.out.println("record is "+recordLines); List<Object> values = new Values(type, recordLines); collector.emit(values, values); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("type", "record")); } }
public class DistributeByTypeBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; private OutputCollector collector; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple input) { String type = input.getString(0); String word = input.getString(1); switch (type) { case Type.NUMBER: collector.emit("stream-number-saver", input, new Values(type, word)); collector.emit(input, new Values("other", "message coming")); break; case Type.STRING: collector.emit("stream-string-saver", input, new Values(type, word)); collector.emit(input, new Values("other", "message coming")); break; default: collector.emit(input, new Values(type, word)); } collector.ack(input); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream("stream-number-saver", new Fields("type", "word")); declarer.declareStream("stream-string-saver", new Fields("type", "word")); declarer.declare(new Fields("type", "word")); } }
public class SaveBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; private OutputCollector collector; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple input) { System.out.println("個人微信:intsmaze"+ "SourceComponent=" + input.getSourceComponent() + ", SourceStreamId=" + input.getSourceStreamId() + ", type=" + input.getString(0) + ", value=" + input.getString(1)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
public class SaveDefaultBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; private OutputCollector collector; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple input) { System.out.println("個人微博:猥瑣發育的碼農"+ "SourceComponent=" + input.getSourceComponent() + ", SourceStreamId=" + input.getSourceStreamId() + ", type=" + input.getString(0) + ", value=" + input.getString(1)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
public class SaveTwoBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; private OutputCollector collector; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple input) { System.out.println("博客鏈接:http://www.cnblogs.com/intsmaze/p/7283442.html"+ "SourceComponent=" + input.getSourceComponent() + ", SourceStreamId=" + input.getSourceStreamId() + ", type=" + input.getString(0) + ", value=" + input.getString(1)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
public class StreamTopologyMain { public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout-number", new ProduceRecordSpout(Type.NUMBER, "80966 31"), 1); builder.setSpout("spout-string", new ProduceRecordSpout(Type.STRING, "hello the word"), 1); builder.setBolt("bolt-distributor", new DistributeByTypeBolt(), 2) .shuffleGrouping("spout-number") .shuffleGrouping("spout-string"); builder.setBolt("bolt-number-saver", new SaveBolt(), 1).shuffleGrouping("bolt-distributor", "stream-number-saver"); builder.setBolt("bolt-string-saver", new SaveTwoBolt(), 1).shuffleGrouping("bolt-distributor", "stream-string-saver"); builder.setBolt("bolt-default-saver", new SaveDefaultBolt(), 1).shuffleGrouping("bolt-distributor"); Config conf = new Config(); conf.setDebug(false); String name = StreamTopologyMain.class.getSimpleName(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(name, conf, builder.createTopology()); Thread.sleep(60 * 60 * 1000); cluster.shutdown(); } } interface Type { String NUMBER = "NUMBER"; String STRING = "STRING"; }