Flume+Kafka+Storm+Hbase+HDSF+Poi整合

来源:https://www.cnblogs.com/hongten/archive/2018/12/21/hongten_flume_kafka_storm_hbase_hdfs_poi.html
-Advertisement-
Play Games

Flume+Kafka+Storm+Hbase+HDSF+Poi整合 需求: 針對一個網站,我們需要根據用戶的行為記錄日誌信息,分析對我們有用的數據。 ...


Flume+Kafka+Storm+Hbase+HDSF+Poi整合

需求:

針對一個網站,我們需要根據用戶的行為記錄日誌信息,分析對我們有用的數據。

舉例:這個網站www.hongten.com(當然這是一個我虛擬的電商網站),用戶在這個網站裡面可以有很多行為,比如註冊,登錄,查看,點擊,雙擊,購買東西,加入購物車,添加記錄,修改記錄,刪除記錄,評論,登出等一系列我們熟悉的操作。這些操作都被記錄在日誌信息裡面。我們要對日誌信息進行分析。

本文中,我們對購買東西和加入購物車兩個行為進行分析。然後生成相應的報表,這樣我們可以通過報表查看用戶在什麼時候喜歡購買東西,什麼時候喜歡加入購物車,從而,在相應的時間採取行動,激烈用戶購買東西,推薦商品給用戶加入購物車(加入購物車,這屬於潛在購買用戶)。

畢竟網站盈利才是我們希望達到的目的,對吧。

 

1.抽象用戶行為

    // 用戶的action
    public static final String[] USER_ACTION = { "Register", "Login", "View", "Click", "Double_Click", "Buy", "Shopping_Car", "Add", "Edit", "Delete", "Comment", "Logout" };

 

2.日誌格式定義

115.19.62.102    海南    2018-12-20    1545286960749    1735787074662918890    www.hongten.com    Edit
27.177.45.84    新疆    2018-12-20    1545286962255    6667636903937987930    www.hongten.com    Delete
176.54.120.96    寧夏    2018-12-20    1545286962256    6988408478348165495    www.hongten.com    Comment
175.117.33.187    遼寧    2018-12-20    1545286962257    8411202446705338969    www.hongten.com    Shopping_Car
17.67.62.213    天津    2018-12-20    1545286962258    7787584752786413943    www.hongten.com    Add
137.81.41.9    海南    2018-12-20    1545286962259    6218367085234099455    www.hongten.com    Shopping_Car
125.187.107.57    山東    2018-12-20    1545286962260    3358658811146151155    www.hongten.com    Double_Click
104.167.205.87    內蒙    2018-12-20    1545286962261    2303468282544965471    www.hongten.com    Shopping_Car
64.106.149.83    河南    2018-12-20    1545286962262    8422202443986582525    www.hongten.com    Delete
138.22.156.183    浙江    2018-12-20    1545286962263    7649154147863130337    www.hongten.com    Shopping_Car
41.216.103.31    河北    2018-12-20    1545286962264    6785302169446728008    www.hongten.com    Shopping_Car
132.144.93.20    廣東    2018-12-20    1545286962265    6444575166009004406    www.hongten.com    Add

日誌格式:

//log fromat
String log = ip + "\t" + address + "\t" + d + "\t" + timestamp + "\t" + userid + "\t" + Common.WEB_SITE + "\t" + action;

 

3.系統架構

 

 

4.報表樣式

由於我採用的是隨機生成數據,所有,我們看到的結果呈現線性增長

這裡我只是實現了一個小時的報表,當然,也可以做一天,一個季度,全年,三年,五年的報表,可以根據實際需求實現即可。

 

5.組件分佈情況

我總共搭建了4個節點node1,node2,node3,node4(註: 4個節點上面都要有JDK)

Zookeeper安裝在node1,node2,nod3

Hadoop集群在node1,node2,nod3,node4

Hbase集群在node1,node2,nod3,node4

Flume安裝在node2

Kafka安裝在node1,node2,node3

Storm安裝在node1,node2,node3

 

6.具體實現

6.1.配置Flume

--從node2
cd flumedir

vi flume2kafka

--node2配置如下
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = node2
a1.sources.r1.port = 41414

# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = all_my_log
a1.sinks.k1.brokerList = node1:9092,node2:9092,node3:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 10000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

:wq

 

6.2.啟動Zookeeper

--關閉防火牆node1,node2,node3,node4
service iptables stop

--啟動Zookeeper,在node1,node2,node3
zkServer.sh start

 

6.3.啟動Kafka

--啟動kafka
--分別進入node1,node2,node3
cd /root/kafka/kafka_2.10-0.8.2.2
./start-kafka.sh

 

6.4.啟動Flume服務

--進入node2,啟動
cd /root/flumedir
flume-ng agent -n a1 -c conf -f flume2kafka -Dflume.root.logger=DEBUG,console

 

6.5.產生日誌信息並寫入到Flume

運行java 代碼,產生日誌信息並寫入到Flume伺服器

package com.b510.big.data.flume.client;

import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;

/**
 * @author Hongten
 * 
 *         功能: 模擬產生用戶日誌信息,並且向Flume發送數據
 */
public class FlumeClient {

    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(new GenerateDataAndSend2Flume());

        exec.shutdown();
    }

}

class GenerateDataAndSend2Flume implements Runnable {

    FlumeRPCClient flumeRPCClient;
    static Random random = new Random();

    GenerateDataAndSend2Flume() {
        // 初始化RPC客戶端
        flumeRPCClient = new FlumeRPCClient();
        flumeRPCClient.init(Common.FLUME_HOST_NAME, Common.FLUME_PORT);
    }

    @Override
    public void run() {
        while (true) {
            Date date = new Date();
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat(Common.DATE_FORMAT_YYYYDDMM);
            String d = simpleDateFormat.format(date);
            Long timestamp = new Date().getTime();
            // ip地址生成
            String ip = random.nextInt(Common.MAX_IP_NUMBER) + "." + random.nextInt(Common.MAX_IP_NUMBER) + "." + random.nextInt(Common.MAX_IP_NUMBER) + "." + random.nextInt(Common.MAX_IP_NUMBER);
            // ip地址對應的address(這裡是為了構造數據,並沒有按照真實的ip地址,找到對應的address)
            String address = Common.ADDRESS[random.nextInt(Common.ADDRESS.length)];

            Long userid = Math.abs(random.nextLong());
            String action = Common.USER_ACTION[random.nextInt(Common.USER_ACTION.length)];
            // 日誌信息構造
            // example : 199.80.45.117 雲南 2018-12-20 1545285957720 3086250439781555145 www.hongten.com Buy
            String data = ip + "\t" + address + "\t" + d + "\t" + timestamp + "\t" + userid + "\t" + Common.WEB_SITE + "\t" + action;
            //System.out.println(data);

            // 往Flume發送數據
            flumeRPCClient.sendData2Flume(data);

            try {
                TimeUnit.MICROSECONDS.sleep(random.nextInt(1000));
            } catch (InterruptedException e) {
                flumeRPCClient.cleanUp();
                System.out.println("interrupted exception : " + e);
            }
        }
    }
}

class FlumeRPCClient {

    private RpcClient client;
    private String hostname;
    private int port;

    public void init(String hostname, int port) {
        this.hostname = hostname;
        this.port = port;
        this.client = getRpcClient(hostname, port);
    }

    public void sendData2Flume(String data) {
        Event event = EventBuilder.withBody(data, Charset.forName(Common.CHAR_FORMAT));

        try {
            client.append(event);
        } catch (EventDeliveryException e) {
            cleanUp();
            client = null;
            client = getRpcClient(hostname, port);
        }
    }

    public RpcClient getRpcClient(String hostname, int port) {
        return RpcClientFactory.getDefaultInstance(hostname, port);
    }

    public void cleanUp() {
        // Close the RPC connection
        client.close();
    }
}

// 所有的常量定義
class Common {
    public static final String CHAR_FORMAT = "UTF-8";

    public static final String DATE_FORMAT_YYYYDDMM = "yyyy-MM-dd";

    // this is a test web site
    public static final String WEB_SITE = "www.hongten.com";

    // 用戶的action
    public static final String[] USER_ACTION = { "Register", "Login", "View", "Click", "Double_Click", "Buy", "Shopping_Car", "Add", "Edit", "Delete", "Comment", "Logout" };

    public static final int MAX_IP_NUMBER = 224;
    // ip所對應的地址
    public static String[] ADDRESS = { "北京", "天津", "上海", "廣東", "重慶", "河北", "山東", "河南", "雲南", "山西", "甘肅", "安徽", "福建", "黑龍江", "海南", "四川", "貴州", "寧夏", "新疆", "湖北", "湖南", "山西", "遼寧", "吉林", "江蘇", "浙江", "青海", "江西", "西藏", "內蒙", "廣西", "香港", "澳門", "臺灣", };

    // Flume conf
    public static final String FLUME_HOST_NAME = "node2";
    public static final int FLUME_PORT = 41414;

}

 

6.6.監聽Kafka

--進入node3,啟動kafka消費者
cd /home/kafka-2.10/bin
./kafka-console-consumer.sh --zookeeper node1,node2,node3 --from-beginning --topic all_my_log

 

運行效果:

168.208.193.207    安徽    2018-12-20    1545287646527    5462770148222682599    www.hongten.com    Login
103.143.79.127    新疆    2018-12-20    1545287646529    3389475301916412717    www.hongten.com    Login
111.208.80.39    山東    2018-12-20    1545287646531    535601622597096753    www.hongten.com    Shopping_Car
105.30.86.46    四川    2018-12-20    1545287646532    7825340079790811845    www.hongten.com    Login
205.55.33.74    新疆    2018-12-20    1545287646533    4228838365367235561    www.hongten.com    Logout
34.44.60.134    安徽    2018-12-20    1545287646536    702584874247456732    www.hongten.com    Double_Click
154.169.15.145    廣東    2018-12-20    1545287646537    1683351753576425036    www.hongten.com    View
126.28.192.28    湖南    2018-12-20    1545287646538    8319814684518483148    www.hongten.com    Edit
5.140.156.73    臺灣    2018-12-20    1545287646539    7432409906375230025    www.hongten.com    Logout
72.175.210.95    西藏    2018-12-20    1545287646540    5233707593244910849    www.hongten.com    View
121.25.190.25    廣西    2018-12-20    1545287646541    268200251881841673    www.hongten.com    Buy

 

6.7.在Kafka創建Topic

--進入node1,創建一個topic:filtered_log
--設置3個partitions
--replication-factor=3
./kafka-topics.sh --zookeeper node1,node2,node3 --create --topic filtered_log --partitions 3 --replication-factor 3

 

6.8.Storm清洗數據

  • Storm從Kafka消費數據
  • Storm對數據進行篩選(Buy-已經購買,Shopping_Car-潛在購買)
  • Storm把篩選的數據放入到Kafka
package com.b510.big.data.storm.process;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import storm.kafka.bolt.KafkaBolt;
import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
import storm.kafka.bolt.selector.DefaultTopicSelector;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class LogFilterTopology {

    public static void main(String[] args) {

        ZkHosts zkHosts = new ZkHosts(Common.ZOOKEEPER_QUORUM);
        //Spout從'filtered_log' topic裡面獲取數據
        SpoutConfig spoutConfig = new SpoutConfig(zkHosts, Common.ALL_MY_LOG_TOPIC, Common.ZOOKEEPER_ROOT, Common.ZOOKEEPER_ID);
        List<String> zkServers = new ArrayList<>();
        for (String host : zkHosts.brokerZkStr.split(",")) {
            zkServers.add(host.split(":")[0]);
        }

        spoutConfig.zkServers = zkServers;
        spoutConfig.zkPort = Common.ZOOKEEPER_PORT;
        spoutConfig.forceFromStart = true;
        spoutConfig.socketTimeoutMs = 60 * 60 * 1000;
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

        // 創建KafkaSpout
        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

        TopologyBuilder builder = new TopologyBuilder();
        // Storm從Kafka消費數據
        builder.setSpout(Common.KAFKA_SPOUT, kafkaSpout, 3);
        // Storm對數據進行篩選(Buy-已經購買,Shopping_Car-潛在購買)
        builder.setBolt(Common.FILTER_BOLT, new FilterBolt(), 8).shuffleGrouping(Common.KAFKA_SPOUT);

        // 創建KafkaBolt
        @SuppressWarnings({ "unchecked", "rawtypes" })
        KafkaBolt kafkaBolt = new KafkaBolt().withTopicSelector(new DefaultTopicSelector(Common.FILTERED_LOG_TOPIC)).withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());

        // Storm把篩選的數據放入到Kafka
        builder.setBolt(Common.KAFKA_BOLT, kafkaBolt, 2).shuffleGrouping(Common.FILTER_BOLT);

        Properties props = new Properties();
        props.put("metadata.broker.list", Common.STORM_METADATA_BROKER_LIST);
        props.put("request.required.acks", Common.STORM_REQUEST_REQUIRED_ACKS);
        props.put("serializer.class", Common.STORM_SERILIZER_CLASS);

        Config conf = new Config();
        conf.put("kafka.broker.properties", props);

        conf.put(Config.STORM_ZOOKEEPER_SERVERS, zkServers);

        if (args == null || args.length == 0) {
            // 本地方式運行
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("storm-kafka-topology", conf, builder.createTopology());
        } else {
            // 集群方式運行
            conf.setNumWorkers(3);
            try {
                StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
            } catch (AlreadyAliveException | InvalidTopologyException e) {
                System.out.println("error : " + e);
            }
        }
    }
}

class FilterBolt extends BaseBasicBolt {

    private static final long serialVersionUID = 1L;

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String logStr = input.getString(0);
        // 只針對我們感興趣的關鍵字進行過濾
        // 這裡我們過濾包含'Buy', 'Shopping_Car'的日誌信息
        if (logStr.contains(Common.KEY_WORD_BUY) || logStr.contains(Common.KEY_WORD_SHOPPING_CAR)) {
            collector.emit(new Values(logStr));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields(FieldNameBasedTupleToKafkaMapper.BOLT_MESSAGE));
    }
}

class Common {
    public static final String ALL_MY_LOG_TOPIC = "all_my_log";
    public static final String FILTERED_LOG_TOPIC = "filtered_log";
    
    public static final String DATE_FORMAT_YYYYDDMMHHMMSS = "yyyyMMddHHmmss";
    public static final String DATE_FORMAT_HHMMSS = "HHmmss";
    public static final String DATE_FORMAT_HHMMSS_DEFAULT_VALUE = "000001";

    public static final String HBASE_ZOOKEEPER_LIST = "node1:2888,node2:2888,node3:2888";
    public static final int ZOOKEEPER_PORT = 2181;
    public static final String ZOOKEEPER_QUORUM = "node1:" + ZOOKEEPER_PORT + ",node2:" + ZOOKEEPER_PORT + ",node3:" + ZOOKEEPER_PORT + "";
    public static final String ZOOKEEPER_ROOT = "/MyKafka";
    public static final String ZOOKEEPER_ID = "MyTrack";

    public static final String KAFKA_SPOUT = "kafkaSpout";
    public static final String FILTER_BOLT = "filterBolt";
    public static final String PROCESS_BOLT = "processBolt";
    public static final String HBASE_BOLT = "hbaseBolt";
    public static final String KAFKA_BOLT = "kafkaBolt";

    // Storm Conf
    public static final String STORM_METADATA_BROKER_LIST = "node1:9092,node2:9092,node3:9092";
    public static final String STORM_REQUEST_REQUIRED_ACKS = "1";
    public static final String STORM_SERILIZER_CLASS = "kafka.serializer.StringEncoder";

    // key word
    public static final String KEY_WORD_BUY = "Buy";
    public static final String KEY_WORD_SHOPPING_CAR = "Shopping_Car";
    
    //hbase
    public static final String TABLE_USER_ACTION = "t_user_actions";
    public static final String COLUMN_FAMILY = "cf";
    //間隔多少秒寫入Hbase一次
    public static final int WRITE_RECORD_TO_TABLE_PER_SECOND = 1;
    public static final int TABLE_MAX_VERSION = (60/WRITE_RECORD_TO_TABLE_PER_SECOND) * 60 * 24;
}

 

6.9.監聽Kafka

--進入node3,啟動kafka消費者
cd /home/kafka-2.10/bin
./kafka-console-consumer.sh --zookeeper node1,node2,node3 --from-beginning --topic filtered_log

 

效果:

87.26.135.185    黑龍江    2018-12-20    1545290594658    7290881731606227972    www.hongten.com    Shopping_Car
60.96.96.38    青海    2018-12-20    1545290594687    6935901257286057015    www.hongten.com    Shopping_Car
43.159.110.193    江蘇    2018-12-20    1545290594727    7096698224110515553    www.hongten.com    Shopping_Car
21.103.139.11    山西    2018-12-20    1545290594693    7805867078876194442    www.hongten.com    Shopping_Car
139.51.213.184    廣東    2018-12-20    1545290594729    8048796865619113514    www.hongten.com    Buy
58.213.148.89    河北    2018-12-20    1545290594708    5176551342435592748    www.hongten.com    Buy
36.205.221.116    湖南    2018-12-20    1545290594715    4484717918039766421    www.hongten.com    Shopping_Car
135.194.103.53    北京    2018-12-20    1545290594769    4833011508087432349    www.hongten.com    Shopping_Car
180.21.100.66    貴州    2018-12-20    1545290594752    5270357330431599426    www.hongten.com    Buy
167.71.65.70    山西    2018-12-20    1545290594790    275898530145861990    www.hongten.com    Buy
125.51.21.199    寧夏    2018-12-20    1545290594814    3613499600574777198    www.hongten.com    Buy

 

 

6.10.Storm再次消費Kafka數據處理後保存數據到Hbase

  • Storm再次從Kafka消費數據
  • Storm對數據進行統計(Buy-已經購買人數,Shopping_Car-潛在購買人數)
  • Storm將數據寫入到Hbase
package com.b510.big.data.storm.process;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;

import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class LogProcessTopology {

    public static void main(String[] args) {

        ZkHosts zkHosts = new ZkHosts(Common.ZOOKEEPER_QUORUM);
        //Spout從'filtered_log' topic裡面獲取數據
        SpoutConfig spoutConfig = new SpoutConfig(zkHosts, Common.FILTERED_LOG_TOPIC, Common.ZOOKEEPER_ROOT, Common.ZOOKEEPER_ID);
        List<String> zkServers = new ArrayList<>();
        for (String host : zkHosts.brokerZkStr.split(",")) {
            zkServers.add(host.split(":")[0]);
        }

        spoutConfig.zkServers = zkServers;
        spoutConfig.zkPort = Common.ZOOKEEPER_PORT;
        spoutConfig.forceFromStart = true;
        spoutConfig.socketTimeoutMs = 60 * 60 * 1000;
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

        // 創建KafkaSpout
        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

        TopologyBuilder builder = new TopologyBuilder();
        // Storm再次從Kafka消費數據
        builder.setSpout(Common.KAFKA_SPOUT, kafkaSpout, 3);
        // Storm對數據進行統計(Buy-已經購買人數,Shopping_Car-潛在購買人數)
        builder.setBolt(Common.PROCESS_BOLT, new ProcessBolt(), 3).shuffleGrouping(Common.KAFKA_SPOUT);
        // Storm將數據寫入到Hbase
        builder.setBolt(Common.HBASE_BOLT, new HbaseBolt(), 3).shuffleGrouping(Common.PROCESS_BOLT);

        Properties props = new Properties();
        props.put("metadata.broker.list", Common.STORM_METADATA_BROKER_LIST);
        props.put("request.required.acks", Common.STORM_REQUEST_REQUIRED_ACKS);
        props.put("serializer.class", Common.STORM_SERILIZER_CLASS);

        Config conf = new Config();
        conf.put("kafka.broker.properties", props);

        conf.put(Config.STORM_ZOOKEEPER_SERVERS, zkServers);

        if (args == null || args.length == 0) {
            // 本地方式運行
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("storm-kafka-topology", conf, builder.createTopology());
        } else {
            // 集群方式運行
            conf.setNumWorkers(3);
            try {
                StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
            } catch (AlreadyAliveException | InvalidTopologyException e) {
                System.out.println("error : " + e);
            }
        }
        
    }
}

class ProcessBolt extends BaseBasicBolt {

    private static final long serialVersionUID = 1L;

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String logStr = input.getString(0);
        if (logStr != null) {
            String infos[] = logStr.split("\\t");
            //180.21.100.66    貴州    2018-12-20    1545290594752    5270357330431599426    www.hongten.com    Buy
            collector.emit(new Values(infos[2], infos[6]));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("date", "user_action"));
    }
}

class HbaseBolt implements IBasicBolt {
    private static final long serialVersionUID = 1L;

    HBaseDAO hBaseDAO = null;
    
    SimpleDateFormat simpleDateFormat = null;
    SimpleDateFormat simpleDateFormatHHMMSS = null;
    
    int userBuyCount = 0;
    int userShoopingCarCount = 0;
    
    //這裡要考慮避免頻繁寫入數據到hbase
    int writeToHbaseMaxNum = Common.WRITE_RECORD_TO_TABLE_PER_SECOND * 1000;
    long begin = System.currentTimeMillis();
    long end = 0;
    
    @SuppressWarnings("rawtypes")
    @Override
    public void prepare(Map map, TopologyContext context) {
        hBaseDAO = new HBaseDAOImpl();
        simpleDateFormat = new SimpleDateFormat(Common.DATE_FORMAT_YYYYDDMMHHMMSS);
        simpleDateFormatHHMMSS = new SimpleDateFormat(Common.DATE_FORMAT_HHMMSS);
        hBaseDAO.createTable(Common.TABLE_USER_ACTION, new String[]{Common.COLUMN_FAMILY}, Common.TABLE_MAX_VERSION);
    }

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        // 如果時間是第二天的凌晨1s
        // 需要對count做清零處理
        //不過這裡的判斷不是很準確,因為在此時,可能前一天的數據還沒有處理完
        if (simpleDateFormatHHMMSS.format(new Date()).equals(Common.DATE_FORMAT_HHMMSS_DEFAULT_VALUE)) {
            userBuyCount = 0;
            userShoopingCarCount = 0;
        }
        
        if (input != null) {
            // base one ProcessBolt.declareOutputFields()
            String date = input.getString(0);
            String userAction = input.getString(1);

            if (userAction.equals(Common.KEY_WORD_BUY)) {
                //同一個user在一天之內可以重覆'Buy'動作
                userBuyCount++;
            }

            if (userAction.equals(Common.KEY_WORD_SHOPPING_CAR)) {
                userShoopingCarCount++;
            }

            end = System.currentTimeMillis();
            if ((end - begin) > writeToHbaseMaxNum) {
                System.out.println("hbase_key: " + Common.KEY_WORD_BUY + "_" + date + " , userBuyCount: " + userBuyCount + ", userShoopingCarCount :" + userShoopingCarCount);
                
                //往hbase中寫入數據
                String quailifer = simpleDateFormat.format(new Date());
                hBaseDAO.insert(Common.TABLE_USER_ACTION , 
                        Common.KEY_WORD_BUY + "_" + date, 
                        Common.COLUMN_FAMILY, 
                        new String[] { quailifer },
                        new String[] { "{user_buy_count:" + userBuyCount + "}" }
                        );
                hBaseDAO.insert(Common.TABLE_USER_ACTION , 
                        Common.KEY_WORD_SHOPPING_CAR + "_" + date, 
                        Common.COLUMN_FAMILY, 
                        new String[] { quailifer },
                        new String[] { "{user_shopping_car_count:" + userShoopingCarCount + "}" }
                        );
                begin = System.currentTimeMillis();
            }
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

    @Override
    public void cleanup() {

    }
}

interface HBaseDAO {
    public void createTable(String tableName, String[] columnFamilys, int maxVersion);
    public void insert(String tableName, String rowKey, String family, String quailifer[], String value[]);
}

class HBaseDAOImpl implements HBaseDAO {

    HConnection hConnection = null;
    static Configuration conf = null;

    public HBaseDAOImpl() {
        conf = new Configuration();
        conf.set("hbase.zookeeper.quorum", Common.HBASE_ZOOKEEPER_LIST);
        try {
            hConnection = HConnectionManager.createConnection(conf);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    public void createTable(String tableName, String[] columnFamilys, int maxVersion) {
        try {
            HBaseAdmin admin = new HBaseAdmin(conf);
            if (admin.tableExists(tableName)) {
                System.err.println("table existing in hbase.");
            } else {
                HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
                for (String columnFamily : columnFamilys) {
                    HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(columnFamily);
                    hColumnDescriptor.setMaxVersions(maxVersion);
                    tableDesc.addFamily(hColumnDescriptor);
                }

                admin.createTable(tableDesc);
                System.err.println("table is created.");
            }
            admin.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    @Override
    public void insert(String tableName, String rowKey, String family, String quailifer[], String value[]) {
        HTableInterface table = null;
        try {
            table = hConnection.getTable(tableName);
            Put put = new Put(rowKey.getBytes());
            for (int i = 0; i < quailifer.length; i++) {
                String col = quailifer[i];
                String val = value[i];
                put.add(family.getBytes(), col.getBytes(), val.getBytes());
            }
            table.put(put);
            System.err.println("save record successfuly.");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                table.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

}

 

Storm處理邏輯:

1.每秒向Hbase寫入數據

2.明天凌晨會重置數據

如果,我們一直運行上面的程式,那麼,系統就會一直往Hbase裡面寫入數據,那麼這樣,我們就可以採集到我們生成報表的數據了。

那麼下麵就是報表實現

 

6.11.讀取Hbase數據通過POI生成Excel Report

  • 讀取Hbase數據
  • 通過POI生成Excel報表
package com.b510.big.data.poi;

import<

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • Nexus Repository下載 根據操作系統選擇指定版本,本文針對Linux安裝,其他的安裝過程可能有所差異。 "https://help.sonatype.com/repomanager3/download/download archives repository manager 3" 安裝 ...
  • 傳統大數據處理現代數據架構Hadoop在20業務場景的應用DataLakeA data lake is a system or repository of data stored in its natural format, usually object blobs or files. A data... ...
  • TIDB 資料庫集群 一、TiDB數據介紹 1.1、TiDB數據簡介 TiDB 是 PingCAP 公司設計的開源分散式 HTAP (Hybrid Transactional and Analytical Processing) 資料庫,結合了傳統的 RDBMS 和 NoSQL 的最佳特性。TiDB ...
  • 1、先停止mysql服務 2、進入mysql的安裝路徑,找到並打開my.ini文件,找到[mysqld],在該行下麵添加 skip_grant_tables,也就是通知mysql,在登陸的時候跳過密碼的驗證,保存後退出 3、然後重啟mysql服務 4、在控制台輸入:mysql -u root -p ...
  • 2016-12-21 14:54:20 該系列文章鏈接NoSQL 資料庫簡介Redis的安裝及及一些雜項基礎知識Redis 的常用五大數據類型(key,string,hash,list,set,zset)Redis 配置文件介紹Redis 持久化之RDBRedis 持久化之AOFRedis 主從複製 ...
  • 是什麼? JDBC:Java Data Base Connectivity(java資料庫連接) 為什麼用? sun公司提供JDBC API介面,資料庫廠商來提供實現 我們需要用哪個資料庫就載入那個資料庫廠商提供的驅動包 怎麼用? 需要先在資料庫中建立表 我的資料庫名為db_user,表名為t_us ...
  • SQL Server Management Studio 17.4或更高版本的SSMS中提供了SQL Server漏洞偵測(VA)功能,此功能允許SQL Server掃描您的資料庫以查找潛在的安全漏洞,並且可以針對SQL Server 2012或更高版本運行。如果您還沒有使用SSMS上的較新版本,請 ...
  • 最近在學習中用到了MySQL資料庫,在安裝過程中遇到了不少問題,在翻了大半天百度後,問題基本都解決了,所以寫一篇MySQL 5.7 解壓版的圖文詳細安裝教程。 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...