大數據技術之Flume 第3章 Flume進階

来源:https://www.cnblogs.com/niuniu2022/archive/2022/06/07/16350388.html
-Advertisement-
Play Games

**前言:**本文將以 Ubuntu Server 22.04 LTS 為例,說明在 VMware 虛擬機中的安裝和配置 Linux 操作系統的步驟。 一、VMWare 安裝配置 1、VMware 下載地址:VMware Workstation Pro 16.x(需要登錄),安裝和配置步驟略。 二、 ...


第3章 Flume進階

3.1 Flume事務

image-20220213050229161

3.2 Flume Agent內部原理

image-20220213050259680

重要組件:

1)ChannelSelector

ChannelSelector的作用就是選出Event將要被髮往哪個Channel。其共有兩種類型,分別是Replicating(複製)和Multiplexing(多路復用)。

ReplicatingSelector會將同一個Event發往所有的Channel,Multiplexing會根據相應的原則,將不同的Event發往不同的Channel。

2)SinkProcessor

SinkProcessor共有三種類型,分別是DefaultSinkProcessor、LoadBalancingSinkProcessor和FailoverSinkProcessor

DefaultSinkProcessor對應的是單個的Sink,LoadBalancingSinkProcessor和FailoverSinkProcessor對應的是Sink Group,LoadBalancingSinkProcessor可以實現負載均衡的功能,FailoverSinkProcessor可以錯誤恢復的功能。

3.3 Flume拓撲結構

3.3.1 簡單串聯

image-20220213050429267

這種模式是將多個flume順序連接起來了,從最初的source開始到最終sink傳送的目的存儲系統。此模式不建議橋接過多的flume數量, flume數量過多不僅會影響傳輸速率,而且一旦傳輸過程中某個節點flume宕機,會影響整個傳輸系統。

3.3.2 複製和多路復用

image-20220213050521110

Flume支持將事件流向一個或者多個目的地。這種模式可以將相同數據複製到多個channel中,或者將不同數據分發到不同的channel中,sink可以選擇傳送到不同的目的地。

3.3.3 負載均衡和故障轉移

image-20220213050559380

Flume支持使用將多個sink邏輯上分到一個sink組,sink組配合不同的SinkProcessor可以實現負載均衡和錯誤恢復的功能。

3.3.4 聚合

image-20220213050642471

這種模式是我們最常見的,也非常實用,日常web應用通常分佈在上百個伺服器,大者甚至上千個、上萬個伺服器。產生的日誌,處理起來也非常麻煩。用flume的這種組合方式能很好的解決這一問題,每台伺服器部署一個flume採集日誌,傳送到一個集中收集日誌的flume,再由此flume上傳到hdfs、hive、hbase等,進行日誌分析。

3.4 Flume企業開發案例

3.4.1 複製和多路復用

1)案例需求

使用Flume-1監控文件變動,Flume-1將變動內容傳遞給Flume-2,Flume-2負責存儲到HDFS。同時Flume-1將變動內容傳遞給Flume-3,Flume-3負責輸出到Local FileSystem。

2)需求分析:

單數據源多出口複製channel

3)實現步驟:

(1)準備工作

在/opt/module/flume-1.9.0/job/目錄下創建replicating文件夾

[wolffy@hadoop102 job]$ mkdir replicating

在/opt/module/flume-1.9.0/job/目錄下創建file_roll文件夾

[wolffy@hadoop102 job]$ mkdir file_roll

(2)創建a1.conf

配置1個接收日誌文件的source和兩個channel、兩個sink,分別輸送給a2和a3。

編輯配置文件

[wolffy@hadoop102 group1]$ vim a1.conf

添加如下內容

# Name
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive-3.1.2/logs/hive.log

# sink1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 5555

# sink2
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 6666

# Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Channel
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Channel Selector
a1.sources.r1.selector.type = replicating

# bind
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

(3)創建a2.conf

配置上級Flume輸出的Source,輸出是到HDFS的Sink。

編輯配置文件

[wolffy@hadoop102 group1]$ vim a2.conf

添加如下內容

# Name
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 5555


# sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:9820/flume/%Y%m%d/%H
#上傳文件的首碼
a2.sinks.k1.hdfs.filePrefix = logs-
#是否按照時間滾動文件夾
a2.sinks.k1.hdfs.round = true
#多少時間單位創建一個新的文件夾
a2.sinks.k1.hdfs.roundValue = 1
#重新定義時間單位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地時間戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
#設置文件類型,可支持壓縮
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一個新的文件
a2.sinks.k1.hdfs.rollInterval = 30
#設置每個文件的滾動大小
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滾動與Event數量無關
a2.sinks.k1.hdfs.rollCount = 0


# Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

(4)創建a3.conf

配置上級Flume輸出的Source,輸出是到本地目錄的Sink。

編輯配置文件

[wolffy@hadoop102 group1]$ vim flume-flume-dir.conf

添加如下內容

# Name
a3.sources = r1
a3.sinks = k1
a3.channels = c1

# source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 6666

# sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/flume-1.9.0/job/file_roll


# Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

提示:輸出的本地目錄必須是已經存在的目錄,如果該目錄不存在,並不會創建新的目錄。

(5)執行配置文件

分別啟動對應的flume進程:a3.conf,a2.conf,a1.conf。

[wolffy@hadoop102 flume-1.9.0]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/replicating/a3.conf
[wolffy@hadoop102 flume-1.9.0]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/replicating/a2.conf
[wolffy@hadoop102 flume-1.9.0]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/replicating/a1.conf

(6)啟動Hadoop和Hive

[wolffy@hadoop102 hadoop-2.7.2]$ sbin/start-dfs.sh
[wolffy@hadoop103 hadoop-2.7.2]$ sbin/start-yarn.sh
[wolffy@hadoop102 hive]$ bin/hive
hive (default)>

(7)檢查HDFS上數據

image-20220213054715024

(8)檢查/opt/module/datas/flume3目錄中數據

[wolffy@hadoop102 flume3]$ ll
總用量 8
-rw-rw-r--. 1 wolffy wolffy 5942 5月 22 00:09 1526918887550-3

3.4.2 負載均衡和故障轉移

1)案例需求

使用Flume1監控一個埠,其sink組中的sink分別對接Flume2和Flume3,採用FailoverSinkProcessor,實現故障轉移的功能。

2)需求分析

負載均衡和故障轉移

3)實現步驟

(1)準備工作

在/opt/module/flume-1.9.0/job/目錄下創建failover文件夾

[wolffy@hadoop102 job]$ mkdir failover/
[wolffy@hadoop102 job]$ cd failover/

(2)創建a1.conf

配置1個netcat source和1個channel、1個sink group(2個sink),分別輸送給a2.conf和a3.conf。

編輯配置文件

[wolffy@hadoop102 group2]$ vim a1.conf

添加如下內容

# Name
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1

# source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop102
a1.sources.r1.port = 4444
# sink1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 5555

# sink2
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 6666

# sink processor
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

# Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

(3)創建a2.conf

配置上級Flume輸出的Source,輸出是到本地控制台。

編輯配置文件

[wolffy@hadoop102 group2]$ vim a2.conf

添加如下內容

# Name
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 5555

# sink
a2.sinks.k1.type = logger

# Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

(4)創建a3.conf

配置上級Flume輸出的Source,輸出是到本地控制台。

編輯配置文件

[wolffy@hadoop102 group2]$ vim a3.conf

添加如下內容

# Name
a3.sources = r1
a3.sinks = k1
a3.channels = c1

# source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 6666

# sink
a3.sinks.k1.type = logger

# Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

(5)執行配置文件

分別開啟對應配置文件:a3.conf,a2.conf,a1.conf。

[wolffy@hadoop102 lume-1.9.0]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/failover/a3.conf -Dflume.wolffy.logger=INFO,console

[wolffy@hadoop102 flulume-1.9.0me]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/failover/a3.conf -Dflume.root.logger=INFO,console

[wolffy@hadoop102 lume-1.9.0]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/failover/a1.conf

(6)使用netcat工具向本機的44444埠發送內容

[wolffy@hadoop102 failover]$ nc hadoop102 4444

(7)查看Flume2及Flume3的控制台列印日誌

(8)將Flume2 kill,觀察Flume3的控制台列印情況。

 2022-02-13 06:40:14,235 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F                                  hello }

註:使用jps -ml查看Flume進程。

3.4.3 負載均衡

a1.conf

# Name
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1

# source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop102
a1.sources.r1.port = 4444
# sink1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 5555

# sink2
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 6666

# sink processor

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.selector = round_robin
# 退避
a1.sinkgroups.g1.processor.backoff = true


# Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

a2.conf

# Name
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 5555


# sink
a2.sinks.k1.type = logger

# Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

a3.conf

# Name
a3.sources = r1
a3.sinks = k1
a3.channels = c1

# source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 6666


# sink
a3.sinks.k1.type = logger


# Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

3.4.4 聚合

1)案例需求:

hadoop102上的Flume-1監控文件/opt/module/group.log,

hadoop103上的Flume-2監控某一個埠的數據流,

Flume-1與Flume-2將數據發送給hadoop104上的Flume-3,Flume-3將最終數據列印到控制台。

2)需求分析

聚合

3)實現步驟:

(1)準備工作

分發Flume

[wolffy@hadoop102 module]$ xsync flume-1.9.0

在hadoop102、hadoop103以及hadoop104的/opt/module/flume1.9.0/job目錄下創建一個aggre文件夾。

[wolffy@hadoop102 job]$ mkdir aggre
[wolffy@hadoop103 job]$ mkdir aggre
[wolffy@hadoop104 job]$ mkdir aggre

(2)創建flume1-logger-flume.conf

配置Source用於監控hive.log文件,配置Sink輸出數據到下一級Flume。

在hadoop102上編輯配置文件

[wolffy@hadoop102 group3]$ vim a1.conf 

添加如下內容

[wolffy@hadoop102 aggre]$ cat a1.txt 
# Name
a1.sources = r1
a1.sinks = k1 
a1.channels = c1

# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive-3.1.2/logs/hive.log

# sink1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 8888

# Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(3)創建a2conf

配置Source監控埠44444數據流,配置Sink數據到下一級Flume:

在hadoop103上編輯配置文件

[wolffy@hadoop102 group3]$ vim a2.conf

添加如下內容

# Name
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# source
a2.sources.r1.type = netcat
a2.sources.r1.bind = hadoop103
a2.sources.r1.port = 4444

# sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 8888

# Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

(4)創建flume3-flume-logger.conf

配置source用於接收flume1與flume2發送過來的數據流,最終合併後sink到控制台。

在hadoop104上編輯配置文件

[wolffy@hadoop104 group3]$ touch a3.conf
[wolffy@hadoop104 group3]$ vim a3.conf

添加如下內容

# Name
a3.sources = r1
a3.sinks = k1
a3.channels = c1

# source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 8888

# sink
a3.sinks.k1.type = logger

# Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

(5)執行配置文件

分別開啟對應配置文件:a3.conf,a2.conf,a1.conf。

[root@hadoop104 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/aggre/a3.conf -Dflume.root.logger=INFO,console
[root@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/aggre/a1.conf
[root@hadoop103 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/aggre/a2.conf

(6)在hadoop102上向啟動bin/hive

[root@hadoop102 hive-3.1.2]$ bin/hive
Hive Session ID = 1a14279f-53b3-47e9-9504-534fb461235f
Logging initialized using configuration in file:/opt/module/hive-3.1.2/conf/hive-log4j2.properties Async: true
Hive Session ID = a54853cc-e857-43ee-b305-dc644db44b84
hive (default)>

(7)在hadoop103上向4444埠發送數據

[root@hadoop103 flume]$ nc hadoop103 4444

(8)檢查hadoop104上數據

2022-02-13 07:31:44,850 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 32 30 32 32 2D 30 32 2D 31 33 54 30 37 3A 33 31 2022-02-13T07:31 }
2022-02-13 07:31:44,850 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 32 30 32 32 2D 30 32 2D 31 33 54 30 37 3A 33 31 2022-02-13T07:31 }
2022-02-13 07:31:45,854 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 32 30 32 32 2D 30 32 2D 31 33 54 30 37 3A 33 31 2022-02-13T07:31 }                              hello }

3.5 自定義Interceptor

1)案例需求

使用Flume採集伺服器本地日誌,需要按照日誌類型的不同,將不同種類的日誌發往不同的分析系統。

2)需求分析

在實際的開發中,一臺伺服器產生的日誌類型可能有很多種,不同類型的日誌可能需要發送到不同的分析系統。此時會用到Flume拓撲結構中的Multiplexing結構,Multiplexing的原理是,根據event中Header的某個key的值,將不同的event發送到不同的Channel中,所以我們需要自定義一個Interceptor,為不同類型的event的Header中的key賦予不同的值。

在該案例中,我們以埠數據模擬日誌,以數字(單個)和字母(單個)模擬不同類型的日誌,我們需要自定義interceptor區分數字和字母,將其分別發往不同的分析系統(Channel)。

image-20220213091646549

3)實現步驟

(1)創建一個maven項目,並引入以下依賴。

<dependency>
   <groupId>org.apache.flume</groupId>
   <artifactId>flume-ng-core</artifactId>
   <version>1.9.0</version>
 </dependency>

(2)定義CustomInterceptor類並實現Interceptor介面。

package com.bdreasercher.bigdata;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.List;

public class CustomInterceptor implements Interceptor {
    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        byte[] body = event.getBody();
        if (body[0] < 'z' && body[0] > 'a') {
            event.getHeaders().put("type", "letter");
        } else if (body[0] > '0' && body[0] < '9') {
            event.getHeaders().put("type", "number");
        }
        return event;

    }

    @Override
    public List<Event> intercept(List<Event> events) {
        for (Event event : events) {
            intercept(event);
        }
        return events;

    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {

        @Override
        public Interceptor build() {
            return new CustomInterceptor();
        }

        @Override
        public void configure(Context context) {
        }
    }

}

打包,把編譯好的Jar包房到lib文件夾下。

(3)編輯flume配置文件

為hadoop102上的Flume1配置1個netcat source,1個sink group(2個avro sink),並配置相應的ChannelSelector和interceptor。

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444

a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.bdreasercher.bigdata.CustomInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.letter = c1
a1.sources.r1.selector.mapping.number = c2
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 4141

a1.sinks.k2.type=avro
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 4242

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100


# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

為hadoop103上的Flume4配置一個avro source和一個logger sink。

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop103
a1.sources.r1.port = 4141

a1.sinks.k1.type = logger


a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

為hadoop104上的Flume3配置一個avro source和一個logger sink。

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop104
a1.sources.r1.port = 4242

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

(4)分別在hadoop102,hadoop103,hadoop104上啟動flume進程,註意先後順序。

(5)在hadoop102使用netcat向localhost:44444發送字母和數字。

(6)觀察hadoop103和hadoop104列印的日誌。

3.6 自定義Source

1)介紹

Source是負責接收數據到Flume Agent的組件。Source組件可以處理各種類型、各種格式的日誌數據,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。官方提供的source類型已經很多,但是有時候並不能滿足實際開發當中的需求,此時我們就需要根據實際需求自定義某些source。

官方也提供了自定義source的介面:https://flume.apache.org/FlumeDeveloperGuide.html#source根據官方說明自定義MySource需要繼承AbstractSource類並實現Configurable和PollableSource介面。

實現相應方法:

getBackOffSleepIncrement() //backoff 步長

getMaxBackOffSleepInterval()//backoff 最長時間

configure(Context context)//初始化context(讀取配置文件內容)

process()//獲取數據封裝成event並寫入channel,這個方法將被迴圈調用。

使用場景:讀取MySQL數據或者其他文件系統。

2)需求

使用flume接收數據,並給每條數據添加首碼,輸出到控制台。首碼可從flume配置文件中配置。

image-20220213091804669

image-20220213091823718

4)編碼

(1)導入pom依賴

<dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.9.0</version>
</dependency>

(2)編寫代碼

package com.bdreasercher.bigdata;

import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;

import java.util.HashMap;

public class MySource extends AbstractSource implements Configurable, PollableSource {

    //定義配置文件將來要讀取的欄位
    private Long delay;
    private String field;

    //初始化配置信息
    @Override
    public void configure(Context context) {
        delay = context.getLong("delay");
        field = context.getString("field", "Hello!");
    }

    @Override
    public Status process() throws EventDeliveryException {

        try {
            //創建事件頭信息
            HashMap<String, String> hearderMap = new HashMap<>();
            //創建事件
            SimpleEvent event = new SimpleEvent();
            //迴圈封裝事件
            for (int i = 0; i < 5; i++) {
                //給事件設置頭信息
                event.setHeaders(hearderMap);
                //給事件設置內容
                event.setBody((field + i).getBytes());
                //將事件寫入channel
                getChannelProcessor().processEvent(event);
                Thread.sleep(delay);
            }
        } catch (Exception e) {
            e.printStackTrace();
            return Status.BACKOFF;
        }
        return Status.READY;
    }

    @Override
    public long getBackOffSleepIncrement() {
        return 0;
    }

    @Override
    public long getMaxBackOffSleepInterval() {
        return 0;
    }
}

5)測試

(1)打包

將寫好的代碼打包,並放到flume的lib目錄(/opt/module/flume-1.9.0/lib)下。

(2)配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = com.bdreasercher.bigdata.MySource
a1.sources.r1.delay = 1000
#a1.sources.r1.field = root

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(3)開啟任務

[root@hadoop102 flume-1.9.0]$ bin/flume-ng agent -c conf/ -f job/mysource.conf -n a1 -Dflume.root.logger=INFO,console

(4)結果展示

image-20220213204602544

3.7 自定義Sink

1)介紹

Sink不斷地輪詢Channel中的事件且批量地移除它們,並將這些事件批量寫入到存儲或索引系統、或者被髮送到另一個Flume Agent。

Sink是完全事務性的。在從Channel批量刪除數據之前,每個Sink用Channel啟動一個事務。批量事件一旦成功寫出到存儲系統或下一個Flume Agent,Sink就利用Channel提交事務。事務一旦被提交,該Channel從自己的內部緩衝區刪除事件。

Sink組件目的地包括hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定義。官方提供的Sink類型已經很多,但是有時候並不能滿足實際開發當中的需求,此時我們就需要根據實際需求自定義某些Sink。

官方也提供了自定義sink的介面:https://flume.apache.org/FlumeDeveloperGuide.html#sink根據官方說明自定義MySink需要繼承AbstractSink類並實現Configurable介面。

實現相應方法:

configure(Context context)//初始化context(讀取配置文件內容)

process()//從Channel讀取獲取數據(event),這個方法將被迴圈調用。

使用場景:讀取Channel數據寫入MySQL或者其他文件系統。

2)需求

使用flume接收數據,併在Sink端給每條數據添加首碼和尾碼,輸出到控制台。前尾碼可在flume任務配置文件中配置。

流程分析:

image-20220213092122746

3)編碼

package com.bdreasercher.bigdata;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySink extends AbstractSink implements Configurable {

    //創建Logger對象
    private static final Logger LOG = LoggerFactory.getLogger(AbstractSink.class);

    private String prefix;
    private String suffix;

    @Override
    public Status process() throws EventDeliveryException {

        //聲明返回值狀態信息
        Status status;

        //獲取當前Sink綁定的Channel
        Channel ch = getChannel();

        //獲取事務
        Transaction txn = ch.getTransaction();

        //聲明事件
        Event event;

        //開啟事務
        txn.begin();

        //讀取Channel中的事件,直到讀取到事件結束迴圈
        while (true) {
            event = ch.take();
            if (event != null) {
                break;
            }
        }
        try {
            //處理事件(列印)
            LOG.info(prefix + new String(event.getBody()) + suffix);

            //事務提交
            txn.commit();
            status = Status.READY;
        } catch (Exception e) {

            //遇到異常,事務回滾
            txn.rollback();
            status = Status.BACKOFF;
        } finally {

            //關閉事務
            txn.close();
        }
        return status;
    }

    @Override
    public void configure(Context context) {

        //讀取配置文件內容,有預設值
        prefix = context.getString("prefix", "hello:");

        //讀取配置文件內容,無預設值
        suffix = context.getString("suffix");
    }
}

4)測試

(1)打包

將寫好的代碼打包,並放到flume的lib目錄(/opt/module/flume-1.9.0/lib)下。

(2)配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = com.bdreasercher.bigdata.MySink
#a1.sinks.k1.prefix = root:
a1.sinks.k1.suffix = :root

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(3)開啟任務

[root@hadoop102 flume-1.9.0]$ bin/flume-ng agent -c conf/ -f job/mysink.conf -n a1 -Dflume.root.logger=INFO,console
[root@hadoop102 ~]$ nc localhost 44444
hello
OK
root
OK

(4)結果展示

3.8 Flume數據流監控

3.8.1 Ganglia的安裝與部署

Ganglia由gmond、gmetad、gweb三部分組成。

  • gmond(Ganglia Monitoring Daemon)是一種輕量級服務,安裝在每台需要收集指標數據的節點主機上。使用gmond,你可以很容易收集很多系統指標數據,如CPU、記憶體、磁碟、網路和活躍進程的數據等。

  • gmetad(Ganglia Meta Daemon)整合所有信息,並將其以RRD格式存儲至磁碟的服務。

  • gweb(Ganglia Web)Ganglia可視化工具,gweb是一種利用瀏覽器顯示gmetad所存儲數據的PHP前端。在Web界面中以圖表方式展現集群的運行狀態下收集的多種不同指標數據。

1)安裝ganglia

(1)規劃

hadoop102:   web gmetad gmod 
hadoop103:   gmod
hadoop104:   gmod

(2)在102 103 104分別安裝epel-release

[root@hadoop102 flume-1.9.0]$ sudo yum -y install epel-release
[root@hadoop103 flume-1.9.0]$ sudo yum -y install epel-release
[root@hadoop104 flume-1.9.0]$ sudo yum -y install epel-release

(3)在102 安裝 web gmetad gmod

[root@hadoop102 flume-1.9.0]$ sudo yum -y install ganglia-gmetad 
[root@hadoop102 flume-1.9.0]$ sudo yum -y install ganglia-web
[root@hadoop102 flume-1.9.0]$ sudo yum -y install ganglia-gmond

(4)在103 和 104 安裝 gmod

[root@hadoop103 flume-1.9.0]$ sudo yum -y install ganglia-gmond
[root@hadoop104 flume-1.9.0]$ sudo yum -y install ganglia-gmond

2)在102修改配置文件 /etc/httpd/conf.d/ganglia.conf

[root@hadoop102 flume-1.9.0]$ sudo vim /etc/httpd/conf.d/ganglia.conf

修改為紅顏色的配置:

# Ganglia monitoring system php web frontend
Alias /ganglia /usr/share/ganglia
<Location /ganglia>
  # Require ip 10.1.2.3
  # Require host example.org
  Require all granted 
</Location>

5)在102修改配置文件/etc/ganglia/gmetad.conf

[root@hadoop102 flume-1.9.0]$ sudo vim /etc/ganglia/gmetad.conf

修改為:

data_source "my cluster" hadoop102

6)在102 103 104分別修改配置文件/etc/ganglia/gmond.conf

[root@hadoop102 flume-1.9.0]$ sudo vim /etc/ganglia/gmond.conf 

修改為:

cluster {
  name = "my cluster"
  owner = "unspecified"
  latlong = "unspecified"
  url = "unspecified"
}
udp_send_channel {
  #bind_hostname = yes # Highly recommended, soon to be default.
                       # This option tells gmond to use a source address
                       # that resolves to the machine's hostname.  Without
                       # this, the metrics may appear to come from any
                       # interface and the DNS names associated with
                       # those IPs will be used to create the RRDs.
  # mcast_join = 239.2.11.71
  # 數據發送給hadoop102
  host = hadoop102
  port = 8649
  ttl = 1
}
udp_recv_channel {
  # mcast_join = 239.2.11.71
  port = 8649
  # 接收來自任意連接的數據
  bind = 0.0.0.0
  retry_bind = true
  # Size of the UDP buffer. If you are handling lots of metrics you really
  # should bump it up to e.g. 10MB or even higher.
  # buffer = 10485760
}

7)在102修改配置文件 /etc/selinux/config

[root@hadoop102 flume-1.9.0]$ sudo vim /etc/selinux/config

修改為:

# This file controls the state of SELinux on the system.
# SELINUX= can take one of these three values:
#     enforcing - SELinux security policy is enforced.
#     permissive - SELinux prints warnings instead of enforcing.
#     disabled - No SELinux policy is loaded.
SELINUX=disabled
# SELINUXTYPE= can take one of these two values:
#     targeted - Targeted processes are protected,
#     mls - Multi Level Security protection.
SELINUXTYPE=targeted

尖叫提示:selinux本次生效關閉必須重啟,如果此時不想重啟,可以臨時生效之:

[root@hadoop102 flume-1.9.0]$ sudo setenforce 0

8)啟動ganglia

(1)在102 103 104 分別啟動

[root@hadoop102 flume-1.9.0]$ sudo systemctl start gmond
[root@hadoop104 flume-1.9.0]$ sudo systemctl status gmond
● gmond.service - Ganglia Monitoring Daemon
   Loaded: loaded (/usr/lib/systemd/system/gmond.service; disabled; vendor preset: disabled)
   Active: active (running) since 二 2022-01-25 20:32:00 CST; 2 weeks 4 days ago
  Process: 43115 ExecStart=/usr/sbin/gmond (code=exited, status=0/SUCCESS)
 Main PID: 43116 (gmond)
   CGroup: /system.slice/gmond.service
           └─43116 /usr/sbin/gmond

1月 25 20:32:00 hadoop104 systemd[1]: Starting Ganglia Monitoring Daemon...
1月 25 20:32:00 hadoop104 systemd[1]: Started Ganglia Monitoring Daemon.

(2)在102 啟動

[root@hadoop102 flume-1.9.0]$ sudo systemctl start httpd
[root@hadoop102 flume-1.9.0]$ sudo systemctl status httpd
● httpd.service - The Apache HTTP Server
   Loaded: loaded (/usr/lib/systemd/system/httpd.service; disabled; vendor preset: disabled)
   Active: active (running) since 二 2022-01-25 20:41:19 CST; 2 weeks 4 days ago
     Docs: man:httpd(8)
           man:apachectl(8)
  Process: 86976 ExecStop=/bin/kill -WINCH ${MAINPID} (code=exited, status=0/SUCCESS)
  Process: 3268 ExecReload=/usr/sbin/httpd $OPTIONS -k graceful (code=exited, status=0/SUCCESS)
 Main PID: 86981 (httpd)
   Status: "Total requests: 0; Current requests/sec: 0; Current traffic:   0 B/sec"
   CGroup: /system.slice/httpd.service
           ├─ 3273 /usr/sbin/httpd -DFOREGROUND
           ├─ 3274 /usr/sbin/httpd -DFOREGROUND
           ├─ 3275 /usr/sbin/httpd -DFOREGROUND
           ├─ 3276 /usr/sbin/httpd -DFOREGROUND
           ├─ 3277 /usr/sbin/httpd -DFOREGROUND
           └─86981 /usr/sbin/httpd -DFOREGROUND

[root@hadoop102 flume-1.9.0]$ sudo systemctl start gmetad
[root@hadoop102 flume-1.9.0]$ sudo systemctl status gmetad
● gmetad.service - Ganglia Meta Daemon
   Loaded: loaded (/usr/lib/systemd/system/gmetad.service; disabled; vendor preset: disabled)
   Active: active (running) since 二 2022-01-25 20:32:17 CST; 2 weeks 4 days ago
 Main PID: 86918 (gmetad)
   CGroup: /system.slice/gmetad.service
           └─86918 /usr/sbin/gmetad -d 1

9)打開網頁瀏覽ganglia頁面

http://hadoop102/ganglia

image-20220213175325374

尖叫提示:如果完成以上操作依然出現許可權不足錯誤,請修改/var/lib/ganglia目錄的許可權。

[root@hadoop102 flume-1.9.0]$ sudo chmod -R 777 /var/lib/ganglia

參考:https://www.cnblogs.com/cosmos-wong/p/11980500.html

3.8.2 操作Flume測試監控

1)啟動Flume任務

[root@hadoop102 flume-1.9.0]$ bin/flume-ng agent \
-c conf/ \
-n a1 \
-f job/netcat-flume-logger.conf \
-Dflume.root.logger=INFO,console \
-Dflume.monitoring.type=ganglia \
-Dflume.monitoring.hosts=hadoop102:8649

2)發送數據觀察ganglia監測圖

[root@hadoop102 flume-1.9.0]$ nc localhost 6666

樣式如圖:

image-20220213175934123

image-20220213180047298

圖例說明:

欄位(圖表名稱) 欄位含義
EventPutAttemptCount source嘗試寫入channel的事件總數量
EventPutSuccessCount 成功寫入channel且提交的事件總數量
EventTakeAttemptCount sink嘗試從channel拉取事件的總數量。
EventTakeSuccessCount sink成功讀取的事件的總數量
StartTime channel啟動的時間(毫秒)
StopTime channel停止的時間(毫秒)
ChannelSize 目前channel中事件的總數量
ChannelFillPercentage channel占用百分比
ChannelCapacity channel的容量

IT學習網站

牛牛IT網站

Spring Cloud Alibaba學習資源推薦

Spring Cloud Alibaba 微服務架構實戰
鏈接: https://pan.baidu.com/s/1nJpfbjPyf3n339INybjOQQ?pwd=8t11 提取碼: 8t11
--來自百度網盤超級會員v4的分享
失效加V:x923713


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • #結構 ##順序結構 JAVA的基本結構就是順序結構,除非特意指明,否則就按照順序,一句一句執行。 順序結構是最簡單的演算法結構 語句與語句之間,框與框之間按照從上到下的順序執行,它是任何演算法都離不開的基本演算法結構 ##選擇結構 if單選擇結構 if雙選擇結構 if多選擇結構 嵌套的if結構 swit ...
  • 項目環境docker及docker-compose文檔 1、Linux環境介紹 centos7.6 16G以上記憶體空間(至少8G) 2、靜態IP設置 1、找到配置文件 cd /etc/sysconfig/network-scripts/ vi ifcfg-ens33 TYPE=EthernetPRO ...
  • 前面我們分析了攜程的 apollo(見 詳解apollo的設計與使用),現在再來看看阿裡的 nacos。和 apollo 一樣,nacos 也是一款配置中心,同樣可以實現配置的集中管理、分環境管理、即時生效等等。不過,nacos 還具備了服務發現的功能。這篇博客將重點分析 nacos 和 apoll... ...
  • 關註「WeiyiGeek」點我,點我 設為「特別關註」,每天帶你在B站玩轉網路安全運維、應用開發、物聯網IOT學習! 希望各位看友【關註、點贊、評論、收藏、投幣】,助力每一個夢想。 文章目錄 0x00 快速瞭解 EasyOCR 介紹 EasyOCR 參考來源 0x01 安裝部署 環境依賴 環境安裝 ...
  • Markdown語法學習筆記 {#Top} 寫在前面:本篇筆記僅記錄學習後的總結,以供日後快速回顧使用。 更多擴展語法參考Markdown官網 標題上設置了錨點鏈接 # Markdown語法學習筆記 {#Top} 緊接著下方設置了生成索引目錄 1. 標題 Markdown 語法 HTML 預覽效果 ...
  • 目錄 一.簡介 二.效果演示 三.源碼下載 四.猜你喜歡 零基礎 OpenGL (ES) 學習路線推薦 : OpenGL (ES) 學習目錄 >> OpenGL ES 基礎 零基礎 OpenGL (ES) 學習路線推薦 : OpenGL (ES) 學習目錄 >> OpenGL ES 轉場 零基礎 O ...
  • 最近通過WPF開發項目,為了對WPF知識點進行總結,所以利用業餘時間,開發一個學生信息管理系統【Student Information Management System】。前四篇文章進行了框架搭建和模塊劃分,後臺WebApi介面編寫,以及課程管理模塊,班級管理模塊,學生管理模塊的開發,本文在前四篇... ...
  • 文章內容1 解決 User.findOrCreate({ where: {name: '小明'}, defaults: {age: 5} }) .spread((user, created) ⇒ { if(created false) { user.update({......}) } }) 分析 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...