目錄 · 概述 · 手工搭建集群 · 引言 · 安裝Python · 配置文件 · 啟動與測試 · 應用部署 · 參數配置 · Storm命令 · 原理 · Storm架構 · Storm組件 · Stream Grouping · 守護進程容錯性(Daemon Fault Tolerance) · ...
目錄
· 概述
· 手工搭建集群
· 引言
· 安裝Python
· 配置文件
· 啟動與測試
· 應用部署
· 參數配置
· Storm命令
· 原理
· Storm架構
· Storm組件
· 守護進程容錯性(Daemon Fault Tolerance)
· 數據可靠性(Guaranteeing Message Processing)
· 消息傳輸機制
· API
· 應用部署方式
· 組件介面
· 組件實現類
· 數據連接方式
· 日誌(集群模式)
· 並行度設置
· tick定時機制
· 序列化
· 與其他系統集成
· 性能調優
概述
1. Apache Storm是Twitter開源的分散式實時計算框架。
2. Storm的主要開發語言是Java和Clojure,其中Java定義骨架,Clojure編寫核心邏輯。
3. Storm應用(Topology):Spout是水龍頭,源源不斷讀取消息併發送出去;Bolt是水管的每個轉介面,通過Stream分組的策略轉發消息流。
4. Storm特性
a) 易用性:只要遵循Topology、Spout和Bolt編程規範即可開發出擴展性極好的應用,無需瞭解底層RPC、Worker之間冗餘及數據分流等。
b) 容錯性:守護進程(Nimbus、Supervisor等)是無狀態的,狀態保存在ZooKeeper可隨意重啟;Worker失效或機器故障時,Storm自動分配新Worker替換失效Worker。
c) 伸縮性:可線性伸縮。
d) 完整性:採用Acker機制,保證數據不丟失;採用事務機制,保證數據準確性。
5. Storm應用方向
a) 流處理(Stream Processing):最基本的應用,Storm處理源源不斷流進來的消息,處理後將結果寫到某存儲。
b) 連續計算(Continuous Computation):Storm保證計算永遠運行,直到用戶結束計算進程。
c) 分散式RPC(Distributed RPC):可作為分散式RPC框架使用。
6. 與Spark Streaming比較
a) 聯合歷史數據:Spark Streaming將流數據分成小的時間片段(幾秒到幾分鐘),以類似batch批量處理的方式處理這些小部分數據,因此可同時相容批量和實時數據處理的邏輯和演算法,便於歷史數據和實時數據聯合分析。
b) 延遲:Storm處理每次傳入的一個事件,Spark Streaming處理某個時間段視窗內的事件流,因此Storm延遲極低,Spark Streaming相對較高。
手工搭建集群
引言
1. 環境:
Role |
Host name |
Nimbus |
centos1 |
Supervisor |
centos2 |
centos3 |
2. 假設已成功安裝JDK、ZooKeeper集群。
安裝Python
1. [Nimbus、Supervisor]登錄root用戶安裝Python到/usr/local/python目錄下。
tar zxvf Python-2.7.13.tgz // Ubuntu要先安裝依賴sudo apt-get install build-essential zlib1g-dev cd Python-2.7.13/ ./configure --prefix=/usr/local/python make sudo make install
2. [Nimbus、Supervisor]配置命令。
ln -s /usr/local/python/bin/python /usr/bin/python // 軟鏈接 python -V // 驗證
配置文件
3. [Nimbus]
tar zxvf apache-storm-1.1.0.tar.gz -C /opt/app cd /opt/app/apache-storm-1.1.0 vi conf/storm.yaml
storm.zookeeper.servers: - "centos1" - "centos2" storm.zookeeper.port: 2181 nimbus.seeds: ["centos1"] supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 storm.local.dir: "/opt/data/storm.local.dir" ui.port: 8080
4. [Nimbus]從Nimbus複製Storm目錄到各Supervisor。
scp -r /opt/app/apache-storm-1.1.0 hadoop@centos2:/opt/app scp -r /opt/app/apache-storm-1.1.0 hadoop@centos3:/opt/app
啟動與測試
5. [Nimbus、Supervisor]配置Storm環境變數。
export STORM_HOME=/opt/app/apache-storm-1.1.0 export PATH=$PATH:$STORM_HOME/bin
6. [Nimbus]啟動守護進程。
nohup bin/storm nimbus 1>/dev/null 2>&1 & nohup bin/storm ui 1>/dev/null 2>&1 & nohup bin/storm logviewer 1>/dev/null 2>&1 & jps
nimbus # Nimbus守護進程
core # Storm UI守護進程
logviewer # LogViewer守護進程
7. [Supervisor]啟動守護進程。
nohup bin/storm supervisor 1>/dev/null 2>&1 & nohup bin/storm logviewer 1>/dev/null 2>&1 & jps
supervisor # Supervisor守護進程
logviewer # LogViewer守護進程
8. [Nimbus]測試。
storm jar teststorm.jar teststorm.WordCountTopology wordcount
9. 監控頁面。
http://centos1:8080 |
Storm UI |
10. [Nimbus]關閉守護進程。
kill -s TERM ${PID} # PID為各守護進程ID
應用部署
參數配置
1. 配置方式
a) External Component Specific Configuration:通過TopologyBuilder的setSpout和setBold方法返回的SpoutDeclarer和BoldDeclarer對象的一系列方法。
b) Internal Component Specific Configuration:Override Spout和Bold的getComponentConfiguration方法並返回Map。
c) Topology Specific Configuration:命令傳參“bin/storm -c conf1=v1 -c conf2=v2”。
d) storm.yaml:“$STORM_HOME/conf/storm.yaml”。
e) defaults.yaml:“$STORM_HOME/lib/storm-core-x.y.z.jar/defaults.yaml”。
2. 參數優先順序:
defaults.yaml
< storm.yaml
< Topology Specific Configuration
< Internal Component Specific Configuration
< External Component Specific Configuration
Storm命令
常用命令,詳情參考官方文檔。
a) storm jar topology-jar-path class ...
b) storm kill topology-name [-w wait-time-secs]
c) storm activate topology-name
d) storm deactivate topology-name
e) storm rebalance topology-name [-w wait-time-secs] [-n new-num-workers] [-e component=parallelism]*
f) storm classpath
g) storm localconfvalue conf-name
h) storm remoteconfvalue conf-name
i) storm nimbus
j) storm supervisor
k) storm ui
l) storm get-errors topology-name
m) storm kill_workers
n) storm list
o) storm logviewer
p) storm set_log_level -l [logger name]=[log level][:optional timeout] -r [logger name] topology-name
q) storm version
r) storm help [command]
原理
Storm架構
Storm組件
名稱 |
說明 |
Nimbus |
負責資源分配和任務調度,類似Hadoop的JobTracker。 |
Supervisor |
負責接受Nimbus分配的任務,啟動和停止屬於自己管理的Worker進程,類似Hadoop的TaskTracker。 |
Worker |
運行具體處理組件邏輯的進程。 |
Executor |
Executor為Worker進程中具體的物理線程,同一個Spout/Bolt的Task可能會共用一個物理線程,一個Executor中只能運行隸屬於同一個Spout/Bolt的Task。 |
Task |
每一個Spout/Bolt具體要做的工作,也是各節點之間進行分組的單位。 |
Topology |
一個實時計算應用程式邏輯上被封裝在Topology對象中,類似Hadoop的作業。與作業不同的是,Topology會一直運行直到顯式被殺死。 |
Spout |
a) 在Topology中產生源數據流。 b) 通常Spout獲取數據源的數據(如MQ),然後調用nextTuple方法,發射數據供Bolt消費。 c) 可通過OutputFieldsDeclarer的declareStream方法聲明1或多個流,並通過OutputCollector的emit方法向指定流發射數據。 |
Bolt |
a) 在Topology中接受Spout的數據並執行處理。 b) 當處理複雜邏輯時,可分成多個Bolt處理。 c) Bolt在接受到消息後調用execute方法,在此可執行過濾、合併、寫資料庫等操作。 d) 可通過OutputFieldsDeclarer的declareStream方法聲明1或多個流,並通過OutputCollector的emit方法向指定流發射數據。 |
Tuple |
消息傳遞的基本單元。 |
Stream |
源源不斷傳遞的Tuple組成了Stream |
Stream Grouping |
即消息的分區(partition),內置了7種分組策略。 |
Stream Grouping
1. Stream Grouping定義了數據流在Bolt間如何被切分。
2. 內置7種Stream Grouping策略
a) Shuffle grouping:隨機分組,保證各Bolt接受的Tuple數量一致。
b) Fields grouping:根據Tuple中某一個或多個欄位分組,相同分組欄位的Tuple發送至同一Bolt。
c) All grouping:數據流被覆制發送給所有Bolt,慎用。
d) Global grouping:整個數據流只發給ID值最小的Bolt。
e) None grouping:不關心如何分組,當前與Shuffle grouping相同。
f) Direct grouping:生產Tuple的Spout/Bolt指定該Tuple的消費者Bolt。通過OutputCollector的emitDirect方法實現。
g) Local or shuffle grouping:如果目標Bolt有一個或多個Task與當前生產Tuple的Task在同一Worker進程,那麼將該Tuple發送給該目標Bolt;否則Shuffle grouping。
3. 自定義Stream Grouping策略:實現CustomStreamGrouping介面。
守護進程容錯性(Daemon Fault Tolerance)
1. Worker:如果Worker故障,則Supervisor重啟該Worker;如果其仍然故障,則Nimbus重新分配Worker資源。
2. 節點:如果節點機器故障,則該節點上的Task將超時,Nimbus將這些Task分配到其他節點。
3. Nimbus和Supervisor:Nimbus和Supervisor都是fail-fast(故障時進程自銷毀)和stateless(狀態保存在ZooKeeper或磁碟),故障後重啟進程即可;Worker進程不會
受Nimbus或Supervisor故障影響,但Worker進程的跨節點遷移會受影響。
4. Nimbus:Storm v1.0.0開始引入Nimbus HA。
數據可靠性(Guaranteeing Message Processing)
1. MessageId:Storm允許用戶在Spout發射新Tuple時為其指定MessageId(Object類型);多個Tuple可共用同一MessageId,表示它們是同一消息單元。
2. 可靠性:Tuple超時時間內,該MessageId綁定的Stream Tuple及其衍生的所有Tuple都已經過Topology中應該到達的Bolt處理;Storm使用Acker解決Tuple消息可靠性問
題(調用OutputCollector的ack和fail方法告知Storm該Tuple處理成功和失敗)。
3. Tuple超時時間:通過參數“topology.message.timeout.secs”配置,預設30秒。
4. 錨定(Anchoring)
a) Tuple從Spout到Bolt形成了Tuple tree,以WordCount為例:
b) 錨定:Tuple被錨定後,如果Tuple未被下游ack,根節點的Spout將稍後重發Tuple。
c) 錨定的API寫法:
1 // Spout 2 collector.emit(new Values(content1), uniqueMessageId);
1 // Bold 2 collector.emit(tuple, new Values(content2)); 3 collector.ack(tuple);
d) 未錨定:Tuple未被錨定,如果Tuple未被下游ack,根節點的Spout不會重發Tuple。
e) 未錨定的API寫法:
1 // Bold 2 collector.emit(new Values(content)); 3 collector.ack(tuple);
f) 複合錨定:一個輸出Tuple可被錨定到多個輸入Tuple。複合錨定會打破樹結構,形成有向無環圖(DAG)。
g) 複合錨定API寫法:
1 // Bold 2 List<Tuple> anchors = new ArrayList<>(); 3 anchors.add(tuple1); 4 anchors.add(tuple2); 5 collector.emit(anchors, new Values(content)); 6 collector.ack(tuple);
h) ack和fail:每一個Tuple必須執行ack或fail,Storm使用記憶體跟蹤每個Tuple,如果未ack或fail,任務最終會記憶體耗盡。
i) Acker任務:Topology有一組特殊的Acker任務跟蹤Tuple樹或有向無環圖,通過參數“topology.acker.executors”或“Config.TOPOLOGY_ACKER_EXECUTORS”配置Acker任務數量,預設為1。處理量大時應增大該值。
5. 關閉可靠性:如果對可靠性要求不高,可關閉以提高性能。
a) 方法1:設置“Config.TOPOLOGY_ACKER_EXECUTORS”為0。
b) 方法2:採用未錨定的API寫法寫法。
消息傳輸機制
自Storm 0.9.0開始使用Netty作為消息通信解決方案,已不再需要ZeroMQ。
API
WordCount示例
1. WordCountTopology.java
1 import org.apache.storm.Config; 2 import org.apache.storm.LocalCluster; 3 import org.apache.storm.StormSubmitter; 4 import org.apache.storm.generated.AlreadyAliveException; 5 import org.apache.storm.generated.AuthorizationException; 6 import org.apache.storm.generated.InvalidTopologyException; 7 import org.apache.storm.topology.TopologyBuilder; 8 import org.apache.storm.tuple.Fields; 9 import org.slf4j.Logger; 10 import org.slf4j.LoggerFactory; 11 12 public class WordCountTopology { 13 14 private static final Logger logger = LoggerFactory.getLogger(WordCountTopology.class); 15 16 public static void main(String[] args) throws InterruptedException { 17 final String inputFile = "/opt/app/apache-storm-1.1.0/LICENSE"; 18 final String outputDir = "/opt/workspace/wordcount"; 19 20 TopologyBuilder builder = new TopologyBuilder(); 21 builder.setSpout(FileReadSpout.class.getSimpleName(), new FileReadSpout(inputFile)); 22 builder.setBolt(LineSplitBolt.class.getSimpleName(), new LineSplitBolt()) 23 .shuffleGrouping(FileReadSpout.class.getSimpleName()); 24 // 最終生成4個文件 25 builder.setBolt(WordCountBolt.class.getSimpleName(), new WordCountBolt(outputDir), 2) 26 .setNumTasks(4) 27 .fieldsGrouping(LineSplitBolt.class.getSimpleName(), new Fields("word")); 28 29 Config conf = new Config(); 30 conf.setDebug(true); 31 if (args != null && args.length > 0) { 32 try { 33 StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); 34 } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) { 35 logger.error("Failed to submit " + WordCountTopology.class.getName() + ".", e); 36 } 37 } else { 38 conf.setDebug(true); 39 LocalCluster cluster = new LocalCluster(); 40 cluster.submitTopology(WordCountTopology.class.getSimpleName(), conf, builder.createTopology()); 41 Thread.sleep(30 * 1000); 42 cluster.shutdown(); 43 } 44 } 45 46 }
2. FileReadSpout.java
1 import java.io.BufferedReader; 2 import java.io.FileNotFoundException; 3 import java.io.FileReader; 4 import java.io.IOException; 5 import java.util.Map; 6 7 import org.apache.storm.spout.SpoutOutputCollector; 8 import org.apache.storm.task.TopologyContext; 9 import org.apache.storm.topology.OutputFieldsDeclarer; 10 import org.apache.storm.topology.base.BaseRichSpout; 11 import org.apache.storm.tuple.Fields; 12 import org.apache.storm.tuple.Values; 13 14 public class FileReadSpout extends BaseRichSpout { 15 16 private static final long serialVersionUID = 8543601286964250940L; 17 18 private String inputFile; 19 20 private BufferedReader reader; 21 22 private SpoutOutputCollector collector; 23 24 public FileReadSpout(String inputFile) { 25 this.inputFile = inputFile; 26 } 27 28 @Override 29 @SuppressWarnings("rawtypes") 30 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { 31 try { 32 reader = new BufferedReader(new FileReader(inputFile)); 33 } catch (FileNotFoundException e) { 34 throw new RuntimeException("Cannot find file [" + inputFile + "].", e); 35 } 36 this.collector = collector; 37 } 38 39 @Override 40 public void nextTuple() { 41 try { 42 String line = null; 43 while ((line = reader.readLine()) != null) { 44 collector.emit(new Values(line)); 45 } 46 } catch (IOException e) { 47 throw new RuntimeException("Encountered a file read error.", e); 48 } 49 } 50 51 @Override 52 public void declareOutputFields(OutputFieldsDeclarer declarer) { 53 declarer.declare(new Fields("line")); 54 } 55 56 @Override 57 public void close() { 58 if (reader != null) { 59 try { 60 reader.close(); 61 } catch (IOException e) { 62 // Ignore 63 } 64 } 65 super.close(); 66 } 67 68 }
3. LineSplitBolt.java
1 import java.util.Map; 2 3 import org.apache.storm.task.OutputCollector; 4 import org.apache.storm.task.TopologyContext; 5 import org.apache.storm.topology.OutputFieldsDeclarer; 6 import org.apache.storm.topology.base.BaseRichBolt; 7 import org.apache.storm.tuple.Fields; 8 import org.apache.storm.tuple.Tuple; 9 import org.apache.storm.tuple.Values; 10 11 public class LineSplitBolt extends BaseRichBolt { 12 13 private static final long serialVersionUID = -2045688041930588092L; 14 15 private OutputCollector collector; 16 17 @Override 18 @SuppressWarnings("rawtypes") 19 public void prepare(Map conf, TopologyContext context, OutputCollector collector) { 20 this.collector = collector; 21 } 22 23 @Override 24 public void execute(Tuple tuple) { 25 String line = tuple.getStringByField("line"); 26 String[] words = line.split(" "); 27 for (String word : words) { 28 word = word.trim(); 29 if (!word.isEmpty()) { 30 word = word.toLowerCase(); 31 collector.emit(new Values(word, 1)); 32 } 33 } 34 35 collector.ack(tuple); 36 } 37 38 @Override 39 public void declareOutputFields(OutputFieldsDeclarer declarer) { 40 declarer.declare(new Fields("word", "count")); 41 } 42 43 @Override 44 public void cleanup() { 45 super.cleanup(); 46 } 47 48 }
4. WordCountBolt.java
1 import java.io.IOException; 2 import java.io.RandomAccessFile; 3 import java.util.HashMap; 4 import java.util.Map; 5 import java.util.UUID; 6 7 import org.apache.storm.task.OutputCollector; 8 import org.apache.storm.task.TopologyContext; 9 import org.apache.storm.topology.OutputFieldsDeclarer; 10 import org.apache.storm.topology.base.BaseRichBolt; 11 import org.apache.storm.tuple.Tuple; 12 import org.slf4j.Logger; 13 import org.slf4j.LoggerFactory; 14 15 public class WordCountBolt extends BaseRichBolt { 16 17 private static final long serialVersionUID = 8239697869626573368L; 18 19 private static final Logger logger = LoggerFactory.getLogger(WordCountBolt.class); 20 21 private String outputDir; 22 23 private OutputCollector collector; 24 25 private Map<String, Integer> wordCounter; 26 27 public WordCountBolt(String outputDir) { 28 this.outputDir = outputDir; 29 } 30 31 @Override 32 @SuppressWarnings("rawtypes") 33 public void prepare(Map conf, TopologyContext context, OutputCollector collector) { 34 this.collector = collector; 35 wordCounter = new HashMap<>(); 36 } 37 38 @Override 39 public void execute(Tuple tuple) { 40 String word = tuple.getStringByField("word"); 41 Integer count = tuple.getIntegerByField("count"); 42 Integer wordCount = wordCounter.get(word); 43 if (wordCount == null) { 44 wordCounter.put(word, count); 45 } else { 46 wordCounter.put(word, count + wordCount); 47 } 48 49 collector.ack(tuple); 50 } 51 52 @Override 53 public void declareOutputFields(OutputFieldsDeclarer declarer) { 54 } 55 56 @Override 57 public void cleanup() { 58 if (wordCounter != null) { 59 outputResult(wordCounter); 60 wordCounter.clear(); 61 } 62 super.cleanup(); 63 } 64 65 private void outputResult(Map<String, Integer> wordCounter) { 66 String filePath = outputDir + "/" + UUID.randomUUID().toString(); 67 RandomAccessFile randomAccessFile = null; 68 try { 69 randomAccessFile = new RandomAccessFile(filePath, "rw"); 70 for (Map.Entry<String, Integer> entry : wordCounter.entrySet()) { 71 randomAccessFile.writeChars(entry.getKey()); 72 randomAccessFile.writeChar('\t'); 73 randomAccessFile.writeChars(String.valueOf(entry.getValue())); 74 randomAccessFile.writeChar('\n'); 75 } 76 } catch (IOException e) { 77 logger.error("Failed to write file [" + filePath + "].", e); 78 } finally { 79 if (randomAccessFile != null) { 80 try { 81 randomAccessFile.close(); 82 } catch (IOException e) { 83 logger.warn("Failed to close output stream.", e); 84 } 85 } 86 } 87 } 88 89 }
應用部署方式
應用程式部署(Topology提交)分類
a) 本地模式:在進程中模擬Storm集群,用於開發、測試。
b) 集群模式:用於生產。
組件介面
1. IComponent
1 package org.apache.storm.topology; 2 3 import java.io.Serializable; 4 import java.util.Map; 5 6 /** 7 * Common methods for all possible components in a topology. This interface is used 8 * when defining topologies using the Java API. 9 */ 10 public interface IComponent extends Serializable { 11 12 /** 13 * Declare the output schema for all the streams of this topology. 14 * 15 * @param declarer this is used to declare output stream ids, output fields, and whether or not each output stream is a direct stream 16 */ 17 void declareOutputFields(OutputFieldsDeclarer declarer); 18 19 /** 20 * Declare configuration specific to this component. Only a subset of the "topology.*" configs can 21 * be overridden. The component configuration can be further overridden when constructing the 22 * topology using {@link TopologyBuilder} 23 * 24 */ 25 Map<String, Object> getComponentConfiguration(); 26 27 }
2. ISpout
1 package org.apache.storm.spout; 2 3 import org.apache.storm.task.TopologyContext; 4 import java.util.Map; 5 import java.io.Serializable; 6 7 /** 8 * ISpout is the core interface for implementing spouts. A Spout is responsible 9 * for feeding messages into the topology for processing. For every tuple emitted by 10 * a spout, Storm will track the (potentially very large) DAG of tuples generated 11 * based on a tuple emitted by the spout. When Storm detects that every tuple in 12 * that DAG has been successfully processed, it will send an ack message to the Spout. 13 * 14 * If a tuple fails to be fully processed within the configured timeout for the 15 * topology (see {@link org.apache.storm.Config}), Storm will send a fail message to the spout 16 * for the message. 17 * 18 * When a Spout emits a tuple, it can tag the tuple with a message id. The message id 19 * can be any type. When Storm acks or fails a message, it will pass back to the 20 * spout the same message id to identify which tuple it's referring to. If the spout leaves out 21 * the message id, or sets it to null, then Storm will not track the message and the spout 22 * will not receive any ack or fail callbacks for the message. 23 * 24 * Storm executes ack, fail, and nextTuple all on the same thread. This means that an implementor 25 * of an ISpout does not need to worry about concurrency issues between those methods. However, it 26 * also means that an implementor must ensure that nextTuple is non-blocking: otherwise 27 * the method could block acks and fails that are pending to be processed. 28 */ 29 public interface ISpout extends Serializable { 30 /** 31 * Called when a task for this component is initialized within a worker on the cluster. 32 * It provides the spout with the environment in which the spout executes. 33 * 34 * This includes the: 35 * 36 * @param conf The Storm configuration for this spout. This is the configuration provided to the topology merged in with cluster configuration on this machine. 37 * @param context This object can be used to get information about this task's place within the topology, including the task id and component id of this task, input and output information,