Flink的API分層、架構與組件原理、並行度、任務執行計劃、chain

来源:https://www.cnblogs.com/lazy-ccx/archive/2022/11/24/16922928.html
-Advertisement-
Play Games

摘要:DWS的負載管理分為兩層,第一層為cn的全局併發控制,第二層為資源池級別的併發控制。 本文分享自華為雲社區《GaussDB(DWS) 併發管控&記憶體管控》,作者: fighttingman。 1背景 這裡將併發管控和記憶體管控寫在一起,是因為記憶體管控實際是通過限制語句的併發達到記憶體管控的目的的。 ...


Flink的API分層

註:越底層API越靈活,越上層的API越輕便
Stateful Stream Processing
• 位於最底層, 是core API 的底層實現
• processFunction
• 利用低階,構建一些新的組件或者運算元
• 靈活性高,但開發比較複雜
Core API
• DataSet - 批處理 API
• DataStream –流處理 API
Table API & SQL

• SQL 構建在Table 之上,都需要構建Table 環境
• 不同的類型的Table 構建不同的Table 環境
• Table 可以與DataStream或者DataSet進行相互轉換
• Streaming SQL不同於存儲的SQL,最終會轉化為流式執行計劃

Flink架構

當 Flink 集群啟動後,首先會啟動一個 JobManger 和一個或多個的 TaskManager。由 Client 提交任務給 JobManager,JobManager 再調度任務到各個 TaskManager 去執行,然後 TaskManager 將心跳和統計信息彙報給 JobManager。TaskManager 之間以流的形式進行數據的傳輸。上述三者均為獨立的 JVM 進程。
• Client 為提交 Job 的客戶端,可以是運行在任何機器上(與 JobManager 環境連通即可)。提交 Job 後,Client 可以結束進程(Streaming的任務),也可以不結束並等待結果返回。
• JobManager 主要負責從 Client 處接收到 Job 和 JAR 包等資源後,會生成優化後的執行計劃,並以 Task 的單元調度到各個 TaskManager 去執行。
• TaskManager 在啟動的時候就設置好了槽位數(Slot),每個 slot 能啟動一個 Task,Task 為線程。從 JobManager 處接收需要部署的 Task,部署啟動後,與自己的上游建立 Netty 連接,接收數據並處理。
• flnik架構中的角色間的通信使用Akka,數據的傳輸使用Netty
Task Slot
在上圖中我們介紹了 TaskManager 是一個 JVM 進程,並會以獨立的線程來執行一個task或多個subtask。為了控制一個 TaskManager 能接受多少個 task,Flink 提出了 Task Slot 的概念。
Flink 中的計算資源通過 Task Slot 來定義。每個 task slot 代表了 TaskManager 的一個固定大小的資源子集。例如,一個擁有3個slot的 TaskManager,會將其管理的記憶體平均分成三分分給各個 slot。將資源 slot 化意味著來自不同job的task不會為了記憶體而競爭,而是每個task都擁有一定數量的記憶體儲備。需要註意的是,這裡不會涉及到CPU的隔離,slot目前僅僅用來隔離task的記憶體。
通過調整 task slot 的數量,用戶可以定義task之間是如何相互隔離的。每個 TaskManager 有一個slot,也就意味著每個task運行在獨立的 JVM 中。每個 TaskManager 有多個slot的話,也就是說多個task運行在同一個JVM中。而在同一個JVM進程中的task,可以共用TCP連接(基於多路復用)和心跳消息,可以減少數據的網路傳輸。也能共用一些數據結構,一定程度上減少了每個task的消耗。
task的並行度

通過job的webUI界面查看任務的並行度

任務執行計劃

生成個json字元串然後粘貼在這裡 https://flink.apache.org/visualizer/會看到任務執行圖

但這並不是最終在 Flink 中運行的執行圖,只是一個表示拓撲節點關係的計劃圖,在 Flink 中對應了 SteramGraph。另外,提交拓撲後(併發度設為2)還能在 UI 中看到另一張執行計劃圖,如下所示,該圖對應了 Flink 中的 JobGraph。

其實Flink 中的執行圖可以分成四層:StreamGraph -> JobGraph -> ExecutionGraph -> 物理執行圖
• StreamGraph:是根據用戶通過 Stream API 編寫的代碼生成的最初的圖。用來表示程式的拓撲結構。
• JobGraph:StreamGraph經過優化後生成了 JobGraph,提交給 JobManager 的數據結構。主要的優化為,將多個符合條件的節點 chain 在一起作為一個節點,這樣可以減少數據在節點之間流動所需要的序列化/反序列化/傳輸消耗。
• ExecutionGraph:JobManager 根據 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的並行化版本,是調度層最核心的數據結構。
• 物理執行圖:JobManager 根據 ExecutionGraph 對 Job 進行調度後,在各個TaskManager 上部署 Task 後形成的“圖”,並不是一個具體的數據結構。
例如上文中的2個併發度(Source為1個併發度)的 SocketTextStreamWordCount 四層執行圖的演變過程如下圖所示:


那麼 Flink 為什麼要設計這4張圖呢,其目的是什麼呢?Spark 中也有多張圖,數據依賴圖以及物理執行的DAG。其目的都是一樣的,就是解耦,每張圖各司其職,每張圖對應了 Job 不同的階段,更方便做該階段的事情。我們給出更完整的 Flink Graph 的層次圖。

首先我們看到,JobGraph 之上除了 StreamGraph 還有 OptimizedPlan。OptimizedPlan 是由 Batch API 轉換而來的。StreamGraph 是由 Stream API 轉換而來的。為什麼 API 不直接轉換成 JobGraph?因為,Batch 和 Stream 的圖結構和優化方法有很大的區別,比如 Batch 有很多執行前的預分析用來優化圖的執行,而這種優化並不普適於 Stream,所以通過 OptimizedPlan 來做 Batch 的優化會更方便和清晰,也不會影響 Stream。JobGraph 的責任就是統一 Batch 和 Stream 的圖,用來描述清楚一個拓撲圖的結構,並且做了 chaining 的優化,chaining 是普適於 Batch 和 Stream 的,所以在這一層做掉。ExecutionGraph 的責任是方便調度和各個 tasks 狀態的監控和跟蹤,所以 ExecutionGraph 是並行化的 JobGraph。而“物理執行圖”就是最終分散式在各個機器上運行著的tasks了。所以可以看到,這種解耦方式極大地方便了我們在各個層所做的工作,各個層之間是相互隔離的。

8.Operator Chains
為了更高效地分散式執行,Flink會儘可能地將operator的subtask鏈接(chain)在一起形成task。每個task在一個線程中執行。將operators鏈接成task是非常有效的優化:它能減少線程之間的切換,減少消息的序列化/反序列化,減少數據在緩衝區的交換,減少了延遲的同時提高整體的吞吐量。
我們仍以上面的 WordCount 為例,下麵這幅圖,展示了Source並行度為1,FlatMap、KeyAggregation、Sink並行度均為2,最終以5個並行的線程來執行的優化過程。

上圖中將KeyAggregation和Sink兩個operator進行了合併,因為這兩個合併後並不會改變整體的拓撲結構。但是,並不是任意兩個 operator 就能 chain 一起的。其條件還是很苛刻的:
1. 上下游的並行度一致
2. 下游節點的入度為1 (也就是說下游節點沒有來自其他節點的輸入)
3. 上下游節點都在同一個 slot group 中(下麵會解釋 slot group)

4. 下游節點的 chain 策略為 ALWAYS(可以與上下游鏈接,map、flatmap、filter等預設是ALWAYS)
5. 上游節點的 chain 策略為 ALWAYS 或 HEAD(只能與下游鏈接,不能與上游鏈接,Source預設是HEAD)
6. ![](https://img2022.cnblogs.com/blog/3026492/202211/3026492-20221124190055850-65109011.png)

7. 上下游運算元之間沒有數據shuffle (數據分區方式是 forward)
8. 用戶沒有禁用 chain

Operator chain的行為可以通過編程API中進行指定。可以通過在DataStream的operator後面(如someStream.map(..))調用startNewChain()來指示從該operator開始一個新的chain(與前面截斷,不會被chain到前面)。或者調用disableChaining()來指示該operator不參與chaining(不會與前後的operator chain一起)。在底層,這兩個方法都是通過調整operator的 chain 策略(HEAD、NEVER)來實現的。另外,也可以通過調用StreamExecutionEnvironment.disableOperatorChaining()來全局禁用chaining。
代碼驗證:
• operator禁用chaining

• 全局禁用chaining

• 查看job的graph圖

OperatorChain的優缺點:
那麼 Flink 是如何將多個 operators chain在一起的呢?chain在一起的operators是如何作為一個整體被執行的呢?它們之間的數據流又是如何避免了序列化/反序列化以及網路傳輸的呢?下圖展示了operators chain的內部實現:

如上圖所示,Flink內部是通過OperatorChain這個類來將多個operator鏈在一起形成一個新的operator。OperatorChain形成的框框就像一個黑盒,Flink 無需知道黑盒中有多少個ChainOperator、數據在chain內部是怎麼流動的,只需要將input數據交給 HeadOperator 就可以了,這就使得OperatorChain在行為上與普通的operator無差別,上面的OperaotrChain就可以看做是一個入度為1,出度為2的operator。所以在實現中,對外可見的只有HeadOperator,以及與外部連通的實線輸出,這些輸出對應了JobGraph中的JobEdge,在底層通過RecordWriterOutput來實現。另外,框中的虛線是operator chain內部的數據流,這個流內的數據不會經過序列化/反序列化、網路傳輸,而是直接將消息對象傳遞給下游的 ChainOperator 處理,這是性能提升的關鍵點,在底層是通過 ChainingOutput 實現的

OperatorChain的優點總結:
• 減少線程切換
• 減少序列化與反序列化
• 減少數據在緩衝區的交換
• 減少延遲並且提高吞吐能力
OperatorChain的缺點總結:
• 可能會讓N個比較複雜的業務跑在一個slot中,本來一個業務就慢,這發生這種情況就更慢了,所以可以通過startNewChain()/disableChaining()或全局禁用disableOperatorChaining()給分開

SlotSharingGroup 與 CoLocationGroup
每一個 TaskManager 會擁有一個或多個的 task slot,每個 slot 都能跑由多個連續 task 組成的一個 pipeline,比如 MapFunction 的第n個並行實例和 ReduceFunction 的第n個並行實例可以組成一個 pipeline。
如上文所述的 WordCount 例子,5個Task沒有solt共用的時候在TaskManager的slots中如下圖分佈,2個TaskManager,每個有3個slot:

預設情況下,Flink 允許subtasks共用slot,條件是它們都來自同一個Job的不同task的subtask。結果可能一個slot持有該job的整個pipeline。允許slot共用有以下兩點好處:
1. Flink 集群所需的task slots數與job中最高的並行度一致。
2. 更容易獲得更充分的資源利用。如果沒有slot共用,那麼非密集型操作source/flatmap就會占用同密集型操作 keyAggregation/sink 一樣多的資源。如果有slot共用,將基線的2個並行度增加到6個,能充分利用slot資源,同時保證每個TaskManager能平均分配到相同數量的subtasks。

我們將 WordCount 的並行度從之前的2個增加到6個(Source並行度仍為1),並開啟slot共用(所有operator都在default共用組),將得到如上圖所示的slot分佈圖。該任務最終會占用6個slots(最高並行度為6)。其次,我們可以看到密集型操作 keyAggregation/sink 被平均地分配到各個 TaskManager。
SlotSharingGroup:
• SlotSharingGroup是Flink中用來實現slot共用的類,它儘可能地讓subtasks共用一個slot。
• 保證同一個group的並行度相同的sub-tasks 共用同一個slots
• 運算元的預設group為default(即預設一個job下的subtask都可以共用一個slot)
• 為了防止不合理的共用,用戶也能通過API來強制指定operator的共用組,比如:someStream.filter(...).slotSharingGroup("group1");就強制指定了filter的slot共用組為group1。

• 怎麼確定一個未做SlotSharingGroup設置的運算元的Group是什麼呢(根據上游運算元的 group 和自身是否設置group共同確定)
• 適當設置可以減少每個slot運行的線程數,從而整體上減少機器的負載 

CoLocationGroup(強制):
• 保證所有的並行度相同的sub-tasks運行在同一個slot
• 主要用於迭代流(訓練機器學習模型)

代碼驗證:
• 設置本地開發環境tm的slot數量

• 設置最後的operator使用新的group

• 由於不和前面的operator在一個group,無法進行slot的共用,所以最後的operator占用了其它slot

• 為什麼占用了兩個呢?
	○ 因為不同組,與上面的default不能共用slot,組間互斥
	○ 同組中的同一個operator的subtask不能在一個slot中,由於operator的並行度是2,所以占用了兩個槽位,subtask組內互斥

原理與實現
那麼多個tasks(或者說operators)是如何共用slot的呢?
關於Flink調度,有兩個非常重要的原則我們必須知道:
1. 同一個operator的各個subtask是不能呆在同一個SharedSlot中的,例如FlatMap[1]和FlatMap[2]是不能在同一個SharedSlot中的。
2. Flink是按照拓撲順序從Source一個個調度到Sink的。例如WordCount(Source並行度為1,其他並行度為2),那麼調度的順序依次是:Source -> FlatMap[1] -> FlatMap[2] -> KeyAgg->Sink[1] -> KeyAgg->Sink[2]。假設現在有2個TaskManager,每個只有1個slot(為簡化問題),那麼分配slot的過程如圖所示:

註:圖中 SharedSlot 與 SimpleSlot 後帶的括弧中的數字代表槽位號(slotNumber)
1. 為Source分配slot。首先,我們從TaskManager1中分配出一個SharedSlot。並從SharedSlot中為Source分配出一個SimpleSlot。如上圖中的①和②。
2. 為FlatMap[1]分配slot。目前已經有一個SharedSlot,則從該SharedSlot中分配出一個SimpleSlot用來部署FlatMap[1]。如上圖中的③。
3. 為FlatMap[2]分配slot。由於TaskManager1的SharedSlot中已經有同operator的FlatMap[1]了,我們只能分配到其他SharedSlot中去。從TaskManager2中分配出一個SharedSlot,並從該SharedSlot中為FlatMap[2]分配出一個SimpleSlot。如上圖的④和⑤。
4. 為Key->Sink[1]分配slot。目前兩個SharedSlot都符合條件,從TaskManager1的SharedSlot中分配出一個SimpleSlot用來部署Key->Sink[1]。如上圖中的⑥。
5. 為Key->Sink[2]分配slot。TaskManager1的SharedSlot中已經有同operator的Key->Sink[1]了,則只能選擇另一個SharedSlot中分配出一個SimpleSlot用來部署Key->Sink[2]。如上圖中的⑦。
最後Source、FlatMap[1]、Key->Sink[1]這些subtask都會部署到TaskManager1的唯一一個slot中,並啟動對應的線程。FlatMap[2]、Key->Sink[2]這些subtask都會被部署到TaskManager2的唯一一個slot中,並啟動對應的線程。從而實現了slot共用。

Flink中計算資源的相關概念以及原理實現。最核心的是 Task Slot,每個slot能運行一個或多個task。為了拓撲更高效地運行,Flink提出了Chaining,儘可能地將operators chain在一起作為一個task來處理。為了資源更充分的利用,Flink又提出了SlotSharingGroup,儘可能地讓多個task共用一個slot。

如何計算一個應用需要多少slot
• 不設置SlotSharingGroup,就是不設置新的組大家都為default組。(應用的最大並行度)
• 設置SlotSharingGroup ,就是設置了新的組,比如下圖有兩個組default和test組(所有SlotSharingGroup中的最大並行度之和)

由於source和map之後的operator不屬於同一個group,所以source和它們不能在一個solt中運行,而這裡的source的default組的並行度是10,test組的並行度是20,所以所需槽位一共是30


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 這個問題是最近更新.NET 7 進行資料庫遷移發現的,操作步驟很簡單,先看一下SQL Server中的解決方法: 錯誤信息: 解決方案: 在登錄時,更改選項的連接屬性,解決方案、信任伺服器證書選項都選擇或者都不選擇,不能只選一個 這是我們SQL Server的寫法,在我們資料庫遷移中.NET 7 也 ...
  • 環境 伺服器:centos6.5 客戶端:Windows 前言 項目中有一個exe,安裝在客戶端,其中有一個功能是將本地產生的文件上傳至伺服器,這個功能是以服務的方式安裝在客戶端上。之前一切好使,文件能正常上傳到伺服器。但最近發現產生的文件沒有被上傳到伺服器,查看文件上傳服務的事件日誌提示密碼錯誤。 ...
  • ASP.NET Core是啥 ASP.NET Core 是一個跨平臺的高性能開源框架,用於生成啟用雲且連接 Internet 的新式應用。 ASP.NET Core 可以幹啥 生成 Web 應用和服務、物聯網 (IoT) 應用和移動後端。 在 Windows、macOS 和 Linux 上使用喜愛的 ...
  • 一:背景 1.講故事 在B站,公眾號上發了一篇 AOT 的文章後,沒想到反響還是挺大的,都稱贊這個東西能抗反編譯,可以讓破解難度極大提高,可能有很多朋友對逆向不瞭解,以為用 ILSpy,Reflector,DnSpy 這些工具打不開就覺得很安全,其實不然,在 OllyDbg,IDA,WinDBG 這 ...
  • CentOS7 配置本地yum源軟體倉庫 先連接虛擬光碟機,對應的光碟機iso文件 沒有的去下載對應鏡像 Download (centos.org) https://www.centos.org/download/ 進入虛擬機,把光碟掛載到 指定目錄 下 [root@localhost ~]# mkdi ...
  • 在使用 Linux 系統的過程中,我們經常需要查看系統、資源、網路、進程、用戶等方面的信息,查看這些信息的常用命令值得瞭解和熟悉。 ...
  • 痞子衡嵌入式半月刊: 第 67 期 這裡分享嵌入式領域有用有趣的項目/工具以及一些熱點新聞,農曆年分二十四節氣,希望在每個交節之日準時發佈一期。 本期刊是開源項目(GitHub: JayHeng/pzh-mcu-bi-weekly),歡迎提交 issue,投稿或推薦你知道的嵌入式那些事兒。 上期回顧 ...
  • Python基礎之MySQL資料庫 一、約束概述 1、為什麼要約束 ​ 為了防止資料庫中存在不符合語義規定的數據和防止錯誤信息的輸入、輸出造成無效的操作而提出的 ​ 為了保證數據的完整性,SQL規範以約束的方式對錶數據進行額外的條件限制,從以下四個方面考慮 實體完整性:例如一個表中不能存在兩條相同的 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...