摘要:DWS的負載管理分為兩層,第一層為cn的全局併發控制,第二層為資源池級別的併發控制。 本文分享自華為雲社區《GaussDB(DWS) 併發管控&記憶體管控》,作者: fighttingman。 1背景 這裡將併發管控和記憶體管控寫在一起,是因為記憶體管控實際是通過限制語句的併發達到記憶體管控的目的的。 ...
Flink的API分層
註:越底層API越靈活,越上層的API越輕便
Stateful Stream Processing
• 位於最底層, 是core API 的底層實現
• processFunction
• 利用低階,構建一些新的組件或者運算元
• 靈活性高,但開發比較複雜
Core API
• DataSet - 批處理 API
• DataStream –流處理 API
Table API & SQL
• SQL 構建在Table 之上,都需要構建Table 環境
• 不同的類型的Table 構建不同的Table 環境
• Table 可以與DataStream或者DataSet進行相互轉換
• Streaming SQL不同於存儲的SQL,最終會轉化為流式執行計劃
Flink架構
當 Flink 集群啟動後,首先會啟動一個 JobManger 和一個或多個的 TaskManager。由 Client 提交任務給 JobManager,JobManager 再調度任務到各個 TaskManager 去執行,然後 TaskManager 將心跳和統計信息彙報給 JobManager。TaskManager 之間以流的形式進行數據的傳輸。上述三者均為獨立的 JVM 進程。
• Client 為提交 Job 的客戶端,可以是運行在任何機器上(與 JobManager 環境連通即可)。提交 Job 後,Client 可以結束進程(Streaming的任務),也可以不結束並等待結果返回。
• JobManager 主要負責從 Client 處接收到 Job 和 JAR 包等資源後,會生成優化後的執行計劃,並以 Task 的單元調度到各個 TaskManager 去執行。
• TaskManager 在啟動的時候就設置好了槽位數(Slot),每個 slot 能啟動一個 Task,Task 為線程。從 JobManager 處接收需要部署的 Task,部署啟動後,與自己的上游建立 Netty 連接,接收數據並處理。
• flnik架構中的角色間的通信使用Akka,數據的傳輸使用Netty
Task Slot
在上圖中我們介紹了 TaskManager 是一個 JVM 進程,並會以獨立的線程來執行一個task或多個subtask。為了控制一個 TaskManager 能接受多少個 task,Flink 提出了 Task Slot 的概念。
Flink 中的計算資源通過 Task Slot 來定義。每個 task slot 代表了 TaskManager 的一個固定大小的資源子集。例如,一個擁有3個slot的 TaskManager,會將其管理的記憶體平均分成三分分給各個 slot。將資源 slot 化意味著來自不同job的task不會為了記憶體而競爭,而是每個task都擁有一定數量的記憶體儲備。需要註意的是,這裡不會涉及到CPU的隔離,slot目前僅僅用來隔離task的記憶體。
通過調整 task slot 的數量,用戶可以定義task之間是如何相互隔離的。每個 TaskManager 有一個slot,也就意味著每個task運行在獨立的 JVM 中。每個 TaskManager 有多個slot的話,也就是說多個task運行在同一個JVM中。而在同一個JVM進程中的task,可以共用TCP連接(基於多路復用)和心跳消息,可以減少數據的網路傳輸。也能共用一些數據結構,一定程度上減少了每個task的消耗。
task的並行度
通過job的webUI界面查看任務的並行度
任務執行計劃
生成個json字元串然後粘貼在這裡 https://flink.apache.org/visualizer/會看到任務執行圖
但這並不是最終在 Flink 中運行的執行圖,只是一個表示拓撲節點關係的計劃圖,在 Flink 中對應了 SteramGraph。另外,提交拓撲後(併發度設為2)還能在 UI 中看到另一張執行計劃圖,如下所示,該圖對應了 Flink 中的 JobGraph。
其實Flink 中的執行圖可以分成四層:StreamGraph -> JobGraph -> ExecutionGraph -> 物理執行圖
• StreamGraph:是根據用戶通過 Stream API 編寫的代碼生成的最初的圖。用來表示程式的拓撲結構。
• JobGraph:StreamGraph經過優化後生成了 JobGraph,提交給 JobManager 的數據結構。主要的優化為,將多個符合條件的節點 chain 在一起作為一個節點,這樣可以減少數據在節點之間流動所需要的序列化/反序列化/傳輸消耗。
• ExecutionGraph:JobManager 根據 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的並行化版本,是調度層最核心的數據結構。
• 物理執行圖:JobManager 根據 ExecutionGraph 對 Job 進行調度後,在各個TaskManager 上部署 Task 後形成的“圖”,並不是一個具體的數據結構。
例如上文中的2個併發度(Source為1個併發度)的 SocketTextStreamWordCount 四層執行圖的演變過程如下圖所示:
那麼 Flink 為什麼要設計這4張圖呢,其目的是什麼呢?Spark 中也有多張圖,數據依賴圖以及物理執行的DAG。其目的都是一樣的,就是解耦,每張圖各司其職,每張圖對應了 Job 不同的階段,更方便做該階段的事情。我們給出更完整的 Flink Graph 的層次圖。
首先我們看到,JobGraph 之上除了 StreamGraph 還有 OptimizedPlan。OptimizedPlan 是由 Batch API 轉換而來的。StreamGraph 是由 Stream API 轉換而來的。為什麼 API 不直接轉換成 JobGraph?因為,Batch 和 Stream 的圖結構和優化方法有很大的區別,比如 Batch 有很多執行前的預分析用來優化圖的執行,而這種優化並不普適於 Stream,所以通過 OptimizedPlan 來做 Batch 的優化會更方便和清晰,也不會影響 Stream。JobGraph 的責任就是統一 Batch 和 Stream 的圖,用來描述清楚一個拓撲圖的結構,並且做了 chaining 的優化,chaining 是普適於 Batch 和 Stream 的,所以在這一層做掉。ExecutionGraph 的責任是方便調度和各個 tasks 狀態的監控和跟蹤,所以 ExecutionGraph 是並行化的 JobGraph。而“物理執行圖”就是最終分散式在各個機器上運行著的tasks了。所以可以看到,這種解耦方式極大地方便了我們在各個層所做的工作,各個層之間是相互隔離的。
8.Operator Chains
為了更高效地分散式執行,Flink會儘可能地將operator的subtask鏈接(chain)在一起形成task。每個task在一個線程中執行。將operators鏈接成task是非常有效的優化:它能減少線程之間的切換,減少消息的序列化/反序列化,減少數據在緩衝區的交換,減少了延遲的同時提高整體的吞吐量。
我們仍以上面的 WordCount 為例,下麵這幅圖,展示了Source並行度為1,FlatMap、KeyAggregation、Sink並行度均為2,最終以5個並行的線程來執行的優化過程。
上圖中將KeyAggregation和Sink兩個operator進行了合併,因為這兩個合併後並不會改變整體的拓撲結構。但是,並不是任意兩個 operator 就能 chain 一起的。其條件還是很苛刻的:
1. 上下游的並行度一致
2. 下游節點的入度為1 (也就是說下游節點沒有來自其他節點的輸入)
3. 上下游節點都在同一個 slot group 中(下麵會解釋 slot group)
4. 下游節點的 chain 策略為 ALWAYS(可以與上下游鏈接,map、flatmap、filter等預設是ALWAYS)
5. 上游節點的 chain 策略為 ALWAYS 或 HEAD(只能與下游鏈接,不能與上游鏈接,Source預設是HEAD)
6. ![](https://img2022.cnblogs.com/blog/3026492/202211/3026492-20221124190055850-65109011.png)
7. 上下游運算元之間沒有數據shuffle (數據分區方式是 forward)
8. 用戶沒有禁用 chain
Operator chain的行為可以通過編程API中進行指定。可以通過在DataStream的operator後面(如someStream.map(..))調用startNewChain()來指示從該operator開始一個新的chain(與前面截斷,不會被chain到前面)。或者調用disableChaining()來指示該operator不參與chaining(不會與前後的operator chain一起)。在底層,這兩個方法都是通過調整operator的 chain 策略(HEAD、NEVER)來實現的。另外,也可以通過調用StreamExecutionEnvironment.disableOperatorChaining()來全局禁用chaining。
代碼驗證:
• operator禁用chaining
• 全局禁用chaining
• 查看job的graph圖
OperatorChain的優缺點:
那麼 Flink 是如何將多個 operators chain在一起的呢?chain在一起的operators是如何作為一個整體被執行的呢?它們之間的數據流又是如何避免了序列化/反序列化以及網路傳輸的呢?下圖展示了operators chain的內部實現:
如上圖所示,Flink內部是通過OperatorChain這個類來將多個operator鏈在一起形成一個新的operator。OperatorChain形成的框框就像一個黑盒,Flink 無需知道黑盒中有多少個ChainOperator、數據在chain內部是怎麼流動的,只需要將input數據交給 HeadOperator 就可以了,這就使得OperatorChain在行為上與普通的operator無差別,上面的OperaotrChain就可以看做是一個入度為1,出度為2的operator。所以在實現中,對外可見的只有HeadOperator,以及與外部連通的實線輸出,這些輸出對應了JobGraph中的JobEdge,在底層通過RecordWriterOutput來實現。另外,框中的虛線是operator chain內部的數據流,這個流內的數據不會經過序列化/反序列化、網路傳輸,而是直接將消息對象傳遞給下游的 ChainOperator 處理,這是性能提升的關鍵點,在底層是通過 ChainingOutput 實現的
OperatorChain的優點總結:
• 減少線程切換
• 減少序列化與反序列化
• 減少數據在緩衝區的交換
• 減少延遲並且提高吞吐能力
OperatorChain的缺點總結:
• 可能會讓N個比較複雜的業務跑在一個slot中,本來一個業務就慢,這發生這種情況就更慢了,所以可以通過startNewChain()/disableChaining()或全局禁用disableOperatorChaining()給分開
SlotSharingGroup 與 CoLocationGroup
每一個 TaskManager 會擁有一個或多個的 task slot,每個 slot 都能跑由多個連續 task 組成的一個 pipeline,比如 MapFunction 的第n個並行實例和 ReduceFunction 的第n個並行實例可以組成一個 pipeline。
如上文所述的 WordCount 例子,5個Task沒有solt共用的時候在TaskManager的slots中如下圖分佈,2個TaskManager,每個有3個slot:
預設情況下,Flink 允許subtasks共用slot,條件是它們都來自同一個Job的不同task的subtask。結果可能一個slot持有該job的整個pipeline。允許slot共用有以下兩點好處:
1. Flink 集群所需的task slots數與job中最高的並行度一致。
2. 更容易獲得更充分的資源利用。如果沒有slot共用,那麼非密集型操作source/flatmap就會占用同密集型操作 keyAggregation/sink 一樣多的資源。如果有slot共用,將基線的2個並行度增加到6個,能充分利用slot資源,同時保證每個TaskManager能平均分配到相同數量的subtasks。
我們將 WordCount 的並行度從之前的2個增加到6個(Source並行度仍為1),並開啟slot共用(所有operator都在default共用組),將得到如上圖所示的slot分佈圖。該任務最終會占用6個slots(最高並行度為6)。其次,我們可以看到密集型操作 keyAggregation/sink 被平均地分配到各個 TaskManager。
SlotSharingGroup:
• SlotSharingGroup是Flink中用來實現slot共用的類,它儘可能地讓subtasks共用一個slot。
• 保證同一個group的並行度相同的sub-tasks 共用同一個slots
• 運算元的預設group為default(即預設一個job下的subtask都可以共用一個slot)
• 為了防止不合理的共用,用戶也能通過API來強制指定operator的共用組,比如:someStream.filter(...).slotSharingGroup("group1");就強制指定了filter的slot共用組為group1。
• 怎麼確定一個未做SlotSharingGroup設置的運算元的Group是什麼呢(根據上游運算元的 group 和自身是否設置group共同確定)
• 適當設置可以減少每個slot運行的線程數,從而整體上減少機器的負載
CoLocationGroup(強制):
• 保證所有的並行度相同的sub-tasks運行在同一個slot
• 主要用於迭代流(訓練機器學習模型)
代碼驗證:
• 設置本地開發環境tm的slot數量
• 設置最後的operator使用新的group
• 由於不和前面的operator在一個group,無法進行slot的共用,所以最後的operator占用了其它slot
• 為什麼占用了兩個呢?
○ 因為不同組,與上面的default不能共用slot,組間互斥
○ 同組中的同一個operator的subtask不能在一個slot中,由於operator的並行度是2,所以占用了兩個槽位,subtask組內互斥
原理與實現
那麼多個tasks(或者說operators)是如何共用slot的呢?
關於Flink調度,有兩個非常重要的原則我們必須知道:
1. 同一個operator的各個subtask是不能呆在同一個SharedSlot中的,例如FlatMap[1]和FlatMap[2]是不能在同一個SharedSlot中的。
2. Flink是按照拓撲順序從Source一個個調度到Sink的。例如WordCount(Source並行度為1,其他並行度為2),那麼調度的順序依次是:Source -> FlatMap[1] -> FlatMap[2] -> KeyAgg->Sink[1] -> KeyAgg->Sink[2]。假設現在有2個TaskManager,每個只有1個slot(為簡化問題),那麼分配slot的過程如圖所示:
註:圖中 SharedSlot 與 SimpleSlot 後帶的括弧中的數字代表槽位號(slotNumber)
1. 為Source分配slot。首先,我們從TaskManager1中分配出一個SharedSlot。並從SharedSlot中為Source分配出一個SimpleSlot。如上圖中的①和②。
2. 為FlatMap[1]分配slot。目前已經有一個SharedSlot,則從該SharedSlot中分配出一個SimpleSlot用來部署FlatMap[1]。如上圖中的③。
3. 為FlatMap[2]分配slot。由於TaskManager1的SharedSlot中已經有同operator的FlatMap[1]了,我們只能分配到其他SharedSlot中去。從TaskManager2中分配出一個SharedSlot,並從該SharedSlot中為FlatMap[2]分配出一個SimpleSlot。如上圖的④和⑤。
4. 為Key->Sink[1]分配slot。目前兩個SharedSlot都符合條件,從TaskManager1的SharedSlot中分配出一個SimpleSlot用來部署Key->Sink[1]。如上圖中的⑥。
5. 為Key->Sink[2]分配slot。TaskManager1的SharedSlot中已經有同operator的Key->Sink[1]了,則只能選擇另一個SharedSlot中分配出一個SimpleSlot用來部署Key->Sink[2]。如上圖中的⑦。
最後Source、FlatMap[1]、Key->Sink[1]這些subtask都會部署到TaskManager1的唯一一個slot中,並啟動對應的線程。FlatMap[2]、Key->Sink[2]這些subtask都會被部署到TaskManager2的唯一一個slot中,並啟動對應的線程。從而實現了slot共用。
Flink中計算資源的相關概念以及原理實現。最核心的是 Task Slot,每個slot能運行一個或多個task。為了拓撲更高效地運行,Flink提出了Chaining,儘可能地將operators chain在一起作為一個task來處理。為了資源更充分的利用,Flink又提出了SlotSharingGroup,儘可能地讓多個task共用一個slot。
如何計算一個應用需要多少slot
• 不設置SlotSharingGroup,就是不設置新的組大家都為default組。(應用的最大並行度)
• 設置SlotSharingGroup ,就是設置了新的組,比如下圖有兩個組default和test組(所有SlotSharingGroup中的最大並行度之和)
由於source和map之後的operator不屬於同一個group,所以source和它們不能在一個solt中運行,而這裡的source的default組的並行度是10,test組的並行度是20,所以所需槽位一共是30