Apache Flink是一個面向分散式數據流處理和批量數據處理的開源計算平臺,它能夠基於同一個Flink運行時,提供支持流處理和批處理兩種類型應用的功能。 現有的開源計算方案,會把流處理和批處理作為兩種不同的應用類型,因為它們所提供的SLA(Service Level Aggreement)是完全 ...
Apache Flink是一個面向分散式數據流處理和批量數據處理的開源計算平臺,它能夠基於同一個Flink運行時,提供支持流處理和批處理兩種類型應用的功能。
現有的開源計算方案,會把流處理和批處理作為兩種不同的應用類型,因為它們所提供的SLA(Service-Level-Aggreement)是完全不相同的:流處理一般需要支持低延遲、Exactly-once保證,而批處理需要支持高吞吐、高效處理。
Flink從另一個視角看待流處理和批處理,將二者統一起來:Flink是完全支持流處理,也就是說作為流處理看待時輸入數據流是無界的;批處理被作為一種特殊的流處理,只是它的輸入數據流被定義為有界的。
Flink流處理特性:
- 支持高吞吐、低延遲、高性能的流處理
- 支持帶有事件時間的視窗(Window)操作
- 支持有狀態計算的Exactly-once語義
- 支持高度靈活的視窗(Window)操作,支持基於time、count、session,以及data-driven的視窗操作
- 支持具有Backpressure功能的持續流模型
- 支持基於輕量級分散式快照(Snapshot)實現的容錯
- 一個運行時同時支持Batch on Streaming處理和Streaming處理
- Flink在JVM內部實現了自己的記憶體管理
- 支持迭代計算
- 支持程式自動優化:避免特定情況下Shuffle、排序等昂貴操作,中間結果有必要進行緩存
一、架構
Flink以層級式系統形式組件其軟體棧,不同層的棧建立在其下層基礎上,並且各層接受程式不同層的抽象形式。
- 運行時層以JobGraph形式接收程式。JobGraph即為一個一般化的並行數據流圖(data flow),它擁有任意數量的Task來接收和產生data stream。
- DataStream API和DataSet API都會使用單獨編譯的處理方式生成JobGraph。DataSet API使用optimizer來決定針對程式的優化方法,而DataStream API則使用stream builder來完成該任務。
- 在執行JobGraph時,Flink提供了多種候選部署方案(如local,remote,YARN等)。
- Flink附隨了一些產生DataSet或DataStream API程式的的類庫和API:處理邏輯表查詢的Table,機器學習的FlinkML,圖像處理的Gelly,複雜事件處理的CEP。
二、原理
1. 流、轉換、操作符
Flink程式是由Stream和Transformation這兩個基本構建塊組成,其中Stream是一個中間結果數據,而Transformation是一個操作,它對一個或多個輸入Stream進行計算處理,輸出一個或多個結果Stream。
Flink程式被執行的時候,它會被映射為Streaming Dataflow。一個Streaming Dataflow是由一組Stream和Transformation Operator組成,它類似於一個DAG圖,在啟動的時候從一個或多個Source Operator開始,結束於一個或多個Sink Operator。
2. 並行數據流
一個Stream可以被分成多個Stream分區(Stream Partitions),一個Operator可以被分成多個Operator Subtask,每一個Operator Subtask是在不同的線程中獨立執行的。一個Operator的並行度,等於Operator Subtask的個數,一個Stream的並行度總是等於生成它的Operator的並行度。
One-to-one模式
比如從Source[1]到map()[1],它保持了Source的分區特性(Partitioning)和分區內元素處理的有序性,也就是說map()[1]的Subtask看到數據流中記錄的順序,與Source[1]中看到的記錄順序是一致的。
Redistribution模式
這種模式改變了輸入數據流的分區,比如從map()[1]、map()[2]到keyBy()/window()/apply()[1]、keyBy()/window()/apply()[2],上游的Subtask向下游的多個不同的Subtask發送數據,改變了數據流的分區,這與實際應用所選擇的Operator有關係。
3. 任務、操作符鏈
Flink分散式執行環境中,會將多個Operator Subtask串起來組成一個Operator Chain,實際上就是一個執行鏈,每個執行鏈會在TaskManager上一個獨立的線程中執行。
4. 時間
處理Stream中的記錄時,記錄中通常會包含各種典型的時間欄位:
- Event Time:表示事件創建時間
- Ingestion Time:表示事件進入到Flink Dataflow的時間
- Processing Time:表示某個Operator對事件進行處理的本地系統時間
Flink使用WaterMark衡量時間的時間,WaterMark攜帶時間戳t,並被插入到stream中。
- WaterMark的含義是所有時間t'< t的事件都已經發生。
- 針對亂序的的流,WaterMark至關重要,這樣可以允許一些事件到達延遲,而不至於過於影響window視窗的計算。
- 並行數據流中,當Operator有多個輸入流時,Operator的event time以最小流event time為準。
5. 視窗
Flink支持基於時間視窗操作,也支持基於數據的視窗操作:
視窗分類:
- 按分割標準劃分:timeWindow、countWindow
- 按視窗行為劃分:Tumbling Window、Sliding Window、自定義視窗
Tumbling/Sliding Time Window
// Stream of (sensorId, carCnt)
val vehicleCnts: DataStream[(Int, Int)] = ...
val tumblingCnts: DataStream[(Int, Int)] = vehicleCnts
// key stream by sensorId
.keyBy(0)
// tumbling time window of 1 minute length
.timeWindow(Time.minutes(1))
// compute sum over carCnt
.sum(1)
val slidingCnts: DataStream[(Int, Int)] = vehicleCnts
.keyBy(0)
// sliding time window of 1 minute length and 30 secs trigger interval
.timeWindow(Time.minutes(1), Time.seconds(30))
.sum(1)
Tumbling/Sliding Count Window
// Stream of (sensorId, carCnt)
val vehicleCnts: DataStream[(Int, Int)] = ...
val tumblingCnts: DataStream[(Int, Int)] = vehicleCnts
// key stream by sensorId
.keyBy(0)
// tumbling count window of 100 elements size
.countWindow(100)
// compute the carCnt sum
.sum(1)
val slidingCnts: DataStream[(Int, Int)] = vehicleCnts
.keyBy(0)
// sliding count window of 100 elements size and 10 elements trigger interval
.countWindow(100, 10)
.sum(1)
自定義視窗
基本操作:
- window:創建自定義視窗
- trigger:自定義觸發器
- evictor:自定義evictor
- apply:自定義window function
6. 容錯
Barrier機制:
- 出現一個Barrier,在該Barrier之前出現的記錄都屬於該Barrier對應的Snapshot,在該Barrier之後出現的記錄屬於下一個Snapshot。
- 來自不同Snapshot多個Barrier可能同時出現在數據流中,也就是說同一個時刻可能併發生成多個Snapshot。
- 當一個中間(Intermediate)Operator接收到一個Barrier後,它會發送Barrier到屬於該Barrier的Snapshot的數據流中,等到Sink Operator接收到該Barrier後會向Checkpoint Coordinator確認該Snapshot,直到所有的Sink Operator都確認了該Snapshot,才被認為完成了該Snapshot。
對齊:
當Operator接收到多個輸入的數據流時,需要在Snapshot Barrier中對數據流進行排列對齊:
- Operator從一個incoming Stream接收到Snapshot Barrier n,然後暫停處理,直到其它的incoming Stream的Barrier n(否則屬於2個Snapshot的記錄就混在一起了)到達該Operator
- 接收到Barrier n的Stream被臨時擱置,來自這些Stream的記錄不會被處理,而是被放在一個Buffer中。
- 一旦最後一個Stream接收到Barrier n,Operator會emit所有暫存在Buffer中的記錄,然後向Checkpoint Coordinator發送Snapshot n。
- 繼續處理來自多個Stream的記錄
基於Stream Aligning操作能夠實現Exactly Once語義,但是也會給流處理應用帶來延遲,因為為了排列對齊Barrier,會暫時緩存一部分Stream的記錄到Buffer中,尤其是在數據流並行度很高的場景下可能更加明顯,通常以最遲對齊Barrier的一個Stream為處理Buffer中緩存記錄的時刻點。在Flink中,提供了一個開關,選擇是否使用Stream Aligning,如果關掉則Exactly Once會變成At least once。
CheckPoint:
Snapshot並不僅僅是對數據流做了一個狀態的Checkpoint,它也包含了一個Operator內部所持有的狀態,這樣才能夠在保證在流處理系統失敗時能夠正確地恢複數據流處理。狀態包含兩種:
- 系統狀態:一個Operator進行計算處理的時候需要對數據進行緩衝,所以數據緩衝區的狀態是與Operator相關聯的。以視窗操作的緩衝區為例,Flink系統會收集或聚合記錄數據並放到緩衝區中,直到該緩衝區中的數據被處理完成。
- 一種是用戶自定義狀態(狀態可以通過轉換函數進行創建和修改),它可以是函數中的Java對象這樣的簡單變數,也可以是與函數相關的Key/Value狀態。
7. 調度
在JobManager端,會接收到Client提交的JobGraph形式的Flink Job,JobManager會將一個JobGraph轉換映射為一個ExecutionGraph,ExecutionGraph是JobGraph的並行表示,也就是實際JobManager調度一個Job在TaskManager上運行的邏輯視圖。
物理上進行調度,基於資源的分配與使用的一個例子:
- 左上子圖:有2個TaskManager,每個TaskManager有3個Task Slot
- 左下子圖:一個Flink Job,邏輯上包含了1個data source、1個MapFunction、1個ReduceFunction,對應一個JobGraph
- 左下子圖:用戶提交的Flink Job對各個Operator進行的配置——data source的並行度設置為4,MapFunction的並行度也為4,ReduceFunction的並行度為3,在JobManager端對應於ExecutionGraph
- 右上子圖:TaskManager 1上,有2個並行的ExecutionVertex組成的DAG圖,它們各占用一個Task Slot
- 右下子圖:TaskManager 2上,也有2個並行的ExecutionVertex組成的DAG圖,它們也各占用一個Task Slot
- 在2個TaskManager上運行的4個Execution是並行執行的
8. 迭代
機器學習和圖計算應用,都會使用到迭代計算,Flink通過在迭代Operator中定義Step函數來實現迭代演算法,這種迭代演算法包括Iterate和Delta Iterate兩種類型。
Iterate
Iterate Operator是一種簡單的迭代形式:每一輪迭代,Step函數的輸入或者是輸入的整個數據集,或者是上一輪迭代的結果,通過該輪迭代計算出下一輪計算所需要的輸入(也稱為Next Partial Solution),滿足迭代的終止條件後,會輸出最終迭代結果。
流程偽代碼:
IterationState state = getInitialState();
while (!terminationCriterion()) {
state = step(state);
}
setFinalState(state);
Delta Iterate
Delta Iterate Operator實現了增量迭代。
流程偽代碼:
IterationState workset = getInitialState();
IterationState solution = getInitialSolution();
while (!terminationCriterion()) {
(delta, workset) = step(workset, solution);
solution.update(delta)
}
setFinalState(solution);
最小值傳播:
9. Back Pressure監控
流處理系統中,當下游Operator處理速度跟不上的情況,如果下游Operator能夠將自己處理狀態傳播給上游Operator,使得上游Operator處理速度慢下來就會緩解上述問題,比如通過告警的方式通知現有流處理系統存在的問題。
Flink Web界面上提供了對運行Job的Backpressure行為的監控,它通過使用Sampling線程對正在運行的Task進行堆棧跟蹤採樣來實現。
預設情況下,JobManager會每間隔50ms觸發對一個Job的每個Task依次進行100次堆棧跟蹤調用,過計算得到一個比值,例如,radio=0.01,表示100次中僅有1次方法調用阻塞。Flink目前定義瞭如下Backpressure狀態:
OK: 0 <= Ratio <= 0.10
LOW: 0.10 < Ratio <= 0.5
HIGH: 0.5 < Ratio <= 1
三、庫
1. Table
Flink的Table API實現了使用類SQL進行流和批處理。
詳情參考:https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/table_api.html
2. CEP
Flink的CEP(Complex Event Processing)支持在流中發現複雜的事件模式,快速篩選用戶感興趣的數據。
3. Gelly
Gelly是Flink提供的圖計算API,提供了簡化開發和構建圖計算分析應用的介面。
詳情參考:https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/libs/gelly/index.html
4. FlinkML
FlinkML是Flink提供的機器學習庫,提供了可擴展的機器學習演算法、簡潔的API和工具簡化機器學習系統的開發。
詳情參考:https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/libs/ml/index.html
四、部署
當Flink系統啟動時,首先啟動JobManager和一至多個TaskManager。JobManager負責協調Flink系統,TaskManager則是執行並行程式的worker。當系統以本地形式啟動時,一個JobManager和一個TaskManager會啟動在同一個JVM中。
當一個程式被提交後,系統會創建一個Client來進行預處理,將程式轉變成一個並行數據流的形式,交給JobManager和TaskManager執行。
1. 啟動測試
編譯flink,本地啟動。
$ java -version
java version "1.8.0_111"
$ git clone https://github.com/apache/flink.git
$ git checkout release-1.1.4 -b release-1.1.4
$ cd flink
$ mvn clean package -DskipTests
$ cd flink-dist/target/flink-1.1.4-bin/flink-1.1.4
$ ./bin/start-local.sh
編寫本地流處理demo。
SocketWindowWordCount.java
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
// the port to connect to
final int port;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
port = params.getInt("port");
} catch (Exception e) {
System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'");
return;
}
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream("localhost", port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.reduce(new ReduceFunction<WordWithCount>() {
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
// Data type for words with count
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}
pom.xml
<!-- Use this dependency if you are using the DataStream API -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>1.1.4</version>
</dependency>
<!-- Use this dependency if you are using the DataSet API -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.10</artifactId>
<version>1.1.4</version>
</dependency>
執行mvn構建。
$ mvn clean install
$ ls target/flink-demo-1.0-SNAPSHOT.jar
開啟9000埠,用於輸入數據:
$ nc -l 9000
提交flink任務:
$ ./bin/flink run -c com.demo.florian.WordCount $DEMO_DIR/target/flink-demo-1.0-SNAPSHOT.jar --port 9000
在nc里輸入數據後,查看執行結果:
$ tail -f log/flink-*-jobmanager-*.out
查看flink web頁面:localhost:8081
2. 代碼結構
Flink系統核心可分為多個子項目。分割項目旨在減少開發Flink程式需要的依賴數量,並對測試和開發小組件提供便捷。
Flink當前還包括以下子項目:
- Flink-dist:distribution項目。它定義瞭如何將編譯後的代碼、腳本和其他資源整合到最終可用的目錄結構中。
- Flink-quick-start:有關quickstart和教程的腳本、maven原型和示常式序
- flink-contrib:一系列有用戶開發的早起版本和有用的工具的項目。後期的代碼主要由外部貢獻者繼續維護,被flink-contirb接受的代碼的要求低於其他項目的要求。
3. Flink On YARN
Flink在YARN集群上運行時:Flink YARN Client負責與YARN RM通信協商資源請求,Flink JobManager和Flink TaskManager分別申請到Container去運行各自的進程。
YARN AM與Flink JobManager在同一個Container中,這樣AM可以知道Flink JobManager的地址,從而AM可以申請Container去啟動Flink TaskManager。待Flink成功運行在YARN集群上,Flink YARN Client就可以提交Flink Job到Flink JobManager,併進行後續的映射、調度和計算處理。
- 設置Hadoop環境變數
$ export HADOOP_CONF_DIR=/etc/hadoop/conf
- 以集群模式提交任務,每次都會新建flink集群
$ ./bin/flink run -m yarn-cluster -c com.demo.florian.WordCount $DEMO_DIR/target/flink-demo-1.0-SNAPSHOT.jar
- 啟動共用flink集群,提交任務
$ ./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096 -d
$ ./bin/flink run -c com.demo.florian.WordCount $DEMO_DIR/target/flink-demo-1.0.SNAPSHOT.jar
參考資料
http://shiyanjun.cn/archives/1508.html
https://ci.apache.org/projects/flink/flink-docs-release-1.2/index.html