在《基於Flume的美團日誌收集系統(一)架構和設計》中,我們詳述了基於Flume的美團日誌收集系統的架構設計,以及為什麼做這樣的設計。在本節中,我們將會講述在實際部署和使用過程中遇到的問題,對Flume的功能改進和對系統做的優化。1 Flume的問題總結在Flume的使用過程中,遇到的主要問題如下...
在《基於Flume的美團日誌收集系統(一)架構和設計》中,我們詳述了基於Flume的美團日誌收集系統的架構設計,以及為什麼做這樣的設計。在本節中,我們將會講述在實際部署和使用過程中遇到的問題,對Flume的功能改進和對系統做的優化。
1 Flume的問題總結
在Flume的使用過程中,遇到的主要問題如下:
a. Channel“水土不服”:使用固定大小的MemoryChannel在日誌高峰時常報隊列大小不夠的異常;使用FileChannel又導致IO繁忙的問題;
b. HdfsSink的性能問題:使用HdfsSink向Hdfs寫日誌,在高峰時間速度較慢;
c. 系統的管理問題:配置升級,模塊重啟等;
2 Flume的功能改進和優化點
從上面的問題中可以看到,有一些需求是原生Flume無法滿足的,因此,基於開源的Flume我們增加了許多功能,修改了一些Bug,並且進行一些調優。下麵將對一些主要的方面做一些說明。
2.1 增加Zabbix monitor服務
一方面,Flume本身提供了http, ganglia的監控服務,而我們目前主要使用zabbix做監控。因此,我們為Flume添加了zabbix監控模塊,和sa的監控服務無縫融合。
另一方面,凈化Flume的metrics。只將我們需要的metrics發送給zabbix,避免 zabbix server造成壓力。目前我們最為關心的是Flume能否及時把應用端發送過來的日誌寫到Hdfs上, 對應關註的metrics為:
- Source : 接收的event數和處理的event數
- Channel : Channel中擁堵的event數
- Sink : 已經處理的event數
2.2 為HdfsSink增加自動創建index功能
首先,我們的HdfsSink寫到hadoop的文件採用lzo壓縮存儲。 HdfsSink可以讀取hadoop配置文件中提供的編碼類列表,然後通過配置的方式獲取使用何種壓縮編碼,我們目前使用lzo壓縮數據。採用lzo壓縮而非bz2壓縮,是基於以下測試數據:
event大小(Byte) | sink.batch-size | hdfs.batchSize | 壓縮格式 | 總數據大小(G) | 耗時(s) | 平均events/s | 壓縮後大小(G) |
---|---|---|---|---|---|---|---|
544 | 300 | 10000 | bz2 | 9.1 | 2448 | 6833 | 1.36 |
544 | 300 | 10000 | lzo | 9.1 | 612 | 27333 | 3.49 |
其次,我們的HdfsSink增加了創建lzo文件後自動創建index功能。Hadoop提供了對lzo創建索引,使得壓縮文件是可切分的,這樣Hadoop Job可以並行處理數據文件。HdfsSink本身lzo壓縮,但寫完lzo文件並不會建索引,我們在close文件之後添加了建索引功能。
1 /** 2 * Rename bucketPath file from .tmp to permanent location. 3 */ 4 private void renameBucket() throws IOException, InterruptedException { 5 if(bucketPath.equals(targetPath)) { 6 return; 7 } 8 9 final Path srcPath = new Path(bucketPath); 10 final Path dstPath = new Path(targetPath); 11 12 callWithTimeout(new CallRunner<Object>() { 13 @Override 14 public Object call() throws Exception { 15 if(fileSystem.exists(srcPath)) { // could block 16 LOG.info("Renaming " + srcPath + " to " + dstPath); 17 fileSystem.rename(srcPath, dstPath); // could block 18 19 //index the dstPath lzo file 20 if (codeC != null && ".lzo".equals(codeC.getDefaultExtension()) ) { 21 LzoIndexer lzoIndexer = new LzoIndexer(new Configuration()); 22 lzoIndexer.index(dstPath); 23 } 24 } 25 return null; 26 } 27 }); 28 }
2.3 增加HdfsSink的開關
我們在HdfsSink和DualChannel中增加開關,當開關打開的情況下,HdfsSink不再往Hdfs上寫數據,並且數據只寫向DualChannel中的FileChannel。以此策略來防止Hdfs的正常停機維護。
2.4 增加DualChannel
Flume本身提供了MemoryChannel和FileChannel。MemoryChannel處理速度快,但緩存大小有限,且沒有持久化;FileChannel則剛好相反。我們希望利用兩者的優勢,在Sink處理速度夠快,Channel沒有緩存過多日誌的時候,就使用MemoryChannel,當Sink處理速度跟不上,又需要Channel能夠緩存下應用端發送過來的日誌時,就使用FileChannel,由此我們開發了DualChannel,能夠智能的在兩個Channel之間切換。
其具體的邏輯如下:
1 /*** 2 * putToMemChannel indicate put event to memChannel or fileChannel 3 * takeFromMemChannel indicate take event from memChannel or fileChannel 4 * */ 5 private AtomicBoolean putToMemChannel = new AtomicBoolean(true); 6 private AtomicBoolean takeFromMemChannel = new AtomicBoolean(true); 7 8 void doPut(Event event) { 9 if (switchon && putToMemChannel.get()) { 10 //往memChannel中寫數據 11 memTransaction.put(event); 12 13 if ( memChannel.isFull() || fileChannel.getQueueSize() > 100) { 14 putToMemChannel.set(false); 15 } 16 } else { 17 //往fileChannel中寫數據 18 fileTransaction.put(event); 19 } 20 } 21 22 Event doTake() { 23 Event event = null; 24 if ( takeFromMemChannel.get() ) { 25 //從memChannel中取數據 26 event = memTransaction.take(); 27 if (event == null) { 28 takeFromMemChannel.set(false); 29 } 30 } else { 31 //從fileChannel中取數據 32 event = fileTransaction.take(); 33 if (event == null) { 34 takeFromMemChannel.set(true); 35 36 putToMemChannel.set(true); 37 } 38 } 39 return event; 40 }
2.5 增加NullChannel
Flume提供了NullSink,可以把不需要的日誌通過NullSink直接丟棄,不進行存儲。然而,Source需要先將events存放到Channel中,NullSink再將events取出扔掉。為了提升性能,我們把這一步移到了Channel裡面做,所以開發了NullChannel。
2.6 增加KafkaSink
為支持向Storm提供實時數據流,我們增加了KafkaSink用來向Kafka寫實時數據流。其基本的邏輯如下:
1 public class KafkaSink extends AbstractSink implements Configurable { 2 private String zkConnect; 3 private Integer zkTimeout; 4 private Integer batchSize; 5 private Integer queueSize; 6 private String serializerClass; 7 private String producerType; 8 private String topicPrefix; 9 10 private Producer<String, String> producer; 11 12 public void configure(Context context) { 13 //讀取配置,並檢查配置 14 } 15 16 @Override 17 public synchronized void start() { 18 //初始化producer 19 } 20 21 @Override 22 public synchronized void stop() { 23 //關閉producer 24 } 25 26 @Override 27 public Status process() throws EventDeliveryException { 28 29 Status status = Status.READY; 30 31 Channel channel = getChannel(); 32 Transaction tx = channel.getTransaction(); 33 try { 34 tx.begin(); 35 36 //將日誌按category分隊列存放 37 Map<String, List<String>> topic2EventList = new HashMap<String, List<String>>(); 38 39 //從channel中取batchSize大小的日誌,從header中獲取category,生成topic,並存放於上述的Map中; 40 41 //將Map中的數據通過producer發送給kafka 42 43 tx.commit(); 44 } catch (Exception e) { 45 tx.rollback(); 46 throw new EventDeliveryException(e); 47 } finally { 48 tx.close(); 49 } 50 return status; 51 } 52 }
2.7 修複和scribe的相容問題
Scribed在通過ScribeSource發送數據包給Flume時,大於4096位元組的包,會先發送一個Dummy包檢查伺服器的反應,而Flume的ScribeSource對於logentry.size()=0的包返回TRY_LATER,此時Scribed就認為出錯,斷開連接。這樣迴圈反覆嘗試,無法真正發送數據。現在在ScribeSource的Thrift介面中,對size為0的情況返回OK,保證後續正常發送數據。
3. Flume系統調優經驗總結
3.1 基礎參數調優經驗
- HdfsSink中預設的serializer會每寫一行在行尾添加一個換行符,我們日誌本身帶有換行符,這樣會導致每條日誌後面多一個空行,修改配置不要自動添加換行符;
lc.sinks.sink_hdfs.serializer.appendNewline = false
-
調大MemoryChannel的capacity,儘量利用MemoryChannel快速的處理能力;
-
調大HdfsSink的batchSize,增加吞吐量,減少hdfs的flush次數;
-
適當調大HdfsSink的callTimeout,避免不必要的超時錯誤;
3.2 HdfsSink獲取Filename的優化
HdfsSink的path參數指明瞭日誌被寫到Hdfs的位置,該參數中可以引用格式化的參數,將日誌寫到一個動態的目錄中。這方便了日誌的管理。例如我們可以將日誌寫到category分類的目錄,並且按天和按小時存放:
lc.sinks.sink_hdfs.hdfs.path = /user/hive/work/orglog.db/%{category}/dt=%Y%m%d/hour=%H
HdfsS ink中處理每條event時,都要根據配置獲取此event應該寫入的Hdfs path和filename,預設的獲取方法是通過正則表達式替換配置中的變數,獲取真實的path和filename。因為此過程是每條event都要做的操作,耗時很長。通過我們的測試,20萬條日誌,這個操作要耗時6-8s左右。
由於我們目前的path和filename有固定的模式,可以通過字元串拼接獲得。而後者比正則匹配快幾十倍。拼接定符串的方式,20萬條日誌的操作只需要幾百毫秒。
3.3 HdfsSink的b/m/s優化
在我們初始的設計中,所有的日誌都通過一個Channel和一個HdfsSink寫到Hdfs上。我們來看一看這樣做有什麼問題。
首先,我們來看一下HdfsSink在發送數據的邏輯:
1 //從Channel中取batchSize大小的events 2 for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) { 3 //對每條日誌根據category append到相應的bucketWriter上; 4 bucketWriter.append(event); 5 } 6 7 for (BucketWriter bucketWriter : writers) { 8 //然後對每一個bucketWriter調用相應的flush方法將數據flush到Hdfs上 9 bucketWriter.flush(); 10 }
假設我們的系統中有100個category,batchSize大小設置為20萬。則每20萬條數據,就需要對100個文件進行append或者flush操作。
其次,對於我們的日誌來說,基本符合80/20原則。即20%的category產生了系統80%的日誌量。這樣對大部分日誌來說,每20萬條可能只包含幾條日誌,也需要往Hdfs上flush一次。
上述的情況會導致HdfsSink寫Hdfs的效率極差。下圖是單Channel的情況下每小時的發送量和寫hdfs的時間趨勢圖。
鑒於這種實際應用場景,我們把日誌進行了大小歸類,分為big, middle和small三類,這樣可以有效的避免小日誌跟著大日誌一起頻繁的flush,提升效果明顯。下圖是分隊列後big隊列的每小時的發送量和寫hdfs的時間趨勢圖。
4 未來發展
目前,Flume日誌收集系統提供了一個高可用,高可靠,可擴展的分散式服務,已經有效地支持了美團的日誌數據收集工作。
後續,我們將在如下方面繼續研究:
-
日誌管理系統:圖形化的展示和控制日誌收集系統;
-
跟進社區發展:跟進Flume 1.5的進展,同時回饋社區;