上一篇說到Spark的yarn client運行模式,它與yarn cluster模式的主要區別就是前者Driver是運行在客戶端,後者Driver是運行在yarn集群中。yarn client模式一般用在互動式場景中,比如spark shell, spark sql等程式,但是該模式下運行在客戶端 ...
上一篇說到Spark的yarn client運行模式,它與yarn cluster模式的主要區別就是前者Driver是運行在客戶端,後者Driver是運行在yarn集群中。yarn client模式一般用在互動式場景中,比如spark shell, spark sql等程式,但是該模式下運行在客戶端的Driver與Yarn集群有大量的網路交互,如果客戶端與集群之間的網路不是很好,可能會導致性能問題。因此一般在生產環境中,大部分還是採用yarn cluster模式運行spark程式。
下麵具體還是用計算PI的程式來說明,examples中該程式有三個版本,分別採用Scala、Python和Java語言編寫。本次用Python程式pi.py做說明。
1 from __future__ import print_function 2 3 import sys 4 from random import random 5 from operator import add 6 7 from pyspark.sql import SparkSession 8 9 10 if __name__ == "__main__": 11 """ 12 Usage: pi [partitions] 13 """ 14 spark = SparkSession\ 15 .builder\ 16 .appName("PythonPi")\ 17 .getOrCreate() 18 19 partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2 20 n = 100000 * partitions 21 22 def f(_): 23 x = random() * 2 - 1 24 y = random() * 2 - 1 25 return 1 if x ** 2 + y ** 2 <= 1 else 0 26 27 count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add) 28 print("Pi is roughly %f" % (4.0 * count / n)) 29 30 spark.stop()
程式邏輯與上一篇Scala程式一樣,就不再多做說明瞭。
下麵來以yarn cluster方式來執行這個程式,註意執行程式前先要啟動hdfs和yarn,最好同時啟動spark的history server,這樣即使在程式運行完以後也可以從Web UI中查看到程式運行情況。
輸入以下命令:
[root@BruceCentOS4 ~]# $SPARK_HOME/bin/spark-submit --master yarn --deploy-mode cluster $SPARK_HOME/examples/src/main/python/pi.py
以下是程式運行輸出信息部分截圖,
開始部分:
中間部分:
結束部分:
由於程式是以yarn cluster方式運行的,因此Driver是運行在Yarn集群當中(在BruceCentOS3上的ApplicationMaster進程當中),同時在BruceCentOS和BruceCentOS2上各運行了1個Executor進程(進程名字:CoarseGrainedExecutorBackend),而BruceCentOS4上的SparkSubmit進程僅僅作為yarn client向yarn集群提交spark程式。作為對比,在yarn client模式當中,客戶端SparkSubmit進程不僅作為yarn client提交程式,而且同時還會運行Driver,並啟動SparkContext,並且向Executor分配和管理Task,最後收集運行結果,因此yarn client模式程式輸出信息會顯示最終的列印結果。然而在yarn cluster模式當中,由於Driver運行在yarn集群的ApplicationMaster中,因此最終結果需要到ApplicationMaster進程的日誌中取查看。可以通過如下命令查看。
SparkUI上的Executor信息:
BruceCentOS4上的客戶端進程:
BruceCentOS3上的ApplicationMaster進程(包含Spark Driver):
BruceCentOS上的Executor:
BruceCentOS2上的Executor:
下麵具體描述下Spark程式在yarn cluster模式下運行的具體流程。
這裡是一個流程圖:
- Spark Yarn Client向YARN提交應用程式,類似於MapReduce向Yarn提交程式,會將程式文件、庫文件和配置文件等上傳到HDFS。
- ResourceManager收到請求後,在集群中選擇一個NodeManager,為該應用程式分配第一個Container,要求它在這個Container中啟動應用程式的ApplicationMaster,其中ApplicationMaster中會運行Spark Driver,併進行SparkContext的初始化。
- ApplicationMaster向ResourceManager註冊,這樣用戶可以直接通過ResourceManager查看應用程式的運行狀態,然後它將採用輪詢的方式通過RPC協議為各個任務申請資源,並監控它們的運行狀態直到運行結束。
- 一旦ApplicationMaster申請到資源(也就是Container)後,便與對應的NodeManager通信,要求它在獲得的Container中啟動CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend啟動後會向ApplicationMaster中的SparkContext註冊並申請Task。這一點和Standalone模式一樣,只不過SparkContext在Spark Application中初始化時,使用CoarseGrainedSchedulerBackend配合YarnClusterScheduler進行任務的調度。
- ApplicationMaster中的SparkContext分配Task給CoarseGrainedExecutorBackend執行,CoarseGrainedExecutorBackend運行Task並向ApplicationMaster彙報運行的狀態和進度,以讓ApplicationMaster隨時掌握各個任務的運行狀態,從而可以在任務失敗時重新啟動任務。
- 應用程式運行完成後,ApplicationMaster向ResourceManager申請註銷並關閉自己。
以上就是個人對Spark運行模式(yarn cluster)的一點理解,其中參考了“求知若渴 虛心若愚”博主的“Spark(一): 基本架構及原理”的部分內容(其中基於Spark2.3.0對某些細節進行了修正),在此表示感謝。