SparkSubmit提交yarn流程分析(學習版)

来源:https://www.cnblogs.com/Diona/archive/2022/07/13/16473965.html
-Advertisement-
Play Games

SparkSubmit提交流程分析 tips:分析基於如下執行命令開始 ./spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode cluster \ ./examples/ja ...


SparkSubmit提交流程分析

tips:分析基於如下執行命令開始

./spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
./examples/jars/spark-example_2.12-3.0.0.jar \
10

首先執行了spark-submit這個腳本程式,找到這個腳本的代碼

#!/usr/bin/env bash

if [ -z "${SPARK_HOME}" ]; then
  source "$(dirname "$0")"/find-spark-home
fi

# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0

#exec 調用spark-class腳本  然後傳入SparkSubmit這個類 和 上面那一堆參數
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

然後我們去看spark-class這個腳本的代碼(只關註重點版):

#!/usr/bin/env bash

if [ -z "${SPARK_HOME}" ]; then
  source "$(dirname "$0")"/find-spark-home
fi

. "${SPARK_HOME}"/bin/load-spark-env.sh

if [ -n "${JAVA_HOME}" ]; then
  RUNNER="${JAVA_HOME}/bin/java"
else
  if [ "$(command -v java)" ]; then
    RUNNER="java"
  else
    echo "JAVA_HOME is not set" >&2
    exit 1
  fi
fi

if [ -d "${SPARK_HOME}/jars" ]; then
  SPARK_JARS_DIR="${SPARK_HOME}/jars"
else
  SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"
fi

if [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING$SPARK_SQL_TESTING" ]; then
  echo "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2
  echo "You need to build Spark with the target \"package\" before running this program." 1>&2
  exit 1
else
  LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*"
fi

if [ -n "$SPARK_PREPEND_CLASSES" ]; then
  LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
fi

if [[ -n "$SPARK_TESTING" ]]; then
  unset YARN_CONF_DIR
  unset HADOOP_CONF_DIR
fi

#3.$RUNNER="${JAVA_HOME}/bin/java"  調用類路徑中的org.apache.spark.launcher.Main類 參數為spark-submit指定的所有參數,在這裡調用launcher生成下麵jvm command
build_command() {
  "$RUNNER" -Xmx128m $SPARK_LAUNCHER_OPTS -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
  printf "%d\0" $?
}

set +o posix
CMD=()
DELIM=$'\n'
CMD_START_FLAG="false"
while IFS= read -d "$DELIM" -r ARG; do
  if [ "$CMD_START_FLAG" == "true" ]; then
  	#2.CMD在這個迴圈里一直做累加,這個迴圈通過build_command把參數準備好
    CMD+=("$ARG")
  else
    if [ "$ARG" == $'\0' ]; then
      DELIM=''
      CMD_START_FLAG="true"
    elif [ "$ARG" != "" ]; then
      echo "$ARG"
    fi
  fi
done < <(build_command "$@")

#1。我們執行了一個cmd,這個cmd從哪兒來的
CMD=("${CMD[@]:0:$LAST}")
exec "${CMD[@]}"

最後執行的cmd:

/usr/lib/java/jdk1.8.0_144/bin/java -cp \
/home/etluser/kong/spark/spark-3.0.0-bin/spark-3.0.0-bin-hadoop3.2/conf/:/home/etluser/kong/spark/spark-3.0.0-bin/spark-3.0.0-bin-hadoop3.2/jars/* \
-Xmx1g \
org.apache.spark.deploy.SparkSubmit \
--master yarn \
--deploy-mode client \
--class org.apache.spark.examples.SparkPi \
./examples/jars/spark-example_2.12-3.0.0.jar

所以,spark提交腳本很關鍵的點在於org.apache.spark.deploy.SparkSubmit這個類是怎麼運作的,其他的都是參數,我們就先看看這個類的代碼:

//一個可以運行的類肯定有main方法,所以我們從main方法開始
  override def main(args: Array[String]): Unit = {
    //new 了一個sparksubmit的匿名內部類
    val submit = new SparkSubmit() {
      self =>

      override protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
        new SparkSubmitArguments(args) {
          override protected def logInfo(msg: => String): Unit = self.logInfo(msg)

          override protected def logWarning(msg: => String): Unit = self.logWarning(msg)

          override protected def logError(msg: => String): Unit = self.logError(msg)
        }
      }

      override protected def logInfo(msg: => String): Unit = printMessage(msg)

      override protected def logWarning(msg: => String): Unit = printMessage(s"Warning: $msg")

      override protected def logError(msg: => String): Unit = printMessage(s"Error: $msg")
	 //所以是執行了這個方法,這個方法又調用的父類的doSubmit(args)
      override def doSubmit(args: Array[String]): Unit = {
        try {
          super.doSubmit(args)
        } catch {
          case e: SparkUserAppException =>
            exitFn(e.exitCode)
        }
      }

    }
	//然後用匿名內部類執行了一個dosubmit方法,此方法在匿名內部類里已被重寫
    submit.doSubmit(args)
  }

1.從super.dosubmit開始的提交流程

def doSubmit(args: Array[String]): Unit = {
    // Initialize logging if it hasn't been done yet. Keep track of whether logging needs to
    // be reset before the application starts.
    // 這個是日誌,暫且不看
    val uninitLog = initializeLogIfNecessary(true, silent = true)
	
    // *parseArguments這個方法返回了appArgs,作用在於解析參數
    val appArgs = parseArguments(args)
    if (appArgs.verbose) {
      logInfo(appArgs.toString)
    }
    //這裡模式匹配 appArgs.action屬性一定在下麵這四個之中,所以我們從parseArguments方法開始
    appArgs.action match {
      case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
      case SparkSubmitAction.PRINT_VERSION => printVersion()
    }
  }

1.1 parseArguments(args)

protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
    //構造方法 執行了關鍵的1.1.1 和 1.1.2 兩個東西
  new SparkSubmitArguments(args)
}

1.1.1 SparkSubmitArguments(args)

try {
    //代碼塊兒
  parse(args.asJava)
} catch {
  case e: IllegalArgumentException =>
    SparkSubmit.printErrorAndExit(e.getMessage())
}

1.1.2 loadEnvironmentArguments()

1.1.1.1 parse(args.asJava)
//很明顯了嘛,在爪子嘛,在格式化輸入的參數撒
protected final void parse(List<String> args) {
    //這個就是分離參數的正則表達式
  Pattern eqSeparatedOpt = Pattern.compile("(--[^=]+)=(.+)");

  int idx = 0;
  for (idx = 0; idx < args.size(); idx++) {
    String arg = args.get(idx);
    String value = null;

    Matcher m = eqSeparatedOpt.matcher(arg);
    if (m.matches()) {
      arg = m.group(1);
      value = m.group(2);
    }

    // Look for options with a value.
    String name = findCliOption(arg, opts);
    if (name != null) {
      if (value == null) {
        if (idx == args.size() - 1) {
          throw new IllegalArgumentException(
              String.format("Missing argument for option '%s'.", arg));
        }
        idx++;
        value = args.get(idx);
      }
      if (!handle(name, value)) {
        break;
      }
      continue;
    }

    // Look for a switch.
    name = findCliOption(arg, switches);
    if (name != null) {
        // * 這裡就是參數解析的關鍵函數
      if (!handle(name, null)) {
        break;
      }
      continue;
    }

    if (!handleUnknown(arg)) {
      break;
    }
  }

  if (idx < args.size()) {
    idx++;
  }
  handleExtraArgs(args.subList(idx, args.size()));
}
1.1.1.1.1 handle(name, null)
//看到這個模式匹配是不是一下就清晰了,找到這個參數,然後給屬性賦值
override protected def handle(opt: String, value: String): Boolean = {
  opt match {
  // protected final String NAME = "--name";
    case NAME =>
      name = value
  // protected final String MASTER = "--master";
    case MASTER =>
      master = value
  // protected final String CLASS = "--class";
    case CLASS =>
      mainClass = value

    case NUM_EXECUTORS =>
      numExecutors = value

    case TOTAL_EXECUTOR_CORES =>
      totalExecutorCores = value

    case EXECUTOR_CORES =>
      executorCores = value

    case EXECUTOR_MEMORY =>
      executorMemory = value

    case DRIVER_MEMORY =>
      driverMemory = value

    case DRIVER_CORES =>
      driverCores = value

    case _ =>
      throw new IllegalArgumentException(s"Unexpected argument '$opt'.")
  }
  true
}

1.1.2 loadEnvironmentArguments()

// Action should be SUBMIT unless otherwise specified
//第一次執行action為空 那麼action賦值一定為submit
action = Option(action).getOrElse(SUBMIT)

1.2 submit(appArgs, uninitLog)

runMain(args, uninitLog)

1.2.1 runMain(args, uninitLog) 刪除不重要的log版

private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
    // (childArgs, childClasspath, sparkConf, childMainClass)
    // childMainClass =》 "org.apache.spark.deploy.yarn.YarnClusterApplication"
    -- prepareSubmitEnvironment(args)
    
    // classForName(childMainClass)
    -- var mainClass: Class[_] = Utils.classForName(childMainClass)
    
    // classOf[SparkApplication].isAssignableFrom(mainClass)
    val app: SparkApplication =
    -- a)mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
    -- b)new JavaMainApplication(mainClass)
    
    // "org.apache.spark.deploy.yarn.YarnClusterApplication"
    app.start(childArgs.toArray, sparkConf)
}
1.2.1.1 prepareSubmitEnvironment(args) 刪除不重要版
if (isYarnCluster) {
  childMainClass = YARN_CLUSTER_SUBMIT_CLASS
}
1.2.1.2 app.start(childArgs.toArray, sparkConf) 刪除不重要版
override def start(args: Array[String], conf: SparkConf): Unit = {
  // SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
  // so remove them from sparkConf here for yarn mode.
  conf.remove(JARS)
  conf.remove(FILES)

  // new ClientArguments(args) 解析傳過來的參數 其中 --class => userClass = value =>自己執行的那個類
  // private val yarnClient = YarnClient.createYarnClient
    //  YarnClient client = new YarnClientImpl();
      //  protected ApplicationClientProtocol rmClient;  resourceManager的客戶端說明這個client 是用來和resourceManager做交互的
  // 對像明白了,接下來看看run裡面都是些啥
  new Client(new ClientArguments(args), conf, null).run()
}
1.2.1.2.1 rmClient.run() 刪除不重要版
def run(): Unit = {
  this.appId = submitApplication()
}

def submitApplication(): ApplicationId = {

    try {
        //啟動了連接
      launcherBackend.connect()
      yarnClient.init(hadoopConf)
      yarnClient.start()
       
       // 從我們的 RM 獲取新的應用程式
      val newApp = yarnClient.createApplication()
      val newAppResponse = newApp.getNewApplicationResponse()
      
      // 設置適當的上下文來啟動我們的 AM 進程
        // 創建容器
          // commands = JAVA_HOME/bin/java org.apache.spark.deploy.yarn.ApplicationMaster 
      val containerContext = createContainerLaunchContext(newAppResponse)
      val appContext = createApplicationSubmissionContext(newApp, containerContext)

      // 最後,提交並監控申請
      yarnClient.submitApplication(appContext)
      launcherBackend.setAppId(appId.toString)
      reportLauncherState(SparkAppHandle.State.SUBMITTED)

      appId
    } catch {
      ...
    }
  }

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • #IO思維導圖總結 ##總覽: 1.文件 <目標:File類的創建和刪除的方法 > public boolean createNewFile() :當且僅當具有該名稱的文件尚不存在時, 創建一個新的空文件。 (幾乎不用的,因為以後文件都是自動創建的!) public boolean delete() ...
  • 因為有些主題的原因,一些新加欄目不能按照需求,在首頁調出部分列表數據。我們可以這樣做: 1.找到該主題的include.php文件,在該文件最後添加代碼如下: 註意: San_Tiger_GetArticleCategorys函數名中,需要將 San_Tiger換成網站所正在用的主題名 functi ...
  • instanceof instanceof是Java 的保留關鍵字。 它的作用是測試它左邊的對象是否是它右邊的類的實例,返回 boolean 的數據類型。 類的實例包含本身的實例,以及所有直接或間接子類的實例 instanceof左邊顯式聲明的類型與右邊操作元必須是同種類或存在繼承關係,也就是說需要 ...
  • 來源:https://juejin.cn/post/6844903954308939784 導語 自從畢業後,今年已經是我工作的第 8 個年頭了,我甚至都快忘記了到底是哪年畢業的。 從出來,本人一直在做 Java 相關的工作,現在終於有時間坐下來,寫一篇關於 Java 寫法的一篇文章,來探討一下如果 ...
  • java入門 java誕生過程:1972年誕生c語言,因為指針和記憶體管理複雜,難以移植性。1982年產生了改進後的c++但是還是很複雜,於是在1995年就建立了java語言。 java優點: 語法有點像c 沒有指針 沒有記憶體管理 運行在jvm上,實現了真正的一次編譯到處運行 面向對象 類型安全 …… ...
  • 前言 大家早好、午好、晚好吖~ 環境使用: Python 3.8 Pycharm 2021.2版本 ffmpeg <需要設置環境變數> 模塊使用: import requests >>> pip install requests 內置模塊 你安裝好python環境就可以了 import re imp ...
  • JDBC簡單一句話,就是用java代碼去控制資料庫,對資料庫進行增刪改查 JDBC 的相關API 總結 最常用是阿裡巴巴的德魯伊資料庫連接池技術 資料庫連接步驟 必須先創建資料庫哈 引入德魯伊的jar包 加入配置文件(properties),要放在src目錄下,根據一些提示信息去做相應的配置,如果是 ...
  • 用Python檢測用戶輸入密碼的複雜度,灰常簡單! 密碼強度檢測規則: 至少包含一個數字 至少包含一個大寫字母 長度至少 8 位 主要知識點 while 迴圈 推導式 列表 any 函數 命令行 input 代碼部分 密碼強度檢測 1、創建 python 文件 密碼強度檢測規則 1 至少包含一個數字 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...