一、Storm集群構建 編寫storm 與 zookeeper的yml文件 storm yml文件的編寫 具體如下: version: '2' services: zookeeper1: image: registry.aliyuncs.com/denverdino/zookeeper:3.4.8 ...
一、Storm集群構建
編寫storm 與 zookeeper的yml文件
storm yml文件的編寫
具體如下:
version: '2' services: zookeeper1: image: registry.aliyuncs.com/denverdino/zookeeper:3.4.8 container_name: zk1.cloud environment: - SERVER_ID=1 - ADDITIONAL_ZOOKEEPER_1=server.1=0.0.0.0:2888:3888 - ADDITIONAL_ZOOKEEPER_2=server.2=zk2.cloud:2888:3888 - ADDITIONAL_ZOOKEEPER_3=server.3=zk3.cloud:2888:3888 zookeeper2: image: registry.aliyuncs.com/denverdino/zookeeper:3.4.8 container_name: zk2.cloud environment: - SERVER_ID=2 - ADDITIONAL_ZOOKEEPER_1=server.1=zk1.cloud:2888:3888 - ADDITIONAL_ZOOKEEPER_2=server.2=0.0.0.0:2888:3888 - ADDITIONAL_ZOOKEEPER_3=server.3=zk3.cloud:2888:3888 zookeeper3: image: registry.aliyuncs.com/denverdino/zookeeper:3.4.8 container_name: zk3.cloud environment: - SERVER_ID=3 - ADDITIONAL_ZOOKEEPER_1=server.1=zk1.cloud:2888:3888 - ADDITIONAL_ZOOKEEPER_2=server.2=zk2.cloud:2888:3888 - ADDITIONAL_ZOOKEEPER_3=server.3=0.0.0.0:2888:3888 ui: image: registry.aliyuncs.com/denverdino/baqend-storm:1.0.0 command: ui -c nimbus.host=nimbus environment: - STORM_ZOOKEEPER_SERVERS=zk1.cloud,zk2.cloud,zk3.cloud restart: always container_name: ui ports: - 8080:8080 depends_on: - nimbus nimbus: image: registry.aliyuncs.com/denverdino/baqend-storm:1.0.0 command: nimbus -c nimbus.host=nimbus restart: always environment: - STORM_ZOOKEEPER_SERVERS=zk1.cloud,zk2.cloud,zk3.cloud container_name: nimbus ports: - 6627:6627 supervisor: image: registry.aliyuncs.com/denverdino/baqend-storm:1.0.0 command: supervisor -c nimbus.host=nimbus -c supervisor.slots.ports=[6700,6701,6702,6703] restart: always environment: - affinity:role!=supervisor - STORM_ZOOKEEPER_SERVERS=zk1.cloud,zk2.cloud,zk3.cloud depends_on: - nimbus networks: default: external: name: zk-net
拉取Storm搭建需要的鏡像,這裡我選擇鏡像版本為 zookeeper:3.4.8 storm:1.0.0
鍵入命令:
docker pull zookeeper:3.4.8 docker pull storm:1.0.0
storm鏡像 獲取
使用docker-compose 構建集群
在power shell中執行以下命令:
docker-compose -f storm.yml up -d
docker-compose 構建集群
在瀏覽器中打開localhost:8080 可以看到storm集群的詳細情況
storm UI 展示
二、Storm統計任務
統計股票交易情況交易量和交易總金額 (數據文件存儲在csv文件中)
編寫DataSourceSpout類
DataSourceSpout類
編寫bolt類
編寫topology類
需要註意的是 Storm Java API 下有本地模型和遠端模式
在本地模式下的調試不依賴於集群環境,可以進行簡單的調試
如果需要使用生產模式,則需要將
1、 編寫和自身業務相關的spout和bolt類,並將其打包成一個jar包
2、將上述的jar包放到客戶端代碼能讀到的任何位置,
3、使用如下方式定義一個拓撲(Topology)
演示結果:
本地模式下的調試:
正在執行:
根據24小時
根據股票種類
生產模式:
向集群提交topology
三、核心計算bolt的代碼
1.統計不同類型的股票交易量和交易總金額:
package bolt; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Set; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; @SuppressWarnings("serial") public class TypeCountBolt extends BaseRichBolt { OutputCollector collector; Map<String,Integer> map = new HashMap<String, Integer>(); Map<String,Float> map2 = new HashMap<String, Float>(); public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple input) { String line = input.getStringByField("line"); String[] data = line.split(","); Integer count = map.get(data[2]); Float total_amount = map2.get(data[2]); if(count==null){ count = 0; } if(total_amount==null){ total_amount = 0.0f; } count++; total_amount+=Float.parseFloat(data[3]) * Integer.parseInt(data[4]); map.put(data[2],count); map2.put(data[2],total_amount); System.out.println("~~~~~~~~~~~~~~~~~~~~~~~"); Set<Map.Entry<String,Integer>> entrySet = map.entrySet(); for(Map.Entry<String,Integer> entry :entrySet){ System.out.println("交易量:"); System.out.println(entry); } System.out.println(); Set<Map.Entry<String,Float>> entrySet2 = map2.entrySet(); for(Map.Entry<String,Float> entry :entrySet2){ System.out.println("交易總金額:"); System.out.println(entry); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
2. 統計不同每個小時的交易量和交易總金額
package bolt; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.Set; public class TimeCountBolt extends BaseRichBolt { OutputCollector collector; Map<Integer,Integer> map = new HashMap<Integer, Integer>(); Map<Integer,Float> map2 = new HashMap<Integer, Float>(); public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple input) { String line = input.getStringByField("line"); String[] data = line.split(","); Date date = new Date(); SimpleDateFormat dateFormat= new SimpleDateFormat("yyyy-MM-dd hh:mm:ss"); try { date = dateFormat.parse(data[0]); } catch (ParseException e) { e.printStackTrace(); } Integer count = map.get(date.getHours()); Float total_amount = map2.get(date.getHours()); if(count==null){ count = 0; } if(total_amount==null){ total_amount = 0.0f; } count++; total_amount+=Float.parseFloat(data[3]) * Integer.parseInt(data[4]); map.put(date.getHours(),count); map2.put(date.getHours(),total_amount); System.out.println("~~~~~~~~~~~~~~~~~~~~~~~"); Set<Map.Entry<Integer,Integer>> entrySet = map.entrySet(); for(Map.Entry<Integer,Integer> entry :entrySet){ System.out.println("交易量:"); System.out.println(entry); } System.out.println(); Set<Map.Entry<Integer,Float>> entrySet2 = map2.entrySet(); for(Map.Entry<Integer,Float> entry :entrySet2){ System.out.println("交易總金額:"); System.out.println(entry); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { } }