( 自己寫的插件,數據序列化,格式化拋出的異常都會導致flume停止,不能繼續取數據,異常可以自己處理 ) 最近在用Flume做數據的收集。用到了裡面的Spooldir的源在使用中有如下的問題: 如果文件的某一行有亂碼,不符合指定的編碼規範,那麼flume會拋出一個exception,然後就停在那兒 ...
(
自己寫的插件,數據序列化,格式化拋出的異常都會導致flume停止,不能繼續取數據,異常可以自己處理
)
最近在用Flume做數據的收集。用到了裡面的Spooldir的源在使用中有如下的問題:
- 如果文件的某一行有亂碼,不符合指定的編碼規範,那麼flume會拋出一個exception,然後就停在那兒了。
- spooldir指定的文件夾中的文件一旦被修改,flume就會拋出一個exception,然後停在那兒了。
其實,flume的最大問題就是不夠魯棒。一旦出現問題,不能跳過,只能死在那兒。不知道flume為什麼要這麼設計。理論上,它應該允許我們在配置文件中指定在遇到錯誤的行時,是停止還是跳過,不過它目前並不支持這個。所以,我們只能寫一個自己的flume的插件了。
https://github.com/xlvector/flume
https://github.com/ponyma/flume
這個插件主要修複了前面提到的兩個問題:
- 如果某一行有亂碼,flume會忽略這一行
- flume只會check最近N分鐘沒有修改過的文件
具體修改方法如下。首先,我們繼承了SpoolDirectorySource,實現了一個叫做RobustSpoolDirectorySource的類。這個類的代碼基本是拷貝了SpoolDirectorySource的代碼。但做瞭如下的修改。
在getNextFile()的函數中,我們發現了一個filter,做瞭如下的修改
FileFilter filter = new FileFilter() {
public boolean accept(File candidate) {
String fileName = candidate.getName();
if ((candidate.isDirectory()) ||
(fileName.endsWith(completedSuffix)) ||
(fileName.startsWith(".")) ||
ignorePattern.matcher(fileName).matches() ||
(System.currentTimeMillis() - candidate.lastModified() < 600000)) {
return false;
}
return true;
}
};
這裡,我們加入了一個條件
(System.currentTimeMillis() - candidate.lastModified() < 600000)
也就是說10分鐘之內修改過的文件我們不會處理。
第二個修改是關於編碼的,你可以在ReliableSpoolingFileEventReader.java的代碼中找到如下的代碼:
ResettableInputStream in =
new ResettableFileInputStream(nextFile, tracker,
ResettableFileInputStream.DEFAULT_BUF_SIZE, inputCharset,
DecodeErrorPolicy.FAIL);
這裡,我們只需要將DecodeErrorPolicy 改成 DecodeErrorPolicy.IGNORE 即可。