Spark計算框架為適應高併發和高吞吐的數據處理需求,封裝了三大數據結構,以處理不同應用: 1)RDD:彈性分散式數據集 2)累加器:分散式共用只寫變數 3)廣播變數:分散式共用只讀變數 ##RDD(1) ###什麼是RDD RDD(Resilient Distributed Dataset)彈性分 ...
Spark計算框架為適應高併發和高吞吐的數據處理需求,封裝了三大數據結構,以處理不同應用:
1)RDD:彈性分散式數據集
2)累加器:分散式共用只寫變數
3)廣播變數:分散式共用只讀變數
RDD(1)
什麼是RDD
RDD(Resilient Distributed Dataset)彈性分散式數據集,為Spark中最基本的數據處理模型。
它是一個抽象類,代表彈性、不可變、可分區且其中元素可並行計算的集合。
1)彈性:多方面(存儲、容錯、計算、分片)
2)分散式:數據存儲在集群不同節點上
3)數據集:RDD只封裝計算邏輯,不保存數據
4)數據抽象:RDD需要子類實現
5)不可變:RDD封裝的計算邏輯不可改變,只能產生根據原來的RDD產生新的RDD,併在其中封裝計算邏輯
6)可分區、並行計算
核心屬性
1)分區列表
RDD數據結構中存在分區列表,用於並行計算,是實現分散式計算的重要屬性
2)分區計算函數
Spark在計算時,是使用分區函數對每一個分區進行計算
3)RDD之間依賴關係
RDD是計算模型的封裝,如果需求包含多個計算模型的組合,就需要多個RDD建立依賴關係
4)分區器(option)
數據為KV類型(key-value),可以通過設定分區器自定義數據的分區
5)首選位置(option)
計算數據時,可根據計算節點狀態選擇節點位置進行計算
基礎編程
RDD創建
在Spark中從創建RDD的方式有四種:
1)集合中創建RDD,Spark主要提供兩個方法:parallelize、makeRDD
e.g.
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val rdd1 = sparkContext.parallelize(List(1,2,3,4))
val rdd2 = sparkContext.makeRDD(list(1,2,3,4))
rdd1.collect().foreach(println)
rdd2.collect().foreach(println)
sparkContext.stop()
makeRDD方法從底層實現看就是parallelize方法
2)從外部存儲(文件)創建RDD
由外部存儲系統的數據集創建RDD包括:本地的文件系統、所用Hadoop支持的數據集(譬如HDFS、HBase)
e.g.
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val fileRDD:RDD[String] = sparkContext.textFile("input")
fileRDD.collect().foreach(println)
sparkContext.stop()
3)從其他RDD創建
通過一個RDD運算完後,再產生新的RDD。
4)直接創建RDD(new)
使用new的方式直接構造RDD,一般由Spark框架自身使用
RDD並行度與分區
e.g.
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val dataRDD:RDD[Int] = sparkContext.makeRDD(List(1,2,3,4),4)
val fileRDD:RDD[String] = sparkContext.textFile("input",2)
fileRDD.collect(),foreach(println)
sparkContext.stop()
預設情況下,Spark可以將一個作業切分多個任務後,發送給Executor節點並行計算,能夠並行計算的任務數量稱之為並行度。這個
數量可以再構建RDD時指定。不過需要註意的是,這裡並行執行的任務數量,不是指切分任務的數量