【深入淺出 Yarn 架構與實現】4-6 RM 行為探究 - 申請與分配 Container

来源:https://www.cnblogs.com/shuofxz/archive/2023/03/01/17169563.html
-Advertisement-
Play Games

本小節介紹應用程式的 ApplicationMaster 在 NodeManager 成功啟動並向 ResourceManager 註冊後,向 ResourceManager 請求資源(Container)到獲取到資源的整個過程,以及 ResourceManager 內部涉及的主要工作流程。 ...


本小節介紹應用程式的 ApplicationMaster 在 NodeManager 成功啟動並向 ResourceManager 註冊後,向 ResourceManager 請求資源(Container)到獲取到資源的整個過程,以及 ResourceManager 內部涉及的主要工作流程。

一、整體流程

整個過程可看做以下兩個階段的送代迴圈:

  • 階段1 ApplicationMaster 彙報資源需求並領取已經分配到的資源;
  • 階段2 NodeManager 向 ResourceManager 彙報各個 Container 運行狀態,如果 ResourceManager 發現它上面有空閑的資源,則進行一次資源分配,並將分配的資源保存到對應的 應用程式數據結構中,等待下次 ApplicationMaster 發送心跳信息時獲取(即階段1)。

image.png

一)AM 彙報心跳

1、ApplicationMaster 通過 RPC 函數 ApplicationMasterProtocol#allocate 向 ResourceManager 彙報資源需求(由於該函數被周期性調用,我們通常也稱之為“心跳”),包括新的資源需求描述、待釋放的 Container 列表、請求加入黑名單的節點列表、請求移除黑名單的節點列表等。

public AllocateResponse allocate(AllocateRequest request) {
	// Send the status update to the appAttempt.
    // 發送 RMAppAttemptEventType.STATUS_UPDATE 事件
	this.rmContext.getDispatcher().getEventHandler().handle(
	    new RMAppAttemptStatusupdateEvent(appAttemptId, request.getProgress()));
    
    // 從 am 心跳 AllocateRequest 中取出新的資源需求描述、待釋放的 Container 列表、黑名單列表
    List<ResourceRequest> ask = request.getAskList();
    List<ContainerId> release = request.getReleaseList();
    ResourceBlacklistRequest blacklistRequest = request.getResourceBlacklistRequest();

	// 接下來會做一些檢查(資源申請量、label、blacklist 等)

	// 將資源申請分割(動態調整 container 資源量)
    // Split Update Resource Requests into increase and decrease.
    // No Exceptions are thrown here. All update errors are aggregated
    // and returned to the AM.
    List<UpdateContainerRequest> increaseResourceReqs = new ArrayList<>();
    List<UpdateContainerRequest> decreaseResourceReqs = new ArrayList<>();
    List<UpdateContainerError> updateContainerErrors =
        RMServerUtils.validateAndSplitUpdateResourceRequests(rmContext,
            request, maximumCapacity, increaseResourceReqs,
            decreaseResourceReqs);

	// 調用 ResourceScheduler#allocate 函數,將該 AM 資源需求彙報給 ResourceScheduler
    // (實際是 Capacity、Fair、Fifo 等實際指定的 Scheduler 處理)
    allocation =
        this.rScheduler.allocate(appAttemptId, ask, release,
            blacklistAdditions, blacklistRemovals,
            increaseResourceReqs, decreaseResourceReqs);
}

2、ResourceManager 中的 ApplicationMasterService#allocate 負責處理來自 AM 的心跳請求,收到該請求後,會發送一個 RMAppAttemptEventType.STATUS_UPDATE 事件,RMAppAttemptImpl 收到該事件後,將更新應用程式執行進度和 AMLivenessMonitor 中記錄的應用程式最近更新時間。
3、調用 ResourceScheduler#allocate 函數,將該 AM 資源需求彙報給 ResourceScheduler,實際是 Capacity、Fair、Fifo 等實際指定的 Scheduler 處理。
CapacityScheduler#allocate 實現為例:

// CapacityScheduler#allocate
public Allocation allocate(ApplicationAttemptId applicationAttemptId,
    List<ResourceRequest> ask, List<ContainerId> release,
    List<String> blacklistAdditions, List<String> blacklistRemovals,
    List<UpdateContainerRequest> increaseRequests,
    List<UpdateContainerRequest> decreaseRequests) {

    // Release containers
	// 發送 RMContainerEventType.RELEASED
    releaseContainers(release, application);

    // update increase requests
    LeafQueue updateDemandForQueue =
        updateIncreaseRequests(increaseRequests, application);

    // Decrease containers
    decreaseContainers(decreaseRequests, application);

    // Sanity check for new allocation requests
    // 會將資源請求進行規範化,限制到最小和最大區間內,並且規範到最小增長量上
    SchedulerUtils.normalizeRequests(
        ask, getResourceCalculator(), getClusterResource(),
        getMinimumResourceCapability(), getMaximumResourceCapability());

    // Update application requests
    // 將新的資源需求更新到對應的數據結構中
    if (application.updateResourceRequests(ask)
        && (updateDemandForQueue == null)) {
      updateDemandForQueue = (LeafQueue) application.getQueue();
    }

    // 獲取已經為該應用程式分配的資源
    allocation = application.getAllocation(getResourceCalculator(),
                   clusterResource, getMinimumResourceCapability());
        
    return allocation;
}

4、ResourceScheduler 首先讀取待釋放 Container 列表,向對應的 RMContainerImpl 發送 RMContainerEventType.RELEASED 類型事件,殺死正在運行的 Container;然後將新的資源需求更新到對應的數據結構中,之後獲取已經為該應用程式分配的資源,並返回給 ApplicationMasterService。

二)NM 彙報心跳

1、NodeManager 將當前節點各種信息(container 狀況、節點利用率、健康情況等)封裝到 nodeStatus 中,再將標識節點的信息一起封裝到 request 中,之後通過RPC 函數 ResourceTracker#nodeHeartbeat 向 ResourceManager 彙報這些狀態。

// NodeStatusUpdaterImpl#startStatusUpdater
  protected void startStatusUpdater() {

    statusUpdaterRunnable = new Runnable() {
      @Override
      @SuppressWarnings("unchecked")
      public void run() {
        // ...
        Set<NodeLabel> nodeLabelsForHeartbeat =
                nodeLabelsHandler.getNodeLabelsForHeartbeat();
        NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID);

        NodeHeartbeatRequest request =
            NodeHeartbeatRequest.newInstance(nodeStatus,
                NodeStatusUpdaterImpl.this.context
                    .getContainerTokenSecretManager().getCurrentKey(),
                NodeStatusUpdaterImpl.this.context
                    .getNMTokenSecretManager().getCurrentKey(),
                nodeLabelsForHeartbeat);
          
        // 發送 nm 的心跳
        response = resourceTracker.nodeHeartbeat(request);

2、ResourceManager 中的 ResourceTrackerService 負責處理來自 NodeManager 的請 求,一旦收到該請求,會向 RMNodeImpl 發送一個 RMNodeEventType.STATUS_UPDATE 類型事件,而 RMNodelmpl 收到該事件後,將更新各個 Container 的運行狀態,併進一步向 ResoutceScheduler 發送一個 SchedulerEventType.NODE_UPDATE 類型事件。

// ResourceTrackerService#nodeHeartbeat
  public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
      throws YarnException, IOException {

    NodeStatus remoteNodeStatus = request.getNodeStatus();
    /**
     * Here is the node heartbeat sequence...
     * 1. Check if it's a valid (i.e. not excluded) node
     * 2. Check if it's a registered node
     * 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
     * 4. Send healthStatus to RMNode
     * 5. Update node's labels if distributed Node Labels configuration is enabled
     */
      
    // 前 3 步都是各種檢查,後面才是重點的邏輯
    // Heartbeat response
    NodeHeartbeatResponse nodeHeartBeatResponse =
        YarnServerBuilderUtils.newNodeHeartbeatResponse(
            getNextResponseId(lastNodeHeartbeatResponse.getResponseId()),
            NodeAction.NORMAL, null, null, null, null, nextHeartBeatInterval);
    // 這裡會 set 待釋放的 container、application 列表
    // 思考:為何只有待釋放的列表呢?分配的資源不返回麽? - 分配的資源是和 AM 進行交互的
    rmNode.setAndUpdateNodeHeartbeatResponse(nodeHeartBeatResponse);

    populateKeys(request, nodeHeartBeatResponse);

    ConcurrentMap<ApplicationId, ByteBuffer> systemCredentials =
        rmContext.getSystemCredentialsForApps();
    if (!systemCredentials.isEmpty()) {
      nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials);
    }

    // 4. Send status to RMNode, saving the latest response.
    // 發送 RMNodeEventType.STATUS_UPDATE 事件
    RMNodeStatusEvent nodeStatusEvent =
        new RMNodeStatusEvent(nodeId, remoteNodeStatus);
    if (request.getLogAggregationReportsForApps() != null
        && !request.getLogAggregationReportsForApps().isEmpty()) {
      nodeStatusEvent.setLogAggregationReportsForApps(request
        .getLogAggregationReportsForApps());
    }
    this.rmContext.getDispatcher().getEventHandler().handle(nodeStatusEvent);

3、ResourceScheduler 收到事件後,如果該節點上有可分配的空閑資源,則會將這些資源分配給各個應用程式,而分配後的資源僅是記錄到對應的數據結構中,等待 ApplicationMaster 下次通過心跳機制來領取。(資源分配的具體邏輯,將在後面介紹 Scheduler 的文章中詳細講解)。

三、總結

本篇分析了申請與分配 Container 的流程,主要分為兩個階段。
第一階段由 AM 發起,通過心跳向 RM 發起資源請求。
第二階段由 NM 發起,通過心跳向 RM 彙報資源使用情況。
之後就是,RM 根據 AM 資源請求以及 NM 剩餘資源進行一次資源分配(具體分配邏輯將在後續文章中介紹),並將分配的資源通過下一次 AM 心跳返回給 AM。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • RxEditor是一款開源企業級可視化低代碼前端,目標是可以編輯所有 HTML 基礎的組件。比如支持 React、VUE、小程式等,目前僅實現了 React 版。 RxEditor運行快照: 項目地址:https://github.com/rxdrag/rxeditor 演示地址( Vercel 部 ...
  • 如何畫好一張架構圖,要做好這件事情首先要回答的就是什麼是架構圖。我們日常工作中經常能看到各種各樣的架構圖,而且經常會發現大家對架構圖的理解各有側重。深入追究到這個問題,可能一下子還很難有一個具象的定義,如果我們把這個問題進行拆分,理解起來就會容易一點。 ...
  • 1. JPA 1.1. 性能直接受底層JDBC驅動性能的影響 1.2. 性能提升是通過改變實體類的位元組碼來實現的 1.2.1. 在類載入到JAR文件或者由JVM運行之前增強位元組碼的方法 1.2.1.1. 在編譯過程中完成的 1.2.1.2. 在實體類編譯完成後,它們會被傳遞給一個特定實現的後置處理器 ...
  • 一、前戲 在之前我們已經學會使用 pytest-html 插件生成 html 格式的測試報告: 1 # 第一步,安裝插件 2 pip install pytest-html 3 ​ 4 # 第二步,執行用例時使用 --html 參數 5 ## main 函數中執行 6 if __name__ == ...
  • MyBatis的關聯映射02 3.一對多 3.1基本介紹 mybatis – MyBatis 3 | XML 映射器 多對一關係也是一個基本的映射關係,多對一,也可以理解為一對多。例如: User--Pet:一個用戶可以有多只寵物 Dep--Emp:一個部門有多個員工 雙向的多對一關係:通過User ...
  • 背景 公司目前主要聚焦於視頻這個領域,利用視頻為媒體、文旅、會議等行業進行賦能。 既然聚焦於視頻領域,那麼視頻轉碼則是繞不開的話題。 為了降低成本,以及保證產品的核心能力,公司自建了一套轉碼系統。 轉碼服務除了儘可能多的相容業界的視頻格式外,轉碼的速度是另一個非常重要的指標。 因為視頻轉碼對用戶來說 ...
  • 一、固件使用背景 在執行測試用例時,我們常常需要在測試用例執行的前後去完成一些額外的操作。例如針對於 Web 測試,在用例執行前需要打開瀏覽器,完成用戶登錄等一系列前置操作;在用例執行完成後,要清除瀏覽器緩存,關閉瀏覽器...... Pytest 框架提供的固件機制(又稱為夾具)可以幫我們實現一系列 ...
  • 約定 口 = 1 Byte,用於具象化,方便眼睛對比長度; void = 空類型; C語言中,short、long、singned、unsigned都為說明符,可以改變標識符存儲大小; C語言中,在聲明整型變數時,如果聲明中已經有一個其他的(同等)說明符,那麼關鍵字int可以省略; C語言中,預設使 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...