Flume+Kafka+Storm整合 1. 需求: 有一個客戶端Client可以產生日誌信息,我們需要通過Flume獲取日誌信息,再把該日誌信息放入到Kafka的一個Topic:flume-to-kafka ...
Flume+Kafka+Storm整合
1. 需求:
有一個客戶端Client可以產生日誌信息,我們需要通過Flume獲取日誌信息,再把該日誌信息放入到Kafka的一個Topic:flume-to-kafka
再由Storm讀取該topic:flume-to-kafka,進行日誌分析處理(這裡我們做的邏輯處理為filter,即過濾日誌信息),處理完日誌信息後,再由Storm把處理好的日誌信息放入到Kafka的另一個topic:storm-to-kafka
2.組件分佈情況
我總共搭建了3個節點node1,node2,node3
Zookeeper安裝在node1,node2,nod3
Flume安裝在node2
Kafka安裝在node1,node2,node3
Storm安裝在node1,node2,node3
3.JDK安裝
--在node1, node2, node3上面安裝jdk --install JDK -- http://blog.51cto.com/vvxyz/1642258(LInux安裝jdk的三種方法) --解壓安裝 rpm -ivh your-package.rpm --修改環境變數 vi /etc/profile JAVA_HOME=/usr/java/jdk1.7.0_67 JRE_HOME=/usr/java/jdk1.7.0_67/jre CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin export JAVA_HOME JRE_HOME CLASS_PATH PATH :wq --使配置有效 source /etc/profile
4.Zookeeper的安裝
---============================ --解壓zookeeper壓縮包並安裝 tar -zxvf zookeeper-3.4.6.tar.gz --創建zookeeper的軟鏈 ln -sf /root/zookeeper-3.4.6 /home/zk --配置zookeeper cd /home/zk/conf/ --把下麵的zoo_sample.cfg文件重新命名 cp zoo_sample.cfg zoo.cfg --修改zoo.cfg配置文件 vi zoo.cfg --設置zookeeper的文件存放目錄 --找到dataDir=/tmp/zookeeper,並設置為下麵值 dataDir=/opt/zookeeper --設置zookeeper集群 server.1=node1:2888:3888 server.2=node2:2888:3888 server.3=node3:2888:3888 :wq --創建/opt/zookeeper目錄 mkdir /opt/zookeeper --進入/opt/zookeeper目錄 cd /opt/zookeeper --創建一個文件myid vi myid --輸入1 1 :wq --以此類推,在node2,node3,值分別是2, 3 --拷貝zookeeper目錄到node2, node3的/opt/目錄下麵 cd .. scp -r zookeeper/ root@node2:/opt/ scp -r zookeeper/ root@node3:/opt/ --分別進入到node2, node3裡面,修改/opt/zookeeper/myid,值分別是2, 3 --作為以上配置,把node1裡面的zookeeper拷貝到node2, node3上面。 scp -r zookeeper-3.4.6 root@node2:~/ scp -r zookeeper-3.4.6 root@node3:~/ --分別進入到node2, node3裡面,創建軟鏈 ln -sf /root/zookeeper-3.4.6/ /home/zk --配置zookeeper環境變數 cd /home/zk/bin --修改/etc/profile文件,把zookeeper的bin目錄路徑添加進去 vi /etc/profile PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin:/home/zk/bin --讓配置文件生效 source /etc/profile --分別進入到node2, node3裡面,修改/etc/profile文件,把zookeeper的bin目錄路徑添加進去 --作為環境變數配置,就可以啟動zookeeper了。 --分別在node1, node2, node3上面啟動zookeeper zkServer.sh start --測試是否啟動成功 jps --觀察是否有QuorumPeerMain進程
5.Flume的安裝
--------------------------------------------------- --安裝Flume --把安裝包上傳到node2上面 cd tar -zxvf apache-flume-1.6.0-bin.tar.gz --創建軟鏈 ln -s /root/apache-flume-1.6.0-bin /home/flume --配置flume cd /root/apache-flume-1.6.0-bin/conf cp flume-env.sh.template flume-env.sh vi flume-env.sh --配置JDK export JAVA_HOME=/usr/java/jdk1.7.0_67 :wq --加入系統變數 vi /etc/profile export FLUME_HOME=/root/apache-flume-1.6.0-bin export PATH=$PATH:$FLUME_HOME/bin :wq source /etc/profile --驗證是否安裝成功 flume-ng version flume-ng version Flume 1.6.0 Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git Revision: 2561a23240a71ba20bf288c7c2cda88f443c2080 Compiled by hshreedharan on Mon May 11 11:15:44 PDT 2015 From source with checksum b29e416802ce9ece3269d34233baf43f
6.Kafka的安裝
--------------------------------------------------- --kafka安裝 --在node1, node2, node3上面搭建kafka --先進入node1 mkdir /root/kafka --解壓 tar -zxvf kafka_2.10-0.8.2.2.tgz --創建軟鏈 ln -s /root/kafka/kafka_2.10-0.8.2.2 /home/kafka-2.10 --配置 cd /root/kafka/kafka_2.10-0.8.2.2/config vi server.properties --node1=0, node2=1,node2=2 broker.id=0 log.dirs=/opt/kafka-log zookeeper.connect=node1:2181,node2:2181,node3:2181 :wq --為了啟動方便 cd /root/kafka/kafka_2.10-0.8.2.2 vi start-kafka.sh nohup bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 & :wq chmod 755 start-kafka.sh --配置好以後 --分發到node2,node3 cd /root/kafka/ scp -r kafka_2.10-0.8.2.2/ root@node2:/root/kafka scp -r kafka_2.10-0.8.2.2/ root@node3:/root/kafka --進入到node2 cd /root/kafka/kafka_2.10-0.8.2.2/config vi server.properties --node1=0, node2=1,node2=2 broker.id=1 :wq --進入到node3 cd /root/kafka/kafka_2.10-0.8.2.2/config vi server.properties --node1=0, node2=1,node2=2 broker.id=2 :wq --啟動kafka ./zkServer.sh start --分別進入node1,node2,node3 cd /root/kafka/kafka_2.10-0.8.2.2 ./start-kafka.sh --檢查是否啟動 jps 查看是否有Kafka進程
7.Storm的安裝
------------ --Storm分散式安裝 --部署到node1,node2,node3節點上 --進入node1 cd /root/apache-storm-0.10.0/conf vi storm.yaml --配置如下 # storm.zookeeper.servers: - "node1" - "node2" - "node3" # nimbus.host: "node1" storm.local.dir: "/opt/storm" supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 :wq --從node1分發到node2,node3 scp -r apache-storm-0.10.0 root@node2:/ scp -r apache-storm-0.10.0 root@node3:/ --分別進入node2,node3創建軟鏈 ln -r /root/apache-storm-0.10.0 /home/storm-0.10 --分別進入node1,node2,node3快捷啟動 cd /root/apache-storm-0.10.0 vi start-storm.sh nohup bin/storm nimbus >> logs/numbus.out 2>&1 & nohup bin/storm supervisor >> logs/supervisor.out 2>&1 & --node1上面配置,node2,node3上面不需要UI nohup bin/storm ui >> logs/ui.out 2>&1 & nohup bin/storm drpc >> logs/drpc.out 2>&1 & :wq --分別進入node1,node2,node3快捷stop-storm vi stop-storm.sh --node1上面配置,node2,node3上面不需要UI kill -9 `jps | grep core | awk '{print $1}'` kill -9 `jps | grep supervisor | awk '{print $1}'` kill -9 `jps | grep nimbus | awk '{print $1}'` kill -9 `jps | grep worker | awk '{print $1}'` kill -9 `jps | grep LogWriter | awk '{print $1}'` :wq chmod 755 start-storm.sh chmod 755 stop-storm.sh --啟動Zookeeper服務 --在node1,node2,node3上面啟動 zkServer.sh start --在node1,node2,node3上面啟動Storm cd /root/apache-storm-0.10.0 ./start-storm.sh --上傳storm_wc.jar 文件 ./storm jar /root/storm_wc.jar storm.wordcount.Test wordcount ------------ Storm DRPC 配置 --進入node1 cd /root/apache-storm-0.10.0/conf vi storm.yaml drpc.servers: - "node1" :wq --從node1,分發到node2,node3 cd /root/apache-storm-0.10.0/conf/ scp -r root@node2:/root/apache-storm-0.10.0/conf scp -r root@node3:/root/apache-storm-0.10.0/conf --配置完,進入node1,node2,node3 cd /root/apache-storm-0.10.0 ./start-storm.sh &
8.Flume+Kafka+Storm整合
8.1.配置Flume
--從node2 cd flumedir vi flume_to_kafka --node2配置如下 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.bind = node2 a1.sources.r1.port = 41414 # Describe the sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.topic = flume-to-kafka a1.sinks.k1.brokerList = node1:9092,node2:9092,node3:9092 a1.sinks.k1.requiredAcks = 1 a1.sinks.k1.batchSize = 20 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000000 a1.channels.c1.transactionCapacity = 10000 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 :wq
8.2.啟動Zookeeper
--啟動Zookeeper,在node1,node2,node3 --關閉防火牆 service iptables stop --啟動Zookeeper zkServer.sh start
8.3.啟動Kfaka
--啟動kafka --分別進入node1,node2,node3 cd /root/kafka/kafka_2.10-0.8.2.2 ./start-kafka.sh
8.4.啟動Flume
--進入node2,啟動 cd /root/flumedir flume-ng agent -n a1 -c conf -f flume_to_kafka -Dflume.root.logger=DEBUG,console
8.4.啟動客戶端Client
啟動客戶端產生日誌信息。
大家可以參考RPC clients - Avro and Thrift的代碼
import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.api.RpcClient; import org.apache.flume.api.RpcClientFactory; import org.apache.flume.event.EventBuilder; import java.nio.charset.Charset; public class MyApp { public static void main(String[] args) { MyRpcClientFacade1 client = new MyRpcClientFacade1(); // Initialize client with the remote Flume agent's host and port client.init("node2", 41414); // Send 10 events to the remote Flume agent. That agent should be // configured to listen with an AvroSource. String sampleData = "Hello ERROR ! ------ Test"; for (int i = 500; i < 505; i++) { client.sendDataToFlume(sampleData + " " + i); System.out.println(sampleData + " " + i); } client.cleanUp(); } } class MyRpcClientFacade1 { private RpcClient client; private String hostname; private int port; public void init(String hostname, int port) { // Setup the RPC connection this.hostname = hostname; this.port = port; this.client = RpcClientFactory.getDefaultInstance(hostname, port); // Use the following method to create a thrift client (instead of the // above line): // this.client = RpcClientFactory.getThriftInstance(hostname, port); } public void sendDataToFlume(String data) { // Create a Flume Event object that encapsulates the sample data Event event = EventBuilder.withBody(data, Charset.forName("UTF-8")); // Send the event try { client.append(event); } catch (EventDeliveryException e) { // clean up and recreate the client client.close(); client = null; client = RpcClientFactory.getDefaultInstance(hostname, port); // Use the following method to create a thrift client (instead of // the above line): // this.client = RpcClientFactory.getThriftInstance(hostname, port); } } public void cleanUp() { // Close the RPC connection client.close(); } }
在eclipse控制台輸出的結果是:
[ WARN ] - [ org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:505) ] Invalid value for batchSize: 0; Using default value.
[ WARN ] - [ org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:634) ] Using default maxIOWorkers
Hello ERROR ! ------ Test 500
Hello ERROR ! ------ Test 501
Hello ERROR ! ------ Test 502
Hello ERROR ! ------ Test 503
Hello ERROR ! ------ Test 504
8.5.查看Kafka的Topic
--進入node3,查看kafka的topic cd /home/kafka-2.10/bin ./kafka-topics.sh --zookeeper node1,node2,node3 --list
可以看到,由於客戶端代碼的執行,Kafka裡面的topic:flume-to-kafka被自動創建
8.6.啟動Kafka Consumer:flume-to-kafka
我們在這裡是查看topic: flume-to-kafka的消費信息
--進入node3,啟動kafka消費者 cd /home/kafka-2.10/bin ./kafka-console-consumer.sh --zookeeper node1,node2,node3 --from-beginning --topic flume-to-kafka
控制台輸出:
Hello ERROR ! ------ Test 500
Hello ERROR ! ------ Test 501
Hello ERROR ! ------ Test 502
Hello ERROR ! ------ Test 503
Hello ERROR ! ------ Test 504
8.7.創建Topic:storm-to-kafka
在Kafka裡面創建另一個topic:
--進入node1,創建一個topic:storm-to-kafka --設置3個partitions --replication-factor=3 ./kafka-topics.sh --zookeeper node1,node2,node3 --create --topic storm-to-kafka --partitions 3 --replication-factor 3
8.8.運行Storm代碼
package storm.logfilter; import java.util.ArrayList; import java.util.List; import java.util.Properties; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.StringScheme; import storm.kafka.ZkHosts; import storm.kafka.bolt.KafkaBolt; import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper; import storm.kafka.bolt.selector.DefaultTopicSelector; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class LogFilterTopology { public static class FilterBolt extends BaseBasicBolt { private static final long serialVersionUID = 1L; @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String line = tuple.getString(0); // 包含ERROR的行留下 if (line.contains("ERROR")) { System.err.println("Filter: " + line + " ~ filtered."); collector.emit(new Values(line + " ~ filtered.")); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // 定義message提供給後面FieldNameBasedTupleToKafkaMapper使用 declarer.declare(new Fields("message")); } } public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); // https://github.com/apache/storm/tree/master/external/storm-kafka // config kafka spout,話題 String topic = "flume-to-kafka"; ZkHosts zkHosts = new ZkHosts("node1:2181,node2:2181,node3:2181"); // /MyKafka,偏移量offset的根目錄,記錄隊列取到了哪裡 SpoutConfig spoutConfig = new SpoutConfig(zkHosts, topic, "/MyKafka", "MyTrack"); List<String> zkServers = new ArrayList<String>(); for (String host : zkHosts.brokerZkStr.split(",")) { zkServers.add(host.split(":")[0]); } spoutConfig.zkServers = zkServers; spoutConfig.zkPort = 2181; // 是否從頭開始消費 spoutConfig.forceFromStart = true; spoutConfig.socketTimeoutMs = 60 * 1000; // StringScheme將位元組流轉解碼成某種編碼的字元串 spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); // set kafka spout builder.setSpout("kafkaSpout", kafkaSpout, 3); // set bolt builder.setBolt("filterBolt", new FilterBolt(), 8).shuffleGrouping("kafkaSpout"); // 數據寫出 // set kafka bolt // withTopicSelector使用預設的選擇器指定寫入的topic: storm-to-kafka // withTupleToKafkaMapper tuple==>kafka的key和message KafkaBolt kafka_bolt = new KafkaBolt().withTopicSelector(new DefaultTopicSelector("storm-to-kafka")) .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper()); builder.setBolt("kafkaBolt", kafka_bolt, 2).shuffleGrouping("filterBolt"); Config conf = new Config(); // set producer properties. Properties props = new Properties(); props.put("metadata.broker.list", "node1:9092,node2:9092,node3:9092"); /** * Kafka生產者ACK機制 0 : 生產者不等待Kafka broker完成確認,繼續發送下一條數據 1 : * 生產者等待消息在leader接收成功確認之後,繼續發送下一條數據 -1 : * 生產者等待消息在follower副本接收到數據確認之後,繼續發送下一條數據 */ props.put("request.required.acks", "1"); props.put("serializer.class", "kafka.serializer.StringEncoder"); conf.put("kafka.broker.properties", props); conf.put(Config.STORM_ZOOKEEPER_SERVERS, zkServers); if (args == null || args.length == 0) { // 本地方式運行 LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("storm-kafka-topology", conf, builder.createTopology()); } else { // 集群方式運行 conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } } }
8.9.啟動Kafka Consumer:storm-to-kafka
我們在這裡是查看topic: storm-to-kafka的消費信息
--進入node1,啟動kafka消費者 cd /home/kafka-2.10/bin ./kafka-console-consumer.sh --zookeeper node1,node2,node3 --from-beginning --topic storm-to-kafka
控制台輸出:
Hello ERROR ! ------ Test 504 ~ filtered.
Hello ERROR ! ------ Test 500 ~ filtered.
Hello ERROR ! ------ Test 501 ~ filtered.
Hello ERROR ! ------ Test 503 ~ filtered.
Hello ERROR ! ------ Test 502 ~ filtered.
========================================================
More reading,and english is important.
I'm Hongten
大哥哥大姐姐,覺得有用打賞點哦!你的支持是我最大的動力。謝謝。
Hongten博客排名在100名以內。粉絲過千。
Hongten出品,必是精品。
E | [email protected] B | http://www.cnblogs.com/hongten
========================================================