Strom框架基本概念就不提了,這裡主要講的是`Stream`自定義ID的消息流。預設spout、bolt都需實現介面方法`declareOutputFields`,這種情況下發的消息會被所有定義的bolts接收。我們如果需要根據得到的消息類型來選擇不同的bolt,就需要用到Stream Group... ...
Strom框架基本概念就不提了,這裡主要講的是Stream
自定義ID的消息流。預設spout、bolt都需實現介面方法declareOutputFields
,代碼如下:
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("body"));
}
這種情況下發的消息會被所有定義的bolts接收。我們如果需要根據得到的消息類型來選擇不同的bolt,就需要用到Stream Grouping。
- 首先通過消息源的
OutputFieldsDeclarer
來定義發射多條消息流stream
以下定義了兩種stream消息流:email郵件、sms簡訊
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declareStream("email", new Fields("body"));
outputFieldsDeclarer.declareStream("sms", new Fields("body"));
}
- 然後我們通過對消息內容進行分析判斷來決定發射指定的stream類型
@Override
public void execute(Tuple tuple) {
String streamType;
String value = tuple.getStringByField("body");
# 邏輯判斷stub code
if (value.startsWith("email:")) {
streamType = "email";
} else {
streamType = "sms";
}
outputCollector.emit(streamType, new Values(value));
}
- topology設置bolt的消息源時通過localOrShuffleGrouping來設置只接收指定stream的消息
FilterBolt通過對消息進行加工處理,下發給bolts時會指定不同的stream,EmailNotifyBolt只接收email
類型的stream消息,SmsNotifyBolt只接收sms
類型的stream消息。
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("RabbitmqSpout", new RabbitmqSpout());
topologyBuilder.setBolt("FilterBolt", new FilterBolt()).shuffleGrouping("RabbitmqSpout");
topologyBuilder.setBolt("EmailNotifyBolt", new EmailNotifyBolt()).localOrShuffleGrouping("FilterBolt", "email");
topologyBuilder.setBolt("SmsNotifyBolt", new SmsNotifyBolt()).localOrShuffleGrouping("FilterBolt", "sms");