spark 1. Spark的四大特性 1. 速度快 spark比mapreduce快的兩個原因 1. 基於記憶體 2. 進程與線程 2. 易用性 1. 可以用java、scala、python、R等不同的語言來快速編寫spark程式 3. 通用性 4. 相容性 1. spark程式有多種運行模式 s ...
spark
1. Spark的四大特性
- 速度快
spark比mapreduce快的兩個原因
- 基於記憶體
1. mapreduce任務後期在計算的是時候,每一個job的輸出結果都會落地到磁碟,後續有其他的job要依賴於前面job的輸出結果,這個時候就需要進行大量的磁碟io操作,性能較低 2. spark任務後期在進行計算的時候,job的結果是可以保存在記憶體中的,後面有其他的job需要以言語前面job的輸出結果,這個時候可以直接從記憶體中讀取,避免了磁碟io操作,性能比較高 spark程式和mapreduce程式都會產生shuffle階段,在shuffle階段中他們產生的數據都會保留在磁碟中
- 進程與線程
1 mapreduce任務以進程的方式運行在yarn集群中,比如說有100個mapTask,一個task就需要一個進程,這些task要運行就需要開啟100個進程 2 spark任務以線程的方式運行在進程中,比如說有100個task,則一個task就對應一個線程
- 易用性
- 可以用java、scala、python、R等不同的語言來快速編寫spark程式
- 通用性
- 相容性
- spark程式有多種運行模式
- standAlone
- spark自帶的獨立運行模式,整個任務的資源分配由spark集群的的Master來負責
- yarn
- 可以把spark程式提交到yarn上運行,整個任務的資源分配由yarn中的ResourceManager負責
- mesos
- apache開源的一個類似於yarn的資源調度平臺
- standAlone
- spark程式有多種運行模式
2. spark集群架構
- Driver
- 他會執行客戶端寫好的main方法,構建一個SparkContext對象(該對象是所有spark程式的執行入口)
- Application
- 是一個spark的應用程式,包含了客戶端的代碼和任務運行的資源信息
- ClusterManager
- 給程式提供計算資源的外部服務
- standAlone
- spark自帶的集群模式,整個任務的資源分配由spark集群的Master負責
- yarn
- 把spark程式提交到yarn中運行,整個任務的資源分配由yarn中的ResourceManager負責
- mesos
- apache開源的一個類似於yarn的資源調度平臺
- standAlone
- 給程式提供計算資源的外部服務
- Master
- Spark集群的主節點,負責任務資源的分配
- Worker
- Spark集群的從節點,負責任務計算的節點
- Executor
- 是一個在worker節點啟動的進程
- Task
- spark任務的以task線程的方式運行在worker節點的executor進程中的
3. RDD是什麼
- RDD (Resilient Distributed Dataset)叫做彈性分散式數據集,是Spark中最基本的抽象單位。它代表的是是一個不可變的、可分區的、裡面元素可以並行計算的數據集合。
- Resilient 彈性的,表示RDD中的數據既可以保存在磁碟上也能保存在記憶體中
- Distibuted 分散式的,表示RDD的數據是分散式存儲的,方便後期的各種計算
- Dataset 一個數據集合,可以存儲很多數據
4. RDD的五大屬性
- A list of partitions
- 一個分區列表,數據集的基本組成單位
- 這裡表示的是一個RDD可能會有多個分區,每個分區會存儲該RDD的一部分數據,Spark中任務是以task線程的方式運行的,一個分區就對應一個task
- 一個分區列表,數據集的基本組成單位
- A function of computing each split
- 一個用來計算每個分區的函數
- Spark中RDD的計算是以分區為單位的
- 一個用來計算每個分區的函數
A list of dependencies on other RDDs
一個RDD會依賴於其他多個RDD
這裡是說RDD和RDD之間是有依賴關係的,spark任務的容錯機制就是根據這個特性(血統)而來
Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
一個Patitioner,即RDD的分區函數(可選項)
spark中實現了兩種類型的分區函數 1 基於哈希的HashPartitioner,(key.hashcode % 分區數 = 分區號) 2 基於範圍的RangePartitioner 只有對於key-value的RDD,並且產生shuffle,才會有Partitioner 非key-value的RDD的Partitioner的值是None
Optionally, a list of preferred locations to compute each split in (e.g. block locations for an HDFS file)
一個列表,存儲每個Partition的優先位置(可選項)
spark任務在調度的時候會優先考慮存有數據的節點開啟計算任務,以減少數據的網路傳輸,提成計算效率
5. RDD運算元分類
- transformation(轉換)
- 根據已經存在的RDD轉換生成另外一個新的RDD,它是延遲載入,不會立即執行
- 如
- map、flatMap、reduceByKey
- action(動作)
- 會觸發任務的運行
- 將RDD計算的結果數據返回給Driver端,或者保存到外部存儲介質(磁碟、記憶體、HDFS)
- 如
- collect、saveAsTextFile
- 會觸發任務的運行
6. RDD常見的運算元操作
6.1 transformation運算元
轉換 | 含義 |
---|---|
map(func) | 返回一個新的RDD,該RDD由每一個輸入元素經過func函數轉換後組成 |
filter(func) | 返回一個新的RDD,該函數由經過func函數計算並且返回值為true的輸入元素組成 |
flatMap(func) | 類似於map,但每一個輸入元素可以被映射為0或多個輸出元素(所以func應該返回一個序列,而不是單一的元素) |
mapPartitions(func) | 類似於map,但獨立地在RDD的每一個分片上運行,因此在類型為T的RDD上運行時,func的函數類型必須是Iterator[T] => Iterator[U] |
mapPartitionsWithIndex(func) | 類似於mapPartitions,但func帶有一個整數參數表示分片的索引值,因此在類型為T的RDD上運行時,func的函數類型必須是Iterator[T] => Iterator[U] |
union(otherDataset) | 對源RDD和參數RDD求並集,並返回一個新的RDD |
intersection(otherDataset) | 對源RDD和參數RDD求交集,並返回一個新的RDD |
distinct([numTasks])) | 對源RDD進行去重之後返回一個新的RDD |
groupByKey([numTasks]) | 在一個(k,v)類型的RDD上調用,返回一個(k,v)的RDD |
reduceByKey(func, [numTasks]) | 在一個(k,v)類型的RDD上調用,返回一個(k,v)的RDD,使用指定的reduce函數,將相同key的值聚合到一起,與groupByKey類似,reduce任務的個數可以通過第二個參數來設置 |
sortByKey([ascending], [numTasks]) | 在一個(k,v)的RDD上調用,k必須實現Ordered介面,返回一個按照key進行排序的(k,v)RDD |
sortBy(func,[ascending], [numTasks]) | 與sortByKey類似,但是更靈活,可以自定義排序func |
join(otherDataset, [numTasks]) | 在類型為(k,v)和(k,w)的RDD上調用,返回一個相同 key對應的所有元素對在一起的(k,(v,w))的RDD |
cogroup(otherDataset, [numTasks]) | 在類型為(K,V)和(K,W)的RDD上調用,返回一個(K,(Iterable |
coalesce(numPartitions) | 減少RDD的分區數到指定值 |
repartition(numPartitions) | 重新給RDD分區 |
repartitionAndSortWithinPartitions(partitioner) | 重新給RDD分區,並且每個分區內以記錄的key排序 |
6.2 action運算元
動作 | 含義 |
---|---|
reduce(func) | reduce將RDD中元素前兩個傳給輸入函數,產生一個新的return值,新產生的return值與RDD中下一個元素(第三個元素)組成兩個元素,再被傳給輸入函數,直到最後只有一個值為止。 |
collect() | 在驅動程式中,以數組的形式返回數據集的所有元素 |
count() | 返回RDD的元素個數 |
first() | 返回RDD的第一個元素(類似於take(1)) |
take(n) | 返回一個由數據集的前n個元素組成的數組 |
takeOrdered(n, [ordering]) | 返回自然順序或者自定義順序的前 n 個元素 |
saveAsTextFile(path) | 將數據集中的元素以textFile的形式保存到HDFS文件系統或者其他支持的文件系統,對於每個元素,Spark將會調用toString方法,將它裝換為文件中的文本 |
saveAsSequenceFile(path) | 將數據集中的元素以Hadoop sequenceFile的格式保存到指定的目錄,可以是HDFS或者其他Hadoop支持的文件系統 |
saveAsObjectFile(path) | 將數據集的元素以Java序列化的方式保存到指定的目錄下 |
countByKey() | 針對(k,v)類型的RDD,返回一個(k,int)的map,表示每一個key對應的元素個數 |
foreach(func) | 在數據集上每個元素上,運行函數func |
foreachPartition(func) | 在數據集的每個分區上,運行函數func |