Flume + Solr + log4j搭建web日誌採集系統 ...
前言
很多web應用會選擇ELK來做日誌採集系統,這裡選用Flume,一方面是因為熟悉整個Hadoop框架,另一方面,Flume也有很多的優點。
關於Apache Hadoop Ecosystem 請點擊這裡。
Cloudera 官方的教程也是基於這個例子開始的,get-started-with-hadoop-tutorial
並且假設我們已經瞭解Flume(agent, Source, Channel, Sink) , Morphline (ETL), Solr (全文檢索),如果都沒有瞭解,請自行百度。
Scenario (需求)
首先我們有多個web 應用,每個web應用每天都有不斷的日誌產生,這些日誌文件現在以文件的形式存儲在伺服器當中,我們需要收集這些日誌,並對日誌進行查詢。
所以整個流程就是,Flume agent 收集日誌 -> Morphline 進行過濾 -> 對結果進行索引然後在Solr中進行搜索。
Flume 收集日誌
1、使用 Spooling Directory Source
就是監視指定目錄是否有新文件移入,如果有,就會讀取這些Event, 但是文件一旦被移到該目錄之後,就不應該被寫入,目錄 下的文件名也不可重覆,這樣的情況就是需要定期將文件移動到指定的目錄,不能實現實時的讀取。
2、使用 Exec Source
就是通過下麵的命令行產生的結果作為源,在agent 死亡或或者機器重啟的過程可能會存在數據丟失
agent.sources.execSrc.type = exec
agent.sources.execSrc.shell=/bin/bash -c
agent.sources.execSrc.command= tail -F /var/log/flume/flume.log | grep "error: "
1、使用消息中間件JMS或者KAFKA
請參考: 基於Flume+Log4j+Kafka的日誌採集架構方案
客戶端直接發送至kafaka queue , 用 log4j KafkaAppender
2、使用Flume Appender
對於Java web 應用,我們就最簡單直接採取這種方式。 Flume Appender 我們這裡就直接採用log4j2 , 關於日誌框架的這些說明,請看另一片博客 spring boot use log4j log4j 關於flume Appender 的配置
The Flume Appender supports three modes of operation.
1、It can act as a remote Flume client which sends Flume events via Avro to a Flume Agent configured with an Avro Source.(同步,Avro協議)
2、It can act as an embedded Flume Agent where Flume events pass directly into Flume for processing.(非同步,需要維護客戶端 flume)
3、It can persist events to a local BerkeleyDB data store and then asynchronously send the events to Flume, similar to the embedded Flume Agent but without most of the Flume dependencies.(先寫資料庫,再非同步發送)
Usage as an embedded agent will cause the messages to be directly passed to the Flume Channel and then control will be immediately returned to the application. All interaction with remote agents will occur asynchronously. Setting the "type" attribute to "Embedded" will force the use of the embedded agent. In addition, configuring agent properties in the appender configuration will also cause the embedded agent to be used.
我們下麵就簡單的用第一種方式
客戶端配置
log4j.xml
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="warn" name="MyApp" packages="">
<Appenders>
<Flume name="eventLogger" compress="true">
<Agent host="192.168.10.101" port="8800"/>
<Agent host="192.168.10.102" port="8800"/>
<RFC5424Layout enterpriseNumber="18060" includeMDC="true" appName="MyApp"/>
</Flume>
</Appenders>
<Loggers>
<Root level="error">
<AppenderRef ref="eventLogger"/>
</Root>
</Loggers>
</Configuration>
服務端配置
參考:flume log4j appender config
下載 flume, 在conf 目錄下,配置example.conf :
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = org.apache.flume.clients.log4jappender.Log4jAppender
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動 flume
bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
查看日誌,是否成功。
Solr 配置
關於solr的介紹
這裡 solr的數據也是需要存儲到 hdfs中的,另外solr 是通過zookeeper 來管理的
以下配置,這裡用的cloudera manager 安裝,所以自動配好了,但是需要檢驗,如果是手動安裝也有對應的文檔可以直接查看, 另外這裡省略Solr Authentication。
配置 zookeeper service
$ cat /etc/solr/conf/solr-env.sh
export SOLR_ZK_ENSEMBLE=zk01.example.com:2181,zk02.example.com:2181,zk03.example.com:2181/solr
配置 solr use hdfs
$ cat /etc/default/solr
//地址nn01.example.com:8020 是hdfs name node的地址
SOLR_HDFS_HOME=hdfs://nn01.example.com:8020/solr
//To create the /solr directory in HDFS,需要創建/solr hdfs目錄:
$ sudo -u hdfs hdfs dfs -mkdir /solr
$ sudo -u hdfs hdfs dfs -chown solr /solr
initializing the ZooKeeper Namespace
$ sudo service solr-server restart
啟動solr
$ sudo service solr-server restart
solr collection 配置
solr 通過 collection 來組織邏輯數據,所以你需要創建collection,每個collection有自己的配置,文檔上已經講的比較清楚了,而且也不多,這裡不再贅述
Generating Collection Configuration
下麵是的collection是用來存儲上面收集到的日誌:
// 使用預設模版創建instancedir
$ solrctl instancedir --generate $HOME/weblogs_config
// upload instancedir to zookeeper,上傳配置
$ solrctl instancedir --create weblogs_config $HOME/weblogs_config
//verify instance
$ solrctl instancedir --list
// create collection -s shard_count, collection 和config 關聯
$ solrctl collection --create weblogs_collection -s 2 -c weblogs_config
A SolrCloud collection is the top-level object for indexing documents and providing a query interface. Each collection must be associated with an instance directory. Different collections can use the same instance directory. Each collection is typically replicated among several SolrCloud instances. Each replica is called a core and is assigned to an individual Solr service. The assignment process is managed automatically, although you can apply fine-grained control over each individual core using the solrctl core command . 這是 collection 和instance之間關係的介紹
成功創建之後如何修改和擴展,請參考這裡solectl usage
Morphline (ETL)
創建好 collection 之後,我們就需要將日誌解析存儲到solr里,方便檢索。Morphline 就是這個中間過程的ETL工具(extracting, transforming and loading data), Flume 提供了Morphlion Solr Sink, 從 log flume的source中讀取event,經過ETL導入到solr中。
配置flume
繼續上面的flume,中 example.conf 的配置
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4444
# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
a1.sinks.k1.morphlineFile=morphlines.conf
a1.sinks.k1.morphlineId = morphline_log4j2
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
配置Morphline
我們的日誌格式文件如下:
[INFO ] 2017-07-14 11:40:51.556 [main] RequestMappingHandlerAdapter - Detected ResponseBodyAdvice bean in apiResponseAdvice
需要解析成:
level: INFO
create_time: 2017-07-14 11:40:51.556
thread: main
class: RequestMappingHandlerAdapter
- //這裡有個短橫線
message: Detected ResponseBodyAdvice bean in apiResponseAdvice
所以我們使用grok
線上調試工具online tools
# grok get data from unstructured line
{
grok {
dictionaryFiles : [grok-dictionary.conf]
expressions : {
message : """\[%{LOGLEVEL:level} \] %{SC_LOGDATETIME:create_time} \[%{DATA:thread}\] %{WORD:class} [-] %{GREEDYDATA:message}"""
}
}
}
# Consume the output record of the previous command and pipe another
# record downstream.
#
# convert timestamp field to native Solr timestamp format
# e.g. 2017-07-14 11:40:52.512 to 2012-09-06T07:14:34.000Z
{
convertTimestamp {
field : create_time
inputFormats : ["yyyy-MM-dd HH:mm:ss.SSS", "yyyy-MM-dd"]
inputTimezone : America/Los_Angeles
outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"
outputTimezone : UTC
}
}
配置 schema.xml
在上一節,配置solr的時候,我們生成了預設的模版,我們需要根據實際的需求修改schema.xml,在$HOME/weblogs/conf 下
The schema.xml file contains all of the details about which fields your documents can contain, and how those fields should be dealt with when adding documents to the index, or when querying those fields.
schema.xml
solrconfig.xml
<field name="level" type="text_general" indexed="true" stored="true" multiValued="true"/>
<field name="create_time" type="date" indexed="true" stored="true"/>
<field name="thread" type="text_general" indexed="true" stored="true"/>
<field name="class" type="text_general" indexed="true" stored="true"/>
<field name="message" type="text_general" indexed="true" stored="true"/>
重新上傳配置到zookeeper
$ solrctl instancedir --update weblogs_config $HOME/weblogs_config
$ solrctl collection --reload weblogs_collection
總結
到此為止,我們完成了日誌的收集,解析,索引,你可以通過 Hue來進行搜索和查詢了,或者自己定義UI。這個教程比較基礎也相對簡單,但是可以完成基本的需求,也把日誌處理流程走了一遍,剩下的大家自定義即可。
文章來源:https://my.oschina.net/tigerlene/blog/1475239