Flink架構、原理與部署測試

来源:http://www.cnblogs.com/fanzhidongyzby/archive/2017/01/18/6297723.html
-Advertisement-
Play Games

Apache Flink是一個面向分散式數據流處理和批量數據處理的開源計算平臺,它能夠基於同一個Flink運行時,提供支持流處理和批處理兩種類型應用的功能。 現有的開源計算方案,會把流處理和批處理作為兩種不同的應用類型,因為它們所提供的SLA(Service Level Aggreement)是完全 ...


Apache Flink是一個面向分散式數據流處理和批量數據處理的開源計算平臺,它能夠基於同一個Flink運行時,提供支持流處理和批處理兩種類型應用的功能。

現有的開源計算方案,會把流處理和批處理作為兩種不同的應用類型,因為它們所提供的SLA(Service-Level-Aggreement)是完全不相同的:流處理一般需要支持低延遲、Exactly-once保證,而批處理需要支持高吞吐、高效處理。

Flink從另一個視角看待流處理和批處理,將二者統一起來:Flink是完全支持流處理,也就是說作為流處理看待時輸入數據流是無界的;批處理被作為一種特殊的流處理,只是它的輸入數據流被定義為有界的

Flink流處理特性:

  1. 支持高吞吐、低延遲、高性能的流處理
  2. 支持帶有事件時間的視窗(Window)操作
  3. 支持有狀態計算的Exactly-once語義
  4. 支持高度靈活的視窗(Window)操作,支持基於time、count、session,以及data-driven的視窗操作
  5. 支持具有Backpressure功能的持續流模型
  6. 支持基於輕量級分散式快照(Snapshot)實現的容錯
  7. 一個運行時同時支持Batch on Streaming處理和Streaming處理
  8. Flink在JVM內部實現了自己的記憶體管理
  9. 支持迭代計算
  10. 支持程式自動優化:避免特定情況下Shuffle、排序等昂貴操作,中間結果有必要進行緩存

一、架構

Flink以層級式系統形式組件其軟體棧,不同層的棧建立在其下層基礎上,並且各層接受程式不同層的抽象形式。

  1. 運行時層以JobGraph形式接收程式。JobGraph即為一個一般化的並行數據流圖(data flow),它擁有任意數量的Task來接收和產生data stream。
  2. DataStream API和DataSet API都會使用單獨編譯的處理方式生成JobGraph。DataSet API使用optimizer來決定針對程式的優化方法,而DataStream API則使用stream builder來完成該任務。
  3. 在執行JobGraph時,Flink提供了多種候選部署方案(如local,remote,YARN等)。
  4. Flink附隨了一些產生DataSet或DataStream API程式的的類庫和API:處理邏輯表查詢的Table,機器學習的FlinkML,圖像處理的Gelly,複雜事件處理的CEP。

二、原理

1. 流、轉換、操作符

Flink程式是由Stream和Transformation這兩個基本構建塊組成,其中Stream是一個中間結果數據,而Transformation是一個操作,它對一個或多個輸入Stream進行計算處理,輸出一個或多個結果Stream。

Flink程式被執行的時候,它會被映射為Streaming Dataflow。一個Streaming Dataflow是由一組Stream和Transformation Operator組成,它類似於一個DAG圖,在啟動的時候從一個或多個Source Operator開始,結束於一個或多個Sink Operator。

2. 並行數據流

一個Stream可以被分成多個Stream分區(Stream Partitions),一個Operator可以被分成多個Operator Subtask,每一個Operator Subtask是在不同的線程中獨立執行的。一個Operator的並行度,等於Operator Subtask的個數,一個Stream的並行度總是等於生成它的Operator的並行度。

One-to-one模式
比如從Source[1]到map()[1],它保持了Source的分區特性(Partitioning)和分區內元素處理的有序性,也就是說map()[1]的Subtask看到數據流中記錄的順序,與Source[1]中看到的記錄順序是一致的。

Redistribution模式
這種模式改變了輸入數據流的分區,比如從map()[1]、map()[2]到keyBy()/window()/apply()[1]、keyBy()/window()/apply()[2],上游的Subtask向下游的多個不同的Subtask發送數據,改變了數據流的分區,這與實際應用所選擇的Operator有關係。

3. 任務、操作符鏈

Flink分散式執行環境中,會將多個Operator Subtask串起來組成一個Operator Chain,實際上就是一個執行鏈,每個執行鏈會在TaskManager上一個獨立的線程中執行。

4. 時間

處理Stream中的記錄時,記錄中通常會包含各種典型的時間欄位:

  1. Event Time:表示事件創建時間
  2. Ingestion Time:表示事件進入到Flink Dataflow的時間
  3. Processing Time:表示某個Operator對事件進行處理的本地系統時間

Flink使用WaterMark衡量時間的時間,WaterMark攜帶時間戳t,並被插入到stream中。

  1. WaterMark的含義是所有時間t'< t的事件都已經發生。
  2. 針對亂序的的流,WaterMark至關重要,這樣可以允許一些事件到達延遲,而不至於過於影響window視窗的計算。
  3. 並行數據流中,當Operator有多個輸入流時,Operator的event time以最小流event time為準。

5. 視窗

Flink支持基於時間視窗操作,也支持基於數據的視窗操作:

視窗分類:

  1. 按分割標準劃分:timeWindow、countWindow
  2. 按視窗行為劃分:Tumbling Window、Sliding Window、自定義視窗

Tumbling/Sliding Time Window

// Stream of (sensorId, carCnt)
val vehicleCnts: DataStream[(Int, Int)] = ...

val tumblingCnts: DataStream[(Int, Int)] = vehicleCnts
  // key stream by sensorId
  .keyBy(0) 
  // tumbling time window of 1 minute length
  .timeWindow(Time.minutes(1))
  // compute sum over carCnt
  .sum(1) 

val slidingCnts: DataStream[(Int, Int)] = vehicleCnts
  .keyBy(0) 
  // sliding time window of 1 minute length and 30 secs trigger interval
  .timeWindow(Time.minutes(1), Time.seconds(30))
  .sum(1)

Tumbling/Sliding Count Window

// Stream of (sensorId, carCnt)
val vehicleCnts: DataStream[(Int, Int)] = ...

val tumblingCnts: DataStream[(Int, Int)] = vehicleCnts
  // key stream by sensorId
  .keyBy(0)
  // tumbling count window of 100 elements size
  .countWindow(100)
  // compute the carCnt sum 
  .sum(1)

val slidingCnts: DataStream[(Int, Int)] = vehicleCnts
  .keyBy(0)
  // sliding count window of 100 elements size and 10 elements trigger interval
  .countWindow(100, 10)
  .sum(1)

自定義視窗

基本操作:

  1. window:創建自定義視窗
  2. trigger:自定義觸發器
  3. evictor:自定義evictor
  4. apply:自定義window function

6. 容錯

Barrier機制:

  1. 出現一個Barrier,在該Barrier之前出現的記錄都屬於該Barrier對應的Snapshot,在該Barrier之後出現的記錄屬於下一個Snapshot。
  2. 來自不同Snapshot多個Barrier可能同時出現在數據流中,也就是說同一個時刻可能併發生成多個Snapshot。
  3. 當一個中間(Intermediate)Operator接收到一個Barrier後,它會發送Barrier到屬於該Barrier的Snapshot的數據流中,等到Sink Operator接收到該Barrier後會向Checkpoint Coordinator確認該Snapshot,直到所有的Sink Operator都確認了該Snapshot,才被認為完成了該Snapshot。

對齊:

當Operator接收到多個輸入的數據流時,需要在Snapshot Barrier中對數據流進行排列對齊:

  1. Operator從一個incoming Stream接收到Snapshot Barrier n,然後暫停處理,直到其它的incoming Stream的Barrier n(否則屬於2個Snapshot的記錄就混在一起了)到達該Operator
  2. 接收到Barrier n的Stream被臨時擱置,來自這些Stream的記錄不會被處理,而是被放在一個Buffer中。
  3. 一旦最後一個Stream接收到Barrier n,Operator會emit所有暫存在Buffer中的記錄,然後向Checkpoint Coordinator發送Snapshot n。
  4. 繼續處理來自多個Stream的記錄

基於Stream Aligning操作能夠實現Exactly Once語義,但是也會給流處理應用帶來延遲,因為為了排列對齊Barrier,會暫時緩存一部分Stream的記錄到Buffer中,尤其是在數據流並行度很高的場景下可能更加明顯,通常以最遲對齊Barrier的一個Stream為處理Buffer中緩存記錄的時刻點。在Flink中,提供了一個開關,選擇是否使用Stream Aligning,如果關掉則Exactly Once會變成At least once。

CheckPoint:
Snapshot並不僅僅是對數據流做了一個狀態的Checkpoint,它也包含了一個Operator內部所持有的狀態,這樣才能夠在保證在流處理系統失敗時能夠正確地恢複數據流處理。狀態包含兩種:

  1. 系統狀態:一個Operator進行計算處理的時候需要對數據進行緩衝,所以數據緩衝區的狀態是與Operator相關聯的。以視窗操作的緩衝區為例,Flink系統會收集或聚合記錄數據並放到緩衝區中,直到該緩衝區中的數據被處理完成。
  2. 一種是用戶自定義狀態(狀態可以通過轉換函數進行創建和修改),它可以是函數中的Java對象這樣的簡單變數,也可以是與函數相關的Key/Value狀態。

7. 調度

在JobManager端,會接收到Client提交的JobGraph形式的Flink Job,JobManager會將一個JobGraph轉換映射為一個ExecutionGraph,ExecutionGraph是JobGraph的並行表示,也就是實際JobManager調度一個Job在TaskManager上運行的邏輯視圖。

物理上進行調度,基於資源的分配與使用的一個例子:

  1. 左上子圖:有2個TaskManager,每個TaskManager有3個Task Slot
  2. 左下子圖:一個Flink Job,邏輯上包含了1個data source、1個MapFunction、1個ReduceFunction,對應一個JobGraph
  3. 左下子圖:用戶提交的Flink Job對各個Operator進行的配置——data source的並行度設置為4,MapFunction的並行度也為4,ReduceFunction的並行度為3,在JobManager端對應於ExecutionGraph
  4. 右上子圖:TaskManager 1上,有2個並行的ExecutionVertex組成的DAG圖,它們各占用一個Task Slot
  5. 右下子圖:TaskManager 2上,也有2個並行的ExecutionVertex組成的DAG圖,它們也各占用一個Task Slot
  6. 在2個TaskManager上運行的4個Execution是並行執行的

8. 迭代

機器學習和圖計算應用,都會使用到迭代計算,Flink通過在迭代Operator中定義Step函數來實現迭代演算法,這種迭代演算法包括Iterate和Delta Iterate兩種類型。

Iterate

Iterate Operator是一種簡單的迭代形式:每一輪迭代,Step函數的輸入或者是輸入的整個數據集,或者是上一輪迭代的結果,通過該輪迭代計算出下一輪計算所需要的輸入(也稱為Next Partial Solution),滿足迭代的終止條件後,會輸出最終迭代結果。

流程偽代碼:

IterationState state = getInitialState();

while (!terminationCriterion()) {
    state = step(state);
}

setFinalState(state);

Delta Iterate

Delta Iterate Operator實現了增量迭代。

流程偽代碼:

IterationState workset = getInitialState();
IterationState solution = getInitialSolution();

while (!terminationCriterion()) {
    (delta, workset) = step(workset, solution);

    solution.update(delta)
}

setFinalState(solution);

最小值傳播:

9. Back Pressure監控

流處理系統中,當下游Operator處理速度跟不上的情況,如果下游Operator能夠將自己處理狀態傳播給上游Operator,使得上游Operator處理速度慢下來就會緩解上述問題,比如通過告警的方式通知現有流處理系統存在的問題。

Flink Web界面上提供了對運行Job的Backpressure行為的監控,它通過使用Sampling線程對正在運行的Task進行堆棧跟蹤採樣來實現。

預設情況下,JobManager會每間隔50ms觸發對一個Job的每個Task依次進行100次堆棧跟蹤調用,過計算得到一個比值,例如,radio=0.01,表示100次中僅有1次方法調用阻塞。Flink目前定義瞭如下Backpressure狀態:
OK: 0 <= Ratio <= 0.10
LOW: 0.10 < Ratio <= 0.5
HIGH: 0.5 < Ratio <= 1

三、庫

1. Table

Flink的Table API實現了使用類SQL進行流和批處理。

詳情參考:https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/table_api.html

2. CEP

Flink的CEP(Complex Event Processing)支持在流中發現複雜的事件模式,快速篩選用戶感興趣的數據。

詳情參考:https://ci.apache.org/projects/flink/flink-docs-release-1.2/concepts/programming-model.html#next-steps

3. Gelly

Gelly是Flink提供的圖計算API,提供了簡化開發和構建圖計算分析應用的介面。

詳情參考:https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/libs/gelly/index.html

4. FlinkML

FlinkML是Flink提供的機器學習庫,提供了可擴展的機器學習演算法、簡潔的API和工具簡化機器學習系統的開發。

詳情參考:https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/libs/ml/index.html

四、部署

當Flink系統啟動時,首先啟動JobManager和一至多個TaskManager。JobManager負責協調Flink系統,TaskManager則是執行並行程式的worker。當系統以本地形式啟動時,一個JobManager和一個TaskManager會啟動在同一個JVM中。
當一個程式被提交後,系統會創建一個Client來進行預處理,將程式轉變成一個並行數據流的形式,交給JobManager和TaskManager執行。

1. 啟動測試

編譯flink,本地啟動。

$ java -version
java version "1.8.0_111"
$ git clone https://github.com/apache/flink.git
$ git checkout release-1.1.4 -b release-1.1.4
$ cd flink
$ mvn clean package -DskipTests
$ cd flink-dist/target/flink-1.1.4-bin/flink-1.1.4
$ ./bin/start-local.sh

編寫本地流處理demo。

SocketWindowWordCount.java

public class SocketWindowWordCount {
    public static void main(String[] args) throws Exception {

        // the port to connect to
        final int port;
        try {
            final ParameterTool params = ParameterTool.fromArgs(args);
            port = params.getInt("port");
        } catch (Exception e) {
            System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'");
            return;
        }

        // get the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // get input data by connecting to the socket
        DataStream<String> text = env.socketTextStream("localhost", port, "\n");

        // parse the data, group it, window it, and aggregate the counts
        DataStream<WordWithCount> windowCounts = text
                .flatMap(new FlatMapFunction<String, WordWithCount>() {
                    public void flatMap(String value, Collector<WordWithCount> out) {
                        for (String word : value.split("\\s")) {
                            out.collect(new WordWithCount(word, 1L));
                        }
                    }
                })
                .keyBy("word")
                .timeWindow(Time.seconds(5), Time.seconds(1))
                .reduce(new ReduceFunction<WordWithCount>() {
                    public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                        return new WordWithCount(a.word, a.count + b.count);
                    }
                });

        // print the results with a single thread, rather than in parallel
        windowCounts.print().setParallelism(1);

        env.execute("Socket Window WordCount");
    }

    // Data type for words with count
    public static class WordWithCount {

        public String word;
        public long count;

        public WordWithCount() {}

        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return word + " : " + count;
        }
    }
}

pom.xml

<!-- Use this dependency if you are using the DataStream API -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.10</artifactId>
    <version>1.1.4</version>
</dependency>
<!-- Use this dependency if you are using the DataSet API -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.1.4</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.10</artifactId>
    <version>1.1.4</version>
</dependency>

執行mvn構建。

$ mvn clean install
$ ls target/flink-demo-1.0-SNAPSHOT.jar

開啟9000埠,用於輸入數據:

$ nc -l 9000

提交flink任務:

$ ./bin/flink run -c com.demo.florian.WordCount  $DEMO_DIR/target/flink-demo-1.0-SNAPSHOT.jar --port 9000

在nc里輸入數據後,查看執行結果:

$ tail -f log/flink-*-jobmanager-*.out

查看flink web頁面:localhost:8081

2. 代碼結構

Flink系統核心可分為多個子項目。分割項目旨在減少開發Flink程式需要的依賴數量,並對測試和開發小組件提供便捷。

Flink當前還包括以下子項目:

  1. Flink-dist:distribution項目。它定義瞭如何將編譯後的代碼、腳本和其他資源整合到最終可用的目錄結構中。
  2. Flink-quick-start:有關quickstart和教程的腳本、maven原型和示常式序
  3. flink-contrib:一系列有用戶開發的早起版本和有用的工具的項目。後期的代碼主要由外部貢獻者繼續維護,被flink-contirb接受的代碼的要求低於其他項目的要求。

Flink在YARN集群上運行時:Flink YARN Client負責與YARN RM通信協商資源請求,Flink JobManager和Flink TaskManager分別申請到Container去運行各自的進程。

YARN AM與Flink JobManager在同一個Container中,這樣AM可以知道Flink JobManager的地址,從而AM可以申請Container去啟動Flink TaskManager。待Flink成功運行在YARN集群上,Flink YARN Client就可以提交Flink Job到Flink JobManager,併進行後續的映射、調度和計算處理。

  1. 設置Hadoop環境變數
$ export HADOOP_CONF_DIR=/etc/hadoop/conf
  1. 以集群模式提交任務,每次都會新建flink集群
$ ./bin/flink run -m yarn-cluster -c com.demo.florian.WordCount  $DEMO_DIR/target/flink-demo-1.0-SNAPSHOT.jar
  1. 啟動共用flink集群,提交任務
$ ./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096 -d
$ ./bin/flink run -c com.demo.florian.WordCount $DEMO_DIR/target/flink-demo-1.0.SNAPSHOT.jar

參考資料

http://shiyanjun.cn/archives/1508.html
https://ci.apache.org/projects/flink/flink-docs-release-1.2/index.html



您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • Java IO流學習總結一:輸入輸出流 轉載請標明出處:http://blog.csdn.net/zhaoyanjun6/article/details/54292148 本文出自 "【趙彥軍的博客】" Java流類圖結構: 流的概念和作用 流是一組有順序的,有起點和終點的位元組集合,是對數據傳輸的總 ...
  • 有的 App 可能有切換語言的選項,結合系統自動切換最簡單的辦法: 代碼說明: 1、"Localizable" 指的是 Localizable.strings 2、"zh-Hans" 這裡強制指定中文 3、如果強制指定就用上面的代碼,否則可以直接用 NSLocalizedString(key, co ...
  • http://www.cnblogs.com/xusir/archive/2012/12/24/2830957.html ...
  • 概述 昨天下午突然看到,《爐石傳說》游戲資料庫發生宕機並引發數據丟失事故的新聞。剛看到時,滿滿的不可思議。暴雪啊,網易啊。 都是很牛叉的公司。他們出的游戲我都是很喜歡的。 當我看到,第一時間著手搶修,重啟伺服器,並嘗試數據恢復時,我的想法是他們的高可用方案呢?為什麼不馬上切換? 當我看到相關備份數據 ...
  • 介紹 經常會有人問profile工具該怎麼使用?有沒有方法獲取性能差的sql的問題。自從轉mysql我自己也差不多2年沒有使用profile,忽然profile變得有點生疏不得不重新熟悉一下。這篇文章主要對profile工具做一個詳細的介紹;包括工具的用途和使用方法等。profile是SQLServ ...
  • 今天公司編輯部有一妹紙,遇到問題,是需要處理資料庫中重覆的數據,於是想辦法幫忙解決,要求刪除重覆的數據,該表中只有一個欄位,假設為保存的公司名,這是經過多重過濾之後,最終留下的的數據,需要進行篩選,然後重覆的數據只保留一條,最上邊或最下邊的都可以,有且僅有一條,整張表的數據量大概在20W左右,重覆量 ...
  • ###11數據表中的數據類型 * A:MySQL中的我們常使用的數據類型如下 詳細的數據類型如下(不建議詳細閱讀!) 分類 類型名稱 說明 整數類型 tinyInt 很小的整數 smallint 小的整數 mediumint 中等大小的整數 int(integer) 普通大小的整數 小數類型 flo ...
  • 記得在自己學習資料庫知識的時候特別喜歡看案例,因為優化的手段是容易掌握的,但是整體的優化思想是很難學會的。這也是為什麼自己特別喜歡看案例,今天也開始分享自己做的優化案例。 最近一直很忙,博客產出也少的可憐,今天整理了一下自己做過優化或各種方案的客戶已經超過100家了,今天分享的案例算是在這些客戶中比 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...