Flume+Kafka+Storm+Hbase+HDSF+Poi整合 需求: 針對一個網站,我們需要根據用戶的行為記錄日誌信息,分析對我們有用的數據。 ...
Flume+Kafka+Storm+Hbase+HDSF+Poi整合
需求:
針對一個網站,我們需要根據用戶的行為記錄日誌信息,分析對我們有用的數據。
舉例:這個網站www.hongten.com(當然這是一個我虛擬的電商網站),用戶在這個網站裡面可以有很多行為,比如註冊,登錄,查看,點擊,雙擊,購買東西,加入購物車,添加記錄,修改記錄,刪除記錄,評論,登出等一系列我們熟悉的操作。這些操作都被記錄在日誌信息裡面。我們要對日誌信息進行分析。
本文中,我們對購買東西和加入購物車兩個行為進行分析。然後生成相應的報表,這樣我們可以通過報表查看用戶在什麼時候喜歡購買東西,什麼時候喜歡加入購物車,從而,在相應的時間採取行動,激烈用戶購買東西,推薦商品給用戶加入購物車(加入購物車,這屬於潛在購買用戶)。
畢竟網站盈利才是我們希望達到的目的,對吧。
1.抽象用戶行為
// 用戶的action public static final String[] USER_ACTION = { "Register", "Login", "View", "Click", "Double_Click", "Buy", "Shopping_Car", "Add", "Edit", "Delete", "Comment", "Logout" };
2.日誌格式定義
115.19.62.102 海南 2018-12-20 1545286960749 1735787074662918890 www.hongten.com Edit
27.177.45.84 新疆 2018-12-20 1545286962255 6667636903937987930 www.hongten.com Delete
176.54.120.96 寧夏 2018-12-20 1545286962256 6988408478348165495 www.hongten.com Comment
175.117.33.187 遼寧 2018-12-20 1545286962257 8411202446705338969 www.hongten.com Shopping_Car
17.67.62.213 天津 2018-12-20 1545286962258 7787584752786413943 www.hongten.com Add
137.81.41.9 海南 2018-12-20 1545286962259 6218367085234099455 www.hongten.com Shopping_Car
125.187.107.57 山東 2018-12-20 1545286962260 3358658811146151155 www.hongten.com Double_Click
104.167.205.87 內蒙 2018-12-20 1545286962261 2303468282544965471 www.hongten.com Shopping_Car
64.106.149.83 河南 2018-12-20 1545286962262 8422202443986582525 www.hongten.com Delete
138.22.156.183 浙江 2018-12-20 1545286962263 7649154147863130337 www.hongten.com Shopping_Car
41.216.103.31 河北 2018-12-20 1545286962264 6785302169446728008 www.hongten.com Shopping_Car
132.144.93.20 廣東 2018-12-20 1545286962265 6444575166009004406 www.hongten.com Add
日誌格式:
//log fromat String log = ip + "\t" + address + "\t" + d + "\t" + timestamp + "\t" + userid + "\t" + Common.WEB_SITE + "\t" + action;
3.系統架構
4.報表樣式
由於我採用的是隨機生成數據,所有,我們看到的結果呈現線性增長
這裡我只是實現了一個小時的報表,當然,也可以做一天,一個季度,全年,三年,五年的報表,可以根據實際需求實現即可。
5.組件分佈情況
我總共搭建了4個節點node1,node2,node3,node4(註: 4個節點上面都要有JDK)
Zookeeper安裝在node1,node2,nod3
Hadoop集群在node1,node2,nod3,node4
Hbase集群在node1,node2,nod3,node4
Flume安裝在node2
Kafka安裝在node1,node2,node3
Storm安裝在node1,node2,node3
6.具體實現
6.1.配置Flume
--從node2 cd flumedir vi flume2kafka --node2配置如下 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.bind = node2 a1.sources.r1.port = 41414 # Describe the sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.topic = all_my_log a1.sinks.k1.brokerList = node1:9092,node2:9092,node3:9092 a1.sinks.k1.requiredAcks = 1 a1.sinks.k1.batchSize = 20 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000000 a1.channels.c1.transactionCapacity = 10000 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 :wq
6.2.啟動Zookeeper
--關閉防火牆node1,node2,node3,node4 service iptables stop --啟動Zookeeper,在node1,node2,node3 zkServer.sh start
6.3.啟動Kafka
--啟動kafka --分別進入node1,node2,node3 cd /root/kafka/kafka_2.10-0.8.2.2 ./start-kafka.sh
6.4.啟動Flume服務
--進入node2,啟動 cd /root/flumedir flume-ng agent -n a1 -c conf -f flume2kafka -Dflume.root.logger=DEBUG,console
6.5.產生日誌信息並寫入到Flume
運行java 代碼,產生日誌信息並寫入到Flume伺服器
package com.b510.big.data.flume.client; import java.nio.charset.Charset; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.api.RpcClient; import org.apache.flume.api.RpcClientFactory; import org.apache.flume.event.EventBuilder; /** * @author Hongten * * 功能: 模擬產生用戶日誌信息,並且向Flume發送數據 */ public class FlumeClient { public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new GenerateDataAndSend2Flume()); exec.shutdown(); } } class GenerateDataAndSend2Flume implements Runnable { FlumeRPCClient flumeRPCClient; static Random random = new Random(); GenerateDataAndSend2Flume() { // 初始化RPC客戶端 flumeRPCClient = new FlumeRPCClient(); flumeRPCClient.init(Common.FLUME_HOST_NAME, Common.FLUME_PORT); } @Override public void run() { while (true) { Date date = new Date(); SimpleDateFormat simpleDateFormat = new SimpleDateFormat(Common.DATE_FORMAT_YYYYDDMM); String d = simpleDateFormat.format(date); Long timestamp = new Date().getTime(); // ip地址生成 String ip = random.nextInt(Common.MAX_IP_NUMBER) + "." + random.nextInt(Common.MAX_IP_NUMBER) + "." + random.nextInt(Common.MAX_IP_NUMBER) + "." + random.nextInt(Common.MAX_IP_NUMBER); // ip地址對應的address(這裡是為了構造數據,並沒有按照真實的ip地址,找到對應的address) String address = Common.ADDRESS[random.nextInt(Common.ADDRESS.length)]; Long userid = Math.abs(random.nextLong()); String action = Common.USER_ACTION[random.nextInt(Common.USER_ACTION.length)]; // 日誌信息構造 // example : 199.80.45.117 雲南 2018-12-20 1545285957720 3086250439781555145 www.hongten.com Buy String data = ip + "\t" + address + "\t" + d + "\t" + timestamp + "\t" + userid + "\t" + Common.WEB_SITE + "\t" + action; //System.out.println(data); // 往Flume發送數據 flumeRPCClient.sendData2Flume(data); try { TimeUnit.MICROSECONDS.sleep(random.nextInt(1000)); } catch (InterruptedException e) { flumeRPCClient.cleanUp(); System.out.println("interrupted exception : " + e); } } } } class FlumeRPCClient { private RpcClient client; private String hostname; private int port; public void init(String hostname, int port) { this.hostname = hostname; this.port = port; this.client = getRpcClient(hostname, port); } public void sendData2Flume(String data) { Event event = EventBuilder.withBody(data, Charset.forName(Common.CHAR_FORMAT)); try { client.append(event); } catch (EventDeliveryException e) { cleanUp(); client = null; client = getRpcClient(hostname, port); } } public RpcClient getRpcClient(String hostname, int port) { return RpcClientFactory.getDefaultInstance(hostname, port); } public void cleanUp() { // Close the RPC connection client.close(); } } // 所有的常量定義 class Common { public static final String CHAR_FORMAT = "UTF-8"; public static final String DATE_FORMAT_YYYYDDMM = "yyyy-MM-dd"; // this is a test web site public static final String WEB_SITE = "www.hongten.com"; // 用戶的action public static final String[] USER_ACTION = { "Register", "Login", "View", "Click", "Double_Click", "Buy", "Shopping_Car", "Add", "Edit", "Delete", "Comment", "Logout" }; public static final int MAX_IP_NUMBER = 224; // ip所對應的地址 public static String[] ADDRESS = { "北京", "天津", "上海", "廣東", "重慶", "河北", "山東", "河南", "雲南", "山西", "甘肅", "安徽", "福建", "黑龍江", "海南", "四川", "貴州", "寧夏", "新疆", "湖北", "湖南", "山西", "遼寧", "吉林", "江蘇", "浙江", "青海", "江西", "西藏", "內蒙", "廣西", "香港", "澳門", "臺灣", }; // Flume conf public static final String FLUME_HOST_NAME = "node2"; public static final int FLUME_PORT = 41414; }
6.6.監聽Kafka
--進入node3,啟動kafka消費者 cd /home/kafka-2.10/bin ./kafka-console-consumer.sh --zookeeper node1,node2,node3 --from-beginning --topic all_my_log
運行效果:
168.208.193.207 安徽 2018-12-20 1545287646527 5462770148222682599 www.hongten.com Login
103.143.79.127 新疆 2018-12-20 1545287646529 3389475301916412717 www.hongten.com Login
111.208.80.39 山東 2018-12-20 1545287646531 535601622597096753 www.hongten.com Shopping_Car
105.30.86.46 四川 2018-12-20 1545287646532 7825340079790811845 www.hongten.com Login
205.55.33.74 新疆 2018-12-20 1545287646533 4228838365367235561 www.hongten.com Logout
34.44.60.134 安徽 2018-12-20 1545287646536 702584874247456732 www.hongten.com Double_Click
154.169.15.145 廣東 2018-12-20 1545287646537 1683351753576425036 www.hongten.com View
126.28.192.28 湖南 2018-12-20 1545287646538 8319814684518483148 www.hongten.com Edit
5.140.156.73 臺灣 2018-12-20 1545287646539 7432409906375230025 www.hongten.com Logout
72.175.210.95 西藏 2018-12-20 1545287646540 5233707593244910849 www.hongten.com View
121.25.190.25 廣西 2018-12-20 1545287646541 268200251881841673 www.hongten.com Buy
6.7.在Kafka創建Topic
--進入node1,創建一個topic:filtered_log --設置3個partitions --replication-factor=3 ./kafka-topics.sh --zookeeper node1,node2,node3 --create --topic filtered_log --partitions 3 --replication-factor 3
6.8.Storm清洗數據
- Storm從Kafka消費數據
- Storm對數據進行篩選(Buy-已經購買,Shopping_Car-潛在購買)
- Storm把篩選的數據放入到Kafka
package com.b510.big.data.storm.process; import java.util.ArrayList; import java.util.List; import java.util.Properties; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.StringScheme; import storm.kafka.ZkHosts; import storm.kafka.bolt.KafkaBolt; import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper; import storm.kafka.bolt.selector.DefaultTopicSelector; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class LogFilterTopology { public static void main(String[] args) { ZkHosts zkHosts = new ZkHosts(Common.ZOOKEEPER_QUORUM); //Spout從'filtered_log' topic裡面獲取數據 SpoutConfig spoutConfig = new SpoutConfig(zkHosts, Common.ALL_MY_LOG_TOPIC, Common.ZOOKEEPER_ROOT, Common.ZOOKEEPER_ID); List<String> zkServers = new ArrayList<>(); for (String host : zkHosts.brokerZkStr.split(",")) { zkServers.add(host.split(":")[0]); } spoutConfig.zkServers = zkServers; spoutConfig.zkPort = Common.ZOOKEEPER_PORT; spoutConfig.forceFromStart = true; spoutConfig.socketTimeoutMs = 60 * 60 * 1000; spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); // 創建KafkaSpout KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); TopologyBuilder builder = new TopologyBuilder(); // Storm從Kafka消費數據 builder.setSpout(Common.KAFKA_SPOUT, kafkaSpout, 3); // Storm對數據進行篩選(Buy-已經購買,Shopping_Car-潛在購買) builder.setBolt(Common.FILTER_BOLT, new FilterBolt(), 8).shuffleGrouping(Common.KAFKA_SPOUT); // 創建KafkaBolt @SuppressWarnings({ "unchecked", "rawtypes" }) KafkaBolt kafkaBolt = new KafkaBolt().withTopicSelector(new DefaultTopicSelector(Common.FILTERED_LOG_TOPIC)).withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper()); // Storm把篩選的數據放入到Kafka builder.setBolt(Common.KAFKA_BOLT, kafkaBolt, 2).shuffleGrouping(Common.FILTER_BOLT); Properties props = new Properties(); props.put("metadata.broker.list", Common.STORM_METADATA_BROKER_LIST); props.put("request.required.acks", Common.STORM_REQUEST_REQUIRED_ACKS); props.put("serializer.class", Common.STORM_SERILIZER_CLASS); Config conf = new Config(); conf.put("kafka.broker.properties", props); conf.put(Config.STORM_ZOOKEEPER_SERVERS, zkServers); if (args == null || args.length == 0) { // 本地方式運行 LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("storm-kafka-topology", conf, builder.createTopology()); } else { // 集群方式運行 conf.setNumWorkers(3); try { StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } catch (AlreadyAliveException | InvalidTopologyException e) { System.out.println("error : " + e); } } } } class FilterBolt extends BaseBasicBolt { private static final long serialVersionUID = 1L; @Override public void execute(Tuple input, BasicOutputCollector collector) { String logStr = input.getString(0); // 只針對我們感興趣的關鍵字進行過濾 // 這裡我們過濾包含'Buy', 'Shopping_Car'的日誌信息 if (logStr.contains(Common.KEY_WORD_BUY) || logStr.contains(Common.KEY_WORD_SHOPPING_CAR)) { collector.emit(new Values(logStr)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(FieldNameBasedTupleToKafkaMapper.BOLT_MESSAGE)); } } class Common { public static final String ALL_MY_LOG_TOPIC = "all_my_log"; public static final String FILTERED_LOG_TOPIC = "filtered_log"; public static final String DATE_FORMAT_YYYYDDMMHHMMSS = "yyyyMMddHHmmss"; public static final String DATE_FORMAT_HHMMSS = "HHmmss"; public static final String DATE_FORMAT_HHMMSS_DEFAULT_VALUE = "000001"; public static final String HBASE_ZOOKEEPER_LIST = "node1:2888,node2:2888,node3:2888"; public static final int ZOOKEEPER_PORT = 2181; public static final String ZOOKEEPER_QUORUM = "node1:" + ZOOKEEPER_PORT + ",node2:" + ZOOKEEPER_PORT + ",node3:" + ZOOKEEPER_PORT + ""; public static final String ZOOKEEPER_ROOT = "/MyKafka"; public static final String ZOOKEEPER_ID = "MyTrack"; public static final String KAFKA_SPOUT = "kafkaSpout"; public static final String FILTER_BOLT = "filterBolt"; public static final String PROCESS_BOLT = "processBolt"; public static final String HBASE_BOLT = "hbaseBolt"; public static final String KAFKA_BOLT = "kafkaBolt"; // Storm Conf public static final String STORM_METADATA_BROKER_LIST = "node1:9092,node2:9092,node3:9092"; public static final String STORM_REQUEST_REQUIRED_ACKS = "1"; public static final String STORM_SERILIZER_CLASS = "kafka.serializer.StringEncoder"; // key word public static final String KEY_WORD_BUY = "Buy"; public static final String KEY_WORD_SHOPPING_CAR = "Shopping_Car"; //hbase public static final String TABLE_USER_ACTION = "t_user_actions"; public static final String COLUMN_FAMILY = "cf"; //間隔多少秒寫入Hbase一次 public static final int WRITE_RECORD_TO_TABLE_PER_SECOND = 1; public static final int TABLE_MAX_VERSION = (60/WRITE_RECORD_TO_TABLE_PER_SECOND) * 60 * 24; }
6.9.監聽Kafka
--進入node3,啟動kafka消費者 cd /home/kafka-2.10/bin ./kafka-console-consumer.sh --zookeeper node1,node2,node3 --from-beginning --topic filtered_log
效果:
87.26.135.185 黑龍江 2018-12-20 1545290594658 7290881731606227972 www.hongten.com Shopping_Car
60.96.96.38 青海 2018-12-20 1545290594687 6935901257286057015 www.hongten.com Shopping_Car
43.159.110.193 江蘇 2018-12-20 1545290594727 7096698224110515553 www.hongten.com Shopping_Car
21.103.139.11 山西 2018-12-20 1545290594693 7805867078876194442 www.hongten.com Shopping_Car
139.51.213.184 廣東 2018-12-20 1545290594729 8048796865619113514 www.hongten.com Buy
58.213.148.89 河北 2018-12-20 1545290594708 5176551342435592748 www.hongten.com Buy
36.205.221.116 湖南 2018-12-20 1545290594715 4484717918039766421 www.hongten.com Shopping_Car
135.194.103.53 北京 2018-12-20 1545290594769 4833011508087432349 www.hongten.com Shopping_Car
180.21.100.66 貴州 2018-12-20 1545290594752 5270357330431599426 www.hongten.com Buy
167.71.65.70 山西 2018-12-20 1545290594790 275898530145861990 www.hongten.com Buy
125.51.21.199 寧夏 2018-12-20 1545290594814 3613499600574777198 www.hongten.com Buy
6.10.Storm再次消費Kafka數據處理後保存數據到Hbase
- Storm再次從Kafka消費數據
- Storm對數據進行統計(Buy-已經購買人數,Shopping_Car-潛在購買人數)
- Storm將數據寫入到Hbase
package com.b510.big.data.storm.process; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Map; import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.StringScheme; import storm.kafka.ZkHosts; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IBasicBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class LogProcessTopology { public static void main(String[] args) { ZkHosts zkHosts = new ZkHosts(Common.ZOOKEEPER_QUORUM); //Spout從'filtered_log' topic裡面獲取數據 SpoutConfig spoutConfig = new SpoutConfig(zkHosts, Common.FILTERED_LOG_TOPIC, Common.ZOOKEEPER_ROOT, Common.ZOOKEEPER_ID); List<String> zkServers = new ArrayList<>(); for (String host : zkHosts.brokerZkStr.split(",")) { zkServers.add(host.split(":")[0]); } spoutConfig.zkServers = zkServers; spoutConfig.zkPort = Common.ZOOKEEPER_PORT; spoutConfig.forceFromStart = true; spoutConfig.socketTimeoutMs = 60 * 60 * 1000; spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); // 創建KafkaSpout KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); TopologyBuilder builder = new TopologyBuilder(); // Storm再次從Kafka消費數據 builder.setSpout(Common.KAFKA_SPOUT, kafkaSpout, 3); // Storm對數據進行統計(Buy-已經購買人數,Shopping_Car-潛在購買人數) builder.setBolt(Common.PROCESS_BOLT, new ProcessBolt(), 3).shuffleGrouping(Common.KAFKA_SPOUT); // Storm將數據寫入到Hbase builder.setBolt(Common.HBASE_BOLT, new HbaseBolt(), 3).shuffleGrouping(Common.PROCESS_BOLT); Properties props = new Properties(); props.put("metadata.broker.list", Common.STORM_METADATA_BROKER_LIST); props.put("request.required.acks", Common.STORM_REQUEST_REQUIRED_ACKS); props.put("serializer.class", Common.STORM_SERILIZER_CLASS); Config conf = new Config(); conf.put("kafka.broker.properties", props); conf.put(Config.STORM_ZOOKEEPER_SERVERS, zkServers); if (args == null || args.length == 0) { // 本地方式運行 LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("storm-kafka-topology", conf, builder.createTopology()); } else { // 集群方式運行 conf.setNumWorkers(3); try { StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } catch (AlreadyAliveException | InvalidTopologyException e) { System.out.println("error : " + e); } } } } class ProcessBolt extends BaseBasicBolt { private static final long serialVersionUID = 1L; @Override public void execute(Tuple input, BasicOutputCollector collector) { String logStr = input.getString(0); if (logStr != null) { String infos[] = logStr.split("\\t"); //180.21.100.66 貴州 2018-12-20 1545290594752 5270357330431599426 www.hongten.com Buy collector.emit(new Values(infos[2], infos[6])); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("date", "user_action")); } } class HbaseBolt implements IBasicBolt { private static final long serialVersionUID = 1L; HBaseDAO hBaseDAO = null; SimpleDateFormat simpleDateFormat = null; SimpleDateFormat simpleDateFormatHHMMSS = null; int userBuyCount = 0; int userShoopingCarCount = 0; //這裡要考慮避免頻繁寫入數據到hbase int writeToHbaseMaxNum = Common.WRITE_RECORD_TO_TABLE_PER_SECOND * 1000; long begin = System.currentTimeMillis(); long end = 0; @SuppressWarnings("rawtypes") @Override public void prepare(Map map, TopologyContext context) { hBaseDAO = new HBaseDAOImpl(); simpleDateFormat = new SimpleDateFormat(Common.DATE_FORMAT_YYYYDDMMHHMMSS); simpleDateFormatHHMMSS = new SimpleDateFormat(Common.DATE_FORMAT_HHMMSS); hBaseDAO.createTable(Common.TABLE_USER_ACTION, new String[]{Common.COLUMN_FAMILY}, Common.TABLE_MAX_VERSION); } @Override public void execute(Tuple input, BasicOutputCollector collector) { // 如果時間是第二天的凌晨1s // 需要對count做清零處理 //不過這裡的判斷不是很準確,因為在此時,可能前一天的數據還沒有處理完 if (simpleDateFormatHHMMSS.format(new Date()).equals(Common.DATE_FORMAT_HHMMSS_DEFAULT_VALUE)) { userBuyCount = 0; userShoopingCarCount = 0; } if (input != null) { // base one ProcessBolt.declareOutputFields() String date = input.getString(0); String userAction = input.getString(1); if (userAction.equals(Common.KEY_WORD_BUY)) { //同一個user在一天之內可以重覆'Buy'動作 userBuyCount++; } if (userAction.equals(Common.KEY_WORD_SHOPPING_CAR)) { userShoopingCarCount++; } end = System.currentTimeMillis(); if ((end - begin) > writeToHbaseMaxNum) { System.out.println("hbase_key: " + Common.KEY_WORD_BUY + "_" + date + " , userBuyCount: " + userBuyCount + ", userShoopingCarCount :" + userShoopingCarCount); //往hbase中寫入數據 String quailifer = simpleDateFormat.format(new Date()); hBaseDAO.insert(Common.TABLE_USER_ACTION , Common.KEY_WORD_BUY + "_" + date, Common.COLUMN_FAMILY, new String[] { quailifer }, new String[] { "{user_buy_count:" + userBuyCount + "}" } ); hBaseDAO.insert(Common.TABLE_USER_ACTION , Common.KEY_WORD_SHOPPING_CAR + "_" + date, Common.COLUMN_FAMILY, new String[] { quailifer }, new String[] { "{user_shopping_car_count:" + userShoopingCarCount + "}" } ); begin = System.currentTimeMillis(); } } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } @Override public Map<String, Object> getComponentConfiguration() { return null; } @Override public void cleanup() { } } interface HBaseDAO { public void createTable(String tableName, String[] columnFamilys, int maxVersion); public void insert(String tableName, String rowKey, String family, String quailifer[], String value[]); } class HBaseDAOImpl implements HBaseDAO { HConnection hConnection = null; static Configuration conf = null; public HBaseDAOImpl() { conf = new Configuration(); conf.set("hbase.zookeeper.quorum", Common.HBASE_ZOOKEEPER_LIST); try { hConnection = HConnectionManager.createConnection(conf); } catch (IOException e) { e.printStackTrace(); } } public void createTable(String tableName, String[] columnFamilys, int maxVersion) { try { HBaseAdmin admin = new HBaseAdmin(conf); if (admin.tableExists(tableName)) { System.err.println("table existing in hbase."); } else { HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName)); for (String columnFamily : columnFamilys) { HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(columnFamily); hColumnDescriptor.setMaxVersions(maxVersion); tableDesc.addFamily(hColumnDescriptor); } admin.createTable(tableDesc); System.err.println("table is created."); } admin.close(); } catch (Exception e) { e.printStackTrace(); } } @Override public void insert(String tableName, String rowKey, String family, String quailifer[], String value[]) { HTableInterface table = null; try { table = hConnection.getTable(tableName); Put put = new Put(rowKey.getBytes()); for (int i = 0; i < quailifer.length; i++) { String col = quailifer[i]; String val = value[i]; put.add(family.getBytes(), col.getBytes(), val.getBytes()); } table.put(put); System.err.println("save record successfuly."); } catch (Exception e) { e.printStackTrace(); } finally { try { table.close(); } catch (IOException e) { e.printStackTrace(); } } } }
Storm處理邏輯:
1.每秒向Hbase寫入數據
2.明天凌晨會重置數據
如果,我們一直運行上面的程式,那麼,系統就會一直往Hbase裡面寫入數據,那麼這樣,我們就可以採集到我們生成報表的數據了。
那麼下麵就是報表實現
6.11.讀取Hbase數據通過POI生成Excel Report
- 讀取Hbase數據
- 通過POI生成Excel報表
package com.b510.big.data.poi; import<