Flink 作業提交流程

来源:https://www.cnblogs.com/cxyxz/archive/2022/07/12/16469037.html
-Advertisement-
Play Games

大家好,我是小寒~ 今天給大家帶來一篇 flink 作業提交相關的文章。 我們都知道,在開發完一個 flink 應用程式後,打包成 jar 包,然後通過 FLink CLI 或者 Web UI 提交作業到 FLink 集群。其實,Flink 的 jar 文件並不是 FLink 集群的可執行文件,需要 ...


大家好,我是小寒~

今天給大家帶來一篇 flink 作業提交相關的文章。

我們都知道,在開發完一個 flink 應用程式後,打包成 jar 包,然後通過 FLink CLI 或者 Web UI 提交作業到 FLink 集群。其實,Flink 的 jar 文件並不是 FLink 集群的可執行文件,需要經過轉換之後提交給集群。其轉換過程分為兩個大的步驟。

  1. 在 FLink Client 中通過反射啟動 Jar 中的 main 函數,生成 Flink StreamGraph、JobGraph,將 JobGraph 提交給 Flink 集群。
  2. FLink 集群收到 JobGraph 之後,將 JobGraph 翻譯成 ExecutionGraph,然後開始調度執行,啟動成功之後開始消費數據。

總的來說,對用戶API的調用,可以轉換為 StreamGraph -> JobGraph -> ExecutionGraph -> 物理執行拓撲(Task DAG)

提交流程

FLink 作業在開發完畢之後,需要提交到 FLink 集群執行。ClientFrontend 是入口,觸發用戶開發的 Flink 應用 jar 文件中的 main 方法,然後交給 PipelineExecutor#execue 方法,最終會觸發一個具體的 PipelineExecutor 執行,如下圖所示。

作業執行可以選擇 Session 和 Per-Job 模式兩種集群。

  • Session 模式的集群,一個集群中運行多個作業。
  • Per-Job 模式的集群,一個集群中只運行一個作業,作業執行完畢則集群銷毀。

流水線執行器 PipelineExecutor

流水線執行器在 FLink 中叫作 PipelineExecutor,是 FLink Client 生成 JobGraph 之後,將作業提交給集群的重要環節。

集群有 Session 和 Per-Job 兩種模式。在這兩種模式下,集群的啟動時機、提交作業的方式不同,所以在生產環境中有兩種 PipelineExecutor。Session 模式對應於 AbstractSessionClusterExecutor,Per-Job 模式對應於 AbstractJobClusterExecutor。

  1. Session 模式

該模式下,作業共用集群資源,作業通過 Http 協議進行提交。

在 Flink 1.10 版本中提供了三種會話模式:Yarn 會話模式、K8s 會話模式、Standalone。Standalone 模式比較特別,Flink 安裝在物理機上,不能像在資源集群上一樣,可以隨時啟動一個新集群,所有的作業共用 Standalone 集群。

在 Session 模式下, Yarn 作業提交使用 yarn-session.sh 腳本, K8s 作業提交使用 kubernetes-session.sh 腳本。兩者的具體實現不同 ,但邏輯是類似的 ,在啟動腳本的時候就會檢查是否存在已經啟動好的 Flink Session 模式集群,如果沒有,則啟動一個 Flink Session 模式集群,然後在 PipelineExecutor 中,通過 Dispatcher 提供的 Rest 介面提交 JobGraph,Dispatcher 為每一個作業啟動一個 JobMaster,然後進入作業執行階段。

  1. Per-Job 模式

該模式下,一個作業一個集群,作業之間相互隔離。

在 FLink 1.10 版本中,只有 Yarn 上實現了 Per-Job 模式。

Per-Job 模式下,因為不需要共用集群,所以在 PipelineExecutor 中執行作業提交的時候,可以創建集群並將 JobGraph 以及所需要的文件等一同交給 Yarn 集群,Yarn 集群在容器中啟動 JobManager 進程,進行一系列的初始化動作,初始化完畢之後,從文件系統中獲取 JobGraph ,交給 Dispatcher。 之後的執行流程與 Session 模式下的執行流程相同。

yarn session 的提交流程

從總體上來說,在 Yarn 集群上使用 Session 模式提交 Flink 作業的過程分為 3 個階段。首先在 Yarn 上啟動 Flink Session 模式的集群;其次通過 Flink Client 提交作業 ,最後進行作業的調度執行。

  1. 啟動集群

(1) 使用 yarn-session.sh 提交會話模式的作業

如果提交到已經存在的集群, 則獲取 Yarn 集群信息、應用 ID,並準備提交作業。

如果是啟動新的 Yarn Session 集群,則進入到步驟 (2)。

(2)Yarn 啟動新的 Flink 集群

如果沒有集群,則創建一個新的 Session 模式的集群。首先,將應用的配置文件(flink-conf.yaml、logback.xml、log4j.properties)和相關文件(Flink jar、用戶 jar 文件、JobGraph 對象等)上傳至分散式存儲(如 HDFS)的應用暫存目錄。

然後通過 Yarn Client 向 Yarn 提交 Flink 創建集群的申請,Yarn 分配資源,在申請的 Yarn Container 中初始化並啟動 FLink JobManager 進程,在 JobManager 進程中運行 YarnSessionClusterEntrypoint 作為集群啟動的入口(不同的集群部署模式有不同的 ClusterEntrypoint 的實現),初始化 Dispatcher、ResourceManager。啟動相關的 RPC 服務,等待 Client 通過 Rest 介面提交作業。

2、作業提交

Yarn 集群準備好後,開始作業提交。

(1)Flink Client 通過 Rest 向 Dispatcher 提交 JobGraph。

(2)Dispatcher 是 Rest 介面,不負責實際的調度、執行方面的工作,當收到 JobGraph 後,為作業創建一個 JobMaster,將工作交給 JobMaster(負責作業調度、管理作業和 Task 的生命周期 ),構建 ExecutionGraph(Job Graph的並行化版本)

  1. 作業調度執行

(1)JobMaster 向 YarnResourceManager 申請資源,開始調度 ExecutionGraph 的執行;初次提交作業,集群尚沒有 TaskManager,此時資源不足,開始申請資源。

(2)YarnResourceManager 收到 JobMaster 的資源請求,如果當前有空閑的 Slot,則將 Slot 分配給 JobMaster,否則 YarnResourceManager 將向 Yarn Master(Yarn 集群的 ResourceManager) 請求創建 TaskManager。

(3)YarnResourceManager 將資源請求加入等待請求隊列,並通過心跳向 YARN RM 申請新的 Container 資源來啟動 TaskManager 進程;Yarn 分配新的 Container 給 TaskManager。

(4)YarnResourceManager 從 HDFS 載入 Jar 文件等所需的相關資源,在容器中啟動 TaskManager。

(5)TaskManager 啟動之後,向 YarnResourceManager 進行註冊,並把自己的 Slot 資源情況彙報給 YarnResourceManager 。

(6)YarnResourceManager 從等待隊列中取出 Slot 請求,向 TaskManager 確認資源可用情況,並告知 TaskManager 將 Slot 分配給了哪個 JobMaster。

(7)TaskManager 向 JobMaster 提供 Slot,JobMaster 調度 Task 到 TaskManager 的此 Slot 上執行。

至此,作業進入執行階段。

Yarn Per-Job 提交流程

Yarn Per-Job 模式提交作業與 Yarn-Session 模式提交作業基本類似。Per-Job 模式下,JobGraph 和集群資源請求一起提交給 Yarn。

  1. 啟動集群

    (1)使用 flink run -m yarn-cluster 提交 Per-Job 模式的作業。

    (2)Yarn 啟動 Flink 集群。該模式下 Flink 集群的啟動入口是 YarnJobClusterEntryPoint,其它與 Yarn-Session 模式啟動類似。

  2. 作業提交

    該步驟與 Session 模式下的不同之處在於,Client 並不會通過 Rest 向 Dispacher 提交 JobGraph,由 Dispacher 從本地文件系統獲取 JobGraph,其後的步驟與 Session 模式一樣。

  3. 作業調度執行

    與 Yarn-Session 模式下一致。

流處理的轉換過程

StreamGraph

使用 DataStream API 開發的應用程式,首先被轉換為 Transformation,然後被映射為 StreamGraph。

我們以熟知的 WordCount 程式為例,它的 StreamGraph 如下圖所示。

從圖中我們可以看到,StreamGraph 是由 StreamNode 和 StreamEdge 構成。

  • StreamNode

    StreamNode 是 StreamGraph 中的節點,從 Transformation 轉換而來,可以簡單理解為一個 StreamNode 表示為一個運算元;從邏輯上來說,StreamNode 在 StreamGraph 中存在實體和虛擬的 StreamNode。StreamNode 可以有多個輸入,也可以有多個輸出。

    實體的 StreamNode 會最終變為物理的運算元。虛擬的 StreamNode 會附著在 StreamEdge 上。

  • StreamEdge

    StreamEdge 是 StreamGraph 中的邊, 用來連接兩個 StreamNode,一個 StreamNode 可以有多個出邊、入邊。 StreamEdge 中包含了盤路輸出、分區器、欄位篩選輸出等的信息。

作業圖

JobGraph 可以由流計算的 StreamGraph 轉換而來。
流計算中,在 StreamGraph 的基礎上進行了一些優化,如通過 OperatorChain 機制將運算元合併起來,在執行時,調度在同一個 Task 線程上,避免數據的跨線程、跨網路的傳遞。

從 JobGraph 的圖裡可以看到,數據從上一個運算元流到下一個運算元的過程中,上游作為生產者提供了中間數據集(IntermediateDateSet),而下游作為消費者需要 JobEdge。JobEdge 是一個通信管道,連接了上游生產的中間數據集和 JobVertex 節點。

JobGraph 的核心對象是 JobVertex、JobEdge 和 IntermediateDateSet。

  • JobVertex

    經過運算元融合優化後符合條件的多個 StreamNode 可能會融合在一起生成一個 JobVertex,即一個 JobVertex 包含一個或多個運算元,JobVertex 的輸入是 JobEdge,輸出是 IntermediateDateSet。

  • JobEdge

    JobEdge 是 JobGraph 中連接 IntermediateDateSet 和 JobVertex 的邊,表示 JobGraph 中的一個數據流轉通道,其上游數據源是 IntermediateDateSet,下游消費者是 JobVertex ,即數據通過 JobEdge 由 IntermediateDateSet 傳遞給目標 JobVertex 。

    JobEdge 中的數據分發模式會直接影響執行時 Task 之間的數據連接關係,是點對點連接還是全連接。

  • IntermediateDateSet

    中間數據集 IntermediateDataSet 是一種邏輯結構,用來表示 JobVertex 的輸出,即該 JobVertex 中包含的運算元會產生的數據集。不同的執行模式下,其對應的結果分區類型不同,決定了在執行時刻數據交換的模式。

    IntermediateDataSet 的個數與該 JobVertex 對應的 StreamNode 的出邊數量相同,可以是一個或者多個。

執行圖

ExecutionGraph 是調度 Flink 作業執行的核心數據結構,包含了作業中所有並行執行的 Task 的信息、Task 之間的關聯關係、數據流轉關係等。

StreamGraph、JobGraph 在 Flink 客戶端中生成,然後提交給 Flink 集群。JobGraph 到 ExecutionGraph 的轉換在 JobMaster 中完成。在轉化過程中,有如下重要變化。

  • 加入了並行度的概念,成為真正可調度的圖結構。
  • 生成了與 JobVertex 對應的 ExecutionJobVertex 和 ExecutionVertex,與IntermediateDataSet 對應的 IntermediateResult 和 IntermediateResultPartition 等。

生成的圖如下圖所示。

ExecutionGraph 的核心對象有 ExecutionJobVertex 、ExecutionVertex、IntermediateResult 、IntermediateResultPartition、ExecutionEdge 和 Execution。

  • ExecutionJobVertex

    該對象和 JobGraph 中的 JobVertex 一一對應。該對象還包含一組 ExecutionVertex,數量與該 JobVertex 中所包含的 StreamNode 的並行度一致,假設 StreamNode 的並行度為3,那麼 ExecutionJobVertex 也會包含 3個 ExecutionVertex。

  • ExecutionVertex

    ExecutionJobVertex 中會對作業進行並行化處理,構造可以並行執行的實例,每一個並行執行的實例就是 ExecutionVertex。

    構造 ExecutionVertex 的同時,也會構建 ExecutionVertex 的輸出 IntermediateResult。

  • IntermediateResult

    IntermediateResult 又叫中間結果集,該對象是個邏輯概念,表示 ExecutionJobVertex 的輸出,和 JobVertex 中的 IntermediateDataSet 一一對應,同樣,一個ExecutionJobVertex 可以有多個中間結果,取決於當前 JobVertex 有幾個出邊(JobEdge)

    一個中間結果包含多個中間結果分區 IntermediateResultPartition,其個數等於該 JobVertex 的併發度,或者叫作運算元的並行度。

  • IntermediateResultPartition

    IntermediateResultPartition 又叫作中間結果分區,表示一個 ExecutionVertex 的輸出結果,與 ExecutionEdge 相關聯。

  • ExecutionEdge

    表示 ExecutionVertex 的輸入,連接到上游產生的 IntermediateResultPartition 。

  • Execution

    ExecutionVertex 相當於每個 Task 的模板,在真正執行的時候,會將 ExecutionVertex 中的信息包裝為一個 Execution,執行一個 ExecutionVertex 的一次嘗試。JobManager 和 TaskManager 之間關於 Task 的部署和 Task 的執行狀態的更新都是通過 ExecutionAttemptID 來標識實例的。在發生故障或者數據需要重算的情況下,ExecutionVertex 可能會有多個ExecutionAttemptID 。一個 Execution 通過 ExecutionAttemptID 來唯一標識。

總結

Flink 作業執行前需要提交 Flink 集群, Flink 集群可以與不同的資源框架(Yarn、K8s、Mesos 等)進行集成,可以按照不同的模式(Session 模式和 Per-Job模式)運行,所以在 Flink 作業提交過程中,可能在資源框架上啟動Flink集群。Flink 就緒之後,進入作業提交階段,在Flink客戶端進行StreamGraph、JobGraph的轉換,提交 JobGraph 到 Flink 集群,然後 Flink 集群負責將 JobGraph 轉換為 ExecutionGraph,之後進入調度執行階段。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • ClickHouse集群的搭建和部署和單機的部署是類似的,主要在於配置的不一致,如果需要瞭解ClickHouse單機的安裝設部署,可以看看這篇文章,ClickHouse(03)ClickHouse怎麼安裝和部署。 ClickHouse集群部署流程大概如下: 環境準備 在每台機器上安裝單機版Click ...
  • MySQL 高級 1、約束 1.1、約束介紹 什麼是約束 對錶中的數據進行限定,保證數據的正確性、有效性、完整性 約束類型 | 約束 | 說明 | | : : | : : | | PRIMARY KEY | 主鍵約束 | | UNIQUE | 唯一約束 | | NOT NULL | 非空約束 | | ...
  • 原文鏈接:批流一體數據集成工具ChunJun同步Hive事務表原理詳解及實戰分享 課件獲取:關註公眾號__ “數棧研習社”,後臺私信 “ChengYing”__ 獲得直播課件 視頻回放:點擊這裡 ChengYing 開源項目地址:github 丨 gitee 喜歡我們的項目給我們點個__ STAR! ...
  • dolphinscheduler單機化改造 轉載請註明出處: https://www.cnblogs.com/funnyzpc/p/16466920.html 前面 其實如果單機的話直接走standalone-server就可以了,簡單又快捷,如果更改的話 後期要升級可能又需要維護個版本才可,所以簡 ...
  • SQL Server 中的事務是什麼? 事務是應該作為一個單元執行的一組 SQL 語句。這意味著事務確保所有命令都成功或都不成功。如果事務中的命令之一失敗,則所有命令都失敗,並且在資料庫中修改的任何數據都將回滾。 比如您在做一個銀行轉賬操作,這涉及了2個操作, 扣款和收款, 必須保證這2個操作都成功 ...
  • 前言 本文章做了把mysql表從壓縮表過渡到普通表的實驗過程,看看壓縮表變成普通表會發生什麼?本文針對mysql5.7和mysql8分別進行了實驗。 1、什麼是表壓縮 在介紹壓縮表變成普通表前,首先給大家普及下,什麼是表壓縮。 表壓縮,意思是使表中的數據以壓縮格式存儲,壓縮能夠顯著提高處理速度和壓縮 ...
  • 曾幾何時,“併發高就分庫,數據大就分表”已經成了處理 MySQL 數據增長問題的聖經。 面試官喜歡問,博主喜歡寫,候選人也喜歡背,似乎已經形成了一個閉環。 但你有沒有思考過,分庫分表真的適合你的系統嗎? 分表 在業務剛剛發展起來的時候,流量全部打到了一個 MySQL 上,用戶信息全落到了 user ...
  • 註:筆記旨在記錄 四、MySQL 聚合函數 | \ | 函 數 名 稱 | 作 用 | 完 成 | |: :|: |: |: :| | 1 | MAX | 求最大值 | 勾 | | 2 | MIN | 求最小值 | 勾 | | 3 | COUNT | 求數量 | 勾 | | 4 | BIT_COUN ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...