SparkSubmit提交流程分析 tips:分析基於如下執行命令開始 ./spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode cluster \ ./examples/ja ...
SparkSubmit提交流程分析
tips:分析基於如下執行命令開始
./spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
./examples/jars/spark-example_2.12-3.0.0.jar \
10
首先執行了spark-submit這個腳本程式,找到這個腳本的代碼
#!/usr/bin/env bash
if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
fi
# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0
#exec 調用spark-class腳本 然後傳入SparkSubmit這個類 和 上面那一堆參數
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
然後我們去看spark-class這個腳本的代碼(只關註重點版):
#!/usr/bin/env bash
if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
fi
. "${SPARK_HOME}"/bin/load-spark-env.sh
if [ -n "${JAVA_HOME}" ]; then
RUNNER="${JAVA_HOME}/bin/java"
else
if [ "$(command -v java)" ]; then
RUNNER="java"
else
echo "JAVA_HOME is not set" >&2
exit 1
fi
fi
if [ -d "${SPARK_HOME}/jars" ]; then
SPARK_JARS_DIR="${SPARK_HOME}/jars"
else
SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"
fi
if [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING$SPARK_SQL_TESTING" ]; then
echo "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2
echo "You need to build Spark with the target \"package\" before running this program." 1>&2
exit 1
else
LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*"
fi
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
fi
if [[ -n "$SPARK_TESTING" ]]; then
unset YARN_CONF_DIR
unset HADOOP_CONF_DIR
fi
#3.$RUNNER="${JAVA_HOME}/bin/java" 調用類路徑中的org.apache.spark.launcher.Main類 參數為spark-submit指定的所有參數,在這裡調用launcher生成下麵jvm command
build_command() {
"$RUNNER" -Xmx128m $SPARK_LAUNCHER_OPTS -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
printf "%d\0" $?
}
set +o posix
CMD=()
DELIM=$'\n'
CMD_START_FLAG="false"
while IFS= read -d "$DELIM" -r ARG; do
if [ "$CMD_START_FLAG" == "true" ]; then
#2.CMD在這個迴圈里一直做累加,這個迴圈通過build_command把參數準備好
CMD+=("$ARG")
else
if [ "$ARG" == $'\0' ]; then
DELIM=''
CMD_START_FLAG="true"
elif [ "$ARG" != "" ]; then
echo "$ARG"
fi
fi
done < <(build_command "$@")
#1。我們執行了一個cmd,這個cmd從哪兒來的
CMD=("${CMD[@]:0:$LAST}")
exec "${CMD[@]}"
最後執行的cmd:
/usr/lib/java/jdk1.8.0_144/bin/java -cp \
/home/etluser/kong/spark/spark-3.0.0-bin/spark-3.0.0-bin-hadoop3.2/conf/:/home/etluser/kong/spark/spark-3.0.0-bin/spark-3.0.0-bin-hadoop3.2/jars/* \
-Xmx1g \
org.apache.spark.deploy.SparkSubmit \
--master yarn \
--deploy-mode client \
--class org.apache.spark.examples.SparkPi \
./examples/jars/spark-example_2.12-3.0.0.jar
所以,spark提交腳本很關鍵的點在於org.apache.spark.deploy.SparkSubmit這個類是怎麼運作的,其他的都是參數,我們就先看看這個類的代碼:
//一個可以運行的類肯定有main方法,所以我們從main方法開始
override def main(args: Array[String]): Unit = {
//new 了一個sparksubmit的匿名內部類
val submit = new SparkSubmit() {
self =>
override protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
new SparkSubmitArguments(args) {
override protected def logInfo(msg: => String): Unit = self.logInfo(msg)
override protected def logWarning(msg: => String): Unit = self.logWarning(msg)
override protected def logError(msg: => String): Unit = self.logError(msg)
}
}
override protected def logInfo(msg: => String): Unit = printMessage(msg)
override protected def logWarning(msg: => String): Unit = printMessage(s"Warning: $msg")
override protected def logError(msg: => String): Unit = printMessage(s"Error: $msg")
//所以是執行了這個方法,這個方法又調用的父類的doSubmit(args)
override def doSubmit(args: Array[String]): Unit = {
try {
super.doSubmit(args)
} catch {
case e: SparkUserAppException =>
exitFn(e.exitCode)
}
}
}
//然後用匿名內部類執行了一個dosubmit方法,此方法在匿名內部類里已被重寫
submit.doSubmit(args)
}
1.從super.dosubmit開始的提交流程
def doSubmit(args: Array[String]): Unit = {
// Initialize logging if it hasn't been done yet. Keep track of whether logging needs to
// be reset before the application starts.
// 這個是日誌,暫且不看
val uninitLog = initializeLogIfNecessary(true, silent = true)
// *parseArguments這個方法返回了appArgs,作用在於解析參數
val appArgs = parseArguments(args)
if (appArgs.verbose) {
logInfo(appArgs.toString)
}
//這裡模式匹配 appArgs.action屬性一定在下麵這四個之中,所以我們從parseArguments方法開始
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
case SparkSubmitAction.PRINT_VERSION => printVersion()
}
}
1.1 parseArguments(args)
protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
//構造方法 執行了關鍵的1.1.1 和 1.1.2 兩個東西
new SparkSubmitArguments(args)
}
1.1.1 SparkSubmitArguments(args)
try {
//代碼塊兒
parse(args.asJava)
} catch {
case e: IllegalArgumentException =>
SparkSubmit.printErrorAndExit(e.getMessage())
}
1.1.2 loadEnvironmentArguments()
1.1.1.1 parse(args.asJava)
//很明顯了嘛,在爪子嘛,在格式化輸入的參數撒
protected final void parse(List<String> args) {
//這個就是分離參數的正則表達式
Pattern eqSeparatedOpt = Pattern.compile("(--[^=]+)=(.+)");
int idx = 0;
for (idx = 0; idx < args.size(); idx++) {
String arg = args.get(idx);
String value = null;
Matcher m = eqSeparatedOpt.matcher(arg);
if (m.matches()) {
arg = m.group(1);
value = m.group(2);
}
// Look for options with a value.
String name = findCliOption(arg, opts);
if (name != null) {
if (value == null) {
if (idx == args.size() - 1) {
throw new IllegalArgumentException(
String.format("Missing argument for option '%s'.", arg));
}
idx++;
value = args.get(idx);
}
if (!handle(name, value)) {
break;
}
continue;
}
// Look for a switch.
name = findCliOption(arg, switches);
if (name != null) {
// * 這裡就是參數解析的關鍵函數
if (!handle(name, null)) {
break;
}
continue;
}
if (!handleUnknown(arg)) {
break;
}
}
if (idx < args.size()) {
idx++;
}
handleExtraArgs(args.subList(idx, args.size()));
}
1.1.1.1.1 handle(name, null)
//看到這個模式匹配是不是一下就清晰了,找到這個參數,然後給屬性賦值
override protected def handle(opt: String, value: String): Boolean = {
opt match {
// protected final String NAME = "--name";
case NAME =>
name = value
// protected final String MASTER = "--master";
case MASTER =>
master = value
// protected final String CLASS = "--class";
case CLASS =>
mainClass = value
case NUM_EXECUTORS =>
numExecutors = value
case TOTAL_EXECUTOR_CORES =>
totalExecutorCores = value
case EXECUTOR_CORES =>
executorCores = value
case EXECUTOR_MEMORY =>
executorMemory = value
case DRIVER_MEMORY =>
driverMemory = value
case DRIVER_CORES =>
driverCores = value
case _ =>
throw new IllegalArgumentException(s"Unexpected argument '$opt'.")
}
true
}
1.1.2 loadEnvironmentArguments()
// Action should be SUBMIT unless otherwise specified
//第一次執行action為空 那麼action賦值一定為submit
action = Option(action).getOrElse(SUBMIT)
1.2 submit(appArgs, uninitLog)
runMain(args, uninitLog)
1.2.1 runMain(args, uninitLog) 刪除不重要的log版
private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
// (childArgs, childClasspath, sparkConf, childMainClass)
// childMainClass =》 "org.apache.spark.deploy.yarn.YarnClusterApplication"
-- prepareSubmitEnvironment(args)
// classForName(childMainClass)
-- var mainClass: Class[_] = Utils.classForName(childMainClass)
// classOf[SparkApplication].isAssignableFrom(mainClass)
val app: SparkApplication =
-- a)mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
-- b)new JavaMainApplication(mainClass)
// "org.apache.spark.deploy.yarn.YarnClusterApplication"
app.start(childArgs.toArray, sparkConf)
}
1.2.1.1 prepareSubmitEnvironment(args) 刪除不重要版
if (isYarnCluster) {
childMainClass = YARN_CLUSTER_SUBMIT_CLASS
}
1.2.1.2 app.start(childArgs.toArray, sparkConf) 刪除不重要版
override def start(args: Array[String], conf: SparkConf): Unit = {
// SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
// so remove them from sparkConf here for yarn mode.
conf.remove(JARS)
conf.remove(FILES)
// new ClientArguments(args) 解析傳過來的參數 其中 --class => userClass = value =>自己執行的那個類
// private val yarnClient = YarnClient.createYarnClient
// YarnClient client = new YarnClientImpl();
// protected ApplicationClientProtocol rmClient; resourceManager的客戶端說明這個client 是用來和resourceManager做交互的
// 對像明白了,接下來看看run裡面都是些啥
new Client(new ClientArguments(args), conf, null).run()
}
1.2.1.2.1 rmClient.run() 刪除不重要版
def run(): Unit = {
this.appId = submitApplication()
}
def submitApplication(): ApplicationId = {
try {
//啟動了連接
launcherBackend.connect()
yarnClient.init(hadoopConf)
yarnClient.start()
// 從我們的 RM 獲取新的應用程式
val newApp = yarnClient.createApplication()
val newAppResponse = newApp.getNewApplicationResponse()
// 設置適當的上下文來啟動我們的 AM 進程
// 創建容器
// commands = JAVA_HOME/bin/java org.apache.spark.deploy.yarn.ApplicationMaster
val containerContext = createContainerLaunchContext(newAppResponse)
val appContext = createApplicationSubmissionContext(newApp, containerContext)
// 最後,提交並監控申請
yarnClient.submitApplication(appContext)
launcherBackend.setAppId(appId.toString)
reportLauncherState(SparkAppHandle.State.SUBMITTED)
appId
} catch {
...
}
}