Spark筆記——技術點彙總

来源:http://www.cnblogs.com/netoxi/archive/2017/08/02/7223412.html
-Advertisement-
Play Games

目錄 · 概況 · 手工搭建集群 · 引言 · 安裝Scala · 配置文件 · 啟動與測試 · 應用部署 · 部署架構 · 應用程式部署 · 核心原理 · RDD概念 · RDD核心組成 · RDD依賴關係 · DAG圖 · RDD故障恢復機制 · Standalone模式的S ...


目錄

· 概況

· 手工搭建集群

    · 引言

    · 安裝Scala

    · 配置文件

    · 啟動與測試

· 應用部署

    · 部署架構

    · 應用程式部署

· 核心原理

    · RDD概念

    · RDD核心組成

    · RDD依賴關係

    · DAG圖

    · RDD故障恢復機制

    · Standalone模式的Spark架構

    · YARN模式的Spark架構

    · 應用程式資源構建

· API

    · WordCount示例

    · RDD構建

    · RDD緩存與持久化

    · RDD分區數

    · 共用變數

    · RDD Operation

    · RDD Operation隱式轉換

    · RDD[T]分區Operation

    · RDD[T]常用聚合Operation

    · RDD間操作Operation

    · DoubleRDDFunctions常用Operation

    · PairRDDFunctions聚合Operation

    · PairRDDFunctions間操作Operation

    · OrderedRDDFunctions常用Operation

· 案例:移動終端上網數據分析

    · 數據準備

    · 載入&預處理

    · 統計App訪問次數

    · 統計DAU

    · 統計MAU

    · 統計App上下流量


 

概況

1. Spark相對MapReduce的優勢:

    a) 支持迭代計算;

    b) 中間結果存儲在記憶體而不是硬碟,降低延遲。

2. Spark已成為輕量級大數據快速處理統一平臺,“One stack to rule them all”,一個平臺完成:即席查詢(ad-hoc queries)、批處理(batch processing)、流式處理(stream processing)。

3. Spark集群搭建方式:

    a) 集成部署工具,如Cloudera Manager;

    b) 手工搭建。

4. Spark源碼編譯方式:

    a) SBT編譯

    b) Maven編譯

手工搭建集群

引言

1. 環境:

Role

Host name

Master

centos1

Slave

centos2

centos3

2. Standalone模式需在MasterSlave節點部署,YARN模式僅需在命令提交機器部署。

3. 假設已成功安裝JDKHadoop集群。

安裝Scala

1. [Master(Standalone模式)或命令提交機器(YARN模式)]安裝Scala到/opt/app目錄下。 

tar zxvf scala-2.10.6.tgz -C /opt/app

2. [Master(Standalone模式)或命令提交機器(YARN模式)]配置環境變數。

vi /etc/profile
export SCALA_HOME=/opt/app/scala-2.10.6
export PATH=$SCALA_HOME/bin:$PATH
source /etc/profile # 生效
env | grep SCALA_HOME # 驗證

配置文件

3. [MasterStandalone模式)或命令提交機器(YARN模式)]

tar zxvf spark-1.6.3-bin-hadoop2.6.tgz -C /opt/app
cd /opt/app/spark-1.6.3-bin-hadoop2.6/conf
cp spark-env.sh.template spark-env.sh
vi spark-env.sh
export JAVA_HOME=/opt/app/jdk1.8.0_121
export SCALA_HOME=/opt/app/scala-2.10.6
export HADOOP_HOME=/opt/app/hadoop-2.6.5
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export YARN_CONF_DIR=${HADOOP_HOME}/etc/hadoop
# For standalone mode
export SPARK_WORKER_CORES=1
export SPARK_DAEMON_MEMORY=512m
cp spark-defaults.conf.template spark-defaults.conf
hadoop fs -mkdir /spark.eventLog.dir
vi spark-defaults.conf
spark.driver.extraClassPath        /opt/app/apache-hive-1.2.2-bin/lib/mysql-connector-java-5.1.22-bin.jar
spark.eventLog.enabled           true
spark.eventLog.dir               hdfs://centos1:9000/spark.eventLog.dir
cp slaves.template slaves
vi slaves
centos2
centos3
ln -s /opt/app/apache-hive-1.2.2-bin/conf/hive-site.xml .

4. [Master(Standalone模式)]從Master複製Spark目錄到各Slave。註意:僅Standalone集群需要執行本步驟。

scp -r /opt/app/spark-1.6.3-bin-hadoop2.6 hadoop@centos2:/opt/app
scp -r /opt/app/spark-1.6.3-bin-hadoop2.6 hadoop@centos3:/opt/app

啟動與測試

5. [MasterStandalone模式)或命令提交機器(YARN模式)]配置Spark環境變數。

export SPARK_HOME=/opt/app/spark-1.6.3-bin-hadoop2.6
export PATH=$PATH:$SPARK_HOME/bin

6. [Master(Standalone模式)]啟動Spark,測試。 

sbin/start-all.sh
jps
Master # Master機器進程
Worker # Slave機器進程

7. [Master(Standalone模式)或命令提交機器(YARN模式)]測試。

bin/spark-submit --master spark://centos1:7077 --deploy-mode client --class org.apache.spark.examples.SparkPi --driver-memory 512m --executor-memory 512m --num-executors 1 --executor-cores 1 lib/spark-examples-1.6.3-hadoop2.6.0.jar # Standalone Client模式運行
bin/spark-submit --master spark://centos1:7077 --deploy-mode cluster --class org.apache.spark.examples.SparkPi --driver-memory 512m --executor-memory 512m --num-executors 1 --executor-cores 1 lib/spark-examples-1.6.3-hadoop2.6.0.jar # Standalone Cluster模式運行
bin/spark-submit --master yarn-client --class org.apache.spark.examples.SparkPi --driver-memory 512m --executor-memory 512m --num-executors 1 --executor-cores 1 lib/spark-examples-1.6.3-hadoop2.6.0.jar # Yarn Client模式運行
bin/spark-submit --master yarn-cluster --class org.apache.spark.examples.SparkPi --driver-memory 512m --executor-memory 512m --num-executors 1 --executor-cores 1 lib/spark-examples-1.6.3-hadoop2.6.0.jar # Yarn Custer模式運行
bin/yarn application -list # 查看YARN運行的應用
bin/yarn application -kill ApplicationID # 殺死YARN運行的應用
bin/spark-shell --master spark://centos1:7077 --deploy-mode client --driver-memory 512m --executor-memory 512m --num-executors 1 --executor-cores 1 # Standalone Client模式運行
bin/spark-shell --master yarn --deploy-mode client --driver-memory 512m --executor-memory 512m --num-executors 1 --executor-cores 1 # Yarn Client模式運行

8. 監控頁面。

http://centos1:8080

Spark監控

http://centos1:8088

YARN監控

應用部署

部署架構

1. ApplicationSpark應用程式,包括一個Driver Program和集群中多個WorkNode中的Executor,其中每個WorkNode為每個Application僅提供一個Executor

2. Driver Program:運行Applicationmain函數。通常也用SparkContext表示。負責DAG構建、Stage劃分、Task管理及調度、生成SchedulerBackend用於Akka通信,主要組件有DAGScheduler、TaskScheduler、SchedulerBackend。

3. Cluster Manager:集群管理器,可封裝如Spark Standalone、YARN等不同集群管理器。Driver Program通過Cluster Manager分配資源,並將任務發送到多個Work Node執行。

4. WorkNode:集群節點。應用程式在運行時的TaskWorkNodeExecutor中執行。

5. ExecutorWorkNodeApplication啟動的一個進程,負責執行Task

6. Stage:一個Applicatoin一般包含一到多個Stage

7. Task:被Driver Program發送到Executor的計算單元,通常一個Task處理一個split(即一個分區),每個split一般是一個Block大小。一個Stage包含一到多個Task,通過多個Task實現並行計算

8. DAGScheduler:將Application分解成一到多個Stage,每個Stage根據RDD分區數決定Task個數,然後生成相應TaskSet放到TaskScheduler

9. DeployMode:Driver進程部署模式,有clusterclient兩種。

10. 註意:

    a) Driver Program必須與Spark集群處於同一網路環境。因為SparkContext要發送任務給不同WorkNodeExecutor並接受Executor的執行結果。

    b) 生產環境中,Driver Program所在機器性能配置,尤其CPU較好。

應用程式部署

1. 分類:

    a) spark-shell:互動式,用於開發調試。已創建好“val sc: SparkContext”和“val sqlContext: SQLContext”實例。

    b) spark-submit:應用提交式,用於生產部署。

2. spark-shell參數:

bin/spark-shell --help
Usage: ./bin/spark-shell [options]

Options:
  --master MASTER_URL         spark://host:port, mesos://host:port, yarn, or local.
  --deploy-mode DEPLOY_MODE   Whether to launch the driver program locally ("client") or
                              on one of the worker machines inside the cluster ("cluster")
                              (Default: client).
  --class CLASS_NAME          Your application's main class (for Java / Scala apps).
  --name NAME                 A name of your application.
  --jars JARS                 Comma-separated list of local jars to include on the driver
                              and executor classpaths.
  --packages                  Comma-separated list of maven coordinates of jars to include
                              on the driver and executor classpaths. Will search the local
                              maven repo, then maven central and any additional remote
                              repositories given by --repositories. The format for the
                              coordinates should be groupId:artifactId:version.
  --exclude-packages          Comma-separated list of groupId:artifactId, to exclude while
                              resolving the dependencies provided in --packages to avoid
                              dependency conflicts.
  --repositories              Comma-separated list of additional remote repositories to
                              search for the maven coordinates given with --packages.
  --py-files PY_FILES         Comma-separated list of .zip, .egg, or .py files to place
                              on the PYTHONPATH for Python apps.
  --files FILES               Comma-separated list of files to be placed in the working
                              directory of each executor.

  --conf PROP=VALUE           Arbitrary Spark configuration property.
  --properties-file FILE      Path to a file from which to load extra properties. If not
                              specified, this will look for conf/spark-defaults.conf.

  --driver-memory MEM         Memory for driver (e.g. 1000M, 2G) (Default: 1024M).
  --driver-java-options       Extra Java options to pass to the driver.
  --driver-library-path       Extra library path entries to pass to the driver.
  --driver-class-path         Extra class path entries to pass to the driver. Note that
                              jars added with --jars are automatically included in the
                              classpath.

  --executor-memory MEM       Memory per executor (e.g. 1000M, 2G) (Default: 1G).

  --proxy-user NAME           User to impersonate when submitting the application.

  --help, -h                  Show this help message and exit
  --verbose, -v               Print additional debug output
  --version,                  Print the version of current Spark

 Spark standalone with cluster deploy mode only:
  --driver-cores NUM          Cores for driver (Default: 1).

 Spark standalone or Mesos with cluster deploy mode only:
  --supervise                 If given, restarts the driver on failure.
  --kill SUBMISSION_ID        If given, kills the driver specified.
  --status SUBMISSION_ID      If given, requests the status of the driver specified.

 Spark standalone and Mesos only:
  --total-executor-cores NUM  Total cores for all executors.

 Spark standalone and YARN only:
  --executor-cores NUM        Number of cores per executor. (Default: 1 in YARN mode,
                              or all available cores on the worker in standalone mode)

 YARN-only:
  --driver-cores NUM          Number of cores used by the driver, only in cluster mode
                              (Default: 1).
  --queue QUEUE_NAME          The YARN queue to submit to (Default: "default").
  --num-executors NUM         Number of executors to launch (Default: 2).
  --archives ARCHIVES         Comma separated list of archives to be extracted into the
                              working directory of each executor.
  --principal PRINCIPAL       Principal to be used to login to KDC, while running on
                              secure HDFS.
  --keytab KEYTAB             The full path to the file that contains the keytab for the
                              principal specified above. This keytab will be copied to
                              the node running the Application Master via the Secure
                              Distributed Cache, for renewing the login tickets and the
                              delegation tokens periodically.

3. spark-submit參數(除Usage外,其他參數與spark-shell一樣):

bin/spark-submit --help
Usage: spark-submit [options] <app jar | python file> [app arguments]
Usage: spark-submit --kill [submission ID] --master [spark://...]
Usage: spark-submit --status [submission ID] --master [spark://...]

Options:
  --master MASTER_URL         spark://host:port, mesos://host:port, yarn, or local.
  --deploy-mode DEPLOY_MODE   Whether to launch the driver program locally ("client") or
                              on one of the worker machines inside the cluster ("cluster")
                              (Default: client).
  --class CLASS_NAME          Your application's main class (for Java / Scala apps).
  --name NAME                 A name of your application.
  --jars JARS                 Comma-separated list of local jars to include on the driver
                              and executor classpaths.
  --packages                  Comma-separated list of maven coordinates of jars to include
                              on the driver and executor classpaths. Will search the local
                              maven repo, then maven central and any additional remote
                              repositories given by --repositories. The format for the
                              coordinates should be groupId:artifactId:version.
  --exclude-packages          Comma-separated list of groupId:artifactId, to exclude while
                              resolving the dependencies provided in --packages to avoid
                              dependency conflicts.
  --repositories              Comma-separated list of additional remote repositories to
                              search for the maven coordinates given with --packages.
  --py-files PY_FILES         Comma-separated list of .zip, .egg, or .py files to place
                              on the PYTHONPATH for Python apps.
  --files FILES               Comma-separated list of files to be placed in the working
                              directory of each executor.

  --conf PROP=VALUE           Arbitrary Spark configuration property.
  --properties-file FILE      Path to a file from which to load extra properties. If not
                              specified, this will look for conf/spark-defaults.conf.

  --driver-memory MEM         Memory for driver (e.g. 1000M, 2G) (Default: 1024M).
  --driver-java-options       Extra Java options to pass to the driver.
  --driver-library-path       Extra library path entries to pass to the driver.
  --driver-class-path         Extra class path entries to pass to the driver. Note that
                              jars added with --jars are automatically included in the
                              classpath.

  --executor-memory MEM       Memory per executor (e.g. 1000M, 2G) (Default: 1G).

  --proxy-user NAME           User to impersonate when submitting the application.

  --help, -h                  Show this help message and exit
  --verbose, -v               Print additional debug output
  --version,                  Print the version of current Spark

 Spark standalone with cluster deploy mode only:
  --driver-cores NUM          Cores for driver (Default: 1).

 Spark standalone or Mesos with cluster deploy mode only:
  --supervise                 If given, restarts the driver on failure.
  --kill SUBMISSION_ID        If given, kills the driver specified.
  --status SUBMISSION_ID      If given, requests the status of the driver specified.

 Spark standalone and Mesos only:
  --total-executor-cores NUM  Total cores for all executors.

 Spark standalone and YARN only:
  --executor-cores NUM        Number of cores per executor. (Default: 1 in YARN mode,
                              or all available cores on the worker in standalone mode)

 YARN-only:
  --driver-cores NUM          Number of cores used by the driver, only in cluster mode
                              (Default: 1).
  --queue QUEUE_NAME          The YARN queue to submit to (Default: "default").
  --num-executors NUM         Number of executors to launch (Default: 2).
  --archives ARCHIVES         Comma separated list of archives to be extracted into the
                              working directory of each executor.
  --principal PRINCIPAL       Principal to be used to login to KDC, while running on
                              secure HDFS.
  --keytab KEYTAB             The full path to the file that contains the keytab for the
                              principal specified above. This keytab will be copied to
                              the node running the Application Master via the Secure
                              Distributed Cache, for renewing the login tickets and the
                              delegation tokens periodically.

4. 預設參數:

    a) 預設應用程式參數配置文件:conf/spark-defaults.conf

    b) 預設JVM參數配置文件:conf/spark-env.sh

    c) 常用的jar文件可通過“--jar”參數配置。

5. 參數優先順序(由高到低):

    a) SparkConf顯示配置參數;

    b) spark-submit指定參數

    c) conf/spark-defaults.conf配置文件參數

6. MASTER_URL格式

MASTER_URL

說明

local

以單線程在本地運行(完全無並行)

local[K]

在本地以KWorker線程運行,K設置為CPU核數較理想

local[*]

K=CPU核數

spark://HOST:PORT

連接Standalone集群的Master,即Spark監控頁面的URL,埠預設為7077(不支持省略)

yarn-client

client模式連接到YARN集群,通過HADOOP_CONF_DIR環境變數查找集群

yarn-cluster

cluster模式連接到YARN集群,通過HADOOP_CONF_DIR環境變數查找集群

 7. 註意:

    a) spark-shell預設使用4040埠,當4040埠被占用時,程式列印日誌警告WARN並嘗試遞增埠(4041、4042……)直到找到可用埠為止。

    b) Executor節點上每個Driver Programjar包和文件會被覆制到工作目錄下,可能占用大量空間。YARN集群會自動清除,Standalone集群需配置“spark.worker.cleanup.appDataTtl”開啟自動清除。

8. 應用程式模板

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveContext

object Test {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Test")
    val sc = new SparkContext(conf)
    
    // ...
  }
}

9. 提交示例:

bin/spark-submit --master spark://ubuntu1:7077 --class org.apache.spark.examples.SparkPi lib/spark-examples-1.6.3-hadoop2.6.0.jar

核心原理

RDD概念

1. RDDResilient Distributed Dataset,彈性分散式數據集。

2. 意義:Spark最核心的抽象概念;具有容錯性基於記憶體的集群計算方法。

RDD核心組成

1. 5個核心方法。

    a) getPartitions:分區列表(數據塊列表)

    b) compute:計算各分區數據的函數。

    c) getDependencies:對父RDD的依賴列表。

    d) partitionerkey-value RDD的分區器。

    e) getPreferredLocations:每個分區的預定義地址列表(如HDFS上的數據塊地址)。

2. 按用途分類以上5個方法:

    a) 前3個:描述RDD間的血統關係(Lineage),必須有的方法;

    b) 後2個:用於優化執行。

3. RDD的實例:RDD[T]T為泛型,即實例。

4. 分區:

    a) 分區概念:將大數據量T實例集合split成多個小數據量的T實例子集合。

    b) 分區源碼:實際上是Iterator[T]

    c) 分區存儲:例如以Block方式存在HDFS

5. 依賴:

    a) 依賴列表:一個RDD可有多個父依賴,所以是父RDD依賴列表。

    b) 與分區關係:依賴是通過RDD分區間的依賴體現的,通過依賴列表和getPartitions方法可知RDD各分區是如何依賴一組父RDD分區的。

6. compute方法:

    a) 延時(lazy)特性,當觸發Action時才真正執行compute方法;

    b) 計算粒度是分區,而不是T元素。

7. partitioner方法:T實例為key-value對類型的RDD

8. RDD抽象類源碼(節選自v1.6.3):

 1 package org.apache.spark.rdd
 2 
 3 //
 4 
 5 /**
 6  * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
 7  * partitioned collection of elements that can be operated on in parallel. This class contains the
 8  * basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition,
 9  * [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value
10  * pairs, such as `groupByKey` and `join`;
11  * [[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of
12  * Doubles; and
13  * [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that
14  * can be saved as SequenceFiles.
15  * All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)]
16  * through implicit.
17  *
18  * Internally, each RDD is characterized by five main properties:
19  *
20  *  - A list of partitions
21  *  - A function for computing each split
22  *  - A list of dependencies on other RDDs
23  *  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
24  *  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
25  *    an HDFS file)
26  *
27  * All of the scheduling and execution in Spark is done based on these methods, allowing each RDD
28  * to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for
29  * reading data from a new storage system) by overriding these functions. Please refer to the
30  * [[http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf Spark paper]] for more details
31  * on RDD internals.
32  */
33 abstract class RDD[T: ClassTag](
34     @transient private var _sc: SparkContext,
35     @transient private var deps: Seq[Dependency[_]]
36   ) extends Serializable with Logging {
37   // ...
38   
39   // =======================================================================
40   // Methods that should be implemented by subclasses of RDD
41   // =======================================================================
42 
43   /**
44    * :: DeveloperApi ::
45    * Implemented by subclasses to compute a given partition.
46    */
47   @DeveloperApi
48   def compute(split: Partition, context: TaskContext): Iterator[T]
49 
50   /**
51    * Implemented by subclasses to return the set of partitions in this RDD. This method will only
52    * be called once, so it is safe to implement a time-consuming computation in it.
53    */
54   protected def getPartitions: Array[Partition]
55 
56   /**
57    * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
58    * be called once, so it is safe to implement a time-consuming computation in it.
59    */
60   protected def getDependencies: Seq[Dependency[_]] = deps
61 
62   /**
63    * Optionally overridden by subclasses to specify placement preferences.
64    */
65   protected def getPreferredLocations(split: Partition): Seq[String] = Nil
66 
67   /** Optionally overridden by subclasses to specify how they are partitioned. */
68   @transient val partitioner: Option[Partitioner] = None
69 
70   // ...
71 }

RDD依賴關係

1. 窄依賴與寬依賴(外框表示RDD,內框表示分區):

 

2. 窄依賴:父RDD每個分區最多被一個子RDD分區所用。

3. 寬依賴:子RDD每個分區都依賴所有分區或多個分區。

4. 特性:

    a) pipeline操作:窄依賴可pipeline操作,即允許在單個集群節點上流水線式執行,該節點可計算所有父分區。

    b) RDD故障恢復:窄依賴只需在故障集群節點上重新計算丟失的父分區,並且可在不同節點上並行重新計算;對於寬依賴,失敗節點可能導致一個RDD所有父RDD分區丟失,都需重新計算。

5. WordCount依賴圖

 

    a) ShuffledRDD為寬依賴,將DAG劃分成兩個Stage:第1StageHadoopRDDMapPartitionsRDD,生成ShuffleMapTask;第2StageShuffledRDDMapPartitionsRDD,生成ResultTask

    b) 第一個Stage3ShuffleMapTask通過pipeline方式並行執行,直至3Task均執行結束至MapPartitionsRDD處。

DAG圖

1. DAG:在圖論中,如果一個有向圖無法從任一頂點出發經過若幹條邊回到該點,則這個圖是一個有向無環圖(Directed Acyclic Graph)。

2. Spark DAGSpark將數據在分散式環境下分區,再將作業(Job)轉化為DAG,並分階段進行DAG調度和任務分散式並行處理。

3. StageDAG調度時,會根據ShuffleJob劃分Stage。如圖,RDD ARDD B、RDD FRDD G間都需要Shuffle,所以有3StageRDD A、RDD CRDD F、RDD BRDD FRDD G

4. 流水線(pipeline):

    a) Spark採用貪心演算法劃分Stage,即如果RDD的分區到父RDD分區是窄依賴,則實施經典的Fusion(融合)優化,把對應的Operation劃分到一個Stage

    b) 如果連續RDD序列都是窄依賴,則把多個Operation併到一個Stage,直到遇到寬依賴。

    c) pipeline好處:減少大量的全局屏障(barrier),並無須物化很多中間結果RDD,極大地提升性能。

RDD故障恢復機制

1. 假設一個RDD故障,根據依賴關係和分區,僅需要再執行一遍父RDD的相應分區。

2. 跨寬依賴的再執行涉及多個父RDD,為避免故障RDD的大量父RDD再執行,Spark保持Map階段中間數據輸出的持久,再執行可獲取相應分區的中間數據。

3. Spark提供數據checkpoint和記錄日誌持久化中間RDDcheckpoint直接將RDD持久化到磁碟或HDFS等存儲,與cache/persist方法不同,checkpointRDD不會因作業結束而被消除,一直存在並被後續作業直接讀取載入。

Standalone模式的Spark架構

1. Standalone模式兩種運行方式--deploy-mode參數控制)

    a) cluster方式Driver運行在Worker節點。

 

    b) client方式:Driver運行在客戶端。

2. 作業執行流程(cluster方式):

    a) 客戶端提交ApplicationMasterMaster讓一個Worker啟動Driver,即SchedulerBackendWorker創建一個DriverRunner線程,DriverRunner啟動SchedulerBackend進程)。

    b) Master會讓其餘Worker啟動Executor,即ExecutorBackendWorker創建一個ExecutorRunner線程,ExecutorRunner會啟動ExecutorBackend進程)。

    c) ExecutorBackend啟動後向DriverSchedulerBackend註冊。

    d) SchedulerBackend進程中包含DAGScheduler,它根據用戶程式生成執行計劃,並調度執行。對於每個StageTask,都被存放到TaskScheduler中,ExecutorBackendSchedulerBackend彙報時把TaskScheduler中的Task調度到ExecutorBackend執行。

    e) 所有Stage都完成後Application結束。

3. 故障恢復

    a) 如果Worker發生故障:Worker退出前,將該Worker上的Executor殺掉;Worker通過定時心跳讓Master感知Worker故障,而後彙報給Driver,並將該Worker移除;Driver可知該Worker上的Executor已被殺死。

    b) 如果Executor發生故障:ExecutorRunner彙報給Master,由於Executor所在Worker正常,Master則發送LaunchExecutor指令給該Worker,讓其再次啟動一個Executor

    c) 如果Master發生故障:通過ZooKeeper搭建的Master HA(一個Active,其他Standby)切換Master

YARN模式的Spark架構

1. YARN模式兩種運行方式--deploy-mode參數控制)

    a) cluster方式Driver運行在NodeManager節點。

    b) client方式Driver運行在客戶端。

        i. SparkAppMaster:相當於Standalone模式的SchedulerBackendSparkAppMaster包括DAGSchedulerYARNClusterScheduler

        ii. Executor:相當於Standalone模式的ExecutorBackend

2. 作業執行流程(cluster方式):

    a) 客戶端提交ApplicationResourceManagerResourceManager在某一NodeManager彙報時把SparkAppMaster分配給NodeManagerNodeManager啟動SparkAppMaster

    b) SparkAppMaster啟動後初始化Application,然後向ResourceManager申請資源,申請後通過RPC讓相應SparkAppMaster:相當於Standalone模式的SchedulerBackendSparkAppMaster包括DAGSchedulerYARNClusterScheduler

    c) Executor:相當於Standalone模式的ExecutorBackendNodeManager啟動SparkExecutor

    d) SparkExecutorSparkAppMaster彙報並完成Task

    e) 此外,SparkClient通過SparkAppMaster獲取Application運行狀態

應用程式資源構建

1. 兩種資源構建方式

    a) 粗粒度:應用程式提交後,運行前,根據應用程式資源需求一次性湊齊資源,整個運行時不再申請資源。

    b) 細粒度:應用程式提交後,動態向Cluster Manager申請資源,只要等到資源滿足一個Task的運行,便開始運行該Task,而不必等到所有資源全部到位。

2. Spark on YARN僅支持粗粒度構建方式

API

WordCount示例

1. 準備數據。

hadoop fs -mkdir -p /test/wordcount
hadoop fs -put README.md /test/wordcount

2. 執行程式。

spark-shell --master spark://centos1:7077
1 import org.apache.log4j.{Logger,Level}
2 Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
3 Logger.getLogger("org.apache.spark.sql").setLevel(Level.WARN)
4 val textFile = sc.textFile("/test/wordcount/README.md")
5 val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((count1, count2) => count1 + count2)
6 wordCounts.saveAsTextFile("/test/wordcount/result")
7 wordCounts.collect

    a) “flatMap(line => line.split(" "))”:將文本每一行按空格拆分成單詞RDD

    b) “map(word => (word, 1))”:將每個單詞轉換為單詞+單詞數的二元組RDD

    c) “reduceByKey((count1, count2) => count1 + count2)”:按key分組(即按單詞分組)後,每組內單詞數求和。

    d) “collect”:Action操作,將RDD全部元素轉換為Scala Array返回給Driver Program。如果數據量過大,會導致Driver Program記憶體不足。

3. 查看結果。

hadoop fs -cat /test/wordcount/WordCounts

RDD構建

1. 載入外部存儲系統的文件構建RDD

    a) 方法定義

def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]

    b) “sc.textFile("/test/directory")”:載入指定目錄下所有文件。

    c) “sc.textFile("/test/directory/*.txt")”:載入指定目錄下所有txt格式的文件

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • activity啟動模式之standard 一、簡介 這種模式是預設的,不用我們自己設定 就像一隻疊加在棧中 如果退出,就一個個退出,其實就是我們自己用手機的那種感受 二、代碼實例 activityLaunchStandard.MainActivity 每次激活activity都會新建一個activ ...
  • 請求碼和結果碼 一、簡介 請求碼: 例如請求頁面有多個button,根據請求碼就知道是哪個button在請求 結果碼: 多個請求可以打開多個頁面,根據結果碼就知道我們打開的是哪個界面 請求碼是用來標識請求源的,結果碼是用來標識結果源的。 二、具體步驟 這裡演示結果碼的 1、界面1裡面的結果碼是100 ...
  • 從Activity中返回數據 一、簡介 這裡也就是使用intent方式返回數據。 二、具體步驟 在MainActivity通過一個button訪問Activity01頁面,然後將Activity01頁面的數據返回到MainActivity頁面。 調用Activity頁面:MainActivity 被 ...
  • 二、CocoaPods 安裝 CocoaPods可以方便地通過Mac自帶的RubyGems安裝。 打開Terminal(Mac電腦自帶的終端): (1).設置ruby的軟體源 這是因為ruby的軟體源rubygems.org因為使用亞馬遜的雲服務,被我天朝屏蔽了,需要更新一下ruby的源,過程如下: ...
  • 一,代碼。 二,輸出。 ...
  • [20170728]oracle保留字.txt--//oracle有許多保留字,我印象最深的就是使用rman備份表空間test,test就是rman裡面的保留字.--//還有rman也是rman裡面的保留字.如果在應用中儘量規避不要使用這些保留字.--//探究一下,oracle內部是否也會不小心這些 ...
  • 作者: kent鵬 轉載請註明出處: http://www.cnblogs.com/xieyupeng/p/7272236.html Oracle的集群 Oracle的體繫結構 SQL> --當前用戶 SQL> show user USER 為 "SCOTT" SQL> --當前用戶下的表 SQL> ...
  • 原文地址:http://blogxinxiucan.sh1.newtouch.com/2017/08/01/Java-curator%E6%93%8D%E4%BD%9Czookeeper%E8%8E%B7%E5%8F%96kafka/ Curator是Netflix公司開源的一個Zookeeper... ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...