一、日誌採集:從網路埠接收數據,下沉到logger 文件netcat-logger.conf: 啟動命令:#告訴flum啟動一個agent,指定配置參數, --name:agent的名字,flume-ng agent --conf conf --conf-file conf/netcat-logg ...
一、日誌採集:從網路埠接收數據,下沉到logger
文件netcat-logger.conf:
1 # Name the components on this agent 2 #給那三個組件取個名字 3 a1.sources = r1 4 a1.sinks = k1 5 a1.channels = c1 6 7 # Describe/configure the source 8 #類型, 從網路埠接收數據,在本機啟動, 所以localhost, type=spoolDir採集目錄源,目錄里有就採 9 a1.sources.r1.type = netcat 10 a1.sources.r1.bind = localhost 11 a1.sources.r1.port = 44444 12 13 # Describe the sink 14 a1.sinks.k1.type = logger 15 16 # Use a channel which buffers events in memory 17 #下沉的時候是一批一批的, 下沉的時候是一個個eventChannel參數解釋: 18 #capacity:預設該通道中最大的可以存儲的event數量 19 #trasactionCapacity:每次最大可以從source中拿到或者送到sink中的event數量 20 a1.channels.c1.type = memory 21 a1.channels.c1.capacity = 1000 22 a1.channels.c1.transactionCapacity = 100 23 24 # Bind the source and sink to the channel 25 a1.sources.r1.channels = c1 26 a1.sinks.k1.channel = c1
啟動命令:
#告訴flum啟動一個agent,指定配置參數, --name:agent的名字,
flume-ng agent --conf conf --conf-file conf/netcat-logger.conf --name a1 -Dflume.root.logger=INFO,console
傳入數據:
[root@mini03 ~]# telnet localhost 44444 Trying ::1... telnet: connect to address ::1: Connection refused Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. hello world!^H^H^H^H^H^H^H^H^H^H^H^H^H^H OK tianjun2012! OK
控制台看到的數據
2017-05-08 13:41:35,766 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 21 08 08 08 08 hello world!.... }
2017-05-08 13:41:40,153 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 74 69 61 6E 6A 75 6E 32 30 31 32 21 0D tianjun2012!. }
二、監視文件夾
啟動命令:
bin/flume-ng agent -c ./conf -f ./conf/spooldir-logger.conf -n a1 -Dflume.root.logger=INFO,console
測試: 往/home/hadoop/flumespool放文件(mv ././xxxFile /home/hadoop/flumeSpool),但是不要在裡面生成文件
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source #監聽目錄,spoolDir指定目錄, fileHeader要不要給文件夾前墜名 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /home/hadoop/flumespool a1.sources.r1.fileHeader = true # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
三、用tail命令獲取數據,下沉到hdfs
1 # Name the components on this agent 2 a1.sources = r1 3 a1.sinks = k1 4 a1.channels = c1 5 6 # Describe/configure the source 7 a1.sources.r1.type = exec 8 a1.sources.r1.command = tail -F /home/hadoop/log/test.log 9 a1.sources.r1.channels = c1 10 11 # Describe the sink 12 a1.sinks.k1.type = hdfs 13 a1.sinks.k1.channel = c1 14 a1.sinks.k1.hdfs.path = hdfs://mini01:9000/flume/events/%y-%m-%d/%H%M/ 15 a1.sinks.k1.hdfs.filePrefix = events- 16 a1.sinks.k1.hdfs.round = true 17 a1.sinks.k1.hdfs.roundValue = 10 18 a1.sinks.k1.hdfs.roundUnit = minute 19 a1.sinks.k1.hdfs.rollInterval = 3 20 a1.sinks.k1.hdfs.rollSize = 20 21 a1.sinks.k1.hdfs.rollCount = 5 22 a1.sinks.k1.hdfs.batchSize = 1 23 a1.sinks.k1.hdfs.useLocalTimeStamp = true 24 #生成的文件類型,預設是Sequencefile,可用DataStream,則為普通文本 25 a1.sinks.k1.hdfs.fileType = DataStream 26 27 28 29 # Use a channel which buffers events in memory 30 a1.channels.c1.type = memory 31 a1.channels.c1.capacity = 1000 32 a1.channels.c1.transactionCapacity = 100 33 34 # Bind the source and sink to the channel 35 a1.sources.r1.channels = c1 36 a1.sinks.k1.channel = c1
啟動命令:
flume-ng agent -c conf -f conf/tail-hdfs.conf -n a1
模擬寫入日誌:
1 [root@mini03 log]# i=1; 2 while(( $i<=500000 )); 3 do echo $i >> /home/hadoop/log/test.log; 4 sleep 0.5; 5 let 'i++';done
查看hdfs上的文件內容
1 [root@mini01 ~]# hdfs dfs -cat /flume/events/17-05-08/1530/* 2 1 3 2 4 3 5 4 6 5 7 6 8 7 9 8 10 9 11 10 12 11 13 12 14 13 15 14 16 15 17 16 18 17 19 18 20 19 21 20
註意,本例中,為了快速看到效果,這個值都設置比較小,真實情況需要調整
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.rollInterval = 3
a1.sinks.k1.hdfs.rollSize = 20
a1.sinks.k1.hdfs.rollCount = 5
22 a1.sinks.k1.hdfs.batchSize = 1