Spark官方文檔 中文翻譯 轉載請註明出處: "http://www.cnblogs.com/BYRans/" "1 概述(Overview)" "2 引入Spark(Linking with Spark)" "3 初始化Spark(Initializing Spark)" "3.1 使用Spar
Spark官方文檔 - 中文翻譯
Spark版本:1.6.0
轉載請註明出處:http://www.cnblogs.com/BYRans/
- 1 概述(Overview)
- 2 引入Spark(Linking with Spark)
- 3 初始化Spark(Initializing Spark)
- 4 彈性分散式數據集(RDDs)
- 4.1 並行集合(Parallelized Collections)
- 4.2 外部資料庫(External Datasets)
- 4.3 RDD操作(RDD Operations)
- 4.4 RDD持久化(RDD Persistence)
- 5 共用變數(Shared Variables)
- 6 將應用提交到集群(Deploying to a Cluster)
- 7 Java/Scala中啟動Spark作業(Launching Spark jobs from Java / Scala)
- 8 單元測試(Unit Testing)
- 9 從Spark1.0之前的版本遷移(Migrating from pre1.0 Versions of Spark)
- 10 下一步(Where to Go from Here)
1 概述(Overview)
總體來講,每一個Spark驅動程式應用都由一個驅動程式組成,該驅動程式包含一個由用戶編寫的main方法,該方法會在集群上並行執行一些列並行計算操作。Spark最重要的一個概念是彈性分散式數據集,簡稱RDD(resilient distributed dataset )。RDD是一個數據容器,它將分佈在集群上各個節點上的數據抽象為一個數據集,並且RDD能夠進行一系列的並行計算操作。可以將RDD理解為一個分散式的List,該List的數據為分佈在各個節點上的數據。RDD通過讀取Hadoop文件系統中的一個文件進行創建,也可以由一個RDD經過轉換得到。用戶也可以將RDD緩存至記憶體,從而高效的處理RDD,提高計算效率。另外,RDD有良好的容錯機制。
Spark另外一個重要的概念是共用變數(shared variables)。在並行計算時,可以方便的使用共用變數。在預設情況下,執行Spark任務時會在多個節點上並行執行多個task,Spark將每個變數的副本分發給各個task。在一些場景下,需要一個能夠在各個task間共用的變數。Spark支持兩種類型的共用變數:
廣播變數(broadcast variables):將一個只讀變數緩存到集群的每個節點上。例如,將一份數據的只讀緩存分發到每個節點。
累加變數(accumulators):只允許add操作,用於計數、求和。
2 引入Spark(Linking with Spark)
在Spark 1.6.0上編寫應用程式,支持使用Scala 2.10.X、Java 7+、Python 2.6+、R 3.1+。如果使用Java 8,支持lambda表達式(lambda expressions)。
在編寫Spark應用時,需要在Maven依賴中添加Spark,Spark的Maven Central為:
groupId = org.apache.spark
artifactId = spark-core_2.10
version = 1.6.0
另外,如果Spark應用中需要訪問HDFS集群,則需要在hadoop-client中添加對應版本的HDFS依賴:
groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>
最後,需要在程式中添加Spark類。代碼如下:
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
(在Spark 1.3.0之前的版本,使用Scala語言編寫Spark應用程式時,需要添加import org.apache.spark.SparkContext._
來啟用必要的隱式轉換)
3 初始化Spark(Initializing Spark)
使用Scala編寫Spark程式的需要做的第一件事就是創建一個SparkContext對象(使用Java語言時創建JavaSparkContext)。SparkContext對象指定了Spark應用訪問集群的方式。創建SparkContext需要先創建一個SparkConf對象,SparkConf對象包含了Spark應用的一些列信息。代碼如下:
- Scala
val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)
- java
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaSparkContext sc = new JavaSparkContext(conf);
appName參數為應用程式在集群的UI上顯示的名字。master為Spark、Mesos、YARN URL或local。使用local值時,表示在本地模式下運行程式。應用程式的執行模型也可以在使用spark-submit
命令提交任務時進行指定。
3.1 使用Spark Shell(Using the Shell)
在Spark Shell下,一個特殊的SparkContext對象已經幫用戶創建好,變數為sc。使用參數--master
設置master參數值,使用參數--jars
設置依賴包,多個jar包使用逗號分隔。可以使用--packages
參數指定Maven坐標來添加依賴包,多個坐標使用逗號分隔。可以使用參數--repositories
添加外部的repository。示例如下:
- 本地模式下,使用4個核運行Spark程式:
$ ./bin/spark-shell --master local[4]
- 將code.jar包添加到classpath:
$ ./bin/spark-shell --master local[4] --jars code.jar
- 使用Maven坐標添加一個依賴:
$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"
詳細的Spark Shell參數描述請執行命令spark-shell --help
。更多的spark-submit腳本請見spark-submit script。
4 彈性分散式數據集(RDDs)
Spark最重要的一個概念就是RDD,RDD是一個有容錯機制的元素容器,它可以進行並行運算操作。得到RDD的方式有兩個:
- 通過並行化驅動程式中已有的一個集合而獲得
- 通過外部存儲系統(例如共用的文件系統、HDFS、HBase等)的數據集進行創建
4.1 並行集合(Parallelized Collections)
在驅動程式中,在一個已經存在的集合上(例如一個Scala的Seq)調用SparkContext的parallelize方法可以創建一個並行集合。集合里的元素將被覆制到一個可被並行操作的分散式數據集中。下麵為並行化一個保存數字1到5的集合示例:
- Scala
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
- Java
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);
當分散式數據集創建之後,就可以進行並行操作。例如,可以調用方法distData.reduce((a,b) => a + b)
求數組內元素的和。Spark支持的分散式數據集上的操作將在後面章節中詳細描述。
並行集合的一個重要的參數是表示將數據劃分為幾個分區(partition)的分區數。Spark將在集群上每個數據分區上啟動一個task。通常情況下,你可以在集群上為每個CPU設置2-4個分區。一般情況下,Spark基於集群自動設置分區數目。也可以手動進行設置,設置該參數需要將參數值作為第二參數傳給parallelize方法,例如:sc.parallelize(data, 10)
。註意:在代碼中,部分位置使用術語slices(而不是partition),這麼做的原因是為了保持版本的向後相容性。
4.2 外部資料庫(External Datasets)
Spark可以通過Hadoop支持的外部數據源創建分散式數據集,Hadoop支持的數據源有本地文件系統、HDFS、Cassandra、HBase、Amazon S3、Spark支持的文本文件、SequenceFiles、Hadoop InputFormat。
SparkContext的testFile方法可以創建文本文件RDD。使用這個方法需要傳遞文本文件的URI,URI可以為本機文件路徑、hdfs://、s3n://等。該方法讀取文本文件的每一行至容器中。示例如下:
- Scala
scala> val distFile = sc.textFile("data.txt")
distFile: RDD[String] = MappedRDD@1d4cee08
- Java
JavaRDD<String> distFile = sc.textFile("data.txt");
創建之後,distFile就可以進行數據集的通用操作。例如,使用map和reduce操作計算所有行的長度的總和:distFile.map(s => s.length).reduce((a, b) => a + b)
。
使用Spark讀取文件需要註意一下幾點:
- 程式中如果使用到本地文件路徑,在其它worker節點上該文件必須在同一目錄,並有訪問許可權。在這種情況下,可以將文件複製到所有的worker節點,也可以使用網路內的共用文件系統。
- Spark所有的基於文件輸入的方法(包括
textFile
),都支持文件夾、壓縮文件、通配符。例如:textFile("/my/directory")
、textFile("/my/directory/*.txt")
、textFile("/my/directory/*.gz")
。 - textFile方法提供了一個可選的第二參數,用於控制文件的分區數。預設情況下,Spark為文件的每個塊創建一個分區(塊使用HDFS的預設值64MB),通過設置這個第二參數可以修改這個預設值。需要註意的是,分區數不能小於塊數。
除了文本文件之外,Spark還支持其它的數據格式:
SparkContext.wholeTextFiles
能夠讀取指定目錄下的許多小文本文件,返回(filename,content)對。而textFile只能讀取一個文本文件,返回該文本文件的每一行。- 對於SequenceFiles可以使用SparkContext的
sequenceFile[K,V]
方法,其中K是文件中key和value的類型。它們必須為像IntWritable和Text那樣,是Hadoop的Writable介面的子類。另外,對於通用的Writable,Spark允許用戶指定原生類型。例如,sequenceFile[Int,String]
將自動讀取IntWritable和Text。 - 對於其他Hadoop InputFormat,可以使用
SparkContext.hadoopRDD
方法,該方法接收任意類型的JobConf和輸入格式類、鍵類型和值類型。可以像設置Hadoop job那樣設置輸入源。對於InputFormat還可以使用基於新版本MapReduce API(org.apache.hadoop.mapreduce
)的SparkContext.newAPIHadoopRDD
。(老版本介面為:SparkContext.newHadoopRDD
) RDD.saveAsObjectFile
和SparkContext.objectFile
能夠保存包含簡單的序列化Java對象的RDD。但是這個方法不如Avro高效,Avro能夠方便的保存任何RDD。
4.3 RDD操作(RDD Operations)
RDD支持兩種類型的操作:
- transformation:從一個RDD轉換為一個新的RDD。
- action:基於一個數據集進行運算,並返回RDD。
例如,map是一個transformation操作,map將數據集的每一個元素按指定的函數轉換為一個RDD返回。reduce是一個action操作,reduce將RDD的所有元素按指定的函數進行聚合併返回結果給驅動程式(還有一個並行的reduceByKey能夠返回一個分散式的數據集)。
Spark的所有transformation操作都是懶執行,它們並不立馬執行,而是先記錄對數據集的一系列transformation操作。在執行一個需要執行一個action操作時,會執行該數據集上所有的transformation操作,然後返回結果。這種設計讓Spark的運算更加高效,例如,對一個數據集map操作之後使用reduce只返回結果,而不返回龐大的map運算的結果集。
預設情況下,每個轉換的RDD在執行action操作時都會重新計算。即使兩個action操作會使用同一個轉換的RDD,該RDD也會重新計算。在這種情況下,可以使用persist
方法或cache
方法將RDD緩存到記憶體,這樣在下次使用這個RDD時將會提高計算效率。在這裡,也支持將RDD持久化到磁碟,或在多個節點上複製。
4.3.1 基礎(Basics)
參考下麵的程式,瞭解RDD的基本輪廓:
- Scala
val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
- Java
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);
第一行通過讀取一個文件創建了一個基本的RDD。這個數據集沒有載入到記憶體,也沒有進行其他的操作,變數lines僅僅是一個指向文件的指針。第二行為transformation操作map的結果。此時lineLengths也沒有進行運算,因為map操作為懶執行。最後,執行action操作reduce。此時Spark將運算分隔成多個任務分發給多個機器,每個機器執行各自部分的map併進行本地reduce,最後返回運行結果給驅動程式。
如果在後面的運算中仍會用到lineLengths,可以將其緩存,在reduce操作之前添加如下代碼,該persist操作將在lineLengths第一次被計算得到後將其緩存到記憶體:
- Scala
lineLengths.persist()
- Java
lineLengths.persist(StorageLevel.MEMORY_ONLY());
4.3.2 把函數傳遞到Spark(Passing Functions to Spark)
- Scala
Spark的API,在很大程度上依賴於把驅動程式中的函數傳遞到集群上運行。這有兩種推薦的實現方式:- 使用匿名函數的語法,這可以讓代碼更加簡潔。
- 使用全局單例對象的靜態方法。比如,你可以定義函數對象objectMyFunctions,然後將該對象的
MyFunction.func1
方法傳遞給Spark,如下所示:
object MyFunctions {
def func1(s: String): String = { ... }
}
myRdd.map(MyFunctions.func1)
註意:由於可能傳遞的是一個類實例方法的引用(而不是一個單例對象),在傳遞方法的時候,應該同時傳遞包含該方法的類對象。舉個例子:
class MyClass {
def func1(s: String): String = { ... }
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}
上面示例中,如果我們創建了一個類實例new MyClass,並且調用了實例的doStuff方法,該方法中的map操作調用了這個MyClass實例的func1方法,所以需要將整個對象傳遞到集群中。類似於寫成:rdd.map(x=>this.func1(x))。
類似地,訪問外部對象的欄位時將引用整個對象:
class MyClass {
val field = "Hello"
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}
等同於寫成rdd.map(x=>this.field+x),引用了整個this。為了避免這種問題,最簡單的方式是把field拷貝到本地變數,而不是去外部訪問它:
def doStuff(rdd: RDD[String]): RDD[String] = {
val field_ = this.field
rdd.map(x => field_ + x)
}
- Java
Spark的API,在很大程度上依賴於把驅動程式中的函數傳遞到集群上運行。在Java中,函數由那些實現了org.apache.spark.api.java.function包中的介面的類表示。有兩種創建這樣的函數的方式:- 在你自己的類中實現Function介面,可以是匿名內部類,或者命名類,並且傳遞類的一個實例到Spark。
- 在Java8中,使用lambda表達式來簡明地定義函數的實現。
為了保持簡潔性,本指南中大量使用了lambda語法,這在長格式中很容易使用所有相同的APIs。比如,我們可以把上面的代碼寫成:
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD lineLengths = lines.map(new Function Integer>() {
public Integer call(String s) { return s.length(); }
});
int totalLength = lineLengths.reduce(new Function2 Integer, Integer>() {
public Integer call(Integer a, Integer b) { return a + b; }
});
同樣的功能,使用內聯式的實現顯得更為笨重繁瑣,代碼如下:
class GetLength implements Function Integer> {
public Integer call(String s) { return s.length(); }
}
class Sum implements Function2 Integer, Integer> {
public Integer call(Integer a, Integer b) { return a + b; }
}
JavaRDD lines = sc.textFile("data.txt");
JavaRDD lineLengths = lines.map(new GetLength());
int totalLength = lineLengths.reduce(new Sum());
註意,java中的內部匿名類,只要帶有final關鍵字,就可以訪問類範圍內的變數。Spark也會把變數複製到每一個worker節點。
4.3.3 理解閉包(Understanding closures)
使用Spark的一個難點為:理解程式在集群中執行時變數和方法的生命周期。RDD操作可以在變數範圍之外修改變數,這是一個經常導致迷惑的地方。比如下麵的例子,使用foreach()
方法增加計數器(counter)的值(類似的情況,在其他的RDD操作中經常出現)。
4.3.3.1 示例(Example)
參考下麵簡單的RDD元素求和示例,求和運算是否在同一個JVM中執行,其複雜度也不同。Spark可以在local
模式下(--master = local[n]
)執行應用,也可以將該Spark應用提交到集群上執行(例如通過spark-submit
提交到YARN):
- Scala
var counter = 0
var rdd = sc.parallelize(data)
// Wrong: Don't do this!!
rdd.foreach(x => counter += x)
println("Counter value: " + counter)
- Java
int counter = 0;
JavaRDD<Integer> rdd = sc.parallelize(data);
// Wrong: Don't do this!!
rdd.foreach(x -> counter += x);
println("Counter value: " + counter);
4.3.3.2 本地模式 VS 集群模式(Local vs. cluster modes)
在本地模式下僅有一個JVM,上面的代碼將直接計算RDD中元素和,並存儲到counter中。此時RDD和變數counter都在driver節點的同一記憶體空間中。
然而,在集群模式下,情況會變得複雜,上面的代碼並不會按照預期的方式執行。為了執行這個job,Spark把處理RDD的操作分割成多個任務,每個任務將被一個executor處理。在執行之前,Spark首先計算閉包(closure)。閉包是必須對executor可見的變數和方法,在對RDD進行運算時將會用到這些變數和方法(在本例子中指foreach())。這個閉包會被序列化,併發送給每個executor。在local模式下,只有一個executor,所以所有的變數和方法都使用同一個閉包。在其他模式下情況跟local模式不一樣,每個executor在不同的worker節點上運行,每個executor都有一個單獨的閉包。
在這裡,發送給每個executor的閉包內的變數是當前變數的副本,因此當counter在foreach中被引用時,已經不是在driver節點上的counter了。在driver節點的記憶體中仍然有一個counter,但這個counter對executors不可見。executor只能操作序列化的閉包中的counter副本。因此,最終counter的值仍然是0,因為所有對counter的操作都是在序列化的閉包內的counter上進行的。
在類似這種場景下,為了保證良好的行為確保,應該使用累加器。Spark中的累加器專門為在集群中多個節點間更新變數提供了一種安全機制。在本手冊的累加器部分將對累加器進行詳細介紹。
一般情況下,像環或本地定義方法這樣的閉包結構,不應該用於更改全局狀態。Spark不定義也不保證來自閉包外引用導致的對象變化行為。有些情況下,在local模式下可以正常運行的代碼,在分散式模式下也許並不會像預期那樣執行。在分散式下運行時,建議使用累加器定義一些全局集合。
4.3.3.3 列印RDD的元素(Printing elements of an RDD)
列印一個RDD的元素也是一個常用的語法,帶引RDD元素可以使用方法rdd.foreach(println)
或rdd.map(println)
。在本地模式下,該方法將生成預期的輸出並列印RDD所有的元素。然而,在集群模式下各個executor調用stdout,將結果列印到executor的stdout中。因為不是列印到driver節點上,所以在driver節點的stdout上不會看到這些輸出。如果想將RDD的元素列印到driver節點上,可以使用collect()
方法將RDD發送到driver節點上,然後再列印該RDD:rdd.collect().foreach(println)
。這個操作可能會導致driver節點記憶體不足,因為collect()
方法將RDD全部的數據都發送到一臺節點上。如果僅僅列印RDD的部分元素,一個安全的方法是使用take()
方法:rdd.take(100).foreach(println)
。
4.3.4 操作鍵值對(Working with KeyValue Pairs)
Spark大部分的RDD操作都是對任意類型的對象的,但是,有部分特殊的操作僅支持對鍵值對的RDD進行操作。最常用的是分散式“shuffle”操作,比如按照key將RDD的元素進行分組或聚集操作。
- Scala
在Scala中,包含Tuple2對象在內的RDD鍵值對操作,都是可以自動可用的(Tuple2對象是Scala語言內置的元組類型,可以通過簡單的編寫進行(a,b)
創建)。鍵值對操作介面在PairRDDFunctions
類中,該類中的介面自動使用RDD的元組。
例如,在下麵的代碼中使用reduceByKey
操作對鍵值對進行計數,計算每行的文本出現的次數:
val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
- Java
在Java中,鍵值對使用的是scala.Tuple2
類。用戶可以使用特定的map操作將JavaRDDs轉換為JavaPairRDDs,例如mapToPair
和flatMapToPair
。JavaPairRDD擁有標準RDD和特殊鍵值對的方法。
例如,在下麵的代碼中使用reduceByKey
操作對鍵值對進行計數,計算每行的文本出現的次數:
JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);
我們還可以使用counts.sortByKey()
按照字母順序將鍵值對排序,使用counts.collect()
將結果以一個數組的形式發送給driver節點。
註意,當在鍵值對操作中使用自定義對象作為key時,你必須保證自定義的equals()
方法有一個對應的hashCode()
方法。詳細的細節,請閱讀Object.hashCode() documentation。
4.3.5 Transformations
下麵列出了Spark常用的transformation操作。詳細的細節請參考RDD API文檔(Scala、Java、Python、R)和鍵值對RDD方法文檔(Scala、Java)。
map(func)
將原來RDD的每個數據項,使用map中用戶自定義的函數func進行映射,轉變為一個新的元素,並返回一個新的RDD。filter(func)
使用函數func對原RDD中數據項進行過濾,將符合func中條件的數據項組成新的RDD返回。flatMap(func)
類似於map,但是輸入數據項可以被映射到0個或多個輸出數據集合中,所以函數func的返回值是一個數據項集合而不是一個單一的數據項。mapPartitions(func)
類似於map,但是該操作是在每個分區上分別執行,所以當操作一個類型為T的RDD時func的格式必須是Iterator<T> => Iterator<U>
。即mapPartitions需要獲取到每個分區的迭代器,在函數中通過這個分區的迭代器對整個分區的元素進行操作。mapPartitionsWithIndex(func)
類似於mapPartitions,但是需要提供給func一個整型值,這個整型值是分區的索引,所以當處理T類型的RDD時,func的格式必須為(Int, Iterator<T>) => Iterator<U>
。sample(withReplacement, fraction, seed)
對數據採樣。用戶可以設定是否有放回(withReplacement)、採樣的百分比(fraction)、隨機種子(seed)。union(otherDataset)
返回原數據集和參數指定的數據集合併後的數據集。使用union函數時需要保證兩個RDD元素的數據類型相同,返回的RDD數據類型和被合併的RDD元素數據類型相同。該操作不進行去重操作,返回的結果會保存所有元素。如果想去重,可以使用distinct()。intersection(otherDataset)
返回兩個數據集的交集。distinct([numTasks]))
將RDD中的元素進行去重操作。groupByKey([numTasks])
操作(K,V)格式的數據集,返回 (K, Iterable)格式的數據集。
註意,如果分組是為了按key進行聚合操作(例如,計算sum、average),此時使用reduceByKey
或aggregateByKey
計算效率會更高。
註意,預設情況下,並行情況取決於父RDD的分區數,但可以通過參數numTasks
來設置任務數。reduceByKey(func, [numTasks])
使用給定的func,將(K,V)對格式的數據集中key相同的值進行聚集,其中func的格式必須為(V,V) => V。可選參數numTasks可以指定reduce任務的數目。aggregateByKey(zeroValue)(seqOp, combOp,[numTasks])
對(K,V)格式的數據按key進行聚合操作,聚合時使用給定的合併函數和一個初試值,返回一個(K,U)對格式數據。需要指定的三個參數:zeroValue為在每個分區中,對key值第一次讀取V類型的值時,使用的U類型的初始變數;seqOp用於在每個分區中,相同的key中V類型的值合併到zeroValue創建的U類型的變數中。combOp是對重新分區後兩個分區中傳入的U類型數據的合併函數。sortByKey([ascending], [numTasks])
(K,V)格式的數據集,其中K已實現了Ordered,經過sortByKey操作返回排序後的數據集。指定布爾值參數ascending來指定升序或降序排列。join(otherDataset, [numTasks])
用於操作兩個鍵值對格式的數據集,操作兩個數據集(K,V)和(K,W)返回(K, (V, W))格式的數據集。通過leftOuterJoin
、rightOuterJoin
、fullOuterJoin
完成外連接操作。cogroup(otherDataset, [numTasks])
用於操作兩個鍵值對格式數據集(K,V)和(K,W),返回數據集格式為 (K,(Iterable, Iterable )) 。這個操作也稱為 groupWith
。對在兩個RDD中的Key-Value類型的元素,每個RDD相同Key的元素分別聚合為一個集合,並且返回兩個RDD中對應Key的元素集合的迭代器。cartesian(otherDataset)
對類型為T和U的兩個數據集進行操作,返回包含兩個數據集所有元素對的(T,U)格式的數據集。即對兩個RDD內的所有元素進行笛卡爾積操作。pipe(command, [envVars])
以管道(pipe)方式將 RDD的各個分區(partition)使用 shell命令處理(比如一個 Perl或 bash腳本)。 RDD的元素會被寫入進程的標準輸入(stdin),將進程返回的一個字元串型 RDD(RDD of strings),以一行文本的形式寫入進程的標準輸出(stdout)中。coalesce(numPartitions)
把RDD的分區數降低到通過參數numPartitions指定的值。在得到的更大一些數據集上執行操作,會更加高效。repartition(numPartitions)
隨機地對RDD的數據重新洗牌(Reshuffle),從而創建更多或更少的分區,以平衡數據。總是對網路上的所有數據進行洗牌(shuffles)。repartitionAndSortWithinPartitions(partitioner)
根據給定的分區器對RDD進行重新分區,在每個結果分區中,按照key值對記錄排序。這在每個分區中比先調用repartition再排序效率更高,因為它可以將排序過程在shuffle操作的機器上進行。
4.3.6 Actions
下麵列出了Spark支持的常用的action操作。詳細請參考RDD API文檔(Scala、Java、Python、R)和鍵值對RDD方法文檔(Scala、Java)。
reduce(func)
使用函數func聚集數據集中的元素,這個函數func輸入為兩個元素,返回為一個元素。這個函數應該符合結合律和交換了,這樣才能保證數據集中各個元素計算的正確性。collect()
在驅動程式中,以數組的形式返回數據集的所有元素。通常用於filter或其它產生了大量小數據集的情況。count()
返回數據集中元素的個數。first()
返回數據集中的第一個元素(類似於take(1)
)。take(n)
返回數據集中的前n個元素。takeSample(withReplacement,num, [seed])
對一個數據集隨機抽樣,返回一個包含num個隨機抽樣元素的數組,參數withReplacement
指定是否有放回抽樣,參數seed
指定生成隨機數的種子。takeOrdered(n, [ordering])
返回RDD按自然順序或自定義順序排序後的前n個元素。saveAsTextFile(path)
將數據集中���元素以文本文件(或文本文件集合)的形式保存到指定的本地文件系統、HDFS或其它Hadoop支持的文件系統中。Spark將在每個元素上調用toString
方法,將數據元素轉換為文本文件中的一行記錄。saveAsSequenceFile(path) (Java and Scala)
將數據集中的元素以Hadoop Sequence文件的形式保存到指定的本地文件系統、HDFS或其它Hadoop支持的文件系統中。該操作只支持對實現了Hadoop的Writable
介面的鍵值對RDD進行操作。在Scala中,還支持隱式轉換為Writable的類型(Spark包括了基本類型的轉換,例如Int、Double、String等等)。saveAsObjectFile(path) (Java and Scala)
將數據集中的元素以簡單的Java序列化的格式寫入指定的路徑。這些保存該數據的文件,可以使用SparkContext.objectFile()進行載入。countByKey()
僅支持對(K,V)格式的鍵值對類型的RDD進行操作。返回(K,Int)格式的Hashmap,(K,Int)為每個key值對應的記錄數目。foreach(func)
對數據集中每個元素使用函數func進行處理。該操作通常用於更新一個累加器(Accumulator)或與外部數據源進行交互。註意:在foreach()之外修改累加器變數可能引起不確定的後果。詳細介紹請閱讀Understanding closures部分。
4.3.7 Shuffle操作(Shuffle operations)
Spark內的一個操作將會觸發shuffle事件。shuffle是Spark將多個分區的數據重新分組重新分佈數據的機制。shuffle是一個複雜且代價較高的操作,它需要完成將數據在executor和機器節點之間進行複製的工作。
4.3.7.1 背景(Background)
通過reduceByKey
操作的例子,來理解shuffle過程。reduceByKey
操作生成了一個新的RDD,原始數據中相同key的所有記錄的聚合值合併為一個元組,這個元組中的key對應的值為執行reduce函數之後的結果。這個操作的挑戰是,key相同的所有記錄不在同一各分區種,甚至不在同一臺機器上,但是該操作必須將這些記錄聯合運算。
在Spark中,通常一條數據不會垮分區分佈,除非為了一個特殊的操作在必要的地方纔會跨分區分佈。在計算過程中,一個分區由一個task進行處理。因此,為了組織所有的數據讓一個reduceByKey任務執行,Spark需要進行一個all-to-all操作。all-to-all操作需要讀取所有分區上的數據的所有的key,以及key對應的所有的值,然後將多個分區上的數據進行彙總,並將每個key對應的多個分區的數據進行計算得出最終的結果,這個過程稱為shuffle。
雖然每個分區中新shuffle後的數據元素是確定的,分區間的���序也是確定的,但是所有的元素是無序的。如果想在shuffle操作後將數據按指定規則進行排序,可以使用下麵的方法:
- 使用
mapPartitions
操作在每個分區上進行排序,排序可以使用.sorted
等方法。 - 使用
repartitionAndSortWithinPartitions
操作在重新分區的同時高效的對分區進行排序。 - 使用
sortBy
將RDD進行排序。
會引起shuffle過程的操作有:
repartition
操作,例如:repartition
、coalesce
ByKey
操作(除了counting相關操作),例如:groupByKey
、reduceByKey
join
操作,例如:cogroup
、join
4.3.7.2 性能影響(Performance Impact)
shuffle是一個代價比較高的操作,它涉及磁碟IO、數據序列化、網路IO。為了準備shuffle操作的數據,Spark啟動了一系列的map任務和reduce任務,map任務完成數據的處理工作,reduce完成map任務處理後的數據的收集工作。這裡的map、reduce來自MapReduce,跟Spark的map
操作和reduce
操作沒有關係。
在內部,一個map任務的所有結果數據會保存在記憶體,直到記憶體不能全部存儲為止。然後,這些數據將基於目標分區進行排序並寫入一個單獨的文件中。在reduce時,任務將讀取相關的已排序的數據塊。
某些shuffle操作會大量消耗堆記憶體空間,因為shuffle操作在數據轉換前後,需要在使用記憶體中的數據結構對數據進行組織。需要特別說明的是,reduceByKey
和aggregateByKey
在map時會創建這些數據結構,ByKey
操作在reduce時創建這些數據結構。當記憶體滿的時候,Spark會把溢出的數據存到磁碟上,這將導致額外的磁碟IO開銷和垃圾回收開銷的增加。
shuffle操作還會在磁碟上生成大量的中間文件。在Spark 1.3中,這些文件將會保留至對應的RDD不在使用並被垃圾回收為止。這麼做的好處是,如果在Spark重新計算RDD的血統關係(lineage)時,shuffle操作產生的這些中間文件不需要重新創建。如果Spark應用長期保持對RDD的引用,或者垃圾回收不頻繁,這將導致垃圾回收的周期比較長。這意味著,長期運行Spark任務可能會消耗大量的磁碟空間。臨時數據存儲路徑可以通過SparkContext中設置參數spark.local.dir
進行配置。
shuffle操作的行為可以通過調節多個參數進行設置。詳細的說明請看Configuration Guide中的“Shuffle Behavior”部分。
4.4 RDD持久化(RDD Persistence)
Spark中一個很重要的能力是將數據持久化(或稱為緩存),在多個操作間都可以訪問這些持久化的數據。當持久化一個RDD時,每個節點會將本節點計算的數據塊存儲到記憶體,在該數據上的其他action操作將直接使用記憶體中的數據。這樣會讓以後的action操作計算速度加快(通常運行速度會加速10倍)。緩存是迭代演算法和快速的互動式使用的重要工具。
RDD可以使用persist()
方法或cache()
方法進行持久化。數據將會在第一次action操作時進行計算,併在各個節點的記憶體中緩存。Spark的緩存具有容錯機制,如果一個緩存的RDD的某個分區丟失了,Spark將按照原來的計算過程,自動重新計算併進行緩存。
另外,每個持久化的RDD可以使用不同的存儲級別進行緩存,例如,持久化到磁碟、已序列化的Java對象形式持久化到記憶體(可以節省空間)、跨節點間複製、以off-heap的方式存儲在 Tachyon。這些存儲級別通過傳遞一個StorageLevel
對象(Scala、Java、Python)給persist()
方法進行設置。cache()
方法是使用預設存儲級別的快捷設置方法,預設的存儲級別是StorageLevel.MEMORY_ONLY
(將反序列化的對象存儲到記憶體中)。詳細的存儲級別介紹如下:
- MEMORY_ONLY:將RDD以反序列化Java對象的形式存儲在JVM中。如果記憶體空間不夠,部分數據分區將不再緩存,在每次需要用到這些數據時重新進行計算。這是預設的級別。
- MEMORY_AND_DISK:將RDD以反序列化Java對象的形式存儲在JVM中。如果記憶體空間不夠,將未緩存的數據分區存儲到磁碟,在需要使用這些分區時從磁碟讀取。
- MEMORY_ONLY_SER:將RDD以序列化的Java對象的形式進行存儲(每個分區為一個byte數組)。這種方式會比反序列化對象的方式節省很多空間,尤其是在使用fast serializer時會節省更多的空間,但是在讀取時會增加CPU的計算負擔。
- MEMORY_AND_DISK_SER:類似於MEMORY_ONLY_SER,但是溢出的分區會存儲到磁碟,而不是在用到它們時重新計算。
- DISK_ONLY:只在磁碟上緩存RDD。
- MEMORY_ONLY_2,MEMORY_AND_DISK_2,等等:與上面的級別功能相同,只不過每個分區在集群中兩個節點上建立副本。
- OFF_HEAP (實驗中):以序列化的格式 (serialized format) 將 RDD存儲到 Tachyon。相比於MEMORY_ONLY_SER, OFF_HEAP 降低了垃圾收集(garbage collection)的開銷,使得 executors變得更小,而且共用了記憶體池,在使用大堆(heaps)和多應用並行的環境下有更好的表現。此外,由於 RDD存儲在Tachyon中, executor的崩潰不會導致記憶體中緩存數據的丟失。在這種模式下, Tachyon中的記憶體是可丟棄的。因此,Tachyon不會嘗試重建一個在記憶體中被清除的分塊。如果你打算使用Tachyon進行off heap級別的緩存,Spark與Tachyon當前可用的版本相相容。詳細的版本配對使用建議請參考Tachyon的說明。
註意,在Python中,緩存的對象總是使用Pickle進行序列化,所以在Python中不關心你選擇的是哪一種序列化級別。
在shuffle操作中(例如reduceByKey
),即便是用戶沒有調用persist
方法,Spark也會自動緩存部分中間數據。這麼做的目的是,在shuffle的過程中某個節點運行失敗時,不需要重新計算所有的輸入數據。如果用戶想多次使用某個RDD,強烈推薦在該RDD上調用persist
方法。
4.4.1 如何選擇存儲級別(Which Storage Level to Choose?)
Spark的存儲級別的選擇,核心問題是在記憶體使用率和CPU效率之間進行權衡。建議按下麵的過程進行存儲級別的選擇:
- 如果使用預設的存儲級別(MEMORY_ONLY),存儲在記憶體中的RDD沒有發生溢出,那麼就選擇預設的存儲級別。預設存儲級別可以最大程度的提高CPU的效率,可以使在RDD上的操作以最快的速度運行。
- 如果記憶體不能全部存儲RDD,那麼使用MEMORY_ONLY_SER,並挑選一個快速序列化庫將對象序列化,以節省記憶體空間。使用這種存儲級別,計算速度仍然很快。
- 除了在計算該數據集的代價特別高,或者在需要過濾大量數據的情況下,儘量不要將溢出的數據存儲到磁碟。因為,重新計算這個數據分區的耗時與從磁碟讀取這些數據的耗時差不多。
- 如果想快速還原故障,建議使用多副本存儲界別(例如,使用Spark作為web應用的後臺服務,在服務出故障時需要快速恢復的場景下)。所有的存儲級別都通過重新計算丟失的數據的方式,提供了完全容錯機制。但是多副本級別在發生數據丟失時,不需要重新計算對應的資料庫,可以讓任務繼續運行。
- 在高記憶體消耗或者多任務的環境下,還處於實驗性的OFF_HEAP模式有下列幾個優勢:
- 它支持多個executor使用Tachyon中的同一個記憶體池。
- 它顯著減少了記憶體回收的代價。
- 如果個別executor崩潰掉,緩存的數據不會丟失。
4.4.2 移除數據(Removing Data)
Spark自動監控各個節點上的緩存使用率,並以最近最少使用的方式(LRU)將舊數據塊移除記憶體。如果想手動移除一個RDD,而不是等待該RDD被Spark自動移除,可以使用RDD.unpersist()
方法。
5 共用變數(Shared Variables)
通常情況下,一個傳遞給Spark操作(例如map
或reduce
)的方法是在遠程集群上的節點執行的。方法在多個節點執行過程中使用的變數,是同一份變數的多個副本。這些變數的以副本的方式拷貝到每個機器上,各個遠程機器上變數的更新並不會傳回driver程式。然而,為了滿足兩種常見的使用場景,Spark提供了兩種特定類型的共用變數:廣播變數(broadcast variables)和累加器(accumulators)。
5.1 廣播變數(broadcast variables)
廣播變數允許編程者將一個只讀變數緩存到每台機器上,而不是給每個任務傳遞一個副本。例如,廣播變數可以用一種高效的方式給每個節點傳遞一份比較大的數據集副本。在使用廣播變數時,Spark也嘗試使用高效廣播演算法分發變數,以降低通信成本。
Spark的action操作是通過一些列的階段(stage)進行執行的,這些階段(stage)是通過分散式的shuffle操作進行切分的。Spark自動廣播在每個階段內任務需要的公共數據。這種情況下廣播的數據使用序列化的形式進行緩存,併在每個任務在運行前進行反序列化。這明確說明瞭,只有在跨越多個階段的多個任務任務會使用相同的數據,或者在使用反序列化形式的數據特別重要的情況下,使用廣播變數會有比較好的效果。
廣播變數通過在一個變數v
上調用SparkContext.broadcast(v)
方法進行創建。廣播變數是v
的一個封裝器,可以通過value
方法訪問v
的值。代碼示例如下:
- Scala
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
- Java
Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});
broadcastVar.value();
// returns [1, 2, 3]
廣播變數創建之後,在集群上執行的所有的函數中,應該使用該廣播變數代替原來的v值。所以,每個節點上的v最多分發一次。另外,對象v在廣播後不應該再被修改,以保證分發到所有的節點上的廣播變數有同樣的值(例如,在分發廣播變數之後,又對廣播變數進行了修改,然後又需要將廣播變數分發到新的節點)。
5.2 累加器(Accumulators)
累加器只允許關聯操作進行"added"操作,因此在並行計算中可以支持特定的計算。累加器可以用於實現計數(類似在MapReduce中那樣)或者求和。原生Spark支持數值型的累加器,編程者可以添加新的支持類型。創建累加器並命名之後,在Spark的UI界面上將會顯示該累加器。這樣可以幫助理解正在運行的階段的運行情況(註意,在Python中還不支持)。
一個累加器可以通過在原始值v上調用SparkContext.accumulator(v)
。然後,集群上正在運行的任務就可以使用add
方法或+=
操作對該累加器進行累加操作。只有driver程式可以讀取累加器的值,讀取累加器的值使用value
方法。
下麵代碼將數組中的元素進行求和:
- Scala
scala> val accum = sc.accumulator(0, "My Accumulator")
accum: spark.Accumulator[Int] = 0
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Int = 10
- Java
Accumulator<Integer> accum = sc.accumulator(0);
sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
accum.value();
// returns 10
上面的代碼示例使用的是Spark內置的Int類型的累加器,開發者可以通過集成AccumulatorParam類創建新的累加器類型。AccumulatorParam介面有兩個方法:zero
方法和addInPlace
方法。zero
方法給數據類型提供了一個0值,addInPlace
方法能夠將兩個值進行累加。例如,假設我們有一個表示數學上向量的Vector
類,我們可以寫成:
- Scala
object VectorAccumulatorParam extends AccumulatorParam[Vector] {
def zero(initialValue: Vector): Vector = {
Vector.zeros(initialValue.size)
}
def addInPlace(v1: Vector, v2: Vector): Vector = {
v1 += v2
}
}
// Then, create an Accumulator of this type:
val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)
- Java
class VectorAccumulatorParam implements AccumulatorParam<Vector> {
public Vector zero(Vector initialValue) {
return Vector.zeros(initialValue.size());
}
public Vector addInPlace(Vector v1, Vector v2) {
v1.addInPlace(v2); return v1;
}
}
// Then, create an Accumulator of this type:
Accumulator<Vector> vecAccum = sc.accumulator(new Vector(...), new VectorAccumulatorParam());
Spark也支持使用更通用的 Accumulable介面去累加數據,其結果數據的類型和累加的元素類型不同(例如,通過收集數據元素創建一個list)。在Scala中,SparkContext.accumulableCollection
方法可用於累加常用的Scala集合類型。
累加器的更新只發生在action操作中,Spark保證每個任務只能更新累加器一次,例如重新啟動一個任務,該重啟的任務不允許更新累加器的值。在transformation用戶需要註意的是,如果任務過job的階段重新執行,每個任務的更新操作將會執行多次。
累加器沒有改變Spark懶執行的模式。如果累加器在RDD中的一個操作中進行更新,該累加器的值只在該RDD進行action操作時進行更新。因此,在一個像map()
這樣的轉換操作中,累加器的更新並沒有執行。下麵的代碼片段證明瞭這個特性:
- Scala
val accum = sc.accumulator(0)
data.map { x => accum += x; f(x) }
// Here, accum is still 0 because no actions have caused the <code>map</code> to be computed.
- Java
Accumulator<Integer> accum = sc.accumulator(0);
data.map(x -> { accum.add(x); return f(x); });
// Here, accum is still 0 because no actions have caused the `map` to be computed.
6 將應用提交到集群(Deploying to a Cluster)
應用提交手冊描述瞭如何將應用提交到集群。簡單的說,當你將你的應用打包成一個JAR(Java/Scala)或者一組.py
或.zip
文件(Python)後,就可以通過bin/spark-submit
腳本將腳本提交到集群支持的管理器中。
7 Java/Scala中啟動Spark作業(Launching Spark jobs from Java / Scala)
使用org.apache.spark.launcher包提供的簡單的Java API,可以將Spark作業以該包中提供的類的子類的形式啟動。
8 單元測試(Unit Testing)
Spark可以友好的使用流行的單元測試框架進行單元測試。在test中簡單的創建一個SparkContext
,master的URL設置為local
,運行幾個操作,然後調用SparkContext.stop()
將該作業停止。因為Spark不支持在同一個程式中運行兩個context,所以需要請確保使用finally
塊或者測試框架的tearDown
方法將context停止。
9 從Spark1.0之前的版本遷移(Migrating from pre1.0 Versions of Spark)
Spark 1.0凍結了1.X系列的Spark核的API,因此,當前沒有標記為"experimental"或者“developer API”的API都將在未來的版本中進行支持。
- Scala的變化
對於Scala的變化是,分組操作(例如groupByKey
、cogroup
和join
)的返回類型由(Key,Seq[Value])
變為(Key,Iterable[Value])
。
- Java API的變化
- 1.0中
org.apache.spark.api.java.function
類中的Function
類變成了介面,這意味著舊的代碼中extends Function
應該改為implement Function
。 - 增加了新的
map
型操作,例如mapToPair
和mapToDouble
,增加的這些操作可用於創建特殊類型的RDD。 - 分組操作(例如
groupByKey
、cogroup
和join
)的返回類型由(Key,Seq[Value])
變為(Key,Iterable[Value])
。
- 1.0中
這些遷移指導對Spark Streaming、MLlib和GraphX同樣有效。
10 下一步(Where to Go from Here)
你可以在Spark網站看一些Spark編程示例。另外,Spark在examples
目錄下包含了許多例子(