目錄 · 概況 · 手工搭建集群 · 引言 · 安裝Scala · 配置文件 · 啟動與測試 · 應用部署 · 部署架構 · 應用程式部署 · 核心原理 · RDD概念 · RDD核心組成 · RDD依賴關係 · DAG圖 · RDD故障恢復機制 · Standalone模式的S ...
目錄
· 概況
· 手工搭建集群
· 引言
· 安裝Scala
· 配置文件
· 啟動與測試
· 應用部署
· 部署架構
· 應用程式部署
· 核心原理
· RDD概念
· RDD核心組成
· RDD依賴關係
· DAG圖
· 應用程式資源構建
· API
· RDD構建
· RDD分區數
· 共用變數
· DoubleRDDFunctions常用Operation
· PairRDDFunctions間操作Operation
· OrderedRDDFunctions常用Operation
· 數據準備
· 載入&預處理
· 統計DAU
· 統計MAU
概況
1. Spark相對MapReduce的優勢:
a) 支持迭代計算;
b) 中間結果存儲在記憶體而不是硬碟,降低延遲。
2. Spark已成為輕量級大數據快速處理統一平臺,“One stack to rule them all”,一個平臺完成:即席查詢(ad-hoc queries)、批處理(batch processing)、流式處理(stream processing)。
3. Spark集群搭建方式:
a) 集成部署工具,如Cloudera Manager;
b) 手工搭建。
4. Spark源碼編譯方式:
a) SBT編譯;
b) Maven編譯。
手工搭建集群
引言
1. 環境:
Role |
Host name |
Master |
centos1 |
Slave |
centos2 |
centos3 |
2. Standalone模式需在Master和Slave節點部署,YARN模式僅需在命令提交機器部署。
3. 假設已成功安裝JDK、Hadoop集群。
安裝Scala
1. [Master(Standalone模式)或命令提交機器(YARN模式)]安裝Scala到/opt/app目錄下。
tar zxvf scala-2.10.6.tgz -C /opt/app
2. [Master(Standalone模式)或命令提交機器(YARN模式)]配置環境變數。
vi /etc/profile
export SCALA_HOME=/opt/app/scala-2.10.6
export PATH=$SCALA_HOME/bin:$PATH
source /etc/profile # 生效 env | grep SCALA_HOME # 驗證
配置文件
3. [Master(Standalone模式)或命令提交機器(YARN模式)]
tar zxvf spark-1.6.3-bin-hadoop2.6.tgz -C /opt/app cd /opt/app/spark-1.6.3-bin-hadoop2.6/conf cp spark-env.sh.template spark-env.sh vi spark-env.sh
export JAVA_HOME=/opt/app/jdk1.8.0_121 export SCALA_HOME=/opt/app/scala-2.10.6 export HADOOP_HOME=/opt/app/hadoop-2.6.5 export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop export YARN_CONF_DIR=${HADOOP_HOME}/etc/hadoop # For standalone mode export SPARK_WORKER_CORES=1 export SPARK_DAEMON_MEMORY=512m
cp spark-defaults.conf.template spark-defaults.conf hadoop fs -mkdir /spark.eventLog.dir vi spark-defaults.conf
spark.driver.extraClassPath /opt/app/apache-hive-1.2.2-bin/lib/mysql-connector-java-5.1.22-bin.jar spark.eventLog.enabled true spark.eventLog.dir hdfs://centos1:9000/spark.eventLog.dir
cp slaves.template slaves vi slaves
centos2
centos3
ln -s /opt/app/apache-hive-1.2.2-bin/conf/hive-site.xml .
4. [Master(Standalone模式)]從Master複製Spark目錄到各Slave。註意:僅Standalone集群需要執行本步驟。
scp -r /opt/app/spark-1.6.3-bin-hadoop2.6 hadoop@centos2:/opt/app scp -r /opt/app/spark-1.6.3-bin-hadoop2.6 hadoop@centos3:/opt/app
啟動與測試
5. [Master(Standalone模式)或命令提交機器(YARN模式)]配置Spark環境變數。
export SPARK_HOME=/opt/app/spark-1.6.3-bin-hadoop2.6 export PATH=$PATH:$SPARK_HOME/bin
6. [Master(Standalone模式)]啟動Spark,測試。
sbin/start-all.sh jps
Master # Master機器進程
Worker # Slave機器進程
7. [Master(Standalone模式)或命令提交機器(YARN模式)]測試。
bin/spark-submit --master spark://centos1:7077 --deploy-mode client --class org.apache.spark.examples.SparkPi --driver-memory 512m --executor-memory 512m --num-executors 1 --executor-cores 1 lib/spark-examples-1.6.3-hadoop2.6.0.jar # Standalone Client模式運行 bin/spark-submit --master spark://centos1:7077 --deploy-mode cluster --class org.apache.spark.examples.SparkPi --driver-memory 512m --executor-memory 512m --num-executors 1 --executor-cores 1 lib/spark-examples-1.6.3-hadoop2.6.0.jar # Standalone Cluster模式運行 bin/spark-submit --master yarn-client --class org.apache.spark.examples.SparkPi --driver-memory 512m --executor-memory 512m --num-executors 1 --executor-cores 1 lib/spark-examples-1.6.3-hadoop2.6.0.jar # Yarn Client模式運行 bin/spark-submit --master yarn-cluster --class org.apache.spark.examples.SparkPi --driver-memory 512m --executor-memory 512m --num-executors 1 --executor-cores 1 lib/spark-examples-1.6.3-hadoop2.6.0.jar # Yarn Custer模式運行
bin/yarn application -list # 查看YARN運行的應用 bin/yarn application -kill ApplicationID # 殺死YARN運行的應用
bin/spark-shell --master spark://centos1:7077 --deploy-mode client --driver-memory 512m --executor-memory 512m --num-executors 1 --executor-cores 1 # Standalone Client模式運行 bin/spark-shell --master yarn --deploy-mode client --driver-memory 512m --executor-memory 512m --num-executors 1 --executor-cores 1 # Yarn Client模式運行
8. 監控頁面。
http://centos1:8080 |
Spark監控 |
http://centos1:8088 |
YARN監控 |
應用部署
部署架構
1. Application:Spark應用程式,包括一個Driver Program和集群中多個WorkNode中的Executor,其中每個WorkNode為每個Application僅提供一個Executor。
2. Driver Program:運行Application的main函數。通常也用SparkContext表示。負責DAG構建、Stage劃分、Task管理及調度、生成SchedulerBackend用於Akka通信,主要組件有DAGScheduler、TaskScheduler、SchedulerBackend。
3. Cluster Manager:集群管理器,可封裝如Spark Standalone、YARN等不同集群管理器。Driver Program通過Cluster Manager分配資源,並將任務發送到多個Work Node執行。
4. WorkNode:集群節點。應用程式在運行時的Task在WorkNode的Executor中執行。
5. Executor:WorkNode為Application啟動的一個進程,負責執行Task。
6. Stage:一個Applicatoin一般包含一到多個Stage。
7. Task:被Driver Program發送到Executor的計算單元,通常一個Task處理一個split(即一個分區),每個split一般是一個Block大小。一個Stage包含一到多個Task,通過多個Task實現並行計算。
8. DAGScheduler:將Application分解成一到多個Stage,每個Stage根據RDD分區數決定Task個數,然後生成相應TaskSet放到TaskScheduler中。
9. DeployMode:Driver進程部署模式,有cluster和client兩種。
10. 註意:
a) Driver Program必須與Spark集群處於同一網路環境。因為SparkContext要發送任務給不同WorkNode的Executor並接受Executor的執行結果。
b) 生產環境中,Driver Program所在機器性能配置,尤其CPU較好。
應用程式部署
1. 分類:
a) spark-shell:互動式,用於開發調試。已創建好“val sc: SparkContext”和“val sqlContext: SQLContext”實例。
b) spark-submit:應用提交式,用於生產部署。
2. spark-shell參數:
bin/spark-shell --help
Usage: ./bin/spark-shell [options] Options: --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local. --deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or on one of the worker machines inside the cluster ("cluster") (Default: client). --class CLASS_NAME Your application's main class (for Java / Scala apps). --name NAME A name of your application. --jars JARS Comma-separated list of local jars to include on the driver and executor classpaths. --packages Comma-separated list of maven coordinates of jars to include on the driver and executor classpaths. Will search the local maven repo, then maven central and any additional remote repositories given by --repositories. The format for the coordinates should be groupId:artifactId:version. --exclude-packages Comma-separated list of groupId:artifactId, to exclude while resolving the dependencies provided in --packages to avoid dependency conflicts. --repositories Comma-separated list of additional remote repositories to search for the maven coordinates given with --packages. --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place on the PYTHONPATH for Python apps. --files FILES Comma-separated list of files to be placed in the working directory of each executor. --conf PROP=VALUE Arbitrary Spark configuration property. --properties-file FILE Path to a file from which to load extra properties. If not specified, this will look for conf/spark-defaults.conf. --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 1024M). --driver-java-options Extra Java options to pass to the driver. --driver-library-path Extra library path entries to pass to the driver. --driver-class-path Extra class path entries to pass to the driver. Note that jars added with --jars are automatically included in the classpath. --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G). --proxy-user NAME User to impersonate when submitting the application. --help, -h Show this help message and exit --verbose, -v Print additional debug output --version, Print the version of current Spark Spark standalone with cluster deploy mode only: --driver-cores NUM Cores for driver (Default: 1). Spark standalone or Mesos with cluster deploy mode only: --supervise If given, restarts the driver on failure. --kill SUBMISSION_ID If given, kills the driver specified. --status SUBMISSION_ID If given, requests the status of the driver specified. Spark standalone and Mesos only: --total-executor-cores NUM Total cores for all executors. Spark standalone and YARN only: --executor-cores NUM Number of cores per executor. (Default: 1 in YARN mode, or all available cores on the worker in standalone mode) YARN-only: --driver-cores NUM Number of cores used by the driver, only in cluster mode (Default: 1). --queue QUEUE_NAME The YARN queue to submit to (Default: "default"). --num-executors NUM Number of executors to launch (Default: 2). --archives ARCHIVES Comma separated list of archives to be extracted into the working directory of each executor. --principal PRINCIPAL Principal to be used to login to KDC, while running on secure HDFS. --keytab KEYTAB The full path to the file that contains the keytab for the principal specified above. This keytab will be copied to the node running the Application Master via the Secure Distributed Cache, for renewing the login tickets and the delegation tokens periodically.
3. spark-submit參數(除Usage外,其他參數與spark-shell一樣):
bin/spark-submit --help
Usage: spark-submit [options] <app jar | python file> [app arguments] Usage: spark-submit --kill [submission ID] --master [spark://...] Usage: spark-submit --status [submission ID] --master [spark://...] Options: --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local. --deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or on one of the worker machines inside the cluster ("cluster") (Default: client). --class CLASS_NAME Your application's main class (for Java / Scala apps). --name NAME A name of your application. --jars JARS Comma-separated list of local jars to include on the driver and executor classpaths. --packages Comma-separated list of maven coordinates of jars to include on the driver and executor classpaths. Will search the local maven repo, then maven central and any additional remote repositories given by --repositories. The format for the coordinates should be groupId:artifactId:version. --exclude-packages Comma-separated list of groupId:artifactId, to exclude while resolving the dependencies provided in --packages to avoid dependency conflicts. --repositories Comma-separated list of additional remote repositories to search for the maven coordinates given with --packages. --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place on the PYTHONPATH for Python apps. --files FILES Comma-separated list of files to be placed in the working directory of each executor. --conf PROP=VALUE Arbitrary Spark configuration property. --properties-file FILE Path to a file from which to load extra properties. If not specified, this will look for conf/spark-defaults.conf. --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 1024M). --driver-java-options Extra Java options to pass to the driver. --driver-library-path Extra library path entries to pass to the driver. --driver-class-path Extra class path entries to pass to the driver. Note that jars added with --jars are automatically included in the classpath. --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G). --proxy-user NAME User to impersonate when submitting the application. --help, -h Show this help message and exit --verbose, -v Print additional debug output --version, Print the version of current Spark Spark standalone with cluster deploy mode only: --driver-cores NUM Cores for driver (Default: 1). Spark standalone or Mesos with cluster deploy mode only: --supervise If given, restarts the driver on failure. --kill SUBMISSION_ID If given, kills the driver specified. --status SUBMISSION_ID If given, requests the status of the driver specified. Spark standalone and Mesos only: --total-executor-cores NUM Total cores for all executors. Spark standalone and YARN only: --executor-cores NUM Number of cores per executor. (Default: 1 in YARN mode, or all available cores on the worker in standalone mode) YARN-only: --driver-cores NUM Number of cores used by the driver, only in cluster mode (Default: 1). --queue QUEUE_NAME The YARN queue to submit to (Default: "default"). --num-executors NUM Number of executors to launch (Default: 2). --archives ARCHIVES Comma separated list of archives to be extracted into the working directory of each executor. --principal PRINCIPAL Principal to be used to login to KDC, while running on secure HDFS. --keytab KEYTAB The full path to the file that contains the keytab for the principal specified above. This keytab will be copied to the node running the Application Master via the Secure Distributed Cache, for renewing the login tickets and the delegation tokens periodically.
4. 預設參數:
a) 預設應用程式參數配置文件:conf/spark-defaults.conf
b) 預設JVM參數配置文件:conf/spark-env.sh
c) 常用的jar文件可通過“--jar”參數配置。
5. 參數優先順序(由高到低):
a) SparkConf顯示配置參數;
b) spark-submit指定參數;
c) conf/spark-defaults.conf配置文件參數。
6. MASTER_URL格式
MASTER_URL |
說明 |
local |
以單線程在本地運行(完全無並行) |
local[K] |
在本地以K個Worker線程運行,K設置為CPU核數較理想 |
local[*] |
K=CPU核數 |
spark://HOST:PORT |
連接Standalone集群的Master,即Spark監控頁面的URL,埠預設為7077(不支持省略) |
yarn-client |
以client模式連接到YARN集群,通過HADOOP_CONF_DIR環境變數查找集群 |
yarn-cluster |
以cluster模式連接到YARN集群,通過HADOOP_CONF_DIR環境變數查找集群 |
7. 註意:
a) spark-shell預設使用4040埠,當4040埠被占用時,程式列印日誌警告WARN並嘗試遞增埠(4041、4042……)直到找到可用埠為止。
b) Executor節點上每個Driver Program的jar包和文件會被覆制到工作目錄下,可能占用大量空間。YARN集群會自動清除,Standalone集群需配置“spark.worker.cleanup.appDataTtl”開啟自動清除。
8. 應用程式模板
import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.spark.sql.hive.HiveContext object Test { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("Test") val sc = new SparkContext(conf) // ... } }
9. 提交示例:
bin/spark-submit --master spark://ubuntu1:7077 --class org.apache.spark.examples.SparkPi lib/spark-examples-1.6.3-hadoop2.6.0.jar
核心原理
RDD概念
1. RDD:Resilient Distributed Dataset,彈性分散式數據集。
2. 意義:Spark最核心的抽象概念;具有容錯性基於記憶體的集群計算方法。
RDD核心組成
1. 5個核心方法。
a) getPartitions:分區列表(數據塊列表)
b) compute:計算各分區數據的函數。
c) getDependencies:對父RDD的依賴列表。
d) partitioner:key-value RDD的分區器。
e) getPreferredLocations:每個分區的預定義地址列表(如HDFS上的數據塊地址)。
2. 按用途分類以上5個方法:
a) 前3個:描述RDD間的血統關係(Lineage),必須有的方法;
b) 後2個:用於優化執行。
3. RDD的實例:RDD[T],T為泛型,即實例。
4. 分區:
a) 分區概念:將大數據量T實例集合split成多個小數據量的T實例子集合。
b) 分區源碼:實際上是Iterator[T]。
c) 分區存儲:例如以Block方式存在HDFS。
5. 依賴:
a) 依賴列表:一個RDD可有多個父依賴,所以是父RDD依賴列表。
b) 與分區關係:依賴是通過RDD分區間的依賴體現的,通過依賴列表和getPartitions方法可知RDD各分區是如何依賴一組父RDD分區的。
6. compute方法:
a) 延時(lazy)特性,當觸發Action時才真正執行compute方法;
b) 計算粒度是分區,而不是T元素。
7. partitioner方法:T實例為key-value對類型的RDD。
8. RDD抽象類源碼(節選自v1.6.3):
1 package org.apache.spark.rdd 2 3 // … 4 5 /** 6 * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, 7 * partitioned collection of elements that can be operated on in parallel. This class contains the 8 * basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition, 9 * [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value 10 * pairs, such as `groupByKey` and `join`; 11 * [[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of 12 * Doubles; and 13 * [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that 14 * can be saved as SequenceFiles. 15 * All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)] 16 * through implicit. 17 * 18 * Internally, each RDD is characterized by five main properties: 19 * 20 * - A list of partitions 21 * - A function for computing each split 22 * - A list of dependencies on other RDDs 23 * - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) 24 * - Optionally, a list of preferred locations to compute each split on (e.g. block locations for 25 * an HDFS file) 26 * 27 * All of the scheduling and execution in Spark is done based on these methods, allowing each RDD 28 * to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for 29 * reading data from a new storage system) by overriding these functions. Please refer to the 30 * [[http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf Spark paper]] for more details 31 * on RDD internals. 32 */ 33 abstract class RDD[T: ClassTag]( 34 @transient private var _sc: SparkContext, 35 @transient private var deps: Seq[Dependency[_]] 36 ) extends Serializable with Logging { 37 // ... 38 39 // ======================================================================= 40 // Methods that should be implemented by subclasses of RDD 41 // ======================================================================= 42 43 /** 44 * :: DeveloperApi :: 45 * Implemented by subclasses to compute a given partition. 46 */ 47 @DeveloperApi 48 def compute(split: Partition, context: TaskContext): Iterator[T] 49 50 /** 51 * Implemented by subclasses to return the set of partitions in this RDD. This method will only 52 * be called once, so it is safe to implement a time-consuming computation in it. 53 */ 54 protected def getPartitions: Array[Partition] 55 56 /** 57 * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only 58 * be called once, so it is safe to implement a time-consuming computation in it. 59 */ 60 protected def getDependencies: Seq[Dependency[_]] = deps 61 62 /** 63 * Optionally overridden by subclasses to specify placement preferences. 64 */ 65 protected def getPreferredLocations(split: Partition): Seq[String] = Nil 66 67 /** Optionally overridden by subclasses to specify how they are partitioned. */ 68 @transient val partitioner: Option[Partitioner] = None 69 70 // ... 71 }
RDD依賴關係
1. 窄依賴與寬依賴(外框表示RDD,內框表示分區):
2. 窄依賴:父RDD每個分區最多被一個子RDD分區所用。
3. 寬依賴:子RDD每個分區都依賴所有分區或多個分區。
4. 特性:
a) pipeline操作:窄依賴可pipeline操作,即允許在單個集群節點上流水線式執行,該節點可計算所有父分區。
b) RDD故障恢復:窄依賴只需在故障集群節點上重新計算丟失的父分區,並且可在不同節點上並行重新計算;對於寬依賴,失敗節點可能導致一個RDD所有父RDD分區丟失,都需重新計算。
5. WordCount依賴圖:
a) ShuffledRDD為寬依賴,將DAG劃分成兩個Stage:第1個Stage從HadoopRDD到MapPartitionsRDD,生成ShuffleMapTask;第2個Stage從ShuffledRDD到MapPartitionsRDD,生成ResultTask。
b) 第一個Stage由3個ShuffleMapTask通過pipeline方式並行執行,直至3個Task均執行結束至MapPartitionsRDD處。
DAG圖
1. DAG:在圖論中,如果一個有向圖無法從任一頂點出發經過若幹條邊回到該點,則這個圖是一個有向無環圖(Directed Acyclic Graph)。
2. Spark DAG:Spark將數據在分散式環境下分區,再將作業(Job)轉化為DAG,並分階段進行DAG調度和任務分散式並行處理。
3. Stage:DAG調度時,會根據Shuffle將Job劃分Stage。如圖,RDD A到RDD B間、RDD F到RDD G間都需要Shuffle,所以有3個Stage:RDD A、RDD C到RDD F、RDD B和RDD F到RDD G。
4. 流水線(pipeline):
a) Spark採用貪心演算法劃分Stage,即如果RDD的分區到父RDD分區是窄依賴,則實施經典的Fusion(融合)優化,把對應的Operation劃分到一個Stage。
b) 如果連續RDD序列都是窄依賴,則把多個Operation併到一個Stage,直到遇到寬依賴。
c) pipeline好處:減少大量的全局屏障(barrier),並無須物化很多中間結果RDD,極大地提升性能。
RDD故障恢復機制
1. 假設一個RDD故障,根據依賴關係和分區,僅需要再執行一遍父RDD的相應分區。
2. 跨寬依賴的再執行涉及多個父RDD,為避免故障RDD的大量父RDD再執行,Spark保持Map階段中間數據輸出的持久,再執行可獲取相應分區的中間數據。
3. Spark提供數據checkpoint和記錄日誌持久化中間RDD。checkpoint直接將RDD持久化到磁碟或HDFS等存儲,與cache/persist方法不同,checkpoint的RDD不會因作業結束而被消除,一直存在並被後續作業直接讀取載入。
Standalone模式的Spark架構
1. Standalone模式兩種運行方式(--deploy-mode參數控制)
a) cluster方式:Driver運行在Worker節點。
b) client方式:Driver運行在客戶端。
2. 作業執行流程(cluster方式):
a) 客戶端提交Application給Master,Master讓一個Worker啟動Driver,即SchedulerBackend(Worker創建一個DriverRunner線程,DriverRunner啟動SchedulerBackend進程)。
b) Master會讓其餘Worker啟動Executor,即ExecutorBackend(Worker創建一個ExecutorRunner線程,ExecutorRunner會啟動ExecutorBackend進程)。
c) ExecutorBackend啟動後向Driver的SchedulerBackend註冊。
d) SchedulerBackend進程中包含DAGScheduler,它根據用戶程式生成執行計劃,並調度執行。對於每個Stage的Task,都被存放到TaskScheduler中,ExecutorBackend向SchedulerBackend彙報時把TaskScheduler中的Task調度到ExecutorBackend執行。
e) 所有Stage都完成後Application結束。
3. 故障恢復
a) 如果Worker發生故障:Worker退出前,將該Worker上的Executor殺掉;Worker通過定時心跳讓Master感知Worker故障,而後彙報給Driver,並將該Worker移除;Driver可知該Worker上的Executor已被殺死。
b) 如果Executor發生故障:ExecutorRunner彙報給Master,由於Executor所在Worker正常,Master則發送LaunchExecutor指令給該Worker,讓其再次啟動一個Executor。
c) 如果Master發生故障:通過ZooKeeper搭建的Master HA(一個Active,其他Standby)切換Master。
YARN模式的Spark架構
1. YARN模式兩種運行方式(--deploy-mode參數控制)
a) cluster方式:Driver運行在NodeManager節點。
b) client方式:Driver運行在客戶端。
i. SparkAppMaster:相當於Standalone模式的SchedulerBackend。SparkAppMaster包括DAGScheduler和YARNClusterScheduler。
ii. Executor:相當於Standalone模式的ExecutorBackend。
2. 作業執行流程(cluster方式):
a) 客戶端提交Application給ResourceManager,ResourceManager在某一NodeManager彙報時把SparkAppMaster分配給NodeManager,NodeManager啟動SparkAppMaster。
b) SparkAppMaster啟動後初始化Application,然後向ResourceManager申請資源,申請後通過RPC讓相應SparkAppMaster:相當於Standalone模式的SchedulerBackend。SparkAppMaster包括DAGScheduler和YARNClusterScheduler。
c) Executor:相當於Standalone模式的ExecutorBackend。的NodeManager啟動SparkExecutor。
d) SparkExecutor向SparkAppMaster彙報並完成Task。
e) 此外,SparkClient通過SparkAppMaster獲取Application運行狀態。
應用程式資源構建
1. 兩種資源構建方式
a) 粗粒度:應用程式提交後,運行前,根據應用程式資源需求一次性湊齊資源,整個運行時不再申請資源。
b) 細粒度:應用程式提交後,動態向Cluster Manager申請資源,只要等到資源滿足一個Task的運行,便開始運行該Task,而不必等到所有資源全部到位。
2. Spark on YARN僅支持粗粒度構建方式。
API
WordCount示例
1. 準備數據。
hadoop fs -mkdir -p /test/wordcount hadoop fs -put README.md /test/wordcount
2. 執行程式。
spark-shell --master spark://centos1:7077
1 import org.apache.log4j.{Logger,Level} 2 Logger.getLogger("org.apache.spark").setLevel(Level.WARN) 3 Logger.getLogger("org.apache.spark.sql").setLevel(Level.WARN) 4 val textFile = sc.textFile("/test/wordcount/README.md") 5 val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((count1, count2) => count1 + count2) 6 wordCounts.saveAsTextFile("/test/wordcount/result") 7 wordCounts.collect
a) “flatMap(line => line.split(" "))”:將文本每一行按空格拆分成單詞RDD。
b) “map(word => (word, 1))”:將每個單詞轉換為單詞+單詞數的二元組RDD。
c) “reduceByKey((count1, count2) => count1 + count2)”:按key分組(即按單詞分組)後,每組內單詞數求和。
d) “collect”:Action操作,將RDD全部元素轉換為Scala Array返回給Driver Program。如果數據量過大,會導致Driver Program記憶體不足。
3. 查看結果。
hadoop fs -cat /test/wordcount/WordCounts
RDD構建
1. 載入外部存儲系統的文件構建RDD
a) 方法定義
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
b) “sc.textFile("/test/directory")”:載入指定目錄下所有文件。
c) “sc.textFile("/test/directory/*.txt")”:載入指定目錄下所有txt格式的文件