一、概述 Apache Spark 是專為大規模數據處理而設計的快速通用的計算引擎。Spark是UC Berkeley AMP lab (加州大學伯克利分校的AMP實驗室)所開源的類Hadoop MapReduce的通用並行框架,Spark,擁有Hadoop MapReduce所具有的優點;但不同於 ...
目錄
一、概述
Apache Spark 是專為大規模數據處理而設計的快速通用的計算引擎。Spark是UC Berkeley AMP lab (加州大學伯克利分校的AMP實驗室)所開源的類Hadoop MapReduce的通用並行框架,Spark,擁有Hadoop MapReduce所具有的優點;但不同於MapReduce的是——Job中間輸出結果可以保存在記憶體中,從而不再需要讀寫HDFS,因此Spark能更好地適用於數據挖掘與機器學習等需要迭代的MapReduce的演算法。官方地址
1)Spark特點
- 高效性:不同於MapReduce將中間計算結果放入磁碟中,Spark採用記憶體存儲中間計算結果,減少了迭代運算的磁碟IO,並通過並行計算DAG圖的優化,減少了不同任務之間的依賴,降低了延遲等待時間。記憶體計算下,Spark 比 MapReduce 快100倍。
- 通用性:Spark提供了統一的解決方案。Spark可以用於批處理、互動式查詢(Spark SQL)、實時流處理(Spark Streaming)、機器學習(Spark MLlib)和圖計算(GraphX)。
- 易用性:不同於MapReduce僅支持Map和Reduce兩種編程運算元,Spark提供了超過80種不同的Transformation和Action運算元,如map,reduce,filter,groupByKey,sortByKey,foreach等,並且採用函數式編程風格,實現相同的功能需要的代碼量極大縮小。
- 相容性:Spark能夠跟很多開源工程相容使用。如Spark可以使用Hadoop的YARN和Apache Mesos作為它的資源管理和調度器,並且Spark可以讀取多種數據源,如HDFS、HBase、MySQL等。
- 容錯性高:Spark引進了彈性分散式數據集RDD (Resilient Distributed Dataset) 的抽象,它是分佈在一組節點中的只讀對象集合,這些集合是彈性的,如果數據集一部分丟失,則可以根據“血統”(即充許基於數據衍生過程)對它們進行重建。另外在RDD計算時可以通過CheckPoint來實現容錯,而CheckPoint有兩種方式:CheckPoint Data,和Logging The Updates,用戶可以控制採用哪種方式來實現容錯。
- 適用場景廣泛:大數據分析統計,實時數據處理,圖計算及機器學習。
2)Spark適用場景
- 複雜的批量處理(Batch Data Processing),偏重點在於處理海量數據的能力,至於處理速度可忍受,通常的時間可能是在數十分鐘到數小時。
- 基於歷史數據的互動式查詢(Interactive Query),通常的時間在數十秒到數十分鐘之間。
- 基於實時數據流的數據處理(Streaming Data Processing),通常在數百毫秒到數秒之間。
二、Spark核心組件
- Spark Core:包含Spark的基本功能;尤其是定義RDD的API、操作以及這兩者上的動作。其他Spark的庫都是構建在RDD和Spark Core之上的。
- Spark SQL:提供通過Apache Hive的SQL變體Hive查詢語言(HiveQL)與Spark進行交互的API。每個資料庫表被當做一個RDD,Spark SQL查詢被轉換為Spark操作。Spark提供的sql形式的對接Hive、JDBC、HBase等各種數據渠道的API,用Java開發人員的思想來講就是面向介面、解耦合,ORMapping、Spring Cloud Stream等都是類似的思想。
- Spark Streaming:基於SparkCore實現的可擴展、高吞吐、高可靠性的實時數據流處理。支持從Kafka、Flume等數據源處理後存儲到HDFS、DataBase、Dashboard中。對實時數據流進行處理和控制。Spark Streaming允許程式能夠像普通RDD一樣處理實時數據。
- MLlib:一個常用機器學習演算法庫,演算法被實現為對RDD的Spark操作。這個庫包含可擴展的學習演算法,比如分類、回歸等需要對大量數據集進行迭代的操作。
三、Spark專業術語詳解
1)Application:Spark應用程式
指的是用戶編寫的Spark應用程式,包含了Driver功能代碼和分佈在集群中多個節點上運行的Executor代碼。Spark應用程式,由一個或多個作業JOB組成,如下圖所示:
2)Driver:驅動程式
Spark中的Driver即運行上述Application的Main()函數並且創建SparkContext,其中創建SparkContext的目的是為了準備Spark應用程式的運行環境。在Spark中由SparkContext負責和ClusterManager通信,進行資源的申請、任務的分配和監控等;當Executor部分運行完畢後,Driver負責將SparkContext關閉。通常SparkContext代表Driver,如下圖所示:
3)Cluster Manager:資源管理器
指的是在集群上獲取資源的外部服務,常用的有:Standalone,Spark原生的資源管理器,由Master負責資源的分配;Haddop Yarn,由Yarn中的ResearchManager負責資源的分配;Messos,由Messos中的Messos Master負責資源管理。
4)Executor:執行器
Application運行在Worker節點上的一個進程,該進程負責運行Task,並且負責將數據存在記憶體或者磁碟上,每個Application都有各自獨立的一批Executor,如下圖所示:
5)Worker:計算節點
集群中任何可以運行Application代碼的節點,類似於Yarn中的NodeManager節點。在Standalone模式中指的就是通過Slave文件配置的Worker節點,在Spark on Yarn模式中指的就是NodeManager節點,在Spark on Messos模式中指的就是Messos Slave節點,如下圖所示:
6)RDD:彈性分散式數據集
Resillient Distributed Dataset,Spark的基本計算單元,可以通過一系列運算元進行操作(主要有Transformation和Action操作),如下圖所示:
7)窄依賴
父RDD每一個分區最多被一個子RDD的分區所用;表現為一個父RDD的分區對應於一個子RDD的分區,或兩個父RDD的分區對應於一個子RDD 的分區。如圖所示:
8)寬依賴
父RDD的每個分區都可能被多個子RDD分區所使用,子RDD分區通常對應所有的父RDD分區。如圖所示:
- 常見的窄依賴有:map、filter、union、mapPartitions、mapValues、join(父RDD是hash-partitioned :如果JoinAPI之前被調用的RDD API是寬依賴(存在shuffle), 而且兩個join的RDD的分區數量一致,join結果的rdd分區數量也一樣,這個時候join api是窄依賴)。
- 常見的寬依賴有groupByKey、partitionBy、reduceByKey、join(父RDD不是hash-partitioned :除此之外的,rdd 的join api是寬依賴)。
9)DAG:有向無環圖
Directed Acycle graph,反應RDD之間的依賴關係,如圖所示:
10)DAGScheduler:有向無環圖調度器
基於DAG劃分Stage 並以TaskSet的形勢提交Stage給TaskScheduler;負責將作業拆分成不同階段的具有依賴關係的多批任務;最重要的任務之一就是:計算作業和任務的依賴關係,制定調度邏輯。在SparkContext初始化的過程中被實例化,一個SparkContext對應創建一個DAGScheduler。如圖所示:
11)TaskScheduler:任務調度器
將Taskset提交給worker(集群)運行並回報結果;負責每個具體任務的實際物理調度。如圖所示:
12)Job:作業
由一個或多個調度階段所組成的一次計算作業;包含多個Task組成的並行計算,往往由Spark Action催生,一個JOB包含多個RDD及作用於相應RDD上的各種Operation。如圖所示:
13)Stage:調度階段
一個任務集對應的調度階段;每個Job會被拆分很多組Task,每組任務被稱為Stage,也可稱TaskSet,一個作業分為多個階段;Stage分成兩種類型ShuffleMapStage、ResultStage。如圖所示:
14)TaskSet:任務集
由一組關聯的,但相互之間沒有Shuffle依賴關係的任務所組成的任務集。如圖所示:
15)Task:任務
被送到某個Executor上的工作任務;單個分區數據集上的最小處理流程單元。如圖所示:
總體如圖所示:
四、Spark運行基本流程
Spark運行基本流程,如下圖:
計算流程:
七,Spark支持的資源管理器
Spark與資源管理器無關,只要能夠獲取executor進程,並能保持相互通信就可以了,Spark支持資源管理器包含: Standalone(Spark)、On Mesos、On YARN、Or On K8S,當然還有local模式。
模式 | 含義 |
---|---|
local | 在本地運行,只有一個工作進程,無並行計算能力 |
local[K] | 在本地運行,有 K 個工作進程,通常設置 K 為機器的CPU 核心數量 |
local[*] | 在本地運行,工作進程數量等於機器的 CPU 核心數量。 |
spark://HOST:PORT | 以 Standalone 模式運行,這是 Spark 自身提供的集群運行模式,預設埠號: 7077 |
mesos://HOST:PORT | 在 Mesos 集群上運行,Driver 進程和 Worker 進程運行在 Mesos 集群上,部署模式必須使用固定值:--deploy-mode cluster |
yarn | 在yarn集群上運行,依賴於hadoop集群,yarn資源調度框架,將應用提交給yarn,在ApplactionMaster(相當於Stand alone模式中的Master)中運行driver,在集群上調度資源,開啟excutor執行任務。 |
k8s | 在k8s集群上運行 |
七、Spark環境搭建(Spark on Yarn)
1)下載
Spark下載地址:http://spark.apache.org/downloads.html
這裡需要註意版本,我的hadoop版本是3.3.1,這裡spark就下載最新版本的3.2.0,而Spark3.2.0依賴的Scala的2.13,所以後面用到Scala編程時註意Scala的版本。
$ cd /opt/bigdata/hadoop/software
# 下載
$ wget https://dlcdn.apache.org/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
# 解壓
$ tar -zxvf spark-3.2.0-bin-hadoop3.2.tgz -C /opt/bigdata/hadoop/server/
2)修改配置文件
# 進入spark配置目錄
$ cd /opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2/conf
# copy 一個模板配置
$ cp spark-env.sh.template spark-env.sh
在spark-env.sh下加入如下配置
# Hadoop 的配置文件目錄
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
# YARN 的配置文件目錄
export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop
# SPARK 的目錄
export SPARK_HOME=/opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2
# SPARK 執行文件目錄
export PATH=$SPARK_HOME/bin:$PATH
複製/opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2 到其它節點
$ scp -r /opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2 hadoop-node2:/opt/bigdata/hadoop/server/
$ scp -r /opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2 hadoop-node3:/opt/bigdata/hadoop/server/
3)配置環境變數
在/etc/profile文件中追加如下內容:
export SPARK_HOME=/opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2
export PATH=$SPARK_HOME/bin:$PATH
source 載入生效
$ source /etc/profile
4)運行SparkPi(圓周率) 測試驗證
spark-submit 詳細參數說明
參數名 | 參數說明 |
---|---|
--master | master 的地址,提交任務到哪裡執行,例如 spark://host:port, yarn, local |
--deploy-mode | 在本地 (client) 啟動 driver 或在 cluster 上啟動,預設是 client |
--class | 應用程式的主類,僅針對 java 或 scala 應用 |
--name | 應用程式的名稱 |
--jars | 用逗號分隔的本地 jar 包,設置後,這些 jar 將包含在 driver 和 executor 的 classpath 下 |
--packages | 包含在driver 和executor 的 classpath 中的 jar 的 maven 坐標 |
--exclude-packages | 為了避免衝突 而指定不包含的 package |
--repositories | 遠程 repository |
--conf PROP=VALUE | 指定 spark 配置屬性的值, 例如 -conf spark.executor.extraJavaOptions="-XX:MaxPermSize=256m" |
--properties-file | 載入的配置文件,預設為 conf/spark-defaults.conf |
--driver-memory | Driver記憶體,預設 1G |
--driver-java-options | 傳給 driver 的額外的 Java 選項 |
--driver-library-path | 傳給 driver 的額外的庫路徑 |
--driver-class-path | 傳給 driver 的額外的類路徑 |
--driver-cores | Driver 的核數,預設是1。在 yarn 或者 standalone 下使用 |
--executor-memory | 每個 executor 的記憶體,預設是1G |
--total-executor-cores | 所有 executor 總共的核數。僅僅在 mesos 或者 standalone 下使用 |
--num-executors | 啟動的 executor 數量。預設為2。在 yarn 下使用 |
--executor-core | 每個 executor 的核數。在yarn或者standalone下使用 |
spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
--driver-memory 1G \
--num-executors 3 \
--executor-memory 1G \
--executor-cores 1 \
/opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2/examples/jars/spark-examples_2.12-3.2.0.jar 100
如果看到控制台出現這個,說明運行成功。
查看yarn任務
查看任務日誌
【註意】預設情況下,Hadoop歷史服務historyserver是沒有啟動的,我們可以通過下麵的命令來啟動Hadoop歷史伺服器。查看日誌依賴於historyserver服務
#啟動JobHistoryServer服務
$ mapred --daemon start historyserver
#查看進程
$ jps
#停止JobHistoryServer服務
$ mapred --daemon stop historyserver
至此已經完成的Spark on Yarn 的環境搭建,並通過測試SparkPi的運行成功了。