從5分鐘到60秒,袋鼠雲數棧在熱重啟技術上的提效探索之路

来源:https://www.cnblogs.com/DTinsight/archive/2023/03/09/17198877.html
-Advertisement-
Play Games

更好地提高效率一直以來是袋鼠雲數棧產品的主要目標之一。當前數棧客戶的實時任務都是基於 Per-Job 模式運行的,客戶在進行一些任務參數的修改之後,只能先取消當前任務,再選擇 CheckPoint 恢復或者重新運行,整個過程需要3-5分鐘,比較浪費時間。為了達到提高效率的目的,我們針對 Per-Jo ...


更好地提高效率一直以來是袋鼠雲數棧產品的主要目標之一。當前數棧客戶的實時任務都是基於 Per-Job 模式運行的,客戶在進行一些任務參數的修改之後,只能先取消當前任務,再選擇 CheckPoint 恢復或者重新運行,整個過程需要3-5分鐘,比較浪費時間。為了達到提高效率的目的,我們針對 Per-Job 任務的整體流程分析,進行了相關探索。

下文和大家聊聊數棧在熱重啟技術方面的探索之路。

熱重啟是什麼?

熱重啟技術旨在復用當前 Per-Job 集群的相關資源,減少重新創建集群以及申請資源的耗時,同時通過 CheckPoint 機制保障數據的正確性

Flink 的 Per-Job 模式是指每個任務都會對應一個獨立的 Flink 集群。在任務提交的時候,會創建一個 Flink 集群進行任務的運行,整個集群只為這一個任務進行服務。同時 Flink 集群不允許繼續提交任務,導致任務修改之後,只能 Cancel 當前任務。重新提交修改後的任務,創建一個新的 Flink 集群進行運行。

經過分析,耗時主要是由於以下兩部分原因造成:

• Client 需要在 Yarn 上啟動一個 Flink 集群,這一部分是客戶端耗時最多的部分,因為這一部分包括上傳 jar,上傳文件到Hdfs 上,申請資源啟動 Flink 集群,都是比較耗時的步驟

• 集群運行的時候需要申請資源等操作也十分耗時

我們思考如果僅僅是一些任務參數或者 Sql 邏輯的修改,而不涉及代碼上的修改,那麼 PerJob 任務是否可以類似 Session 模式進行改造,支持 JobGraph 的重新提交,解決 Client 需要啟動一個 Flink 集群的耗時問題,大大提高提交效率。

同時復用了整個 Flink 集群的資源,如果並行度改變,只需要申請新增加的資源,已有的資源不需要再重覆向 Yarn 的 Resourcemanager 申請。

熱重啟改造後的流程

Flink 中 Per-Job 任務運行的整體流程大概如下所示:

客戶端流程

• Client 端創建 JobGraph

• 上傳 JobGraph 到 hdfs 里

• 通過 YarnClient 提交一個 YarnApplication,運行一個 Flink 任務

• 獲取結果

• 啟動 Flink 集群,啟動 WebMonitor,ResourceManager,Dispatcher 組件

• Client 端上傳到遠程文件服務里的 JobGraph 會被反序列出來由 DIspatcher 持有;

• DIspatcher 會根據此 JobGraph 創建 JobManagerRunner 對象進行運行;

• JobManagerRunner 會交由內部的 ScheduleNg 進行調度運行任務:

a.構建 ScheduleNg 時,會將 JobGraph 轉為ExecutionGraph

b. ScheduleNg 根據 ExecutionGraph 進行調度,運行任務

• 任務運行,等待任務運行結束,進行相應的回調處理

file

從上圖我們可以看出,一個 Per-Job 任務的運行主要包括兩部分:一部分是客戶端上傳文件 jar 等操作後,直接上傳任務到 Yarn 上進行 Flink 任務的啟動,第二部分是Flink集群的啟動,然後對客戶端上傳到遠程文件的 JobGraph 進行處理。因此為了優化 Per-Job 下的效率,我們對這兩部分進行了改造。

想法邏輯是,集群首先改造支持 JobGraph 的重新提交,然後 DIspatcher 處理 JobGraph 的時候,不會創建新的 JobMaster ,而是將當前現有的 JobGraph 里的一些信息填充到新的 JobGraph 里,比如當前任務的 CheckPoint 信息等。任務最終的調度運行是 JobMaster 里的 ScheduleNg 對象。因此我們認為只需要將 ScheduleNg 重新構建,其餘的組件都可以復用。

下圖即為我們熱重啟技術改造後的一個大致流程:

file

熱重啟技術改造後流程

• WebMonitor 支持任務的提交

• DIspatcher 將新的 JobGraph 緩存

• 取消當前任務,等待非同步回調

• 返回結果給客戶端

• 在任務取消的非同步回調里主要是熱重啟的重點改造部分:

a.判斷當前是否有新的 JobGraph 緩存,有的話進入熱重啟邏輯,無則走當前現有邏輯

b.獲取取消任務的 CheckPoint 信息,填充到新的 JobGraph 里

c.將 jobGrap 更新到 JobMaster 里,清理以前 JobGraph 的緩存信息

d.把 JobMaster 里 SlotPool 管理的資源釋放掉

e.JobMaster 重新創建 ScheduleNg 並調度運行,至此新的 JobGraph 就被成功調度運行了

熱重啟改造部分詳解

JobGraph 介紹

在上述流程中,JobGraph 是整體流轉的主要對象,後續的一切操作都是圍繞著 JobGraph 進行處理,所以這裡先對 JobGraph 進行介紹。

JobGraph 是 Flink 作業的內部表示,是一個有向無環圖(DAG),主要是將一些可以優化的運算元節點合併為一個節點。從下圖可知,一個完整的 JobGraph 圖包含了 Source Sink Transform 節點,以及節點的輸出 IntermrdiateDataset 和輸入邊 JobEdge 。在除了 Application 模式外,其餘的提交模式下,JobGraph 是在 Client 創建的,然後通過 Rest 請求提交給 Flink 集群進行處理。

file

看完 JobGraph 此類結構,可以得出以下這些信息:

· taskVertices:上圖中的每個頂點對應一個 jobVertex,taskVertices 維護了 jobGraph 圖裡的各個 jobVertex

· snapshotSettings:checkponit 相關的配置信息,如 CheckPoint 的間隔時間等

· savepointRestoreSettings:任務恢復的 checkpoint 文件信息,熱重啟中,新的 jobGraph 會將上一個任務的 checkPoint 位點信息填充到這個參數里,新的任務會在 CheckPoint 位點處進行恢復運行

· jobConfiguration:整個 job 的相關配置信息

· userJars & calsspath:任務運行過程中需要的一些 jar 以及 classpath 相關信息

其中 JobVertex 是 jobGraph 里非常重要的對象,再看下此類結構:JobVertex 主要存儲了JobEdge以及 IntermediateDataSet 和並行度等相關信息。對於一個 JobVertex 來說,IntermediateDataSet 是作為 JobVertex 的輸出,而 JobEdge 是其輸入。

file

WebMonitor 改造

WebMonitor 組件是 Flink 的 Web 端點,可以通過 Rest Api 進行 Flink 集群的狀態、任務、指標等信息的查詢,同時支持任務的提交、取消、觸發 SavePoint 等操作。

Per-Job 模式下 Flink 集群是不支持客戶端繼續提交任務運行的,因此需要對 WebMonitor 進行改造,類似 Session 下支持同一個 Flink 集群能繼續提交 JobGraph 並運行。

從下圖可以看出 WebMonitor 組件啟動時,其本質是 Netty 為核心的一個 Web 端點。啟動時的主要流程如下:

• 創建 Router,管理 http 請求和處理器 handler 的映射關係

• initializeHandlers 初始化所有的 handler,不同的集群對應的 WebMonitor 提供的 API 功能不同,所以 handlers 也是不同的

• 將 handlers 註冊到 router,完成 URL 以及請求方式(GET,POST,DELETE,PUT)和 Handler 的映射關係

• 創建一個 Netty 的 handler,包裝下 router,然後註冊到 Netty 的 pipeline 里

file

WebMonitor 支持的各種 Rest 請求其實最終是交給一個個的 handler 進行處理,通過 Router 對這些 handler 進行維護,其內部維護了一個 url 以及 Rest 請求方式與 handler 的映射關係。接收 Client 端的 Rest 請求之後,Router 找到對應的處理器 handler,交由 handler 進行最終的處理並返回結果。

因為 Per-job 集群是不支持 Client 端繼續提交任務的,所以其 initializeHandlers 方法初始化出的 handlers 不包含處理任務提交的 handler,導致 router 找不到對應的 handler 報錯,因此需要在 initializeHandlers 里將處理任務提交的 handler 註冊進去 。

file

JobSubmitHandler 處理請求的主要邏輯如下圖所示。核心是從 Rest 請求的 Body 里反序列化得到 JobGraph,反序列化獲取的 Jobgraph 通過 DIspatcherGateway 發送給 Dispatcher 進行後續提交處理。

file

這樣 Client 端只需要重新生成 JobGraph 然後提交即可,避免了重新上傳 jar 到 hdfs,以及避免浪費重新向 yarn 集群申請資源啟動 AppMaster 的時間。

Dispatcher 改造

DisPatcher 顧名思義是一個分發器,其主要功能是 Flink 集群接收到關於 Job 的提交、取消、觸發 SavePoint 等操作,分發到對應的各個 JobMaster 進行處理,或者創建新的 JobMaster 進行任務的運行。

DisPatcher 處理任務提交的核心流程是根據 JobGraph 創建一個 JobManagerRunner 對象並啟動,然後將其包裝成一個 DispatcherJob 緩存在內部。任務的具體調度執行交由創建的 JobManagerRunner 進行非同步處理。

JobManagerRunner 其內部的具體操作其實是 JobMasterService,主要實現類就是 JobMaster。JobMaster 內部有兩個主要對象分別是:

· ScheduleNg: 負責 JobGraph 轉為 ExecutionGraph,然後對 Job 進行調度運行

· SlotPool:負責 Slot 資源的申請以及管理

以上便是 Dispatcher 處理的主要流程。當前改造之後只是支持了任務的重新提交運行,但是新的任務仍然是對應一個新的 JobMaster,其實就是一個類似 Session 的處理,所以為了達到熱重啟的效果,需要進行以下的改造。

主流程的改造邏輯如下:

• 增加了一個 hotRestartJobGraph 欄位,將新的 JobGraph 對象賦予此欄位

• Dispatcher 將緩存的正在運行的任務 cancel,對非同步返回結果進行回調處理

• 直接返回 Client 結果

因為 Flink 整體是非同步處理的,源碼里充滿了大量的 CompletableFuture 回調的處理,主流程僅僅對提交的 JobGraph 進行了一個緩存處理,熱重啟的主要步驟在任務取消的回調里進行處理:

• 判斷 hotRestartJobGraph 是否為空,如果不為空則進行熱重啟處理,為空則用以前的邏輯,整個 Per-job 集群關閉

• 獲取取消任務的最後一個 CheckPoint 位點

• 將 CheckPoint 位點信息填充到新的 Jobgraph 里

• 反射將上一個 Jobgraph 生成的 JobManagerRunner 和 jobMaster 兩個對象的JobGraph 欄位用新的 JobGraph 替換掉

• jobMaster 對象根據 jobGraph 重新生成 scheduleNg 進行調度運行

• jobMaster 的 slotPool 在心跳周期內,會緩存已經釋放掉的 slot,需要把這部分緩存清空

• MiniDispatcher 的 close 方法修改下,如果 hotRestartJobGraph 不為空則不進行集群的關閉

• hotRestartJobGraph 置空

註意上述只是主要的一些改造地方,其餘一些邊緣的細節處理就不再進行贅述。

所以在熱重啟中,DIspatcher 是不會對每一個 JobGraph 創建新的 JobMaster 對象。通過將新的 JobGraph 更新到 JobMaster 里,內部僅僅 ScheduleNg 進行了重新構建,其餘的組件都進行了復用,比如 SlotPool。

ScheduleNg 之所以需要重新構建是因為 JobGraph 轉為 ExecutionGraph 是需要 ScheduleNg 在構建的時候創建的,因此需要重新構建一個 ScheduleNg 進行任務的調度執行,這樣達到了整個資源的復用性,大大提升了效率。

Slot 資源的復用

Flink 中對於資源的抽象主要是 Slot,其各個組件對 Slot 的管理是由不同的組件處理的:

· Flink 的 ResourceManager 里是 SlotManager 管理,主要是任務的資源申請以及管理

· JobMaster 里管理 Slot 是 SlotPool ,主要是對當前任務申請的 slot 進行管理

· TaskExecutor 里則是S lotTable 對 Slot 進行管理,維護 JobId 和 Slot 的關係

在熱重啟中,上一個任務取消之後,JobMaster 里 SlotPool 管理的 Slot 狀態由已分配改為可用。這樣在 JobMaster 通過新的 ScheduleNg 進行重新調度,會復用 SlotPool 里緩存的 Slot,這個時候其實是有問題的。在 TaskExecutor 接收到任務的時候會報錯,在其內部的 JobTable 里找不到新任務的 JobId,因為此時 TaskExecutor 維護的 Jobid 還是上一個任務的。

所以 JobMaster 的 SlotPool 需要釋放掉其內部緩存信息,註意只是清理內部緩存,此時 TaskManager 的 Slot 槽資源還沒被釋放,仍然被 Resourcemanager 的 SlotManager 管理著。這樣 SlotPool 發現內部沒可用的 Slot 槽就會和 ResourceManager 的 SlotManager 申請資源,SlotManager 就仍然復用了以前的 Slot 槽並且將新的 JobGraph 的 jobId 通過 rpc 請求註冊進了 TaskExecutor。從而達到了 slot 槽資源的復用,減少了 Flink 集群的 ResourceManager 重新向 Yarn 的 ResourceManager 申請資源。

總結

數棧在 Per-job 模式下,為了儘快看到任務修改後的效果,在業務允許情況下,通過熱重啟技術復用相關資源,減少了大量時間,極大地提高了效率。在開發驗證中,以前一個任務等待任務結束以及重新提交運行總流程超過4分鐘,但是在熱重啟情況下控制在1分鐘以內就已經可以進行調度執行。

未來我們將會把熱重啟的場景進一步豐富,支持更多場景下的熱重啟技術,如 jar 的代碼修改,如何更新環境里的 jar,支持 k8s 場景等。

袋鼠雲一直以來高度重視產品升級和用戶體驗,用誠心傾聽用戶需求,新的一年我們將繼續保持產品升級節奏,以提效為目標滿足不同行業用戶的更多需求。為了更好的產品,更佳的用戶體驗,數棧一直在路上。
《數據治理行業實踐白皮書》下載地址:https://fs80.cn/380a4b

想瞭解或咨詢更多有關袋鼠雲大數據產品、行業解決方案、客戶案例的朋友,瀏覽袋鼠雲官網:https://www.dtstack.com/?src=szbky

同時,歡迎對大數據開源項目有興趣的同學加入「袋鼠雲開源框架釘釘技術qun」,交流最新開源技術信息,qun號碼:30537511,項目地址:https://github.com/DTStack


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 前言 ASP.NET Core中有很多RateLimit組件,.NET 7甚至推出了官方版本。不過這些組件的主要目標是限制客戶端訪問服務的頻率,在HTTP伺服器崩潰前主動拒絕部分請求。如果請求沒有被拒絕服務會儘可能調用資源儘快處理。 現在有一個問題,有什麼辦法限制響應的發送速率嗎?這在一些需要長時間 ...
  • 腳註,是可以附在文章頁面的最底端的,對某些東西加以說明,印在書頁下端的註文。腳註和章節附註是對文本的補充說明。腳註一般位於頁面的底部,可以作為文檔某處內容的註釋。常用在一些說明書、標書、論文等正式文書用來引註的內容。這篇文章將為您展示如何通過C#/VB.NET代碼,以編程方式在Word中插入或刪除腳註。 ...
  • 自動化測試環境的搭建 :一、安裝 selenium: 安裝方式一: pip install -U selenium 安裝方式二: 手動安裝 selenium: 1、安裝python包,選擇全部組件(pip、安裝過程中配置環境變數)解壓selenium-4.8.2.tar.gz,然後用cmd進入解壓目 ...
  • 一、什麼是Vim Vim是一個類似於Vi的著名的功能強大、高度可定製的文本編輯器,在Vi的基礎上改進和增加了很多特性。代碼補全、編譯及錯誤跳轉等方便編程的功能特別豐富,在程式員中被廣泛使用。和Emacs併列成為類Unix系統用戶最喜歡的文本編輯器。 二、Vim鍵點陣圖 三、Vim的三種模式 Vim 有 ...
  • 一、引言 在之前圍繞STM32的GPIO的基本結構進行了介紹,圖1為STM32的5V容忍的GPIO口內部基本結構圖,圖2為GPIO的基本結構中各個模塊部分的概述。 閱讀GPIO基本結構的內容能夠對GPIO的工作模式有更深的瞭解。正是由於GPIO的結構中包含了多樣性的電路和模塊,因此進行合理的配置組合 ...
  • 1 文件目錄指令 1 pwd 顯示當前目錄的絕對路徑。 說明當前位置在/home/sora 2 cd 切換到指定目錄。 cd ~ 切換到當前用戶的家目錄:如果當前用戶為root,會切換到/root/,如果當前用戶為普通用戶名字叫做A,會切換到/home/A/。 cd .. 切換到上一級目錄 3 mk ...
  • 可以使用以下方法將Win32視窗設置為透明: 定義視窗類時,在WNDCLASSEX結構體中設置hbrBackground成員為NULL。 在視窗創建時,使用WS_EX_LAYERED風格和SetLayeredWindowAttributes函數將視窗設置為透明: HWND hwnd = Create ...
  • 1 系統運行級別 0:關機1:單用戶【找回丟失密碼】 2:多用戶狀態沒有網路服務3:多用戶狀態有網路服務 4:系統未使用保留給用戶5:圖形界面 6:系統重啟 其中,最常用的為3和5。 有關命令: (1)init :切換不同運行狀態 從 圖形界面 切換 為多用戶狀態有網路服務。 (2)systemct ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...