從5分鐘到60秒,袋鼠雲數棧在熱重啟技術上的提效探索之路

来源:https://www.cnblogs.com/DTinsight/archive/2023/03/09/17198877.html
-Advertisement-
Play Games

更好地提高效率一直以來是袋鼠雲數棧產品的主要目標之一。當前數棧客戶的實時任務都是基於 Per-Job 模式運行的,客戶在進行一些任務參數的修改之後,只能先取消當前任務,再選擇 CheckPoint 恢復或者重新運行,整個過程需要3-5分鐘,比較浪費時間。為了達到提高效率的目的,我們針對 Per-Jo ...


更好地提高效率一直以來是袋鼠雲數棧產品的主要目標之一。當前數棧客戶的實時任務都是基於 Per-Job 模式運行的,客戶在進行一些任務參數的修改之後,只能先取消當前任務,再選擇 CheckPoint 恢復或者重新運行,整個過程需要3-5分鐘,比較浪費時間。為了達到提高效率的目的,我們針對 Per-Job 任務的整體流程分析,進行了相關探索。

下文和大家聊聊數棧在熱重啟技術方面的探索之路。

熱重啟是什麼?

熱重啟技術旨在復用當前 Per-Job 集群的相關資源,減少重新創建集群以及申請資源的耗時,同時通過 CheckPoint 機制保障數據的正確性

Flink 的 Per-Job 模式是指每個任務都會對應一個獨立的 Flink 集群。在任務提交的時候,會創建一個 Flink 集群進行任務的運行,整個集群只為這一個任務進行服務。同時 Flink 集群不允許繼續提交任務,導致任務修改之後,只能 Cancel 當前任務。重新提交修改後的任務,創建一個新的 Flink 集群進行運行。

經過分析,耗時主要是由於以下兩部分原因造成:

• Client 需要在 Yarn 上啟動一個 Flink 集群,這一部分是客戶端耗時最多的部分,因為這一部分包括上傳 jar,上傳文件到Hdfs 上,申請資源啟動 Flink 集群,都是比較耗時的步驟

• 集群運行的時候需要申請資源等操作也十分耗時

我們思考如果僅僅是一些任務參數或者 Sql 邏輯的修改,而不涉及代碼上的修改,那麼 PerJob 任務是否可以類似 Session 模式進行改造,支持 JobGraph 的重新提交,解決 Client 需要啟動一個 Flink 集群的耗時問題,大大提高提交效率。

同時復用了整個 Flink 集群的資源,如果並行度改變,只需要申請新增加的資源,已有的資源不需要再重覆向 Yarn 的 Resourcemanager 申請。

熱重啟改造後的流程

Flink 中 Per-Job 任務運行的整體流程大概如下所示:

客戶端流程

• Client 端創建 JobGraph

• 上傳 JobGraph 到 hdfs 里

• 通過 YarnClient 提交一個 YarnApplication,運行一個 Flink 任務

• 獲取結果

• 啟動 Flink 集群,啟動 WebMonitor,ResourceManager,Dispatcher 組件

• Client 端上傳到遠程文件服務里的 JobGraph 會被反序列出來由 DIspatcher 持有;

• DIspatcher 會根據此 JobGraph 創建 JobManagerRunner 對象進行運行;

• JobManagerRunner 會交由內部的 ScheduleNg 進行調度運行任務:

a.構建 ScheduleNg 時,會將 JobGraph 轉為ExecutionGraph

b. ScheduleNg 根據 ExecutionGraph 進行調度,運行任務

• 任務運行,等待任務運行結束,進行相應的回調處理

file

從上圖我們可以看出,一個 Per-Job 任務的運行主要包括兩部分:一部分是客戶端上傳文件 jar 等操作後,直接上傳任務到 Yarn 上進行 Flink 任務的啟動,第二部分是Flink集群的啟動,然後對客戶端上傳到遠程文件的 JobGraph 進行處理。因此為了優化 Per-Job 下的效率,我們對這兩部分進行了改造。

想法邏輯是,集群首先改造支持 JobGraph 的重新提交,然後 DIspatcher 處理 JobGraph 的時候,不會創建新的 JobMaster ,而是將當前現有的 JobGraph 里的一些信息填充到新的 JobGraph 里,比如當前任務的 CheckPoint 信息等。任務最終的調度運行是 JobMaster 里的 ScheduleNg 對象。因此我們認為只需要將 ScheduleNg 重新構建,其餘的組件都可以復用。

下圖即為我們熱重啟技術改造後的一個大致流程:

file

熱重啟技術改造後流程

• WebMonitor 支持任務的提交

• DIspatcher 將新的 JobGraph 緩存

• 取消當前任務,等待非同步回調

• 返回結果給客戶端

• 在任務取消的非同步回調里主要是熱重啟的重點改造部分:

a.判斷當前是否有新的 JobGraph 緩存,有的話進入熱重啟邏輯,無則走當前現有邏輯

b.獲取取消任務的 CheckPoint 信息,填充到新的 JobGraph 里

c.將 jobGrap 更新到 JobMaster 里,清理以前 JobGraph 的緩存信息

d.把 JobMaster 里 SlotPool 管理的資源釋放掉

e.JobMaster 重新創建 ScheduleNg 並調度運行,至此新的 JobGraph 就被成功調度運行了

熱重啟改造部分詳解

JobGraph 介紹

在上述流程中,JobGraph 是整體流轉的主要對象,後續的一切操作都是圍繞著 JobGraph 進行處理,所以這裡先對 JobGraph 進行介紹。

JobGraph 是 Flink 作業的內部表示,是一個有向無環圖(DAG),主要是將一些可以優化的運算元節點合併為一個節點。從下圖可知,一個完整的 JobGraph 圖包含了 Source Sink Transform 節點,以及節點的輸出 IntermrdiateDataset 和輸入邊 JobEdge 。在除了 Application 模式外,其餘的提交模式下,JobGraph 是在 Client 創建的,然後通過 Rest 請求提交給 Flink 集群進行處理。

file

看完 JobGraph 此類結構,可以得出以下這些信息:

· taskVertices:上圖中的每個頂點對應一個 jobVertex,taskVertices 維護了 jobGraph 圖裡的各個 jobVertex

· snapshotSettings:checkponit 相關的配置信息,如 CheckPoint 的間隔時間等

· savepointRestoreSettings:任務恢復的 checkpoint 文件信息,熱重啟中,新的 jobGraph 會將上一個任務的 checkPoint 位點信息填充到這個參數里,新的任務會在 CheckPoint 位點處進行恢復運行

· jobConfiguration:整個 job 的相關配置信息

· userJars & calsspath:任務運行過程中需要的一些 jar 以及 classpath 相關信息

其中 JobVertex 是 jobGraph 里非常重要的對象,再看下此類結構:JobVertex 主要存儲了JobEdge以及 IntermediateDataSet 和並行度等相關信息。對於一個 JobVertex 來說,IntermediateDataSet 是作為 JobVertex 的輸出,而 JobEdge 是其輸入。

file

WebMonitor 改造

WebMonitor 組件是 Flink 的 Web 端點,可以通過 Rest Api 進行 Flink 集群的狀態、任務、指標等信息的查詢,同時支持任務的提交、取消、觸發 SavePoint 等操作。

Per-Job 模式下 Flink 集群是不支持客戶端繼續提交任務運行的,因此需要對 WebMonitor 進行改造,類似 Session 下支持同一個 Flink 集群能繼續提交 JobGraph 並運行。

從下圖可以看出 WebMonitor 組件啟動時,其本質是 Netty 為核心的一個 Web 端點。啟動時的主要流程如下:

• 創建 Router,管理 http 請求和處理器 handler 的映射關係

• initializeHandlers 初始化所有的 handler,不同的集群對應的 WebMonitor 提供的 API 功能不同,所以 handlers 也是不同的

• 將 handlers 註冊到 router,完成 URL 以及請求方式(GET,POST,DELETE,PUT)和 Handler 的映射關係

• 創建一個 Netty 的 handler,包裝下 router,然後註冊到 Netty 的 pipeline 里

file

WebMonitor 支持的各種 Rest 請求其實最終是交給一個個的 handler 進行處理,通過 Router 對這些 handler 進行維護,其內部維護了一個 url 以及 Rest 請求方式與 handler 的映射關係。接收 Client 端的 Rest 請求之後,Router 找到對應的處理器 handler,交由 handler 進行最終的處理並返回結果。

因為 Per-job 集群是不支持 Client 端繼續提交任務的,所以其 initializeHandlers 方法初始化出的 handlers 不包含處理任務提交的 handler,導致 router 找不到對應的 handler 報錯,因此需要在 initializeHandlers 里將處理任務提交的 handler 註冊進去 。

file

JobSubmitHandler 處理請求的主要邏輯如下圖所示。核心是從 Rest 請求的 Body 里反序列化得到 JobGraph,反序列化獲取的 Jobgraph 通過 DIspatcherGateway 發送給 Dispatcher 進行後續提交處理。

file

這樣 Client 端只需要重新生成 JobGraph 然後提交即可,避免了重新上傳 jar 到 hdfs,以及避免浪費重新向 yarn 集群申請資源啟動 AppMaster 的時間。

Dispatcher 改造

DisPatcher 顧名思義是一個分發器,其主要功能是 Flink 集群接收到關於 Job 的提交、取消、觸發 SavePoint 等操作,分發到對應的各個 JobMaster 進行處理,或者創建新的 JobMaster 進行任務的運行。

DisPatcher 處理任務提交的核心流程是根據 JobGraph 創建一個 JobManagerRunner 對象並啟動,然後將其包裝成一個 DispatcherJob 緩存在內部。任務的具體調度執行交由創建的 JobManagerRunner 進行非同步處理。

JobManagerRunner 其內部的具體操作其實是 JobMasterService,主要實現類就是 JobMaster。JobMaster 內部有兩個主要對象分別是:

· ScheduleNg: 負責 JobGraph 轉為 ExecutionGraph,然後對 Job 進行調度運行

· SlotPool:負責 Slot 資源的申請以及管理

以上便是 Dispatcher 處理的主要流程。當前改造之後只是支持了任務的重新提交運行,但是新的任務仍然是對應一個新的 JobMaster,其實就是一個類似 Session 的處理,所以為了達到熱重啟的效果,需要進行以下的改造。

主流程的改造邏輯如下:

• 增加了一個 hotRestartJobGraph 欄位,將新的 JobGraph 對象賦予此欄位

• Dispatcher 將緩存的正在運行的任務 cancel,對非同步返回結果進行回調處理

• 直接返回 Client 結果

因為 Flink 整體是非同步處理的,源碼里充滿了大量的 CompletableFuture 回調的處理,主流程僅僅對提交的 JobGraph 進行了一個緩存處理,熱重啟的主要步驟在任務取消的回調里進行處理:

• 判斷 hotRestartJobGraph 是否為空,如果不為空則進行熱重啟處理,為空則用以前的邏輯,整個 Per-job 集群關閉

• 獲取取消任務的最後一個 CheckPoint 位點

• 將 CheckPoint 位點信息填充到新的 Jobgraph 里

• 反射將上一個 Jobgraph 生成的 JobManagerRunner 和 jobMaster 兩個對象的JobGraph 欄位用新的 JobGraph 替換掉

• jobMaster 對象根據 jobGraph 重新生成 scheduleNg 進行調度運行

• jobMaster 的 slotPool 在心跳周期內,會緩存已經釋放掉的 slot,需要把這部分緩存清空

• MiniDispatcher 的 close 方法修改下,如果 hotRestartJobGraph 不為空則不進行集群的關閉

• hotRestartJobGraph 置空

註意上述只是主要的一些改造地方,其餘一些邊緣的細節處理就不再進行贅述。

所以在熱重啟中,DIspatcher 是不會對每一個 JobGraph 創建新的 JobMaster 對象。通過將新的 JobGraph 更新到 JobMaster 里,內部僅僅 ScheduleNg 進行了重新構建,其餘的組件都進行了復用,比如 SlotPool。

ScheduleNg 之所以需要重新構建是因為 JobGraph 轉為 ExecutionGraph 是需要 ScheduleNg 在構建的時候創建的,因此需要重新構建一個 ScheduleNg 進行任務的調度執行,這樣達到了整個資源的復用性,大大提升了效率。

Slot 資源的復用

Flink 中對於資源的抽象主要是 Slot,其各個組件對 Slot 的管理是由不同的組件處理的:

· Flink 的 ResourceManager 里是 SlotManager 管理,主要是任務的資源申請以及管理

· JobMaster 里管理 Slot 是 SlotPool ,主要是對當前任務申請的 slot 進行管理

· TaskExecutor 里則是S lotTable 對 Slot 進行管理,維護 JobId 和 Slot 的關係

在熱重啟中,上一個任務取消之後,JobMaster 里 SlotPool 管理的 Slot 狀態由已分配改為可用。這樣在 JobMaster 通過新的 ScheduleNg 進行重新調度,會復用 SlotPool 里緩存的 Slot,這個時候其實是有問題的。在 TaskExecutor 接收到任務的時候會報錯,在其內部的 JobTable 里找不到新任務的 JobId,因為此時 TaskExecutor 維護的 Jobid 還是上一個任務的。

所以 JobMaster 的 SlotPool 需要釋放掉其內部緩存信息,註意只是清理內部緩存,此時 TaskManager 的 Slot 槽資源還沒被釋放,仍然被 Resourcemanager 的 SlotManager 管理著。這樣 SlotPool 發現內部沒可用的 Slot 槽就會和 ResourceManager 的 SlotManager 申請資源,SlotManager 就仍然復用了以前的 Slot 槽並且將新的 JobGraph 的 jobId 通過 rpc 請求註冊進了 TaskExecutor。從而達到了 slot 槽資源的復用,減少了 Flink 集群的 ResourceManager 重新向 Yarn 的 ResourceManager 申請資源。

總結

數棧在 Per-job 模式下,為了儘快看到任務修改後的效果,在業務允許情況下,通過熱重啟技術復用相關資源,減少了大量時間,極大地提高了效率。在開發驗證中,以前一個任務等待任務結束以及重新提交運行總流程超過4分鐘,但是在熱重啟情況下控制在1分鐘以內就已經可以進行調度執行。

未來我們將會把熱重啟的場景進一步豐富,支持更多場景下的熱重啟技術,如 jar 的代碼修改,如何更新環境里的 jar,支持 k8s 場景等。

袋鼠雲一直以來高度重視產品升級和用戶體驗,用誠心傾聽用戶需求,新的一年我們將繼續保持產品升級節奏,以提效為目標滿足不同行業用戶的更多需求。為了更好的產品,更佳的用戶體驗,數棧一直在路上。
《數據治理行業實踐白皮書》下載地址:https://fs80.cn/380a4b

想瞭解或咨詢更多有關袋鼠雲大數據產品、行業解決方案、客戶案例的朋友,瀏覽袋鼠雲官網:https://www.dtstack.com/?src=szbky

同時,歡迎對大數據開源項目有興趣的同學加入「袋鼠雲開源框架釘釘技術qun」,交流最新開源技術信息,qun號碼:30537511,項目地址:https://github.com/DTStack


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 前言 ASP.NET Core中有很多RateLimit組件,.NET 7甚至推出了官方版本。不過這些組件的主要目標是限制客戶端訪問服務的頻率,在HTTP伺服器崩潰前主動拒絕部分請求。如果請求沒有被拒絕服務會儘可能調用資源儘快處理。 現在有一個問題,有什麼辦法限制響應的發送速率嗎?這在一些需要長時間 ...
  • 腳註,是可以附在文章頁面的最底端的,對某些東西加以說明,印在書頁下端的註文。腳註和章節附註是對文本的補充說明。腳註一般位於頁面的底部,可以作為文檔某處內容的註釋。常用在一些說明書、標書、論文等正式文書用來引註的內容。這篇文章將為您展示如何通過C#/VB.NET代碼,以編程方式在Word中插入或刪除腳註。 ...
  • 自動化測試環境的搭建 :一、安裝 selenium: 安裝方式一: pip install -U selenium 安裝方式二: 手動安裝 selenium: 1、安裝python包,選擇全部組件(pip、安裝過程中配置環境變數)解壓selenium-4.8.2.tar.gz,然後用cmd進入解壓目 ...
  • 一、什麼是Vim Vim是一個類似於Vi的著名的功能強大、高度可定製的文本編輯器,在Vi的基礎上改進和增加了很多特性。代碼補全、編譯及錯誤跳轉等方便編程的功能特別豐富,在程式員中被廣泛使用。和Emacs併列成為類Unix系統用戶最喜歡的文本編輯器。 二、Vim鍵點陣圖 三、Vim的三種模式 Vim 有 ...
  • 一、引言 在之前圍繞STM32的GPIO的基本結構進行了介紹,圖1為STM32的5V容忍的GPIO口內部基本結構圖,圖2為GPIO的基本結構中各個模塊部分的概述。 閱讀GPIO基本結構的內容能夠對GPIO的工作模式有更深的瞭解。正是由於GPIO的結構中包含了多樣性的電路和模塊,因此進行合理的配置組合 ...
  • 1 文件目錄指令 1 pwd 顯示當前目錄的絕對路徑。 說明當前位置在/home/sora 2 cd 切換到指定目錄。 cd ~ 切換到當前用戶的家目錄:如果當前用戶為root,會切換到/root/,如果當前用戶為普通用戶名字叫做A,會切換到/home/A/。 cd .. 切換到上一級目錄 3 mk ...
  • 可以使用以下方法將Win32視窗設置為透明: 定義視窗類時,在WNDCLASSEX結構體中設置hbrBackground成員為NULL。 在視窗創建時,使用WS_EX_LAYERED風格和SetLayeredWindowAttributes函數將視窗設置為透明: HWND hwnd = Create ...
  • 1 系統運行級別 0:關機1:單用戶【找回丟失密碼】 2:多用戶狀態沒有網路服務3:多用戶狀態有網路服務 4:系統未使用保留給用戶5:圖形界面 6:系統重啟 其中,最常用的為3和5。 有關命令: (1)init :切換不同運行狀態 從 圖形界面 切換 為多用戶狀態有網路服務。 (2)systemct ...
一周排行
    -Advertisement-
    Play Games
  • 概述:在C#中,++i和i++都是自增運算符,其中++i先增加值再返回,而i++先返回值再增加。應用場景根據需求選擇,首碼適合先增後用,尾碼適合先用後增。詳細示例提供清晰的代碼演示這兩者的操作時機和實際應用。 在C#中,++i 和 i++ 都是自增運算符,但它們在操作上有細微的差異,主要體現在操作的 ...
  • 上次發佈了:Taurus.MVC 性能壓力測試(ap 壓測 和 linux 下wrk 壓測):.NET Core 版本,今天計劃準備壓測一下 .NET 版本,來測試並記錄一下 Taurus.MVC 框架在 .NET 版本的性能,以便後續持續優化改進。 為了方便對比,本文章的電腦環境和測試思路,儘量和... ...
  • .NET WebAPI作為一種構建RESTful服務的強大工具,為開發者提供了便捷的方式來定義、處理HTTP請求並返迴響應。在設計API介面時,正確地接收和解析客戶端發送的數據至關重要。.NET WebAPI提供了一系列特性,如[FromRoute]、[FromQuery]和[FromBody],用 ...
  • 原因:我之所以想做這個項目,是因為在之前查找關於C#/WPF相關資料時,我發現講解圖像濾鏡的資源非常稀缺。此外,我註意到許多現有的開源庫主要基於CPU進行圖像渲染。這種方式在處理大量圖像時,會導致CPU的渲染負擔過重。因此,我將在下文中介紹如何通過GPU渲染來有效實現圖像的各種濾鏡效果。 生成的效果 ...
  • 引言 上一章我們介紹了在xUnit單元測試中用xUnit.DependencyInject來使用依賴註入,上一章我們的Sample.Repository倉儲層有一個批量註入的介面沒有做單元測試,今天用這個示例來演示一下如何用Bogus創建模擬數據 ,和 EFCore 的種子數據生成 Bogus 的優 ...
  • 一、前言 在自己的項目中,涉及到實時心率曲線的繪製,項目上的曲線繪製,一般很難找到能直接用的第三方庫,而且有些還是定製化的功能,所以還是自己繪製比較方便。很多人一聽到自己畫就害怕,感覺很難,今天就分享一個完整的實時心率數據繪製心率曲線圖的例子;之前的博客也分享給DrawingVisual繪製曲線的方 ...
  • 如果你在自定義的 Main 方法中直接使用 App 類並啟動應用程式,但發現 App.xaml 中定義的資源沒有被正確載入,那麼問題可能在於如何正確配置 App.xaml 與你的 App 類的交互。 確保 App.xaml 文件中的 x:Class 屬性正確指向你的 App 類。這樣,當你創建 Ap ...
  • 一:背景 1. 講故事 上個月有個朋友在微信上找到我,說他們的軟體在客戶那邊隔幾天就要崩潰一次,一直都沒有找到原因,讓我幫忙看下怎麼回事,確實工控類的軟體環境複雜難搞,朋友手上有一個崩潰的dump,剛好丟給我來分析一下。 二:WinDbg分析 1. 程式為什麼會崩潰 windbg 有一個厲害之處在於 ...
  • 前言 .NET生態中有許多依賴註入容器。在大多數情況下,微軟提供的內置容器在易用性和性能方面都非常優秀。外加ASP.NET Core預設使用內置容器,使用很方便。 但是筆者在使用中一直有一個頭疼的問題:服務工廠無法提供請求的服務類型相關的信息。這在一般情況下並沒有影響,但是內置容器支持註冊開放泛型服 ...
  • 一、前言 在項目開發過程中,DataGrid是經常使用到的一個數據展示控制項,而通常表格的最後一列是作為操作列存在,比如會有編輯、刪除等功能按鈕。但WPF的原始DataGrid中,預設只支持固定左側列,這跟大家習慣性操作列放最後不符,今天就來介紹一種簡單的方式實現固定右側列。(這裡的實現方式參考的大佬 ...