我打算以 flink 官方的 例子 <<Monitoring the Wikipedia Edit Stream>> 作為示例,進行 flink 流計算任務 的源碼解析說明. 其中任務的源碼如下,其中中文註釋 來自 http://flink-china.org/ 後續我會對這個拓撲任務代碼進行逐行的 ...
我打算以 flink 官方的 例子 <<Monitoring the Wikipedia Edit Stream>> 作為示例,進行 flink 流計算任務 的源碼解析說明. 其中任務的源碼如下,其中中文註釋 來自 http://flink-china.org/ 後續我會對這個拓撲任務代碼進行逐行的深入分析,以達到深入瞭解flink代碼運行機制的目的.
public class WikipediaAnalysis { public static void main(String[] args) throws Exception { //首先,構建一個StreamExecutionEnvironment //用來設置運行參數 //當從外部系統讀取數據的時候,也被用來創建源(sources) StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); //讀取 Wikipedia IRC 日誌的源(sources) DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource()); //在本案例中,我們關心的是每個用戶在一個特定時間視窗內(比如說5秒鐘)增加或者刪除內容的位元組數。 //為了實現這個目標,我們需要指定用戶名作為數據流的 key 欄位,也就是說在這個數據流上的操作應該考慮到用戶名。 //在我們的案例中需要對時間視窗中每個唯一用戶的編輯位元組數求和。為了使數據流包含 key,我們需要提供一個KeySelector.ng KeyedStream<WikipediaEditEvent,String> keyedEdits = edits.keyBy(new KeySelector<WikipediaEditEvent, String>() { @Override public String getKey(WikipediaEditEvent event) { return event.getUser(); } }); //它創建了一個WikipediaEditEvent流,以用戶名作為String類型的 key。 //現在我們可以在這個流上指定視窗並且基於這些視窗內的數據計算出結果。 //一個視窗指定了要執行計算的數據流的一個分片。當需要在一個無邊界的數據流上執行聚合計算時,視窗是必不可少的。 //在我們的案例中,我們想要做的就是每5秒鐘一個視窗對編輯位元組數做聚合計算 DataStream<Tuple2<String, Long>> result = keyedEdits //指定了我們需要一個大小為5秒鐘的滾動視窗(非重疊視窗) .timeWindow(Time.seconds(5)) //調用的第二個方法指定了對每個視窗分片中每個唯一的key做 Fold transformation 轉換。 .fold( new Tuple2<>("", 0L), //初始值 new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) { acc.f0 = event.getUser(); acc.f1 += event.getByteDiff(); return acc; } }); //將結果流在終端輸出 result.print(); //將結果輸出到Kafka //result.addSink(new FlinkKafkaProducer010<>("localhost:9092", "wiki-result", new SimpleStringSchema())); //開始執行計算 //像前面的創建數據源,轉換和 Sinks 操作僅僅是構建了一個內部操作圖。 //只有當execute()被調用的時候,這個操作圖才會被扔在集群或者在你的本地機器運行。 see.execute(); } }