基於Flume的美團日誌收集系統(二)改進和優化

来源:http://www.cnblogs.com/jirimutu01/archive/2016/01/11/5121901.html
-Advertisement-
Play Games

在《基於Flume的美團日誌收集系統(一)架構和設計》中,我們詳述了基於Flume的美團日誌收集系統的架構設計,以及為什麼做這樣的設計。在本節中,我們將會講述在實際部署和使用過程中遇到的問題,對Flume的功能改進和對系統做的優化。1 Flume的問題總結在Flume的使用過程中,遇到的主要問題如下...


在《基於Flume的美團日誌收集系統(一)架構和設計》中,我們詳述了基於Flume的美團日誌收集系統的架構設計,以及為什麼做這樣的設計。在本節中,我們將會講述在實際部署和使用過程中遇到的問題,對Flume的功能改進和對系統做的優化。

1 Flume的問題總結

在Flume的使用過程中,遇到的主要問題如下:

a. Channel“水土不服”:使用固定大小的MemoryChannel在日誌高峰時常報隊列大小不夠的異常;使用FileChannel又導致IO繁忙的問題;

b. HdfsSink的性能問題:使用HdfsSink向Hdfs寫日誌,在高峰時間速度較慢;

c. 系統的管理問題:配置升級,模塊重啟等;

2 Flume的功能改進和優化點

從上面的問題中可以看到,有一些需求是原生Flume無法滿足的,因此,基於開源的Flume我們增加了許多功能,修改了一些Bug,並且進行一些調優。下麵將對一些主要的方面做一些說明。

2.1 增加Zabbix monitor服務

一方面,Flume本身提供了http, ganglia的監控服務,而我們目前主要使用zabbix做監控。因此,我們為Flume添加了zabbix監控模塊,和sa的監控服務無縫融合。

另一方面,凈化Flume的metrics。只將我們需要的metrics發送給zabbix,避免 zabbix server造成壓力。目前我們最為關心的是Flume能否及時把應用端發送過來的日誌寫到Hdfs上, 對應關註的metrics為:

  • Source : 接收的event數和處理的event數
  • Channel : Channel中擁堵的event數
  • Sink : 已經處理的event數

2.2 為HdfsSink增加自動創建index功能

首先,我們的HdfsSink寫到hadoop的文件採用lzo壓縮存儲。 HdfsSink可以讀取hadoop配置文件中提供的編碼類列表,然後通過配置的方式獲取使用何種壓縮編碼,我們目前使用lzo壓縮數據。採用lzo壓縮而非bz2壓縮,是基於以下測試數據:

event大小(Byte)sink.batch-sizehdfs.batchSize壓縮格式總數據大小(G)耗時(s)平均events/s壓縮後大小(G)
544 300 10000 bz2 9.1 2448 6833 1.36
544 300 10000 lzo 9.1 612 27333 3.49

其次,我們的HdfsSink增加了創建lzo文件後自動創建index功能。Hadoop提供了對lzo創建索引,使得壓縮文件是可切分的,這樣Hadoop Job可以並行處理數據文件。HdfsSink本身lzo壓縮,但寫完lzo文件並不會建索引,我們在close文件之後添加了建索引功能。

 1 /**
 2    * Rename bucketPath file from .tmp to permanent location.
 3    */
 4   private void renameBucket() throws IOException, InterruptedException {
 5       if(bucketPath.equals(targetPath)) {
 6               return;
 7         }
 8 
 9         final Path srcPath = new Path(bucketPath);
10         final Path dstPath = new Path(targetPath);
11 
12         callWithTimeout(new CallRunner<Object>() {
13               @Override
14               public Object call() throws Exception {
15                 if(fileSystem.exists(srcPath)) { // could block
16                       LOG.info("Renaming " + srcPath + " to " + dstPath);
17                      fileSystem.rename(srcPath, dstPath); // could block
18 
19                       //index the dstPath lzo file
20                       if (codeC != null && ".lzo".equals(codeC.getDefaultExtension()) ) {
21                               LzoIndexer lzoIndexer = new LzoIndexer(new Configuration());
22                               lzoIndexer.index(dstPath);
23                       }
24                 }
25                 return null;
26               }
27     });
28 }

 

2.3 增加HdfsSink的開關

我們在HdfsSink和DualChannel中增加開關,當開關打開的情況下,HdfsSink不再往Hdfs上寫數據,並且數據只寫向DualChannel中的FileChannel。以此策略來防止Hdfs的正常停機維護。

2.4 增加DualChannel

Flume本身提供了MemoryChannel和FileChannel。MemoryChannel處理速度快,但緩存大小有限,且沒有持久化;FileChannel則剛好相反。我們希望利用兩者的優勢,在Sink處理速度夠快,Channel沒有緩存過多日誌的時候,就使用MemoryChannel,當Sink處理速度跟不上,又需要Channel能夠緩存下應用端發送過來的日誌時,就使用FileChannel,由此我們開發了DualChannel,能夠智能的在兩個Channel之間切換。

其具體的邏輯如下:

 1 /***
 2  * putToMemChannel indicate put event to memChannel or fileChannel
 3  * takeFromMemChannel indicate take event from memChannel or fileChannel
 4  * */
 5 private AtomicBoolean putToMemChannel = new AtomicBoolean(true);
 6 private AtomicBoolean takeFromMemChannel = new AtomicBoolean(true);
 7 
 8 void doPut(Event event) {
 9         if (switchon && putToMemChannel.get()) {
10               //往memChannel中寫數據
11               memTransaction.put(event);
12 
13               if ( memChannel.isFull() || fileChannel.getQueueSize() > 100) {
14                 putToMemChannel.set(false);
15               }
16         } else {
17               //往fileChannel中寫數據
18               fileTransaction.put(event);
19         }
20   }
21 
22 Event doTake() {
23     Event event = null;
24     if ( takeFromMemChannel.get() ) {
25         //從memChannel中取數據
26         event = memTransaction.take();
27         if (event == null) {
28             takeFromMemChannel.set(false);
29         } 
30     } else {
31         //從fileChannel中取數據
32         event = fileTransaction.take();
33         if (event == null) {
34             takeFromMemChannel.set(true);
35 
36             putToMemChannel.set(true);
37         } 
38     }
39     return event;
40 }

 

2.5 增加NullChannel

Flume提供了NullSink,可以把不需要的日誌通過NullSink直接丟棄,不進行存儲。然而,Source需要先將events存放到Channel中,NullSink再將events取出扔掉。為了提升性能,我們把這一步移到了Channel裡面做,所以開發了NullChannel。

2.6 增加KafkaSink

為支持向Storm提供實時數據流,我們增加了KafkaSink用來向Kafka寫實時數據流。其基本的邏輯如下:

 1 public class KafkaSink extends AbstractSink implements Configurable {
 2         private String zkConnect;
 3         private Integer zkTimeout;
 4         private Integer batchSize;
 5         private Integer queueSize;
 6         private String serializerClass;
 7         private String producerType;
 8         private String topicPrefix;
 9 
10         private Producer<String, String> producer;
11 
12         public void configure(Context context) {
13             //讀取配置,並檢查配置
14         }
15 
16         @Override
17         public synchronized void start() {
18             //初始化producer
19         }
20 
21         @Override
22         public synchronized void stop() {
23             //關閉producer
24         }
25 
26         @Override
27         public Status process() throws EventDeliveryException {
28 
29             Status status = Status.READY;
30 
31             Channel channel = getChannel();
32             Transaction tx = channel.getTransaction();
33             try {
34                     tx.begin();
35 
36                     //將日誌按category分隊列存放
37                     Map<String, List<String>> topic2EventList = new HashMap<String, List<String>>();
38 
39                     //從channel中取batchSize大小的日誌,從header中獲取category,生成topic,並存放於上述的Map中;
40 
41                     //將Map中的數據通過producer發送給kafka 
42 
43                    tx.commit();
44             } catch (Exception e) {
45                     tx.rollback();
46                     throw new EventDeliveryException(e);
47             } finally {
48                 tx.close();
49             }
50             return status;
51         }
52 }

 

2.7 修複和scribe的相容問題

Scribed在通過ScribeSource發送數據包給Flume時,大於4096位元組的包,會先發送一個Dummy包檢查伺服器的反應,而Flume的ScribeSource對於logentry.size()=0的包返回TRY_LATER,此時Scribed就認為出錯,斷開連接。這樣迴圈反覆嘗試,無法真正發送數據。現在在ScribeSource的Thrift介面中,對size為0的情況返回OK,保證後續正常發送數據。

3. Flume系統調優經驗總結

3.1 基礎參數調優經驗

  • HdfsSink中預設的serializer會每寫一行在行尾添加一個換行符,我們日誌本身帶有換行符,這樣會導致每條日誌後面多一個空行,修改配置不要自動添加換行符;
lc.sinks.sink_hdfs.serializer.appendNewline = false
  • 調大MemoryChannel的capacity,儘量利用MemoryChannel快速的處理能力;

  • 調大HdfsSink的batchSize,增加吞吐量,減少hdfs的flush次數;

  • 適當調大HdfsSink的callTimeout,避免不必要的超時錯誤;

3.2 HdfsSink獲取Filename的優化

HdfsSink的path參數指明瞭日誌被寫到Hdfs的位置,該參數中可以引用格式化的參數,將日誌寫到一個動態的目錄中。這方便了日誌的管理。例如我們可以將日誌寫到category分類的目錄,並且按天和按小時存放:

lc.sinks.sink_hdfs.hdfs.path = /user/hive/work/orglog.db/%{category}/dt=%Y%m%d/hour=%H

HdfsS ink中處理每條event時,都要根據配置獲取此event應該寫入的Hdfs path和filename,預設的獲取方法是通過正則表達式替換配置中的變數,獲取真實的path和filename。因為此過程是每條event都要做的操作,耗時很長。通過我們的測試,20萬條日誌,這個操作要耗時6-8s左右。

由於我們目前的path和filename有固定的模式,可以通過字元串拼接獲得。而後者比正則匹配快幾十倍。拼接定符串的方式,20萬條日誌的操作只需要幾百毫秒。

3.3 HdfsSink的b/m/s優化

在我們初始的設計中,所有的日誌都通過一個Channel和一個HdfsSink寫到Hdfs上。我們來看一看這樣做有什麼問題。

首先,我們來看一下HdfsSink在發送數據的邏輯:

 1 //從Channel中取batchSize大小的events
 2 for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
 3     //對每條日誌根據category append到相應的bucketWriter上;
 4     bucketWriter.append(event);
 5  6 
 7 for (BucketWriter bucketWriter : writers) {
 8     //然後對每一個bucketWriter調用相應的flush方法將數據flush到Hdfs上
 9     bucketWriter.flush();
10

 

假設我們的系統中有100個category,batchSize大小設置為20萬。則每20萬條數據,就需要對100個文件進行append或者flush操作。

其次,對於我們的日誌來說,基本符合80/20原則。即20%的category產生了系統80%的日誌量。這樣對大部分日誌來說,每20萬條可能只包含幾條日誌,也需要往Hdfs上flush一次。

上述的情況會導致HdfsSink寫Hdfs的效率極差。下圖是單Channel的情況下每小時的發送量和寫hdfs的時間趨勢圖。

 美團日誌收集系統架構

鑒於這種實際應用場景,我們把日誌進行了大小歸類,分為big, middle和small三類,這樣可以有效的避免小日誌跟著大日誌一起頻繁的flush,提升效果明顯。下圖是分隊列後big隊列的每小時的發送量和寫hdfs的時間趨勢圖。

 美團日誌收集系統架構

4 未來發展

目前,Flume日誌收集系統提供了一個高可用,高可靠,可擴展的分散式服務,已經有效地支持了美團的日誌數據收集工作。

後續,我們將在如下方面繼續研究:

  • 日誌管理系統:圖形化的展示和控制日誌收集系統;

  • 跟進社區發展:跟進Flume 1.5的進展,同時回饋社區;


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 安裝svn需要依賴apr和apr-util這兩個軟體,所以先安裝這兩個軟體下載安裝APRwget http://apache.fayea.com//apr/apr-1.5.2.tar.gztar -zxvf apr-1.5.2.tar.gzcd apr-1.5.2./configure --pref...
  • Linux視頻教程集:序列之一:Linux基本操作和系統管理序列之二:Linux_C編程(IO+多線程+進程管理+進程通信+網路)序列之三:C++精講精煉(語法+STL+IO編程)序列之四:Qt編程式列之五:ARM嵌入式Linux開發(ARM硬體體系+指令集彙編+Linux開發) 鏈接序列1鏈接: ...
  • 概述 SQLite提供了一系列介面供用戶訪問資料庫,主要包括連接資料庫,處理SQL,迭代查詢結果等。本文會針對我們使用SQLite的主要場景,列出核心的API,詳細介紹API的用法並給出代碼用例。1.打開關閉資料庫sqlite3_open_v2原型:int sqlite3_open_v2(cons....
  • 本文轉至:http://database.51cto.com/art/201503/469510_all.htm(只作轉載, 不代表本站和博主同意文中觀點或證實文中信息)Olery成立於2010年,總部位於阿姆斯特丹。該初創公司為酒店行業提供聲譽管理與媒體監控工具,幫助酒店將網路評論和社交媒體反饋轉...
  • 本文轉載自:http://www.innomysql.net/article/23959.html(只作轉載, 不代表本站和博主同意文中觀點或證實文中信息)工作10餘年,沒有一個版本能像MySQL 5.7那樣令我激動與期盼,10月MySQL 5.7 GA版本的發佈,意味著MySQL資料庫終於有能力在...
  • ---------------原創內容,轉載請註明出處。------------一、概述RRDtool(round-robin database tool),即輪詢式資料庫工具(註:並不等同於電腦中的輪詢調度演算法),採用固定大小的空間來存儲數據,設定一個指針,隨數據的讀寫移動,指向最後更新的數據的...
  • 本文轉載自:http://www.oschina.net/translate/why-you-should-never-use-mongodb(只作轉載, 不代表本站和博主同意文中觀點或證實文中信息)免責聲明:我不構建資料庫引擎,但搭建Web應用。每年我大約跑4-6個不同項目,所以我搭建了不少Web...
  • 本文轉載自:http://www.innomysql.net/article/15612.html(只作轉載, 不代表本站和博主同意文中觀點或證實文中信息)前言上周參加了2015年的中國資料庫大會,差不多從第二屆開始就每年都會北京參會,從最早的嘉賓到這次的會場主持人,也算見證了中國資料庫大會的發展吧...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...