**前言:**本文將以 Ubuntu Server 22.04 LTS 為例,說明在 VMware 虛擬機中的安裝和配置 Linux 操作系統的步驟。 一、VMWare 安裝配置 1、VMware 下載地址:VMware Workstation Pro 16.x(需要登錄),安裝和配置步驟略。 二、 ...
第3章 Flume進階
3.1 Flume事務
3.2 Flume Agent內部原理
重要組件:
1)ChannelSelector
ChannelSelector的作用就是選出Event將要被髮往哪個Channel。其共有兩種類型,分別是Replicating(複製)和Multiplexing(多路復用)。
ReplicatingSelector會將同一個Event發往所有的Channel,Multiplexing會根據相應的原則,將不同的Event發往不同的Channel。
2)SinkProcessor
SinkProcessor共有三種類型,分別是DefaultSinkProcessor、LoadBalancingSinkProcessor和FailoverSinkProcessor
DefaultSinkProcessor對應的是單個的Sink,LoadBalancingSinkProcessor和FailoverSinkProcessor對應的是Sink Group,LoadBalancingSinkProcessor可以實現負載均衡的功能,FailoverSinkProcessor可以錯誤恢復的功能。
3.3 Flume拓撲結構
3.3.1 簡單串聯
這種模式是將多個flume順序連接起來了,從最初的source開始到最終sink傳送的目的存儲系統。此模式不建議橋接過多的flume數量, flume數量過多不僅會影響傳輸速率,而且一旦傳輸過程中某個節點flume宕機,會影響整個傳輸系統。
3.3.2 複製和多路復用
Flume支持將事件流向一個或者多個目的地。這種模式可以將相同數據複製到多個channel中,或者將不同數據分發到不同的channel中,sink可以選擇傳送到不同的目的地。
3.3.3 負載均衡和故障轉移
Flume支持使用將多個sink邏輯上分到一個sink組,sink組配合不同的SinkProcessor可以實現負載均衡和錯誤恢復的功能。
3.3.4 聚合
這種模式是我們最常見的,也非常實用,日常web應用通常分佈在上百個伺服器,大者甚至上千個、上萬個伺服器。產生的日誌,處理起來也非常麻煩。用flume的這種組合方式能很好的解決這一問題,每台伺服器部署一個flume採集日誌,傳送到一個集中收集日誌的flume,再由此flume上傳到hdfs、hive、hbase等,進行日誌分析。
3.4 Flume企業開發案例
3.4.1 複製和多路復用
1)案例需求
使用Flume-1監控文件變動,Flume-1將變動內容傳遞給Flume-2,Flume-2負責存儲到HDFS。同時Flume-1將變動內容傳遞給Flume-3,Flume-3負責輸出到Local FileSystem。
2)需求分析:
3)實現步驟:
(1)準備工作
在/opt/module/flume-1.9.0/job/目錄下創建replicating文件夾
[wolffy@hadoop102 job]$ mkdir replicating
在/opt/module/flume-1.9.0/job/目錄下創建file_roll文件夾
[wolffy@hadoop102 job]$ mkdir file_roll
(2)創建a1.conf
配置1個接收日誌文件的source和兩個channel、兩個sink,分別輸送給a2和a3。
編輯配置文件
[wolffy@hadoop102 group1]$ vim a1.conf
添加如下內容
# Name
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive-3.1.2/logs/hive.log
# sink1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 5555
# sink2
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 6666
# Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Channel
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Channel Selector
a1.sources.r1.selector.type = replicating
# bind
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
(3)創建a2.conf
配置上級Flume輸出的Source,輸出是到HDFS的Sink。
編輯配置文件
[wolffy@hadoop102 group1]$ vim a2.conf
添加如下內容
# Name
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 5555
# sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:9820/flume/%Y%m%d/%H
#上傳文件的首碼
a2.sinks.k1.hdfs.filePrefix = logs-
#是否按照時間滾動文件夾
a2.sinks.k1.hdfs.round = true
#多少時間單位創建一個新的文件夾
a2.sinks.k1.hdfs.roundValue = 1
#重新定義時間單位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地時間戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
#設置文件類型,可支持壓縮
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一個新的文件
a2.sinks.k1.hdfs.rollInterval = 30
#設置每個文件的滾動大小
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滾動與Event數量無關
a2.sinks.k1.hdfs.rollCount = 0
# Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
(4)創建a3.conf
配置上級Flume輸出的Source,輸出是到本地目錄的Sink。
編輯配置文件
[wolffy@hadoop102 group1]$ vim flume-flume-dir.conf
添加如下內容
# Name
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 6666
# sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/flume-1.9.0/job/file_roll
# Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
# bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
提示:輸出的本地目錄必須是已經存在的目錄,如果該目錄不存在,並不會創建新的目錄。
(5)執行配置文件
分別啟動對應的flume進程:a3.conf,a2.conf,a1.conf。
[wolffy@hadoop102 flume-1.9.0]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/replicating/a3.conf
[wolffy@hadoop102 flume-1.9.0]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/replicating/a2.conf
[wolffy@hadoop102 flume-1.9.0]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/replicating/a1.conf
(6)啟動Hadoop和Hive
[wolffy@hadoop102 hadoop-2.7.2]$ sbin/start-dfs.sh
[wolffy@hadoop103 hadoop-2.7.2]$ sbin/start-yarn.sh
[wolffy@hadoop102 hive]$ bin/hive
hive (default)>
(7)檢查HDFS上數據
(8)檢查/opt/module/datas/flume3目錄中數據
[wolffy@hadoop102 flume3]$ ll
總用量 8
-rw-rw-r--. 1 wolffy wolffy 5942 5月 22 00:09 1526918887550-3
3.4.2 負載均衡和故障轉移
1)案例需求
使用Flume1監控一個埠,其sink組中的sink分別對接Flume2和Flume3,採用FailoverSinkProcessor,實現故障轉移的功能。
2)需求分析
3)實現步驟
(1)準備工作
在/opt/module/flume-1.9.0/job/目錄下創建failover文件夾
[wolffy@hadoop102 job]$ mkdir failover/
[wolffy@hadoop102 job]$ cd failover/
(2)創建a1.conf
配置1個netcat source和1個channel、1個sink group(2個sink),分別輸送給a2.conf和a3.conf。
編輯配置文件
[wolffy@hadoop102 group2]$ vim a1.conf
添加如下內容
# Name
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1
# source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop102
a1.sources.r1.port = 4444
# sink1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 5555
# sink2
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 6666
# sink processor
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
# Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
(3)創建a2.conf
配置上級Flume輸出的Source,輸出是到本地控制台。
編輯配置文件
[wolffy@hadoop102 group2]$ vim a2.conf
添加如下內容
# Name
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 5555
# sink
a2.sinks.k1.type = logger
# Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
(4)創建a3.conf
配置上級Flume輸出的Source,輸出是到本地控制台。
編輯配置文件
[wolffy@hadoop102 group2]$ vim a3.conf
添加如下內容
# Name
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 6666
# sink
a3.sinks.k1.type = logger
# Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
# bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
(5)執行配置文件
分別開啟對應配置文件:a3.conf,a2.conf,a1.conf。
[wolffy@hadoop102 lume-1.9.0]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/failover/a3.conf -Dflume.wolffy.logger=INFO,console
[wolffy@hadoop102 flulume-1.9.0me]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/failover/a3.conf -Dflume.root.logger=INFO,console
[wolffy@hadoop102 lume-1.9.0]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/failover/a1.conf
(6)使用netcat工具向本機的44444埠發送內容
[wolffy@hadoop102 failover]$ nc hadoop102 4444
(7)查看Flume2及Flume3的控制台列印日誌
(8)將Flume2 kill,觀察Flume3的控制台列印情況。
2022-02-13 06:40:14,235 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F hello }
註:使用jps -ml查看Flume進程。
3.4.3 負載均衡
a1.conf
# Name
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1
# source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop102
a1.sources.r1.port = 4444
# sink1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 5555
# sink2
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 6666
# sink processor
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.selector = round_robin
# 退避
a1.sinkgroups.g1.processor.backoff = true
# Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
a2.conf
# Name
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 5555
# sink
a2.sinks.k1.type = logger
# Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
a3.conf
# Name
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 6666
# sink
a3.sinks.k1.type = logger
# Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
# bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
3.4.4 聚合
1)案例需求:
hadoop102上的Flume-1監控文件/opt/module/group.log,
hadoop103上的Flume-2監控某一個埠的數據流,
Flume-1與Flume-2將數據發送給hadoop104上的Flume-3,Flume-3將最終數據列印到控制台。
2)需求分析
3)實現步驟:
(1)準備工作
分發Flume
[wolffy@hadoop102 module]$ xsync flume-1.9.0
在hadoop102、hadoop103以及hadoop104的/opt/module/flume1.9.0/job目錄下創建一個aggre文件夾。
[wolffy@hadoop102 job]$ mkdir aggre
[wolffy@hadoop103 job]$ mkdir aggre
[wolffy@hadoop104 job]$ mkdir aggre
(2)創建flume1-logger-flume.conf
配置Source用於監控hive.log文件,配置Sink輸出數據到下一級Flume。
在hadoop102上編輯配置文件
[wolffy@hadoop102 group3]$ vim a1.conf
添加如下內容
[wolffy@hadoop102 aggre]$ cat a1.txt
# Name
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive-3.1.2/logs/hive.log
# sink1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 8888
# Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
(3)創建a2conf
配置Source監控埠44444數據流,配置Sink數據到下一級Flume:
在hadoop103上編輯配置文件
[wolffy@hadoop102 group3]$ vim a2.conf
添加如下內容
# Name
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# source
a2.sources.r1.type = netcat
a2.sources.r1.bind = hadoop103
a2.sources.r1.port = 4444
# sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 8888
# Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
(4)創建flume3-flume-logger.conf
配置source用於接收flume1與flume2發送過來的數據流,最終合併後sink到控制台。
在hadoop104上編輯配置文件
[wolffy@hadoop104 group3]$ touch a3.conf
[wolffy@hadoop104 group3]$ vim a3.conf
添加如下內容
# Name
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 8888
# sink
a3.sinks.k1.type = logger
# Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
# bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
(5)執行配置文件
分別開啟對應配置文件:a3.conf,a2.conf,a1.conf。
[root@hadoop104 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/aggre/a3.conf -Dflume.root.logger=INFO,console
[root@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/aggre/a1.conf
[root@hadoop103 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/aggre/a2.conf
(6)在hadoop102上向啟動bin/hive
[root@hadoop102 hive-3.1.2]$ bin/hive
Hive Session ID = 1a14279f-53b3-47e9-9504-534fb461235f
Logging initialized using configuration in file:/opt/module/hive-3.1.2/conf/hive-log4j2.properties Async: true
Hive Session ID = a54853cc-e857-43ee-b305-dc644db44b84
hive (default)>
(7)在hadoop103上向4444埠發送數據
[root@hadoop103 flume]$ nc hadoop103 4444
(8)檢查hadoop104上數據
2022-02-13 07:31:44,850 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 32 30 32 32 2D 30 32 2D 31 33 54 30 37 3A 33 31 2022-02-13T07:31 }
2022-02-13 07:31:44,850 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 32 30 32 32 2D 30 32 2D 31 33 54 30 37 3A 33 31 2022-02-13T07:31 }
2022-02-13 07:31:45,854 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 32 30 32 32 2D 30 32 2D 31 33 54 30 37 3A 33 31 2022-02-13T07:31 } hello }
3.5 自定義Interceptor
1)案例需求
使用Flume採集伺服器本地日誌,需要按照日誌類型的不同,將不同種類的日誌發往不同的分析系統。
2)需求分析
在實際的開發中,一臺伺服器產生的日誌類型可能有很多種,不同類型的日誌可能需要發送到不同的分析系統。此時會用到Flume拓撲結構中的Multiplexing結構,Multiplexing的原理是,根據event中Header的某個key的值,將不同的event發送到不同的Channel中,所以我們需要自定義一個Interceptor,為不同類型的event的Header中的key賦予不同的值。
在該案例中,我們以埠數據模擬日誌,以數字(單個)和字母(單個)模擬不同類型的日誌,我們需要自定義interceptor區分數字和字母,將其分別發往不同的分析系統(Channel)。
3)實現步驟
(1)創建一個maven項目,並引入以下依賴。
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>
(2)定義CustomInterceptor類並實現Interceptor介面。
package com.bdreasercher.bigdata;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.List;
public class CustomInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
byte[] body = event.getBody();
if (body[0] < 'z' && body[0] > 'a') {
event.getHeaders().put("type", "letter");
} else if (body[0] > '0' && body[0] < '9') {
event.getHeaders().put("type", "number");
}
return event;
}
@Override
public List<Event> intercept(List<Event> events) {
for (Event event : events) {
intercept(event);
}
return events;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new CustomInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
打包,把編譯好的Jar包房到lib文件夾下。
(3)編輯flume配置文件
為hadoop102上的Flume1配置1個netcat source,1個sink group(2個avro sink),並配置相應的ChannelSelector和interceptor。
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.bdreasercher.bigdata.CustomInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.letter = c1
a1.sources.r1.selector.mapping.number = c2
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 4141
a1.sinks.k2.type=avro
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 4242
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
為hadoop103上的Flume4配置一個avro source和一個logger sink。
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop103
a1.sources.r1.port = 4141
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1
為hadoop104上的Flume3配置一個avro source和一個logger sink。
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop104
a1.sources.r1.port = 4242
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1
(4)分別在hadoop102,hadoop103,hadoop104上啟動flume進程,註意先後順序。
(5)在hadoop102使用netcat向localhost:44444發送字母和數字。
(6)觀察hadoop103和hadoop104列印的日誌。
3.6 自定義Source
1)介紹
Source是負責接收數據到Flume Agent的組件。Source組件可以處理各種類型、各種格式的日誌數據,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。官方提供的source類型已經很多,但是有時候並不能滿足實際開發當中的需求,此時我們就需要根據實際需求自定義某些source。
官方也提供了自定義source的介面:https://flume.apache.org/FlumeDeveloperGuide.html#source根據官方說明自定義MySource需要繼承AbstractSource類並實現Configurable和PollableSource介面。
實現相應方法:
getBackOffSleepIncrement() //backoff 步長
getMaxBackOffSleepInterval()//backoff 最長時間
configure(Context context)//初始化context(讀取配置文件內容)
process()//獲取數據封裝成event並寫入channel,這個方法將被迴圈調用。
使用場景:讀取MySQL數據或者其他文件系統。
2)需求
使用flume接收數據,並給每條數據添加首碼,輸出到控制台。首碼可從flume配置文件中配置。
4)編碼
(1)導入pom依賴
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>
(2)編寫代碼
package com.bdreasercher.bigdata;
import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import java.util.HashMap;
public class MySource extends AbstractSource implements Configurable, PollableSource {
//定義配置文件將來要讀取的欄位
private Long delay;
private String field;
//初始化配置信息
@Override
public void configure(Context context) {
delay = context.getLong("delay");
field = context.getString("field", "Hello!");
}
@Override
public Status process() throws EventDeliveryException {
try {
//創建事件頭信息
HashMap<String, String> hearderMap = new HashMap<>();
//創建事件
SimpleEvent event = new SimpleEvent();
//迴圈封裝事件
for (int i = 0; i < 5; i++) {
//給事件設置頭信息
event.setHeaders(hearderMap);
//給事件設置內容
event.setBody((field + i).getBytes());
//將事件寫入channel
getChannelProcessor().processEvent(event);
Thread.sleep(delay);
}
} catch (Exception e) {
e.printStackTrace();
return Status.BACKOFF;
}
return Status.READY;
}
@Override
public long getBackOffSleepIncrement() {
return 0;
}
@Override
public long getMaxBackOffSleepInterval() {
return 0;
}
}
5)測試
(1)打包
將寫好的代碼打包,並放到flume的lib目錄(/opt/module/flume-1.9.0/lib)下。
(2)配置文件
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = com.bdreasercher.bigdata.MySource
a1.sources.r1.delay = 1000
#a1.sources.r1.field = root
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
(3)開啟任務
[root@hadoop102 flume-1.9.0]$ bin/flume-ng agent -c conf/ -f job/mysource.conf -n a1 -Dflume.root.logger=INFO,console
(4)結果展示
3.7 自定義Sink
1)介紹
Sink不斷地輪詢Channel中的事件且批量地移除它們,並將這些事件批量寫入到存儲或索引系統、或者被髮送到另一個Flume Agent。
Sink是完全事務性的。在從Channel批量刪除數據之前,每個Sink用Channel啟動一個事務。批量事件一旦成功寫出到存儲系統或下一個Flume Agent,Sink就利用Channel提交事務。事務一旦被提交,該Channel從自己的內部緩衝區刪除事件。
Sink組件目的地包括hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定義。官方提供的Sink類型已經很多,但是有時候並不能滿足實際開發當中的需求,此時我們就需要根據實際需求自定義某些Sink。
官方也提供了自定義sink的介面:https://flume.apache.org/FlumeDeveloperGuide.html#sink根據官方說明自定義MySink需要繼承AbstractSink類並實現Configurable介面。
實現相應方法:
configure(Context context)//初始化context(讀取配置文件內容)
process()//從Channel讀取獲取數據(event),這個方法將被迴圈調用。
使用場景:讀取Channel數據寫入MySQL或者其他文件系統。
2)需求
使用flume接收數據,併在Sink端給每條數據添加首碼和尾碼,輸出到控制台。前尾碼可在flume任務配置文件中配置。
流程分析:
3)編碼
package com.bdreasercher.bigdata;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MySink extends AbstractSink implements Configurable {
//創建Logger對象
private static final Logger LOG = LoggerFactory.getLogger(AbstractSink.class);
private String prefix;
private String suffix;
@Override
public Status process() throws EventDeliveryException {
//聲明返回值狀態信息
Status status;
//獲取當前Sink綁定的Channel
Channel ch = getChannel();
//獲取事務
Transaction txn = ch.getTransaction();
//聲明事件
Event event;
//開啟事務
txn.begin();
//讀取Channel中的事件,直到讀取到事件結束迴圈
while (true) {
event = ch.take();
if (event != null) {
break;
}
}
try {
//處理事件(列印)
LOG.info(prefix + new String(event.getBody()) + suffix);
//事務提交
txn.commit();
status = Status.READY;
} catch (Exception e) {
//遇到異常,事務回滾
txn.rollback();
status = Status.BACKOFF;
} finally {
//關閉事務
txn.close();
}
return status;
}
@Override
public void configure(Context context) {
//讀取配置文件內容,有預設值
prefix = context.getString("prefix", "hello:");
//讀取配置文件內容,無預設值
suffix = context.getString("suffix");
}
}
4)測試
(1)打包
將寫好的代碼打包,並放到flume的lib目錄(/opt/module/flume-1.9.0/lib)下。
(2)配置文件
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = com.bdreasercher.bigdata.MySink
#a1.sinks.k1.prefix = root:
a1.sinks.k1.suffix = :root
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
(3)開啟任務
[root@hadoop102 flume-1.9.0]$ bin/flume-ng agent -c conf/ -f job/mysink.conf -n a1 -Dflume.root.logger=INFO,console
[root@hadoop102 ~]$ nc localhost 44444
hello
OK
root
OK
(4)結果展示
3.8 Flume數據流監控
3.8.1 Ganglia的安裝與部署
Ganglia由gmond、gmetad、gweb三部分組成。
-
gmond(Ganglia Monitoring Daemon)是一種輕量級服務,安裝在每台需要收集指標數據的節點主機上。使用gmond,你可以很容易收集很多系統指標數據,如CPU、記憶體、磁碟、網路和活躍進程的數據等。
-
gmetad(Ganglia Meta Daemon)整合所有信息,並將其以RRD格式存儲至磁碟的服務。
-
gweb(Ganglia Web)Ganglia可視化工具,gweb是一種利用瀏覽器顯示gmetad所存儲數據的PHP前端。在Web界面中以圖表方式展現集群的運行狀態下收集的多種不同指標數據。
1)安裝ganglia
(1)規劃
hadoop102: web gmetad gmod
hadoop103: gmod
hadoop104: gmod
(2)在102 103 104分別安裝epel-release
[root@hadoop102 flume-1.9.0]$ sudo yum -y install epel-release
[root@hadoop103 flume-1.9.0]$ sudo yum -y install epel-release
[root@hadoop104 flume-1.9.0]$ sudo yum -y install epel-release
(3)在102 安裝 web gmetad gmod
[root@hadoop102 flume-1.9.0]$ sudo yum -y install ganglia-gmetad
[root@hadoop102 flume-1.9.0]$ sudo yum -y install ganglia-web
[root@hadoop102 flume-1.9.0]$ sudo yum -y install ganglia-gmond
(4)在103 和 104 安裝 gmod
[root@hadoop103 flume-1.9.0]$ sudo yum -y install ganglia-gmond
[root@hadoop104 flume-1.9.0]$ sudo yum -y install ganglia-gmond
2)在102修改配置文件 /etc/httpd/conf.d/ganglia.conf
[root@hadoop102 flume-1.9.0]$ sudo vim /etc/httpd/conf.d/ganglia.conf
修改為紅顏色的配置:
# Ganglia monitoring system php web frontend
Alias /ganglia /usr/share/ganglia
<Location /ganglia>
# Require ip 10.1.2.3
# Require host example.org
Require all granted
</Location>
5)在102修改配置文件/etc/ganglia/gmetad.conf
[root@hadoop102 flume-1.9.0]$ sudo vim /etc/ganglia/gmetad.conf
修改為:
data_source "my cluster" hadoop102
6)在102 103 104分別修改配置文件/etc/ganglia/gmond.conf
[root@hadoop102 flume-1.9.0]$ sudo vim /etc/ganglia/gmond.conf
修改為:
cluster {
name = "my cluster"
owner = "unspecified"
latlong = "unspecified"
url = "unspecified"
}
udp_send_channel {
#bind_hostname = yes # Highly recommended, soon to be default.
# This option tells gmond to use a source address
# that resolves to the machine's hostname. Without
# this, the metrics may appear to come from any
# interface and the DNS names associated with
# those IPs will be used to create the RRDs.
# mcast_join = 239.2.11.71
# 數據發送給hadoop102
host = hadoop102
port = 8649
ttl = 1
}
udp_recv_channel {
# mcast_join = 239.2.11.71
port = 8649
# 接收來自任意連接的數據
bind = 0.0.0.0
retry_bind = true
# Size of the UDP buffer. If you are handling lots of metrics you really
# should bump it up to e.g. 10MB or even higher.
# buffer = 10485760
}
7)在102修改配置文件 /etc/selinux/config
[root@hadoop102 flume-1.9.0]$ sudo vim /etc/selinux/config
修改為:
# This file controls the state of SELinux on the system.
# SELINUX= can take one of these three values:
# enforcing - SELinux security policy is enforced.
# permissive - SELinux prints warnings instead of enforcing.
# disabled - No SELinux policy is loaded.
SELINUX=disabled
# SELINUXTYPE= can take one of these two values:
# targeted - Targeted processes are protected,
# mls - Multi Level Security protection.
SELINUXTYPE=targeted
尖叫提示:selinux本次生效關閉必須重啟,如果此時不想重啟,可以臨時生效之:
[root@hadoop102 flume-1.9.0]$ sudo setenforce 0
8)啟動ganglia
(1)在102 103 104 分別啟動
[root@hadoop102 flume-1.9.0]$ sudo systemctl start gmond
[root@hadoop104 flume-1.9.0]$ sudo systemctl status gmond
● gmond.service - Ganglia Monitoring Daemon
Loaded: loaded (/usr/lib/systemd/system/gmond.service; disabled; vendor preset: disabled)
Active: active (running) since 二 2022-01-25 20:32:00 CST; 2 weeks 4 days ago
Process: 43115 ExecStart=/usr/sbin/gmond (code=exited, status=0/SUCCESS)
Main PID: 43116 (gmond)
CGroup: /system.slice/gmond.service
└─43116 /usr/sbin/gmond
1月 25 20:32:00 hadoop104 systemd[1]: Starting Ganglia Monitoring Daemon...
1月 25 20:32:00 hadoop104 systemd[1]: Started Ganglia Monitoring Daemon.
(2)在102 啟動
[root@hadoop102 flume-1.9.0]$ sudo systemctl start httpd
[root@hadoop102 flume-1.9.0]$ sudo systemctl status httpd
● httpd.service - The Apache HTTP Server
Loaded: loaded (/usr/lib/systemd/system/httpd.service; disabled; vendor preset: disabled)
Active: active (running) since 二 2022-01-25 20:41:19 CST; 2 weeks 4 days ago
Docs: man:httpd(8)
man:apachectl(8)
Process: 86976 ExecStop=/bin/kill -WINCH ${MAINPID} (code=exited, status=0/SUCCESS)
Process: 3268 ExecReload=/usr/sbin/httpd $OPTIONS -k graceful (code=exited, status=0/SUCCESS)
Main PID: 86981 (httpd)
Status: "Total requests: 0; Current requests/sec: 0; Current traffic: 0 B/sec"
CGroup: /system.slice/httpd.service
├─ 3273 /usr/sbin/httpd -DFOREGROUND
├─ 3274 /usr/sbin/httpd -DFOREGROUND
├─ 3275 /usr/sbin/httpd -DFOREGROUND
├─ 3276 /usr/sbin/httpd -DFOREGROUND
├─ 3277 /usr/sbin/httpd -DFOREGROUND
└─86981 /usr/sbin/httpd -DFOREGROUND
[root@hadoop102 flume-1.9.0]$ sudo systemctl start gmetad
[root@hadoop102 flume-1.9.0]$ sudo systemctl status gmetad
● gmetad.service - Ganglia Meta Daemon
Loaded: loaded (/usr/lib/systemd/system/gmetad.service; disabled; vendor preset: disabled)
Active: active (running) since 二 2022-01-25 20:32:17 CST; 2 weeks 4 days ago
Main PID: 86918 (gmetad)
CGroup: /system.slice/gmetad.service
└─86918 /usr/sbin/gmetad -d 1
9)打開網頁瀏覽ganglia頁面
尖叫提示:如果完成以上操作依然出現許可權不足錯誤,請修改/var/lib/ganglia目錄的許可權。
[root@hadoop102 flume-1.9.0]$ sudo chmod -R 777 /var/lib/ganglia
參考:https://www.cnblogs.com/cosmos-wong/p/11980500.html
3.8.2 操作Flume測試監控
1)啟動Flume任務
[root@hadoop102 flume-1.9.0]$ bin/flume-ng agent \
-c conf/ \
-n a1 \
-f job/netcat-flume-logger.conf \
-Dflume.root.logger=INFO,console \
-Dflume.monitoring.type=ganglia \
-Dflume.monitoring.hosts=hadoop102:8649
2)發送數據觀察ganglia監測圖
[root@hadoop102 flume-1.9.0]$ nc localhost 6666
樣式如圖:
圖例說明:
欄位(圖表名稱) | 欄位含義 |
---|---|
EventPutAttemptCount | source嘗試寫入channel的事件總數量 |
EventPutSuccessCount | 成功寫入channel且提交的事件總數量 |
EventTakeAttemptCount | sink嘗試從channel拉取事件的總數量。 |
EventTakeSuccessCount | sink成功讀取的事件的總數量 |
StartTime | channel啟動的時間(毫秒) |
StopTime | channel停止的時間(毫秒) |
ChannelSize | 目前channel中事件的總數量 |
ChannelFillPercentage | channel占用百分比 |
ChannelCapacity | channel的容量 |
IT學習網站
Spring Cloud Alibaba學習資源推薦
Spring Cloud Alibaba 微服務架構實戰
鏈接: https://pan.baidu.com/s/1nJpfbjPyf3n339INybjOQQ?pwd=8t11 提取碼: 8t11
--來自百度網盤超級會員v4的分享
失效加V:x923713