【深入淺出 Yarn 架構與實現】4-6 RM 行為探究 - 申請與分配 Container

来源:https://www.cnblogs.com/shuofxz/archive/2023/03/01/17169563.html
-Advertisement-
Play Games

本小節介紹應用程式的 ApplicationMaster 在 NodeManager 成功啟動並向 ResourceManager 註冊後,向 ResourceManager 請求資源(Container)到獲取到資源的整個過程,以及 ResourceManager 內部涉及的主要工作流程。 ...


本小節介紹應用程式的 ApplicationMaster 在 NodeManager 成功啟動並向 ResourceManager 註冊後,向 ResourceManager 請求資源(Container)到獲取到資源的整個過程,以及 ResourceManager 內部涉及的主要工作流程。

一、整體流程

整個過程可看做以下兩個階段的送代迴圈:

  • 階段1 ApplicationMaster 彙報資源需求並領取已經分配到的資源;
  • 階段2 NodeManager 向 ResourceManager 彙報各個 Container 運行狀態,如果 ResourceManager 發現它上面有空閑的資源,則進行一次資源分配,並將分配的資源保存到對應的 應用程式數據結構中,等待下次 ApplicationMaster 發送心跳信息時獲取(即階段1)。

image.png

一)AM 彙報心跳

1、ApplicationMaster 通過 RPC 函數 ApplicationMasterProtocol#allocate 向 ResourceManager 彙報資源需求(由於該函數被周期性調用,我們通常也稱之為“心跳”),包括新的資源需求描述、待釋放的 Container 列表、請求加入黑名單的節點列表、請求移除黑名單的節點列表等。

public AllocateResponse allocate(AllocateRequest request) {
	// Send the status update to the appAttempt.
    // 發送 RMAppAttemptEventType.STATUS_UPDATE 事件
	this.rmContext.getDispatcher().getEventHandler().handle(
	    new RMAppAttemptStatusupdateEvent(appAttemptId, request.getProgress()));
    
    // 從 am 心跳 AllocateRequest 中取出新的資源需求描述、待釋放的 Container 列表、黑名單列表
    List<ResourceRequest> ask = request.getAskList();
    List<ContainerId> release = request.getReleaseList();
    ResourceBlacklistRequest blacklistRequest = request.getResourceBlacklistRequest();

	// 接下來會做一些檢查(資源申請量、label、blacklist 等)

	// 將資源申請分割(動態調整 container 資源量)
    // Split Update Resource Requests into increase and decrease.
    // No Exceptions are thrown here. All update errors are aggregated
    // and returned to the AM.
    List<UpdateContainerRequest> increaseResourceReqs = new ArrayList<>();
    List<UpdateContainerRequest> decreaseResourceReqs = new ArrayList<>();
    List<UpdateContainerError> updateContainerErrors =
        RMServerUtils.validateAndSplitUpdateResourceRequests(rmContext,
            request, maximumCapacity, increaseResourceReqs,
            decreaseResourceReqs);

	// 調用 ResourceScheduler#allocate 函數,將該 AM 資源需求彙報給 ResourceScheduler
    // (實際是 Capacity、Fair、Fifo 等實際指定的 Scheduler 處理)
    allocation =
        this.rScheduler.allocate(appAttemptId, ask, release,
            blacklistAdditions, blacklistRemovals,
            increaseResourceReqs, decreaseResourceReqs);
}

2、ResourceManager 中的 ApplicationMasterService#allocate 負責處理來自 AM 的心跳請求,收到該請求後,會發送一個 RMAppAttemptEventType.STATUS_UPDATE 事件,RMAppAttemptImpl 收到該事件後,將更新應用程式執行進度和 AMLivenessMonitor 中記錄的應用程式最近更新時間。
3、調用 ResourceScheduler#allocate 函數,將該 AM 資源需求彙報給 ResourceScheduler,實際是 Capacity、Fair、Fifo 等實際指定的 Scheduler 處理。
CapacityScheduler#allocate 實現為例:

// CapacityScheduler#allocate
public Allocation allocate(ApplicationAttemptId applicationAttemptId,
    List<ResourceRequest> ask, List<ContainerId> release,
    List<String> blacklistAdditions, List<String> blacklistRemovals,
    List<UpdateContainerRequest> increaseRequests,
    List<UpdateContainerRequest> decreaseRequests) {

    // Release containers
	// 發送 RMContainerEventType.RELEASED
    releaseContainers(release, application);

    // update increase requests
    LeafQueue updateDemandForQueue =
        updateIncreaseRequests(increaseRequests, application);

    // Decrease containers
    decreaseContainers(decreaseRequests, application);

    // Sanity check for new allocation requests
    // 會將資源請求進行規範化,限制到最小和最大區間內,並且規範到最小增長量上
    SchedulerUtils.normalizeRequests(
        ask, getResourceCalculator(), getClusterResource(),
        getMinimumResourceCapability(), getMaximumResourceCapability());

    // Update application requests
    // 將新的資源需求更新到對應的數據結構中
    if (application.updateResourceRequests(ask)
        && (updateDemandForQueue == null)) {
      updateDemandForQueue = (LeafQueue) application.getQueue();
    }

    // 獲取已經為該應用程式分配的資源
    allocation = application.getAllocation(getResourceCalculator(),
                   clusterResource, getMinimumResourceCapability());
        
    return allocation;
}

4、ResourceScheduler 首先讀取待釋放 Container 列表,向對應的 RMContainerImpl 發送 RMContainerEventType.RELEASED 類型事件,殺死正在運行的 Container;然後將新的資源需求更新到對應的數據結構中,之後獲取已經為該應用程式分配的資源,並返回給 ApplicationMasterService。

二)NM 彙報心跳

1、NodeManager 將當前節點各種信息(container 狀況、節點利用率、健康情況等)封裝到 nodeStatus 中,再將標識節點的信息一起封裝到 request 中,之後通過RPC 函數 ResourceTracker#nodeHeartbeat 向 ResourceManager 彙報這些狀態。

// NodeStatusUpdaterImpl#startStatusUpdater
  protected void startStatusUpdater() {

    statusUpdaterRunnable = new Runnable() {
      @Override
      @SuppressWarnings("unchecked")
      public void run() {
        // ...
        Set<NodeLabel> nodeLabelsForHeartbeat =
                nodeLabelsHandler.getNodeLabelsForHeartbeat();
        NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID);

        NodeHeartbeatRequest request =
            NodeHeartbeatRequest.newInstance(nodeStatus,
                NodeStatusUpdaterImpl.this.context
                    .getContainerTokenSecretManager().getCurrentKey(),
                NodeStatusUpdaterImpl.this.context
                    .getNMTokenSecretManager().getCurrentKey(),
                nodeLabelsForHeartbeat);
          
        // 發送 nm 的心跳
        response = resourceTracker.nodeHeartbeat(request);

2、ResourceManager 中的 ResourceTrackerService 負責處理來自 NodeManager 的請 求,一旦收到該請求,會向 RMNodeImpl 發送一個 RMNodeEventType.STATUS_UPDATE 類型事件,而 RMNodelmpl 收到該事件後,將更新各個 Container 的運行狀態,併進一步向 ResoutceScheduler 發送一個 SchedulerEventType.NODE_UPDATE 類型事件。

// ResourceTrackerService#nodeHeartbeat
  public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
      throws YarnException, IOException {

    NodeStatus remoteNodeStatus = request.getNodeStatus();
    /**
     * Here is the node heartbeat sequence...
     * 1. Check if it's a valid (i.e. not excluded) node
     * 2. Check if it's a registered node
     * 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
     * 4. Send healthStatus to RMNode
     * 5. Update node's labels if distributed Node Labels configuration is enabled
     */
      
    // 前 3 步都是各種檢查,後面才是重點的邏輯
    // Heartbeat response
    NodeHeartbeatResponse nodeHeartBeatResponse =
        YarnServerBuilderUtils.newNodeHeartbeatResponse(
            getNextResponseId(lastNodeHeartbeatResponse.getResponseId()),
            NodeAction.NORMAL, null, null, null, null, nextHeartBeatInterval);
    // 這裡會 set 待釋放的 container、application 列表
    // 思考:為何只有待釋放的列表呢?分配的資源不返回麽? - 分配的資源是和 AM 進行交互的
    rmNode.setAndUpdateNodeHeartbeatResponse(nodeHeartBeatResponse);

    populateKeys(request, nodeHeartBeatResponse);

    ConcurrentMap<ApplicationId, ByteBuffer> systemCredentials =
        rmContext.getSystemCredentialsForApps();
    if (!systemCredentials.isEmpty()) {
      nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials);
    }

    // 4. Send status to RMNode, saving the latest response.
    // 發送 RMNodeEventType.STATUS_UPDATE 事件
    RMNodeStatusEvent nodeStatusEvent =
        new RMNodeStatusEvent(nodeId, remoteNodeStatus);
    if (request.getLogAggregationReportsForApps() != null
        && !request.getLogAggregationReportsForApps().isEmpty()) {
      nodeStatusEvent.setLogAggregationReportsForApps(request
        .getLogAggregationReportsForApps());
    }
    this.rmContext.getDispatcher().getEventHandler().handle(nodeStatusEvent);

3、ResourceScheduler 收到事件後,如果該節點上有可分配的空閑資源,則會將這些資源分配給各個應用程式,而分配後的資源僅是記錄到對應的數據結構中,等待 ApplicationMaster 下次通過心跳機制來領取。(資源分配的具體邏輯,將在後面介紹 Scheduler 的文章中詳細講解)。

三、總結

本篇分析了申請與分配 Container 的流程,主要分為兩個階段。
第一階段由 AM 發起,通過心跳向 RM 發起資源請求。
第二階段由 NM 發起,通過心跳向 RM 彙報資源使用情況。
之後就是,RM 根據 AM 資源請求以及 NM 剩餘資源進行一次資源分配(具體分配邏輯將在後續文章中介紹),並將分配的資源通過下一次 AM 心跳返回給 AM。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • RxEditor是一款開源企業級可視化低代碼前端,目標是可以編輯所有 HTML 基礎的組件。比如支持 React、VUE、小程式等,目前僅實現了 React 版。 RxEditor運行快照: 項目地址:https://github.com/rxdrag/rxeditor 演示地址( Vercel 部 ...
  • 如何畫好一張架構圖,要做好這件事情首先要回答的就是什麼是架構圖。我們日常工作中經常能看到各種各樣的架構圖,而且經常會發現大家對架構圖的理解各有側重。深入追究到這個問題,可能一下子還很難有一個具象的定義,如果我們把這個問題進行拆分,理解起來就會容易一點。 ...
  • 1. JPA 1.1. 性能直接受底層JDBC驅動性能的影響 1.2. 性能提升是通過改變實體類的位元組碼來實現的 1.2.1. 在類載入到JAR文件或者由JVM運行之前增強位元組碼的方法 1.2.1.1. 在編譯過程中完成的 1.2.1.2. 在實體類編譯完成後,它們會被傳遞給一個特定實現的後置處理器 ...
  • 一、前戲 在之前我們已經學會使用 pytest-html 插件生成 html 格式的測試報告: 1 # 第一步,安裝插件 2 pip install pytest-html 3 ​ 4 # 第二步,執行用例時使用 --html 參數 5 ## main 函數中執行 6 if __name__ == ...
  • MyBatis的關聯映射02 3.一對多 3.1基本介紹 mybatis – MyBatis 3 | XML 映射器 多對一關係也是一個基本的映射關係,多對一,也可以理解為一對多。例如: User--Pet:一個用戶可以有多只寵物 Dep--Emp:一個部門有多個員工 雙向的多對一關係:通過User ...
  • 背景 公司目前主要聚焦於視頻這個領域,利用視頻為媒體、文旅、會議等行業進行賦能。 既然聚焦於視頻領域,那麼視頻轉碼則是繞不開的話題。 為了降低成本,以及保證產品的核心能力,公司自建了一套轉碼系統。 轉碼服務除了儘可能多的相容業界的視頻格式外,轉碼的速度是另一個非常重要的指標。 因為視頻轉碼對用戶來說 ...
  • 一、固件使用背景 在執行測試用例時,我們常常需要在測試用例執行的前後去完成一些額外的操作。例如針對於 Web 測試,在用例執行前需要打開瀏覽器,完成用戶登錄等一系列前置操作;在用例執行完成後,要清除瀏覽器緩存,關閉瀏覽器...... Pytest 框架提供的固件機制(又稱為夾具)可以幫我們實現一系列 ...
  • 約定 口 = 1 Byte,用於具象化,方便眼睛對比長度; void = 空類型; C語言中,short、long、singned、unsigned都為說明符,可以改變標識符存儲大小; C語言中,在聲明整型變數時,如果聲明中已經有一個其他的(同等)說明符,那麼關鍵字int可以省略; C語言中,預設使 ...
一周排行
    -Advertisement-
    Play Games
  • .Net8.0 Blazor Hybird 桌面端 (WPF/Winform) 實測可以完整運行在 win7sp1/win10/win11. 如果用其他工具打包,還可以運行在mac/linux下, 傳送門BlazorHybrid 發佈為無依賴包方式 安裝 WebView2Runtime 1.57 M ...
  • 目錄前言PostgreSql安裝測試額外Nuget安裝Person.cs模擬運行Navicate連postgresql解決方案Garnet為什麼要選擇Garnet而不是RedisRedis不再開源Windows版的Redis是由微軟維護的Windows Redis版本老舊,後續可能不再更新Garne ...
  • C#TMS系統代碼-聯表報表學習 領導被裁了之後很快就有人上任了,幾乎是無縫銜接,很難讓我不想到這早就決定好了。我的職責沒有任何變化。感受下來這個系統封裝程度很高,我只要會調用方法就行。這個系統交付之後不會有太多問題,更多應該是做小需求,有大的開發任務應該也是第二期的事,嗯?怎麼感覺我變成運維了?而 ...
  • 我在隨筆《EAV模型(實體-屬性-值)的設計和低代碼的處理方案(1)》中介紹了一些基本的EAV模型設計知識和基於Winform場景下低代碼(或者說無代碼)的一些實現思路,在本篇隨筆中,我們來分析一下這種針對通用業務,且只需定義就能構建業務模塊存儲和界面的解決方案,其中的數據查詢處理的操作。 ...
  • 對某個遠程伺服器啟用和設置NTP服務(Windows系統) 打開註冊表 HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Services\W32Time\TimeProviders\NtpServer 將 Enabled 的值設置為 1,這將啟用NTP伺服器功 ...
  • title: Django信號與擴展:深入理解與實踐 date: 2024/5/15 22:40:52 updated: 2024/5/15 22:40:52 categories: 後端開發 tags: Django 信號 松耦合 觀察者 擴展 安全 性能 第一部分:Django信號基礎 Djan ...
  • 使用xadmin2遇到的問題&解決 環境配置: 使用的模塊版本: 關聯的包 Django 3.2.15 mysqlclient 2.2.4 xadmin 2.0.1 django-crispy-forms >= 1.6.0 django-import-export >= 0.5.1 django-r ...
  • 今天我打算整點兒不一樣的內容,通過之前學習的TransformerMap和LazyMap鏈,想搞點不一樣的,所以我關註了另外一條鏈DefaultedMap鏈,主要調用鏈為: 調用鏈詳細描述: ObjectInputStream.readObject() DefaultedMap.readObject ...
  • 後端應用級開發者該如何擁抱 AI GC?就是在這樣的一個大的浪潮下,我們的傳統的應用級開發者。我們該如何選擇職業或者是如何去快速轉型,跟上這樣的一個行業的一個浪潮? 0 AI金字塔模型 越往上它的整個難度就是職業機會也好,或者說是整個的這個運作也好,它的難度會越大,然後越往下機會就會越多,所以這是一 ...
  • @Autowired是Spring框架提供的註解,@Resource是Java EE 5規範提供的註解。 @Autowired預設按照類型自動裝配,而@Resource預設按照名稱自動裝配。 @Autowired支持@Qualifier註解來指定裝配哪一個具有相同類型的bean,而@Resourc... ...