Storm WordCount 工作過程 Storm 版本: 1、Spout 從外部數據源中讀取數據,隨機發送一個元組對象出去; 2、SplitBolt 接收 Spout 中輸出的元組對象,將元組中的數據切分成單詞,並將切分後的單詞發射出去; 3、WordCountBolt 接收 SplitBolt ...
Storm WordCount 工作過程
Storm 版本:
1、Spout 從外部數據源中讀取數據,隨機發送一個元組對象出去;
2、SplitBolt 接收 Spout 中輸出的元組對象,將元組中的數據切分成單詞,並將切分後的單詞發射出去;
3、WordCountBolt 接收 SplitBolt 中輸出的單詞數組,對裡面單詞的頻率進行累加,將累加後的結果輸出。
Java 版本:
1、讀取文件中的數據,一行一行的讀取;
2、將讀到的數據進行切割;
3、對切割後的數組中的單詞進行計算。
Hadoop 版本:
1、按行讀取文件中的數據;
2、在 Mapper()函數中對每一行的數據進行切割,並輸出切割後的數據數組;
3、接收 Mapper()中輸出的數據數組,在 Reducer()函數中對數組中的單詞進行計算,將計算後的統計結果輸出。
源代碼
storm的配置、eclipse里maven的配置以及創建項目部分省略。
Mainclass
package com.test.stormwordcount;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
public class MainClass {
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
//創建一個 TopologyBuilder
TopologyBuilder tb = new TopologyBuilder();
tb.setSpout("SpoutBolt", new SpoutBolt(), 2); tb.setBolt("SplitBolt", new SplitBolt(), 2).shuffleGrouping("SpoutBolt");
tb.setBolt("CountBolt", new CountBolt(), 4).fieldsGrouping("SplitBolt", new Fields("word"));
//創建配置
Config conf = new Config();
//設置 worker 數量
conf.setNumWorkers(2);
//提交任務
//集群提交
//StormSubmitter.submitTopology("myWordcount", conf, tb.createTopology());
//本地提交
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("myWordcount", conf, tb.createTopology());
}
}
SplitBolt 部分
package com.test.stormwordcount;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class SplitBolt extends BaseRichBolt{
OutputCollector collector;
/** * 初始化 */
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
/** * 執行方法 */
public void execute(Tuple input) {
String line = input.getString(0);
String[] split = line.split(" ");
for (String word : split) {
collector.emit(new Values(word));
}
}
/** * 輸出 */
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
CountBolt 部分
package com.test.stormwordcount;
import java.util.HashMap;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
public class CountBolt extends BaseRichBolt{
OutputCollector collector;
Map<String, Integer> map = new HashMap<String, Integer>();
/** * 初始化 */
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
/** * 執行方法 */
public void execute(Tuple input) {
String word = input.getString(0);
if(map.containsKey(word)){
Integer c = map.get(word);
map.put(word, c+1);
}else{
map.put(word, 1);
}
//測試輸出
System.out.println("結果:"+map);
}
/** * 輸出 */
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
SpoutBolt 部分
package com.test.stormwordcount;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class SpoutBolt extends BaseRichSpout{
SpoutOutputCollector collector;
/** * 初始化方法 */
public void open(Map map, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
/** * 重覆調用方法 */
public void nextTuple() {
collector.emit(new Values("hello world this is a test"));
}
/** * 輸出 */
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("test"));
}
}
POM.XML 文件內容
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.test</groupId>
<artifactId>stormwordcount</artifactId>
<version>0.9.6</version>
<packaging>jar</packaging>
<name>stormwordcount</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.6</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.test.stormwordcount.MainClass</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
</build>
遇到的問題
基於Storm的WordCount需要eclipse安裝了maven插件,之前的大數據實踐安裝的eclipse版本為Eclipse IDE for Eclipse Committers4.5.2,這個版本不自帶maven插件,後續安裝失敗了幾次(網上很多的教程都已經失效),這裡分享一下我成功安裝的方法:
使用鏈接下載,Help->Install New SoftWare
點擊Add,name輸入隨意,在location輸入下載eclipse的maven插件,下載地址可以這樣獲取
點擊連接:http://www.eclipse.org/m2e/index.html 進入網站後點擊download,拉到最下麵可以看到很多eclipse maven插件的版本和發佈時間,選在適合eclipse的版本複製鏈接即可。建議取消選中Contack all update sites during install to find required software(耗時太久)。
但是安裝成功後還是無法配置(這裡原因不太清楚,沒找到解決辦法),就直接上官網換成自帶maven插件的JavaEE IDE了...
後續的maven的配置這些都比較順利,第一次創建maven-archetype-quickstat項目報錯,試了網上很多辦法都還沒成功,然後打開 Windows->Preferencs->Maven->Installation發現之前配置了的maven的安裝路徑沒了...重新配置了下就可以創建項目了。
最後運行成功的結果: