flink基本原理

来源:https://www.cnblogs.com/luxiaoxun/archive/2019/12/14/12040016.html
-Advertisement-
Play Games

一、簡介 開源流式處理系統在不斷地發展,從一開始只關註低延遲指標到現在兼顧延遲、吞吐與結果準確性,在發展過程中解決了很多問題,編程API的易用性也在不斷地提高。本文介紹一下 Flink 中的核心概念,這些概念是學習與使用 Flink 十分重要的基礎知識,在後續開發 Flink 程式過程中將會幫助開發 ...


一、簡介

開源流式處理系統在不斷地發展,從一開始只關註低延遲指標到現在兼顧延遲、吞吐與結果準確性,在發展過程中解決了很多問題,編程API的易用性也在不斷地提高。本文介紹一下 Flink 中的核心概念,這些概念是學習與使用 Flink 十分重要的基礎知識,在後續開發 Flink 程式過程中將會幫助開發人員更好地理解 Flink 內部的行為和機制。

這裡引用一張圖來對常用的實時計算框架做個對比:

Flink 是有狀態的和容錯的,可以在維護一次應用程式狀態的同時無縫地從故障中恢復。它支持大規模計算能力,能夠在數千個節點上併發運行。它具有很好的吞吐量和延遲特性。同時,Flink 提供了多種靈活的視窗函數。Flink 在流式計算里屬於真正意義上的單條處理,每一條數據都觸發計算,而不是像 Spark 一樣的 Mini Batch 作為流式處理的妥協。Flink的容錯機制較為輕量,對吞吐量影響較小,而且擁有圖和調度上的一些優化,使得 Flink 可以達到很高的吞吐量。而 Strom 的容錯機制需要對每條數據進行ack,因此其吞吐量瓶頸也是備受詬病。

二、工作原理

Flink基本工作原理如下圖:

JobClient:負責接收程式,解析和優化程式的執行計劃,然後提交執行計划到JobManager。這裡執行的程式優化是將相鄰的Operator融合,形成Operator Chain,Operator的融合可以減少task的數量,提高TaskManager的資源利用率。

JobManagers:負責申請資源,協調以及控制整個job的執行過程,具體包括,調度任務、處理checkpoint、容錯等等。

TaskManager:TaskManager運行在不同節點上的JVM進程,負責接收並執行JobManager發送的task,並且與JobManager通信,反饋任務狀態信息,如果說JobManager是master的話,那麼TaskManager就是worker用於執行任務。每個TaskManager像是一個容器,包含一個或者多個Slot。

Slot:Slot是TaskManager資源粒度的劃分,每個Slot都有自己獨立的記憶體。所有Slot平均分配TaskManager的記憶體,值得註意的是,Slot僅劃分記憶體,不涉及CPU的劃分,即CPU是共用使用。每個Slot可以運行多個task。Slot的個數就代表了一個程式的最高並行度。

Task:Task是在operators的subtask進行鏈化之後形成的,具體Flink job中有多少task和operator的並行度和鏈化的策略有關。

SubTask:因為Flink是分散式部署的,程式中的每個運算元,在實際執行中被分隔為一個或者多個subtask,運算符子任務(subtask)的數量是該特定運算符的並行度。數據流在運算元之間流動,就對應到SubTask之間的數據傳輸。Flink允許同一個job中來自不同task的subtask可以共用同一個slot。每個slot可以執行一個並行的pipeline。可以將pipeline看作是多個subtask的組成的。

三、核心概念

1、Time(時間語義)

Flink 中的 Time 分為三種:事件時間、達到時間與處理時間。

1)事件時間:是事件真實發生的時間。

2)達到時間:是系統接收到事件的時間,即服務端接收到事件的時間。

3)處理時間:是系統開始處理到達事件的時間。

在某些場景下,處理時間等於達到時間。因為處理時間沒有亂序的問題,所以服務端做基於處理時間的計算是比較簡單的,無遲到與亂序數據。

Flink 中只需要通過 env 環境變數即可設置Time:

//創建環境上下文
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 設置在當前程式中使用 ProcessingTime
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

2、Window(視窗)

視窗本質就是將無限數據集沿著時間(或者數量)的邊界切分成有限數據集。

1)Time Window:基於時間的,分為Tumbling Window和Sliding Window 。

2)Count Window:基於數量的,分為Tumbling Window和Sliding Window。 

3)Session Window:基於會話的,一個session window關閉通常是由於一段時間沒有收到元素。

4)Global Window:全局視窗。

在實際操作中,window又分為兩大類型的視窗:Keyed Window 和 Non-keyed Window,兩種類型的視窗操作API有細微的差別。

 3、Trigger

1)自定義觸發器

觸發器決定了視窗何時會被觸發計算,Flink 中開發人員需要在 window 類型的操作之後才能調用 trigger 方法傳入觸發器定義。Flink 中的觸發器定義需要繼承並實現 Trigger 介面,該介面有以下方法:

  • onElement(): 每個被添加到視窗中的元素都會被調用
  • onEventTime(): 當事件時間定時器觸發時會被調用,比如watermark到達
  • onProcessingTime(): 當處理時間定時器觸發時會被調用,比如時間周期觸發
  • onMerge(): 當兩個視窗合併時兩個視窗的觸發器狀態將會被調動併合並
  • clear(): 執行需要清除相關視窗的事件

以上方法會返回決定如何觸發執行的 TriggerResult:

  • CONTINUE: 什麼都不做
  • FIRE: 觸發計算
  • PURGE: 清除視窗中的數據
  • FIRE_AND_PURGE: 觸發計算後清除視窗中的數據

2)預定義觸發器

如果開發人員未指定觸發器,則 Flink 會自動根據場景使用預設的預定義好的觸發器。在基於事件時間的視窗中使用 EventTimeTrigger,該觸發器會在watermark通過視窗邊界後立即觸發(即watermark出現關閉改視窗時)。在全局視窗(GlobalWindow)中使用 NeverTrigger,該觸發器永遠不會觸發,所以在使用全局視窗時用戶需要自定義觸發器。

4、State

Managed State 是由flink runtime管理來管理的,自動存儲、自動恢復,在記憶體管理上有優化機制。且Managed State 支持常見的多種數據結構,如value、list、map等,在大多數業務場景中都有適用之處。總體來說是對開發人員來說是比較友好的,因此 Managed State 是 Flink 中最常用的狀態。Managed State 又分為 Keyed State 和 Operator State 兩種。

Raw State 由用戶自己管理,需要序列化,只能使用位元組數組的數據結構。Raw State 的使用和維度都比 Managed State 要複雜,建議在自定義的Operator場景中酌情使用。

5、狀態存儲

Flink中狀態的實現有三種:MemoryState、FsState、RocksDBState。三種狀態存儲方式與使用場景各不相同,詳細介紹如下:

1)MemoryStateBackend

構造函數:MemoryStateBackend(int maxStateSize, boolean asyncSnapshot)

存儲方式:State存儲於各個 TaskManager記憶體中,Checkpoint存儲於 JobManager記憶體

容量限制:單個State最大5M、maxStateSize<=akka.framesize(10M)、總大小不超過JobManager記憶體

使用場景:無狀態或者JobManager掛掉不影響的測試環境等,不建議在生產環境使用

2)FsStateBackend

構造函數:FsStateBackend(URI checkpointUri, boolean asyncSnapshot)

存儲方式:State存儲於 TaskManager記憶體,Checkpoint存儲於 外部文件系統(本次磁碟 or HDFS)

容量限制:State總量不超過TaskManager記憶體、Checkpoint總大小不超過外部存儲空間

使用場景:常規使用狀態的作業,分鐘級的視窗聚合等,可在生產環境使用

3)RocksDBStateBackend

構造函數:RocksDBStateBackend(URI checkpointUri, boolean enableincrementCheckpoint)

存儲方式:State存儲於 TaskManager上的kv資料庫(記憶體+磁碟),Checkpoint存儲於 外部文件系統(本次磁碟 or HDFS)

容量限制:State總量不超過TaskManager記憶體+磁碟、單key最大2g、Checkpoint總大小不超過外部存儲空間

使用場景:超大狀態的作業,天級的視窗聚合等,對讀寫性能要求不高的場景,可在生產環境使用

根據業務場景需要用戶選擇最合適的 StateBackend ,代碼中只需在相應的 env 環境中設置即可:

// flink 上下文環境變數
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 設置狀態後端為 FsStateBackend,數據存儲到 hdfs /tmp/flink/checkpoint/test 中
env.setStateBackend(new FsStateBackend("hdfs://ns1/tmp/flink/checkpoint/test", false))

6、Checkpoint

Checkpoint 是分散式全域一致的,數據會被寫入hdfs等共用存儲中。且其產生是非同步的,在不中斷、不影響運算的前提下產生。

用戶只需在相應的 env 環境中設置即可:

// 1000毫秒進行一次 Checkpoint 操作
env.enableCheckpointing(1000)
// 模式為準確一次
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 兩次 Checkpoint 之間最少間隔 500毫秒
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
// Checkpoint 過程超時時間為 60000毫秒,即1分鐘視為超時失敗
env.getCheckpointConfig.setCheckpointTimeout(60000)
// 同一時間只允許1個Checkpoint的操作在執行
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

7、Watermark

Flink 程式並 不能自動提取數據源中哪個欄位/標識為數據的事件時間,從而也就無法自己定義 Watermark 。

開發人員需要通過 Flink 提供的 API 來 提取和定義 Timestamp/Watermark,可以在 數據源或者數據流中 定義。

1)自定義數據源設置 Timestamp/Watermark

自定義的數據源類需要繼承並實現 SourceFunction[T] 介面,其中 run 方法是定義數據生產的地方:

//自定義的數據源為自定義類型MyType
class MySource extends SourceFunction[MyType]{
    //重寫run方法,定義數據生產的邏輯
    override def run(ctx: SourceContext[MyType]): Unit = {
        while (/* condition */) {
            val next: MyType = getNext()
            //設置timestamp從MyType的哪個欄位獲取(eventTimestamp)
            ctx.collectWithTimestamp(next, next.eventTimestamp)

            if (next.hasWatermarkTime) {
                //設置watermark從MyType的那個方法獲取(getWatermarkTime)
                ctx.emitWatermark(new Watermark(next.getWatermarkTime))
            }
        }
    }
}

2)在數據流中設置 Timestamp/Watermark

在數據流中,可以設置 stream 的 Timestamp Assigner ,該 Assigner 將會接收一個 stream,並生產一個帶 Timestamp和Watermark 的新 stream。

val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
        .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())

8、廣播狀態(Broadcast State

和 Spark 中的廣播變數一樣,Flink 也支持在各個節點中各存一份小數據集,所在的計算節點實例可在本地記憶體中直接讀取被廣播的數據,可以避免Shuffle提高並行效率。

廣播狀態(Broadcast State)的引入是為了支持一些來自一個流的數據需要廣播到所有下游任務的情況,它存儲在本地,用於處理其他流上的所有傳入元素。

// key the shapes by color
KeyedStream<Item, Color> colorPartitionedStream = shapeStream.keyBy(new KeySelector<Shape, Color>(){...});

// a map descriptor to store the name of the rule (string) and the rule itself.
MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>("RulesBroadcastState",BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(new TypeHint<Rule>() {}));
        
// broadcast the rules and create the broadcast state
BroadcastStream<Rule> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);

DataStream<Match> output = colorPartitionedStream.connect(ruleBroadcastStream).process(new KeyedBroadcastProcessFunction<Color, Item, Rule, String>(){...});

9、Operator Chain

Flink作業中,可以指定相關的chain將相關性非常強的轉換操作(operator)綁定在一起,使得上下游的Task在同一個Pipeline中執行,避免因為數據在網路或者線程之間傳輸導致的開銷。

一般情況下Flink在Map類型的操作中預設開啟 Operator Chain 以提高整體性能,開發人員也可以根據需要創建或者禁止 Operator Chain 對任務進行細粒度的鏈條控制。

//創建 chain
dataStream.filter(...).map(...).startNewChain().map(...)
//禁止 chain
dataStream.map(...).disableChaining()

創建的鏈條只對當前的操作符和之後的操作符有效,不不影響其他操作,如上代碼只針對兩個map操作進行鏈條綁定,對前面的filter操作無效,如果需要可以在filter和map之間使用 startNewChain方法即可。

 

參考:

https://zhuanlan.zhihu.com/p/93507000

https://ci.apache.org/projects/flink/flink-docs-release-1.9/

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • LVM LVM是Linux環境中對磁碟分區進行管理的一種機制,是建立在硬碟和分區之上、文件系統之下的一個邏輯層,可提高磁碟分區管理的靈活性 物理捲(PV:Physical Volume):物理捲是底層真正提供容量,存放數據的設備,它可以是整個硬碟、硬碟上的分區等。 捲組(VG:Volume Grou ...
  • Linux下各種不同環境變數相關文件的作用: 1. /etc/environment 設置整個系統的環境,系統啟動時,該文件被執行。 2. /etc/profile 設置所有用戶的環境,當用戶第一次登錄時,該文件被執行,並從/etc/profile.d目錄的配置文件中搜集shell的設置。 3. / ...
  • Nginx核心流程及模塊介紹 1. Nginx簡介以及特點 Nginx簡介: Nginx (engine x) 是一個高性能的web伺服器和反向代理伺服器,也是一個IMAP/POP3/SMTP伺服器 俄羅斯程式員Igor Sysoev於2002年開始 Nginx是增長最快的Web伺服器,市場份額已達 ...
  • 使用環境:阿裡雲ecs Ubuntu1604生產環境下,編譯安裝mariadb10-2.26 1、先安裝一些初試環境所需要的工具軟體包 apt install -y iproute2 ntpdate tcpdump telnet traceroute nfs-kernel-server nfs-co ...
  • 下麵是我整理(抄襲)的一些Oracle資料庫相關概念對象的理解,如有疏漏,歡迎指正。至於整理這篇文章的目的:主要是網上的內容太散了,這樣整理一遍可以加深理解,也便於後續查閱。就我的理解:下述內容應該可對10g,11g,12c都適用。更新的版本沒用過。 Oracle DataBase是一款關係型資料庫 ...
  • MySQL面試總結 # MySQL的存儲引擎 `MyISAM`(預設表類型):非事務的存儲引擎,基於傳統的`ISAM`(有索引的順序訪問方法)類型,是存儲記錄和文件的標準方法,不是事務安全,不支持外鍵,適用於頻繁的查詢。表鎖,不會出現死鎖,適合小數據和小併發。 - 為什麼不會出死鎖?(沒有事務就不會 ...
  • 約束與索引 概念 1、數據完整性(Data Integrity)是指數據的精確性(Accuracy)和可靠性(Reliability)。 實體完整性(Entity Integrity):例如,同一個表中,不能存在兩條完全相同無法區分的記錄 域完整性(Domain Integrity):例如:年齡範圍 ...
  • #!/usr/bin/env python # -*- coding:utf-8 -*- import sqlite3,os,time import traceback class Sqlite(): db_file = None # 資料庫文件 connection = None # 資料庫連接對 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...