該文章是基於 Hadoop2.7.6_01_部署 進行的 Flume官方文檔:FlumeUserGuide 常見問題:記flume部署過程中遇到的問題以及解決方法(持續更新) 1. 前言 在一個完整的大數據處理系統中,除了hdfs+mapreduce+hive組成分析系統的核心之外,還需要數據採集、 ...
該文章是基於 Hadoop2.7.6_01_部署 進行的
Flume官方文檔:FlumeUserGuide
常見問題:記flume部署過程中遇到的問題以及解決方法(持續更新)
1. 前言
在一個完整的大數據處理系統中,除了hdfs+mapreduce+hive組成分析系統的核心之外,還需要數據採集、結果數據導出、任務調度等不可或缺的輔助系統,而這些輔助工具在hadoop生態體系中都有便捷的開源框架,如圖所示:
2. Flume介紹
2.1. 概述
- Flume是一個分散式、可靠、和高可用的海量日誌採集、聚合和傳輸的系統。
- Flume可以採集文件,socket數據包等各種形式源數據,又可以將採集到的數據輸出到HDFS、hbase、hive、kafka等眾多外部存儲系統中
- 一般的採集需求,通過對flume的簡單配置即可實現
- Flume針對特殊場景也具備良好的自定義擴展能力,因此,flume可以適用於大部分的日常數據採集場景
2.2. 運行機制
1、 Flume分散式系統中最核心的角色是agent,flume採集系統就是由一個個agent所連接起來形成
2、 每一個agent相當於一個數據傳遞員,內部有三個組件:
註意:Source 到 Channel 到 Sink之間傳遞數據的形式是Event事件;Event事件是一個數據流單元。
a) Source:採集源,用於跟數據源對接,以獲取數據
b) Sink:下沉地,採集數據的傳送目的,用於往下一級agent傳遞數據或者往最終存儲系統傳遞數據
c)Channel:angent內部的數據傳輸通道,用於從source將數據傳遞到sink
3. Flume採集系統結構圖
3.1. 簡單結構
單個agent採集數據
3.2. 複雜結構
多級agent之間串聯
4. Flume的安裝部署
4.1. 軟體部署
1 [yun@mini01 software]$ pwd 2 /app/software 3 [yun@mini01 software]$ tar xf apache-flume-1.8.0-bin.tar.gz 4 [yun@mini01 software]$ mv apache-flume-1.8.0-bin /app/flume-1.8.0 5 [yun@mini01 software]$ cd /app/ 6 [yun@mini01 ~]$ ln -s flume-1.8.0 flume # 建立軟連接 7 [yun@mini01 ~]$ ll 8 total 28 9 lrwxrwxrwx 1 yun yun 11 Jul 25 21:54 flume -> flume-1.8.0 10 drwxrwxr-x 7 yun yun 187 Jul 25 21:53 flume-1.8.0 11 ………………
4.2. 環境變數
1 [root@mini01 profile.d]# pwd 2 /etc/profile.d 3 [root@mini01 profile.d]# cat flume.sh 4 export FLUME_HOME="/app/flume" 5 export PATH=$FLUME_HOME/bin:$PATH 6 [root@mini01 profile.d]# logout 7 [yun@mini01 ~]$ source /etc/profile # 環境變數生效
5. 採集案例
5.1. 簡單案例——從網路埠接收數據下沉到logger
配置文件
1 [yun@mini01 conf]$ pwd 2 /app/flume/conf 3 [yun@mini01 conf]$ ll 4 total 20 5 -rw-r--r-- 1 yun yun 1661 Sep 15 2017 flume-conf.properties.template 6 -rw-r--r-- 1 yun yun 1455 Sep 15 2017 flume-env.ps1.template 7 -rw-r--r-- 1 yun yun 1568 Sep 15 2017 flume-env.sh.template 8 -rw-r--r-- 1 yun yun 3107 Sep 15 2017 log4j.properties 9 -rw-rw-r-- 1 yun yun 741 Jul 25 22:12 netcat-logger.conf 10 [yun@mini01 conf]$ cat netcat-logger.conf 11 # Name the components on this agent 12 a1.sources = r1 13 a1.sinks = k1 14 a1.channels = c1 15 16 # Describe/configure the source 17 a1.sources.r1.type = netcat 18 a1.sources.r1.bind = localhost 19 a1.sources.r1.port = 44444 20 # bind = localhost 綁定的是本地埠 21 22 # Describe the sink 23 a1.sinks.k1.type = logger 24 25 # Use a channel which buffers events in memory 26 #下沉的時候是一批一批的, 下沉的時候是一個個eventChannel參數解釋: 27 #capacity:預設該通道中最大的可以存儲的event數量 28 #trasactionCapacity:每次最大可以從source中拿到或者送到sink中的event數量 29 a1.channels.c1.type = memory 30 a1.channels.c1.capacity = 1000 31 a1.channels.c1.transactionCapacity = 100 32 33 # Bind the source and sink to the channel 34 a1.sources.r1.channels = c1 35 a1.sinks.k1.channel = c1
flume啟動
1 [yun@mini01 conf]$ pwd 2 /app/flume/conf 3 # 其中--conf-file 指定的配置文件可以為相對路徑也可以是絕對路徑 4 [yun@mini01 conf]$ flume-ng agent --conf conf --conf-file netcat-logger.conf --name a1 -Dflume.root.logger=INFO,console 5 ……………… 6 18/07/25 22:19:23 INFO node.Application: Starting Channel c1 7 18/07/25 22:19:23 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean. 8 18/07/25 22:19:23 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started 9 18/07/25 22:19:23 INFO node.Application: Starting Sink k1 10 18/07/25 22:19:23 INFO node.Application: Starting Source r1 11 18/07/25 22:19:23 INFO source.NetcatSource: Source starting 12 18/07/25 22:19:23 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]
source端的Telnet輸入
1 [yun@mini01 ~]$ telnet localhost 44444 2 Trying ::1... 3 telnet: connect to address ::1: Connection refused 4 Trying 127.0.0.1... 5 Connected to localhost. 6 Escape character is '^]'. 7 111 8 OK 9 222 10 OK 11 334334634geg 12 OK 13 gwegweg 14 OK 15 ^] 16 telnet> quit 17 Connection closed.
當在Telnet端輸入,logger顯示
1 18/07/25 22:20:09 INFO sink.LoggerSink: Event: { headers:{} body: 31 31 31 0D 111. } 2 18/07/25 22:20:10 INFO sink.LoggerSink: Event: { headers:{} body: 32 32 32 0D 222. } 3 18/07/25 22:20:13 INFO sink.LoggerSink: Event: { headers:{} body: 33 33 34 33 33 34 36 33 34 67 65 67 0D 334334634geg. } 4 18/07/25 22:20:14 INFO sink.LoggerSink: Event: { headers:{} body: 67 77 65 67 77 65 67 0D gwegweg. }
5.2. 監視文件夾——下沉到logger
配置文件
1 [yun@mini01 conf]$ pwd 2 /app/flume/conf 3 [yun@mini01 conf]$ ll 4 total 20 5 -rw-r--r-- 1 yun yun 1661 Sep 15 2017 flume-conf.properties.template 6 -rw-r--r-- 1 yun yun 1455 Sep 15 2017 flume-env.ps1.template 7 -rw-r--r-- 1 yun yun 1568 Sep 15 2017 flume-env.sh.template 8 -rw-r--r-- 1 yun yun 3107 Sep 15 2017 log4j.properties 9 -rw-rw-r-- 1 yun yun 741 Jul 25 22:12 netcat-logger.conf 10 -rw-rw-r-- 1 yun yun 597 Jul 25 22:29 spooldir-logger.conf 11 [yun@mini01 conf]$ cat spooldir-logger.conf 12 # Name the components on this agent 13 a1.sources = r1 14 a1.sinks = k1 15 a1.channels = c1 16 17 # Describe/configure the source 18 #監聽目錄,spoolDir指定目錄, fileHeader要不要給文件加首碼名 19 a1.sources.r1.type = spooldir 20 a1.sources.r1.spoolDir = /app/software/flume 21 a1.sources.r1.fileHeader = true 22 23 # Describe the sink 24 a1.sinks.k1.type = logger 25 26 # Use a channel which buffers events in memory 27 a1.channels.c1.type = memory 28 a1.channels.c1.capacity = 1000 29 a1.channels.c1.transactionCapacity = 100 30 31 # Bind the source and sink to the channel 32 a1.sources.r1.channels = c1 33 a1.sinks.k1.channel = c1
flume啟動
1 [yun@mini01 conf]$ pwd 2 /app/flume/conf 3 [yun@mini01 conf]$ flume-ng agent --conf conf --conf-file spooldir-logger.conf --name a1 -Dflume.root.logger=INFO,console 4 ……………… 5 18/07/25 23:01:04 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started 6 18/07/25 23:01:04 INFO node.Application: Starting Sink k1 7 18/07/25 23:01:04 INFO node.Application: Starting Source r1 8 18/07/25 23:01:04 INFO source.SpoolDirectorySource: SpoolDirectorySource source starting with directory: /app/software/flume 9 18/07/25 23:01:04 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean. 10 18/07/25 23:01:04 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
往/app/software/flume目錄加入文件
1 # 原文件目錄 2 [yun@mini01 hive]$ pwd 3 /app/software/hive 4 [yun@mini01 hive]$ ll 5 total 48 6 -rw-rw-r-- 1 yun yun 71 Jul 12 21:53 t_sz01.dat 7 -rw-rw-r-- 1 yun yun 71 Jul 12 21:53 t_sz01.dat2 8 -rw-rw-r-- 1 yun yun 79 Jul 12 22:15 t_sz02_ext.dat 9 -rw-rw-r-- 1 yun yun 52 Jul 12 23:09 t_sz03_20180711.dat1 10 -rw-rw-r-- 1 yun yun 52 Jul 12 23:09 t_sz03_20180711.dat2 11 -rw-rw-r-- 1 yun yun 52 Jul 12 23:09 t_sz03_20180712.dat1 12 -rw-rw-r-- 1 yun yun 52 Jul 12 23:09 t_sz03_20180712.dat2 13 -rw-rw-r-- 1 yun yun 753 Jul 14 10:36 t_sz05_buck.dat 14 -rw-rw-r-- 1 yun yun 507 Jul 14 10:07 t_sz05_buck.dat.bak 15 [yun@mini01 hive]$ cp -a t_access_times.dat t_sz01.dat t_sz01.dat2 ../flume/ 16 [yun@mini01 hive]$ cp -a t_sz05_buck.dat t_sz05_buck.dat2 ../flume/ 17 ############################################ 18 # 對應的flume目錄 註意文件名不能重覆,否則flume會報錯,也不能是一個目錄 19 [yun@mini01 flume]$ pwd 20 /app/software/flume 21 [yun@mini01 flume]$ ll 22 total 20 23 -rw-rw-r-- 1 yun yun 288 Jul 18 14:20 t_access_times.dat.COMPLETED 24 -rw-rw-r-- 1 yun yun 71 Jul 12 21:53 t_sz01.dat2.COMPLETED 25 -rw-rw-r-- 1 yun yun 71 Jul 12 21:53 t_sz01.dat.COMPLETED 26 -rw-rw-r-- 1 yun yun 753 Jul 14 10:36 t_sz05_buck.dat 27 -rw-rw-r-- 1 yun yun 753 Jul 14 10:36 t_sz05_buck.dat.COMPLETED
有fileHeader配置
有fileHeader配置
5.3. 用tail命令獲取數據,下沉到HDFS
配置文件
1 [yun@mini01 conf]$ pwd 2 /app/flume/conf 3 [yun@mini01 conf]$ ll 4 total 28 5 -rw-r--r-- 1 yun yun 1661 Sep 15 2017 flume-conf.properties.template 6 -rw-r--r-- 1 yun yun 1455 Sep 15 2017 flume-env.ps1.template 7 -rw-r--r-- 1 yun yun 1568 Sep 15 2017 flume-env.sh.template 8 -rw-r--r-- 1 yun yun 3107 Sep 15 2017 log4j.properties 9 -rw-rw-r-- 1 yun yun 741 Jul 25 22:12 netcat-logger.conf 10 -rw-rw-r-- 1 yun yun 593 Jul 25 22:30 spooldir-logger.conf 11 -rw-rw-r-- 1 yun yun 1275 Jul 25 23:29 tail-hdfs.conf 12 [yun@mini01 conf]$ cat tail-hdfs.conf 13 # Name the components on this agent 14 a1.sources = r1 15 a1.sinks = k1 16 a1.channels = c1 17 18 # Describe/configure the source 19 a1.sources.r1.type = exec 20 a1.sources.r1.command = tail -F /app/webservice/logs/access.log 21 a1.sources.r1.channels = c1 22 23 # Describe the sink 24 a1.sinks.k1.type = hdfs 25 a1.sinks.k1.channel = c1 26 a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/ 27 a1.sinks.k1.hdfs.filePrefix = events- 28 # 以下3項表示每隔10分鐘切換目錄存儲 29 a1.sinks.k1.hdfs.round = true 30 a1.sinks.k1.hdfs.roundValue = 10 31 a1.sinks.k1.hdfs.roundUnit = minute 32 # 滾動當前文件前等待的秒數 33 a1.sinks.k1.hdfs.rollInterval = 30 34 # 文件大小以位元組為單位觸發滾動 35 a1.sinks.k1.hdfs.rollSize = 1024 36 # 在滾動之前寫入文件的事件數 37 a1.sinks.k1.hdfs.rollCount = 500 38 # 在它被刷新到HDFS之前寫入文件的事件數量。100個事件為一個批次 39 a1.sinks.k1.hdfs.batchSize = 100 40 a1.sinks.k1.hdfs.useLocalTimeStamp = true 41 #生成的文件類型,預設是Sequencefile,可用DataStream,則為普通文本 42 a1.sinks.k1.hdfs.fileType = DataStream 43 44 # Use a channel which buffers events in memory 45 a1.channels.c1.type = memory 46 a1.channels.c1.capacity = 1000 47 a1.channels.c1.transactionCapacity = 100 48 49 # Bind the source and sink to the channel 50 a1.sources.r1.channels = c1 51 a1.sinks.k1.channel = c1
flume啟動
1 [yun@mini01 conf]$ flume-ng agent -c conf -f tail-hdfs.conf -n a1
啟動jar包列印日誌
1 [yun@mini01 webservice]$ pwd 2 /app/webservice 3 [yun@mini01 webservice]$ java -jar testlog.jar &
可參見:Hadoop2.7.6_02_HDFS常用操作 ----- 3.3. web日誌模擬
瀏覽器查看flume下沉的數據
5.4. 級聯下沉到HDFS
由mini01 的flume發送數據到mini02的flume,然後由mini02的flume下沉到HDFS。
其中mini02的flume安裝過程略。
配置文件mini01
1 [yun@mini01 conf]$ pwd 2 /app/flume/conf 3 [yun@mini01 conf]$ ll 4 total 32 5 -rw-r--r-- 1 yun yun 1661 Sep 15 2017 flume-conf.properties.template 6 -rw-r--r-- 1 yun yun 1455 Sep 15 2017 flume-env.ps1.template 7 -rw-r--r-- 1 yun yun 1568 Sep 15 2017 flume-env.sh.template 8 -rw-r--r-- 1 yun yun 3107 Sep 15 2017 log4j.properties 9 -rw-rw-r-- 1 yun yun 741 Jul 25 22:12 netcat-logger.conf 10 -rw-rw-r-- 1 yun yun 593 Jul 25 22:30 spooldir-logger.conf 11 -rw-rw-r-- 1 yun yun 789 Jul 26 22:31 tail-avro-avro-logger.conf 12 -rw-rw-r-- 1 yun yun 1283 Jul 25 23:41 tail-hdfs.conf 13 [yun@mini01 conf]$ cat tail-avro-avro-logger.conf 14 # Name the components on this agent 15 a1.sources = r1 16 a1.sinks = k1 17 a1.channels = c1 18 19 # Describe/configure the source 20 a1.sources.r1.type = exec 21 a1.sources.r1.command = tail -F /app/webservice/logs/access.log 22 a1.sources.r1.channels = c1 23 24 # Describe the sink 25 #綁定的不是本機, 是另外一臺機器的服務地址, sink端的avro是一個發送端, avro的客戶端, 往mini02這個機器上發 26 a1.sinks = k1 27 a1.sinks.k1.type = avro 28 a1.sinks.k1.channel = c1 29 a1.sinks.k1.hostname = mini02 30 a1.sinks.k1.port = 4141 31 a1.sinks.k1.batch-size = 2 32 33 # Use a channel which buffers events in memory 34 a1.channels.c1.type = memory 35 a1.channels.c1.capacity = 1000 36 a1.channels.c1.transactionCapacity = 100 37 38 # Bind the source and sink to the channel 39 a1.sources.r1.channels = c1 40 a1.sinks.k1.channel = c1
配置文件mini02
1 [yun@mini02 conf]$ pwd 2 /app/flume/conf 3 [yun@mini02 conf]$ ll 4 total 20 5 -rw-rw-r-- 1 yun yun 1357 Jul 26 22:39 avro-hdfs.conf 6 -rw-r--r-- 1 yun yun 1661 Sep 15 2017 flume-conf.properties.template 7 -rw-r--r-- 1 yun yun 1455 Sep 15 2017 flume-env.ps1.template 8 -rw-r--r-- 1 yun yun 1568 Sep 15 2017 flume-env.sh.template 9 -rw-r--r-- 1 yun yun 3107 Sep 15 2017 log4j.properties 10 [yun@mini02 conf]$ cat avro-hdfs.conf 11 # Name the components on this agent 12 a1.sources = r1 13 a1.sinks = k1 14 a1.channels = c1 15 16 # Describe/configure the source 17 #source中的avro組件是接收者服務, 綁定本機 18 a1.sources.r1.type = avro 19 a1.sources.r1.channels = c1 20 a1.sources.r1.bind = 0.0.0.0 21 a1.sources.r1.port = 4141 22 23 # Describe the sink 24 a1.sinks.k1.type = hdfs 25 a1.sinks.k1.channel = c1 26 a1.sinks.k1.hdfs.path = /flume/new-events/%y-%m-%d/%H%M/ 27 a1.sinks.k1.hdfs.filePrefix = events- 28 # 以下3項表示每隔10分鐘切換目錄存儲 29 a1.sinks.k1.hdfs.round = true 30 a1.sinks.k1.hdfs.roundValue = 10 31 a1.sinks.k1.hdfs.roundUnit = minute 32 # 滾動當前文件前等待的秒數 33 a1.sinks.k1.hdfs.rollInterval = 30 34 # 文件大小以位元組為單位觸發滾動 35 a1.sinks.k1.hdfs.rollSize = 204800 36 # 在滾動之前寫入文件的事件數 37 a1.sinks.k1.hdfs.rollCount = 500 38 # 在它被刷新到HDFS之前寫入文件的事件數量,每批次事件最大數 39 a1.sinks.k1.hdfs.batchSize = 100 40 a1.sinks.k1.hdfs.useLocalTimeStamp = true 41 #生成的文件類型,預設是Sequencefile,可用DataStream,則為普通文本 42 a1.sinks.k1.hdfs.fileType = DataStream 43 44 # Use a channel which buffers events in memory 45 a1.channels.c1.type = memory 46 a1.channels.c1.capacity = 1000 47 a1.channels.c1.transactionCapacity = 100 48 49 # Bind the source and sink to the channel 50 a1.sources.r1.channels = c1 51 a1.sinks.k1.channel = c1
啟動flume
1 # 啟動mini02的flume 2 3 4 # 啟動mini01的flume
啟動jar包列印日誌
1 [yun@mini01 webservice]$ pwd 2 /app/webservice 3 [yun@mini01 webservice]$ java -jar testlog.jar &
可參見:Hadoop2.7.6_02_HDFS常用操作 ----- 3.3. web日誌模擬
瀏覽器查看flume下沉的數據
6. 更多source和sink組件
Flume支持眾多的source和sink類型,詳細手冊可參考官方文檔