【深入淺出 Yarn 架構與實現】4-5 RM 行為探究 - 啟動 ApplicationMaster

来源:https://www.cnblogs.com/shuofxz/archive/2023/02/28/17165917.html
-Advertisement-
Play Games

本節開始,將對 ResourceManager 中一些常見行為進行分析探究,看某些具體關鍵的行為,在 RM 中是如何流轉的。本節將深入源碼探究「啟動 ApplicationMaster」的具體流程。 ...


本節開始,將對 ResourceManager 中一些常見行為進行分析探究,看某些具體關鍵的行為,在 RM 中是如何流轉的。本節將深入源碼探究「啟動 ApplicationMaster」的具體流程。

一、整體流程

本小節介紹從應用程式提交到啟動 ApplicationMaster 的整個過程,期間涉及 Client、RMService、 RMAppManager、RMApplmpl、RMAppAttemptImpl、RMNode、ResourceScheduler 等幾個主要組件。當客戶端調用 RPC 函數 ApplicationClientProtocol#submitApplication 後, ResourceManager 端的處理過程如下圖所示。
image.png

二、具體流程分析

接下來跟隨上面的流程圖,我們深入源碼具體分析每一步都是如何執行的:
最開始由客戶端發起任務提交 submitApplication(),經過 ClientRMServiceRMAppManager 發送 RMAppEventType.START 事件,之後交由 RMAppImpl 處理。

  protected void submitApplication(
      ApplicationSubmissionContext submissionContext, long submitTime,
      String user) throws YarnException {
    ApplicationId applicationId = submissionContext.getApplicationId();

    RMAppImpl application =
        createAndPopulateNewRMApp(submissionContext, submitTime, user, false);
    Credentials credentials = null;
    try {
      credentials = parseCredentials(submissionContext);
      if (UserGroupInformation.isSecurityEnabled()) {
        this.rmContext.getDelegationTokenRenewer()
            .addApplicationAsync(applicationId, credentials,
                submissionContext.getCancelTokensWhenComplete(),
                application.getUser());
      } else {
        // Dispatcher is not yet started at this time, so these START events
        // enqueued should be guaranteed to be first processed when dispatcher
        // gets started.
        // 這裡發送 RMAppEventType.START 事件
        this.rmContext.getDispatcher().getEventHandler()
            .handle(new RMAppEvent(applicationId, RMAppEventType.START));
      }

RMAppImpl 這東西是個狀態機,收到事件之後會自己轉換狀態並且處理相應的邏輯。
(狀態機還不熟悉的同學,可翻到我前面的文章進行學習《2-4 Yarn 基礎庫 - 狀態機庫》)
image.png

截取一部分狀態轉換代碼:

  private static final StateMachineFactory<RMAppImpl,
                                           RMAppState,
                                           RMAppEventType,
                                           RMAppEvent> stateMachineFactory
                               = new StateMachineFactory<RMAppImpl,
                                           RMAppState,
                                           RMAppEventType,
                                           RMAppEvent>(RMAppState.NEW)


     // Transitions from NEW state
    .addTransition(RMAppState.NEW, RMAppState.NEW,
        RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
     // 收到 RMAppEventType.START 事件
    .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
        RMAppEventType.START, new RMAppNewlySavingTransition())
    .addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
            RMAppState.ACCEPTED, RMAppState.FINISHED, RMAppState.FAILED,
            RMAppState.KILLED, RMAppState.FINAL_SAVING),
        RMAppEventType.RECOVER, new RMAppRecoveredTransition())
    .addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL,
        new AppKilledTransition())
    .addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING,
        RMAppEventType.APP_REJECTED,
        new FinalSavingTransition(new AppRejectedTransition(),
          RMAppState.FAILED))

一)RMAppImpl - START

收到 RMAppEventType.START 事件之後,會執行 RMAppNewlySavingTransition()

  private static final class RMAppNewlySavingTransition extends RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {

      // If recovery is enabled then store the application information in a
      // non-blocking call so make sure that RM has stored the information
      // needed to restart the AM after RM restart without further client
      // communication
      LOG.info("Storing application with id " + app.applicationId);
      app.rmContext.getStateStore().storeNewApplication(app);
    }
  }

跟下去會發現它發出 RMStateStoreEventType.STORE_APP 事件,去 RMStateStore 中找一下對應的事件處理。發現也是個狀態機:

.addTransition(RMStateStoreState.ACTIVE,
    EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
    RMStateStoreEventType.STORE_APP, new StoreAppTransition())

跟著 StoreAppTransition 看看做了啥(發送 RMAppEventType.APP_NEW_SAVED 事件)

  private static class StoreAppTransition
      implements MultipleArcTransition<RMStateStore, RMStateStoreEvent,
          RMStateStoreState> {
    @Override
    public RMStateStoreState transition(RMStateStore store,
        RMStateStoreEvent event) {
      if (!(event instanceof RMStateStoreAppEvent)) {
        // should never happen
        LOG.error("Illegal event type: " + event.getClass());
        return RMStateStoreState.ACTIVE;
      }
      boolean isFenced = false;
      ApplicationStateData appState =
          ((RMStateStoreAppEvent) event).getAppState();
      ApplicationId appId =
          appState.getApplicationSubmissionContext().getApplicationId();
      LOG.info("Storing info for app: " + appId);
      try {
        store.storeApplicationStateInternal(appId, appState);
        // 這裡發送了 RMAppEventType.APP_NEW_SAVED 事件
        store.notifyApplication(new RMAppEvent(appId,
               RMAppEventType.APP_NEW_SAVED));
      } catch (Exception e) {
        LOG.error("Error storing app: " + appId, e);
        isFenced = store.notifyStoreOperationFailedInternal(e);
      }
      return finalState(isFenced);
    };
  }

二)RMAppImpl - APP_NEW_SAVED

我們再回到 RMAppImpl,找到對應的狀態轉移邏輯。

    // 剛剛我們的狀態是 NEW_SAVING,收到了 APP_NEW_SAVED 事件,執行 AddApplicationToSchedulerTransition() 後,轉換為 SUBMITTED 狀態
    .addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
        RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())

AddApplicationToSchedulerTransition() 中會發送 SchedulerEventType.APP_ADDED 事件。之後 RMAppImpl 轉換為 RMAppState.SUBMITTED 狀態。
SchedulerEventType.APP_ADDED 會被多個事件處理器捕獲處理:
1)ResourceSchedulerWrapper 事件處理器,僅記錄

      } else if (schedulerEvent.getType() == SchedulerEventType.APP_ADDED
          && schedulerEvent instanceof AppAddedSchedulerEvent) {
        AppAddedSchedulerEvent appAddEvent =
                (AppAddedSchedulerEvent) schedulerEvent;
        String queueName = appAddEvent.getQueue();
        appQueueMap.put(appAddEvent.getApplicationId(), queueName);
      }

2)各個 AbstractYarnScheduler 的實現類。以 CapacityScheduler 為例:
執行 addApplication()

    case APP_ADDED:
    {
      AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
      String queueName = resolveReservationQueueName(appAddedEvent.getQueue(),
          appAddedEvent.getApplicationId(), appAddedEvent.getReservationID(),
          appAddedEvent.getIsAppRecovering());
      if (queueName != null) {
        if (!appAddedEvent.getIsAppRecovering()) {
          addApplication(appAddedEvent.getApplicationId(), queueName,
              appAddedEvent.getUser(), appAddedEvent.getApplicatonPriority());
        } else {
          addApplicationOnRecovery(appAddedEvent.getApplicationId(), queueName,
              appAddedEvent.getUser(), appAddedEvent.getApplicatonPriority());
        }
      }
    }

addApplication() 中會提交 Application 併發送 RMAppEventType.APP_ACCEPTED 事件。

	queue.submitApplication(applicationId, user, queueName);
    rmContext.getDispatcher().getEventHandler()
        .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));

三)RMAppImpl - APP_ACCEPTED(重點)

繼續回到 RMAppImpl,執行 StartAppAttemptTransition(),創建 newAttempt,發送事件RMAppAttemptEventType.START

    .addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED,
        RMAppEventType.APP_ACCEPTED, new StartAppAttemptTransition())
  private static final class StartAppAttemptTransition extends RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {
      app.createAndStartNewAttempt(false);
    };
  }
  private void
      createAndStartNewAttempt(boolean transferStateFromPreviousAttempt) {
    createNewAttempt();
    handler.handle(new RMAppStartAttemptEvent(currentAttempt.getAppAttemptId(),
      transferStateFromPreviousAttempt));
  }

RMAppAttemptImpl 中會捕獲這個事件,執行 AttemptStartedTransition(),其中會發送 SchedulerEventType.APP_ATTEMPT_ADDED 事件,由 AbstractYarnScheduler 實現類處理

      .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.SUBMITTED,
          RMAppAttemptEventType.START, new AttemptStartedTransition())

如在 CapacityScheduler 中由 addApplicationAttempt 處理,會提交 ApplicationAttempt,併發送 RMAppAttemptEventType.ATTEMPT_ADDED 事件

private synchronized void addApplicationAttempt() {
    // 提交 attempt
	queue.submitApplicationAttempt(attempt, application.getUser());
    // 發送 RMAppAttemptEventType.ATTEMPT_ADDED 事件
	rmContext.getDispatcher().getEventHandler().handle(
    		new RMAppAttemptEvent(applicationAttemptId,
            RMAppAttemptEventType.ATTEMPT_ADDED));
}

RMAppAttemptImpl 收到 event 後繼續處理,在 ScheduleTransition 會 allocate am container 資源。

      .addTransition(RMAppAttemptState.SUBMITTED, 
          EnumSet.of(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
                     RMAppAttemptState.SCHEDULED),
          RMAppAttemptEventType.ATTEMPT_ADDED,
          new ScheduleTransition())
        // AM resource has been checked when submission
        Allocation amContainerAllocation =
            appAttempt.scheduler.allocate(
                appAttempt.applicationAttemptId,
                Collections.singletonList(appAttempt.amReq),
                EMPTY_CONTAINER_RELEASE_LIST,
                amBlacklist.getBlacklistAdditions(),
                amBlacklist.getBlacklistRemovals(), null, null);

ResourceScheduler 將資源返回給它之前,會向 RMContainerlmpl 發送一個 RMContainerEventType.ACQUIRED 事件。
RMContainerImpl 接到 RMContainerEventType.START,發送 RMAppAttemptEventType.CONTAINER_ALLOCATED 事件。

    .addTransition(RMContainerState.NEW, RMContainerState.ALLOCATED,
        RMContainerEventType.START, new ContainerStartedTransition())
  private static final class ContainerStartedTransition extends
      BaseTransition {

    @Override
    public void transition(RMContainerImpl container, RMContainerEvent event) {
      container.eventHandler.handle(new RMAppAttemptEvent(
          container.appAttemptId, RMAppAttemptEventType.CONTAINER_ALLOCATED));
    }
  }

又回到RMAppAttemptImpl 後續狀態機,執行 AMContainerAllocatedTransition,在其中又一次為 am allocate,和上一個狀態中 allocate 僅參數不同,沒搞懂為啥。這裡如果發現 allocate container 資源還是 0,會退回上一步,狀態還是 RMAppAttemptState.SCHEDULED 等待再次獲取資源。如果正常獲取到了資源,就會轉為 RMAppAttemptState.ALLOCATED_SAVING 狀態。

      .addTransition(RMAppAttemptState.SCHEDULED,
          EnumSet.of(RMAppAttemptState.ALLOCATED_SAVING,
            RMAppAttemptState.SCHEDULED),
          RMAppAttemptEventType.CONTAINER_ALLOCATED,
          new AMContainerAllocatedTransition())
      Allocation amContainerAllocation =
          appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,
            EMPTY_CONTAINER_REQUEST_LIST, EMPTY_CONTAINER_RELEASE_LIST, null,
            null, null, null);

日誌記錄完成後,RMStateStoreRMAppAttemptImpl 發送 RMAppAttemptEventType.ATTEMPT_NEW_SAVED 事件。
RMAppAttemptImpl 後續向 ApplicationMasterLauncher 發 送 AMLauncherEventType.LAUNCH 事件(實際執行是在 AMLauncher 中),並將狀態從 ALLOCATED_SAVING 轉移為 ALLOCATED。

      .addTransition(RMAppAttemptState.ALLOCATED_SAVING, 
          RMAppAttemptState.ALLOCATED,
          RMAppAttemptEventType.ATTEMPT_NEW_SAVED, new AttemptStoredTransition())

ApplicationMasterLauncher 收到 AMLauncherEventType.LAUNCH 事件後,會將該事件放到事件隊列中,等待 AMLauncher 線程池中的線程處理該事件。它將與對應的 NodeManager 通信,啟動 ApplicationMaster,一旦成功啟動後,將向 RMAppAttemptImpl 發送 RMAppAttemptEventType.LAUNCHED 事件。

  public void run() {
    switch (eventType) {
    case LAUNCH:
      try {
        LOG.info("Launching master" + application.getAppAttemptId());
        launch();
        handler.handle(new RMAppAttemptEvent(application.getAppAttemptId(),
            RMAppAttemptEventType.LAUNCHED));

RMAppAttemptImpl 收到 RMAppAttemptEventType.LAUNCHED 事件後,會向 AMLivelinessMonitor 註冊,以監控運行狀態。RMAppAttemptImpl 狀態從 ALLOCATED 轉移為 LAUNCHED

之後,NodeManager 通過心跳機制彙報 ApplicationMaster 所在 Container 已經成功啟動,收到該信息後,ResourceScheduler 將發送一個 RMContainerEventType.LAUNCHED 事件,RMContainerImpl 收到該事件後,會從 ContainerAllocationExpirer 監控列表中移除。

啟動的 ApplicationMaster 通過RPC 函數 ApplicationMasterProtocol#registerApplicationMaster 向 ResourceManager 註冊,ResourceManager 中的 ApplicationMasterService 服務接收到該請求後,發送 RMAppAttemptEventType.REGISTERED 事件。

// ApplicationMasterService#registerApplicationMaster

	LOG.info("AM registration " + applicationAttemptId);
      this.rmContext
        .getDispatcher()
        .getEventHandler()
        .handle(
          // 這裡發送 RMAppAttemptEventType.REGISTERED 事件
          new RMAppAttemptRegistrationEvent(applicationAttemptId, request
            .getHost(), request.getRpcPort(), request.getTrackingUrl()));

RMAppAttemptImpl 收到該事件後,首先保存該 ApplicationMaster 的基本信息(比如所在 host、啟用的 RPC 埠號等),然後向 RMApplmpl 發送一個 RMAppEventType.ATTEMPT_REGISTERED 事件。RMAppAttemptImpl 狀態從 LAUNCHED 轉移為 RUNNING

      .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.RUNNING,
          RMAppAttemptEventType.REGISTERED, REGISTERED_TRANSITION)
// AMRegisteredTransition
	appAttempt.eventHandler.handle(new RMAppEvent(appAttempt
          .getAppAttemptId().getApplicationId(),
          RMAppEventType.ATTEMPT_REGISTERED));

四)RMAppImpl - ATTEMPT_REGISTERED

RMAppImpl 收到 RMAppEventType.ATTEMPT_REGISTERED 事件後,將狀態從 ACCEPTED 轉換為 RUNNING。

    .addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
        RMAppEventType.ATTEMPT_REGISTERED, new RMAppStateUpdateTransition(
            YarnApplicationState.RUNNING))

到這裡,啟動 ApplicationMaster 的整體流程分析完畢!

三、總結

本篇文章分析了從應用程式提交到啟動 ApplicationMaster 的整個過程,分析具體過程看的可能會有些繁瑣。但只要抓住核心本質,就很容易捋清楚。重點就是事件處理和狀態機,這兩個部件理解清楚,就很容易看明白程式的流轉。
實際邏輯無非就是幾個服務之間互相發送對應的事件,接收到事件後會執行啟動服務、記錄日誌、監控狀態,然後再發送個新的事件。
本身不難,但需要耐下心來一點點去梳理。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • Nacos Nacos體系架構 領域模型 Nacos 領域模型描述了服務與實例之間的邊界和層級關係。Nacos 的服務領域模型是以“服 務”為維度構建起來的,這個服務並不是指集群中的單個伺服器,而是指微服務的服務名。 “服務”是 Nacos 中位於最上層的概念,在服務之下,還有集群和實例的概念。 服 ...
  • MyBatis的關聯映射 Mybatis的關聯映射 實際的開發中,對資料庫的操作常常會涉及到多張表,這在面向對象中就涉及到了對象與對象之間的關聯關係。針對多表之間的操作,MyBatis提供了關聯映射,通過關聯映射就可以很好的處理對象與對象之間的關聯關係。 1.關聯關係概述 在關係型資料庫中,多表之間 ...
  • Spring 源碼環境搭建 Spring 是面向 Bean 的編程,Bean 在其中起到了巨大的作用,而 Spring 提供了 IOC 容器來管理對象之間的依賴關係,使我們可以更加關註業務,輕鬆的構建一個企業應用。藉助 IOC 特性和 AOP 面向切麵編程,可以說 Spring 為開發者提供了無限的 ...
  • 我是3y,一年CRUD經驗用十年的markdown程式員👨🏻‍💻常年被譽為職業八股文選手 在前陣子我就已經接入了釘釘的群機器人和工作消息推送,一直沒寫文章同步到給大家。 像這種接入渠道的工作,雖然我沒接入過,但可預見性地就是看看官方文檔,然後對著文檔一頓學習,複製下接入的代碼,然後調試,最後就 ...
  • VL33 非整數倍數據位寬轉換8to12 和上一題一樣的,註意valid_out輸出時加一個valid_in(其實32題也要加,不過不加模擬也能過)。 `timescale 1ns/1ns module width_8to12( input clk , input rst_n , input val ...
  • 歡迎關註個人公眾號:愛喝可可牛奶 LeetCode演算法訓練-回溯總結 適用問題 組合問題:N個數裡面按一定規則找出k個數的集合 排列問題:N個數按一定規則全排列,有幾種排列方式 切割問題:一個字元串按一定規則有幾種切割方式 子集問題:一個N個數的集合里有多少符合條件的子集 棋盤問題:N皇後,解數獨等 ...
  • if條件語句 if語句 if條件語法結構: if 條件語句: 滿足條件運行的代碼1 滿足條件運行的代碼2 ... ps:條件語句(可以是單個數據,即本身就是布爾類型)需返回一個布爾類型,判斷是否進入條件分支語句 if True: print('條件成⽴執⾏的代碼1') print('條件成⽴執⾏的代 ...
  • Lambda表達式 Lambda表達式理解 Lambda表達式是Jdk 8 開始新增的一種語法形式;作用:用於簡化匿名內部類的代碼寫法 註意:Lambda表達式只能簡化函數式介面的匿名內部類!!! 什麼是函數式介面? 有且僅有一個抽象方法的介面。 註意:大部分函數式介面,上面可能會有一個@Funct ...
一周排行
    -Advertisement-
    Play Games
  • C#TMS系統代碼-基礎頁面BaseCity學習 本人純新手,剛進公司跟領導報道,我說我是java全棧,他問我會不會C#,我說大學學過,他說這個TMS系統就給你來管了。外包已經把代碼給我了,這幾天先把增刪改查的代碼背一下,說不定後面就要趕鴨子上架了 Service頁面 //using => impo ...
  • 委托與事件 委托 委托的定義 委托是C#中的一種類型,用於存儲對方法的引用。它允許將方法作為參數傳遞給其他方法,實現回調、事件處理和動態調用等功能。通俗來講,就是委托包含方法的記憶體地址,方法匹配與委托相同的簽名,因此通過使用正確的參數類型來調用方法。 委托的特性 引用方法:委托允許存儲對方法的引用, ...
  • 前言 這幾天閑來沒事看看ABP vNext的文檔和源碼,關於關於依賴註入(屬性註入)這塊兒產生了興趣。 我們都知道。Volo.ABP 依賴註入容器使用了第三方組件Autofac實現的。有三種註入方式,構造函數註入和方法註入和屬性註入。 ABP的屬性註入原則參考如下: 這時候我就開始疑惑了,因為我知道 ...
  • C#TMS系統代碼-業務頁面ShippingNotice學習 學一個業務頁面,ok,領導開完會就被裁掉了,很突然啊,他收拾東西的時候我還以為他要旅游提前請假了,還在尋思為什麼回家連自己買的幾箱飲料都要叫跑腿帶走,怕被偷嗎?還好我在他開會之前拿了兩瓶芬達 感覺感覺前面的BaseCity差不太多,這邊的 ...
  • 概述:在C#中,通過`Expression`類、`AndAlso`和`OrElse`方法可組合兩個`Expression<Func<T, bool>>`,實現多條件動態查詢。通過創建表達式樹,可輕鬆構建複雜的查詢條件。 在C#中,可以使用AndAlso和OrElse方法組合兩個Expression< ...
  • 閑來無聊在我的Biwen.QuickApi中實現一下極簡的事件匯流排,其實代碼還是蠻簡單的,對於初學者可能有些幫助 就貼出來,有什麼不足的地方也歡迎板磚交流~ 首先定義一個事件約定的空介面 public interface IEvent{} 然後定義事件訂閱者介面 public interface I ...
  • 1. 案例 成某三甲醫預約系統, 該項目在2024年初進行上線測試,在正常運行了兩天後,業務系統報錯:The connection pool has been exhausted, either raise MaxPoolSize (currently 800) or Timeout (curren ...
  • 背景 我們有些工具在 Web 版中已經有了很好的實踐,而在 WPF 中重新開發也是一種費時費力的操作,那麼直接集成則是最省事省力的方法了。 思路解釋 為什麼要使用 WPF?莫問為什麼,老 C# 開發的堅持,另外因為 Windows 上已經裝了 Webview2/edge 整體打包比 electron ...
  • EDP是一套集組織架構,許可權框架【功能許可權,操作許可權,數據訪問許可權,WebApi許可權】,自動化日誌,動態Interface,WebApi管理等基礎功能於一體的,基於.net的企業應用開發框架。通過友好的編碼方式實現數據行、列許可權的管控。 ...
  • .Net8.0 Blazor Hybird 桌面端 (WPF/Winform) 實測可以完整運行在 win7sp1/win10/win11. 如果用其他工具打包,還可以運行在mac/linux下, 傳送門BlazorHybrid 發佈為無依賴包方式 安裝 WebView2Runtime 1.57 M ...