Flume – 初識flume、source和sink 目錄基本概念常用源 Source常用sink 基本概念 什麼叫flume? 分散式,可靠的大量日誌收集、聚合和移動工具。 events 事件,是一行數據的位元組數據,是flume發送文件的基本單位。 flume配置文件 重命名flum ...
Flume – 初識flume、source和sink
目錄
基本概念
常用源 Source
常用sink
基本概念
什麼叫flume?
分散式,可靠的大量日誌收集、聚合和移動工具。
events
事件,是一行數據的位元組數據,是flume發送文件的基本單位。
flume配置文件
重命名flume-env.sh.template為flume-env.sh,並添加[export JAVA_HOME=/soft/jdk]
flume的Agent
source //從哪兒讀數據。 負責監控並收集數據。相對於channel是生產者。
channel //數據通道。 通道,相當於數據緩衝區。
sink //將數據傳送往哪兒。 沉槽,負責將數據放置在指定位置。相對於channel是消費者。
flume如何使用
在flume的conf文件下,創建conf尾碼的文件,使用flume命令啟動
flume命令
啟動:flume-ng agent -f /soft/flume/conf/example.conf -n a1
常用源 Source
執行源:Exec Sour //通過linux命令作為source。缺點:失敗後數據會丟失,不能保證數據的完整性。
#定義源:exec
a1.source.r1.type = exec
a1.source.r1.command = tail -F /home/centos/1.txt
滾動目錄源:Spooling Directory Source //監控目錄,如果目錄下有新文件產生,機會將其消費
#定義源:spoodir
a1.source.r1.type = spooldir
#指定監控目錄
a1.source.r1.spoolDir = /home/centos/log
指定類型的文件:tailDir source #監控目錄中指定類型的文件,並監控其消費偏移量;
通過~/.flume/taildir_position.json監控並實時記錄文件偏移量,可通過a1.sources.r1.positionFile配置進行修改
#定義源:TAILDIR
a1.source.r1.type = TAILDIR
#指定監控文件組
a1.source.r1.filegroups = g1
#指定g1組中包含的文件
a1.source.r1.filegroups.g1 = /home/centos/log/.*log
順序數字源:Sequence Generator Source //產生順序數字的源,用作測試
#定義源:seq
a1.source.r1.type = seq
#定義一次RPC產生的批次數量
a1.source.r1.batchSize = 1024
壓力源:Stress Source //測試集群壓力,用作負載測試
#定義源:stress
a1.source.r1.type = org.apache.flume.source.StressSource
#一個event產生的數據量
a1.source.r1.size = 1073741824
常用sink
日誌&控制台:logger sink
a1.sinks.k1.type = logger
存儲在本地文件:File Roll Sink
#設置滾動文件sink
a1.sinks.k1.type = file_roll
#指定文件位置。若文件不存在會報錯
a1.sinks.k1.directory = /home/centos/log2
#設置滾動周期間隔,0即不滾動;預設30s。
a1.sinks.k1.sink.rollInterval = 0
寫入到hdfsL:HDFS Sink //預設SequenceFile,可以通過hdfs.fileType指定(SequenceFile, DataStream or CompressedStream)
#指定類型
a1.sinks.k1.type = hdfs
#指定路徑,不用單獨創建文件夾
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H
#時間相關的配置,必須指定時間戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#實例化文件的首碼
a1.sinks.k1.hdfs.filePrefix = events-
#滾動間隔,0為不滾動
a1.sinks.k1.hdfs.rollInterval = 0
#滾動大小;預設1024
a1.sinks.k1.hdfs.rollSize = 1024
#指定數據類型;預設為 sequenceFile
a1.sinks.k1.hdfs.fileType = CompressedStream
#指定壓縮編解碼器
a1.sinks.k1.hdfs.codeC = gzip
寫入到Hbase:hbase sink //需要創建表,無法指定rowkey和col
#設置類型
a1.sinks.k1.type = hbase
a1.sinks.k1.table = ns1:flume
a1.sinks.k1.columnFaminly = f1
寫入到Hbase:regexhbase sink //需要創建表,可以手動指定rowKey和col
#設置正則hbase類型
a1.sinks.k1.type = hbase
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
#手動指定rowkey和列,[ROW_KEY]必須些,且大寫
a1.sinks.k1.serializer.colNames = ROW_KEY,name,age
#指定正則,與col對應
a1.sinks.k1.serializer.regex = (.*),(.*),(.*)
#指定rowkey索引
a1.sinks.k1.serializer.rowKeyIndex = 0
a1.sinks.k1.table = ns1:flume
a1.sinks.k1.coluFamily = f1
寫入到Hive:hive sink //需要啟動hive的事務性
# 設置hive sink
a1.sinks.k1.type = hive
# 需要啟動hive的metastore:hive --service metastore //metastore源數據倉庫
a1.sinks.k1.hive.metastore = thrift://s101:9083
a1.sinks.k1.hive.database = default
# 需要創建事務表
a1.sinks.k1.hive.table = tx2
# 指定列和欄位的映射
a1.sinks.k1.serializer = DELIMITED
# 指定輸入的格式,必須是雙引號
a1.sinks.k1.serializer.delimiter = "\t"
# 指定hive存儲文件展現方式,必須是單引號
a1.sinks.k1.serializer.serdeSeparator = '\t'
a1.sinks.k1.serializer.fieldnames =id,name,age
寫入到hive補充
1、首先將/soft/hive/hcatalog/share/hcatalog中的所有jar拷貝到hive的lib庫中
cp /soft/hive/hcatalog/share/hcatalog/* /soft/hive/lib/
2、啟動hive的metastore
hive --service metastore
3、啟動hive並創建事務表
SET hive.support.concurrency = true;
SET hive.enforce.bucketing = true;
SET hive.exec.dynamic.partition.mode = nonstrict;
SET hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
SET hive.compactor.initiator.on = true;
SET hive.compactor.worker.threads = 1;
create table tx2(id int ,name string, age int ) clustered by (id) into 2 buckets stored as orc TBLPROPERTIES('transactional'='true');
4、啟動flume,並使用以上的配置文件
flume-ng agent -f k_hive.conf -n a1
5、輸入數據驗證
1 tom 18