spark和mapreduce差不多,都是一種計算引擎,spark相對於MapReduce來說,他的區別是,MapReduce會把計算結果放 在磁碟,spark把計算結果既放在磁碟中有放在記憶體中,mapreduce把可能會把一個大任務分成多個stage,瓶頸發生在IO,spark有一個叫DAG(有向 ...
spark和mapreduce差不多,都是一種計算引擎,spark相對於MapReduce來說,他的區別是,MapReduce會把計算結果放
在磁碟,spark把計算結果既放在磁碟中有放在記憶體中,mapreduce把可能會把一個大任務分成多個stage,瓶頸發生在IO,spark有一個叫DAG(有向無環圖)的東西,可以把多個運算元都放在一個stage進行合併。
spark shuffle的時候一定會把數據放在磁碟中,因為如果在shuffle的時候數據丟失,代價特別的昂貴
spark和mapreduce最大的區別是,spark可以把中間結果即放在記憶體,又可以放在磁碟中。
因為現在記憶體和以前比已經很大了,能放在記憶體就放在記憶體里。但mapreduce是把中間結果放在磁碟中的
,磁碟的IO速度太慢了,所以spark比mapreduce快很多了
spark僅僅可以替代的是mapreduce。spark有個很重要的東西是DAG(有向無環圖)
可以將多個相同的運算元合併到一個stage裡面(以後學。。)
spark在shuffle的時候一定是在記憶體中的,但為了保證數據的安全性,也會把數據寫入磁碟,不然恢復的時候代價比較大
spark可以運行在hadoop,yarn,Mesos(Apache的一個資源調度框架),standalone(spark自帶的)。。。
spark主節點master,工作節點worker,master可以部署兩個,解決單點故障,這時候可以引入zookeeper
安裝
首先我們的電腦上要有jdk8以上的版本
還有hadoop的hdfs就行
我們機器的規劃可以是兩個master,一個zookeeper
我們現在官網上下載一個spark,然後傳到你的機器集群中,解壓安裝到指定目錄
然後進入到conf目錄
然後修改兩個配置文件
vim spark-env.sh
裡面含義,指定你的jdk路徑,指定你的zookeeper機器都有哪台,指定每台機器的內核數(線程數量),每台機器的使用記憶體
這裡面不用安裝scala了,因為spark裡面已經安裝好了
:r! echo /root/spark //小知識在vim裡面輸出
linux腳本
一個簡單的for迴圈
for i in {5..8}; do echo $i; done
scp的時候如果機器太多可以寫個腳本
for i in {1..5}; do scp -r /root/apps/spark/ hdp-0$i:/root/apps/; done
然後修改slaves文件,把你的worker機器給寫入進去就行了
然後把你的spark 給拷貝到其他機器上
然後啟動的話sbin下有一個start-all.sh
sbin/start-all.sh 啟動即可,這個最好首先配置好master上對其他機器的免密登錄
master上的網頁顯示埠是8080,內部通信埠是7077
現在可以試著配置spark的高可用
修改spark-env.sh
export JAVA_HOME=/root/apps/jdk1.8.0_141/
#export SPARK_MASTER_HOST=hdp-01
#export SPARK_MASTER_PORT=7077export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=hdp-01:2181,hdp-02:2181,hdp-03:2181,hdp-04:2181 -Dspark.deploy.zookeeper.dir=/root/zk_spark"
export SPARK_WORKER_CORES=8
export SPARK_WORKER_MEMORY=6g
修改後啟動你的zookeeper
縣啟動你的集群
sbin/start-all.sh
然後再一臺有zookeeper的機器上在啟動一個master
sbin/start-master.sh
提交第一個spark程式
提交一個程式需要一個客戶端 人家名字叫sparkSubmit(Driver)
給活躍的master提交任務,
隨便找一臺機器當做客戶端,在安裝包里有一個example
[root@hdp-02 spark-2.4.3-bin-hadoop2.7]# bin/spark-submit --master spark://hdp-01:7077 --class org.apache.spark.examples.SparkPi examples/jars/spark-examples_2.11-2.4.3.jar 500
這裡只是用了人家的樣例,最後數字是採多少樣本
在執行任務的時候,master會啟動sparksubmit,還有個coarserGrainedExecutorBackend
這個東西是真正執行任務的東西
程式執行完,這幾個進程就會被釋放。
可以指定參數,可以指定多個master,比如用多大記憶體,內核數
參數說明:
--master spark://node1.edu360.cn:7077 指定Master的地址
--executor-memory 2g 指定每個worker可用記憶體為2G
--total-executor-cores 2 指定整個集群使用的cup核數為2個
[root@hdp-02 spark-2.4.3-bin-hadoop2.7]# bin/spark-submit --master spark://hdp-01:7077,hdp-02:7077 --class org.apache.spark.examples.SparkPi --executor-memory 2048mb --total-executor-cores 12 examples/jars/spark-examples_2.11-2.4.3.jar 500
spark shell互動式命令行,方便學習和測試,可以寫spark程式,也是一個客戶端
啟動spark shell
bin/spark-shell,這樣啟動沒有啟動master,是local模式運行的,是模擬的
bin/spark-shell --master spark://hdp-01:7077
此時執行spark-shell的機器也會執行subsubmit
workder也會啟動coarserGrainedExecutorBackend,即使現在還沒有提交任務,他們在準備工作
在spark-shell中提交一個wordcount程式.....scale一句搞定
sc.textFile 指定讀取哪裡的文件,註意要是所有的機器都能訪問的,不能只是本地的,最好是HDFS的
sc.textFile("hdfs://hdp-01:9000/wordcount/input").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).collect
sc是spark core(RDD)的執行入口
spark都有哪些進程
首先啟動master,然後啟動worker,worker會向master註冊,然後不停發送心跳檢測
master,如果只有一個,會把這些信息保存到磁碟,如果有多個,會保存在zookeeper
1.客戶端通過命令行參數知道master在哪,並設置一些參數,比如記憶體資源和內核數,會先添加任務,然後向master申請資源
2.然後master就會在worker裡面查找,負責資源調度(就是將executor在哪些worker啟動)
3.master和work進行RPC通信,讓worker啟動executor(將分配的參數傳遞過去)然後worker啟動executor。
4.接下來,executor和客戶端sparksubmit進行通信(通過master-》worker-》execotor這樣知道客戶端在哪)。
5.然後把sparksubmit把真正的計算邏輯生成task發送給executor。
6.然後再executor執行真正的計算邏輯
yarn和spark的standalone調度模式對比
resourcemanager master 管理子節點,資源調度,接受任務請求
nodemanager worker 管理當前結點,並管理子進程
yarnchild executor 運行真正的計算邏輯的
client sparksubmit (client+ApplicationMaster)提交任務,管理該任務的executor,並將task提交給集群
ApplicationMaster
用scala語言簡單寫一個wordcount
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/
object wordcount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
//spark程式的入口
val sc = new SparkContext(conf)
//得到在哪開始讀取數據
val lines: RDD[String] = sc.textFile(args(0))
//切分壓平
val words: RDD[String] = lines.flatMap(_.split(" "))
//把每個單詞都變成(單詞,1)的元組
val wordAndOne: RDD[(String,Int)] = words.map((_,1))
//然後進行聚合
val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_)
//排序
val sorts = reduced.sortBy(_._2, false)
//保存
sorts.saveAsTextFile(args(1))
sc.stop()
}
}
然後把這個程式打包放到集群中
./spark-submit --master spark://hdp-01:7077 --class test.ScalaWordCount /root/test.jar hdfs://hdp-01:9000/wc hdfs://hdp-01:9000/wcout