初識spark,需要對其API有熟悉的瞭解才能方便開發上層應用。本文用圖形的方式直觀表達相關API的工作特點,並提供瞭解新的API介面使用的方法。例子代碼全部使用python實現。 1. 數據源準備 準備輸入文件: 啟動pyspark: 使用textFile創建RDD: 查看RDD分區與數據: 2. ...
初識spark,需要對其API有熟悉的瞭解才能方便開發上層應用。本文用圖形的方式直觀表達相關API的工作特點,並提供瞭解新的API介面使用的方法。例子代碼全部使用python實現。
1. 數據源準備
準備輸入文件:
$ cat /tmp/in
apple
bag bag
cat cat cat
啟動pyspark:
$ ./spark/bin/pyspark
使用textFile創建RDD:
>>> txt = sc.textFile("file:///tmp/in", 2)
查看RDD分區與數據:
>>> txt.glom().collect()
[[u'apple', u'bag bag'], [u'cat cat cat']]
2. transformation
flatMap
處理RDD的每一行,一對多映射。
代碼示例:
>>> txt.flatMap(lambda line: line.split()).collect()
[u'apple', u'bag', u'bag', u'cat', u'cat', u'cat']
示意圖:
map
處理RDD的每一行,一對一映射。
代碼示例:
>>> txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).collect()
[(u'apple', 1), (u'bag', 1), (u'bag', 1), (u'cat', 1), (u'cat', 1), (u'cat', 1)]
示意圖:
filter
處理RDD的每一行,過濾掉不滿足條件的行。
代碼示例:
>>> txt.flatMap(lambda line: line.split()).filter(lambda word: word !='bag').collect()
[u'apple', u'cat', u'cat', u'cat']
示意圖:
mapPartitions
逐個處理每一個partition,使用迭代器it訪問每個partition的行。
代碼示例:
>>> txt.flatMap(lambda line: line.split()).mapPartitions(lambda it: [len(list(it))]).collect()
[3, 3]
示意圖:
mapPartitionsWithIndex
逐個處理每一個partition,使用迭代器it訪問每個partition的行,index保存partition的索引,等價於mapPartitionsWithSplit(過期函數)。
代碼示例:
>>> txt.flatMap(lambda line: line.split()).mapPartitionsWithIndex(lambda index, it: [index]).collect()
[0, 1]
示意圖:
sample
根據採樣因數指定的比例,對數據進行採樣,可以選擇是否用隨機數進行替換,seed用於指定隨機數生成器種子。第一個參數表示是否放回抽樣,第二個參數表示抽樣比例,第三個參數表示隨機數seed。
代碼示例:
>>> txt.flatMap(lambda line: line.split()).sample(False, 0.5, 5).collect()
[u'bag', u'bag', u'cat', u'cat']
示意圖:
union
合併RDD,不去重。
代碼示例:
>>> txt.union(txt).collect()
[u'apple', u'bag bag', u'cat cat cat', u'apple', u'bag bag', u'cat cat cat']
示意圖:
distinct
對RDD去重。
代碼示例:
>>> txt.flatMap(lambda line: line.split()).distinct().collect()
[u'bag', u'apple', u'cat']
示意圖:
groupByKey
在一個(K,V)對的數據集上調用,返回一個(K,Seq[V])對的數據集。
代碼示例:
>>> txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).groupByKey().collect()
[(u'bag', <pyspark.resultiterable.ResultIterable object at 0x128a150>), (u'apple', <pyspark.resultiterable.ResultIterable object at 0x128a550>), (u'cat', <pyspark.resultiterable.ResultIterable object at 0x13234d0>)]
>>> txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).groupByKey().collect()[0][1].data
[1, 1]
示意圖:
reduceByKey
在一個(K,V)對的數據集上調用時,返回一個(K,V)對的數據集,使用指定的reduce函數,將相同key的值聚合到一起。
代碼示例:
>>> txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).collect()
[(u'bag', 2), (u'apple', 1), (u'cat', 3)]
示意圖:
aggregateByKey
自定義聚合函數,類似groupByKey。在一個(K,V)對的數據集上調用,不過可以返回一個(K,Seq[U])對的數據集。
代碼示例(實現groupByKey的功能):
>>> txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).aggregateByKey([], lambda seq, elem: seq + [elem], lambda a, b: a + b).collect()
[(u'bag', [1, 1]), (u'apple', [1]), (u'cat', [1, 1, 1])]
sortByKey
在一個(K,V)對的數據集上調用,K必須實現Ordered介面,返回一個按照Key進行排序的(K,V)對數據集。升序或降序由ascending布爾參數決定。
代碼示例:
>>> txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).sortByKey().collect()
[(u'apple', 1), (u'bag', 2), (u'cat', 3)]
示意圖:
join
在類型為(K,V)和(K,W)類型的數據集上調用時,返回一個相同key對應的所有元素對在一起的(K, (V, W))數據集。
代碼示例:
>>> sorted_txt = txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).sortByKey()
>>> sorted_txt.join(sorted_txt).collect()
[(u'bag', (2, 2)), (u'apple', (1, 1)), (u'cat', (3, 3))]
示意圖:
cogroup
在類型為(K,V)和(K,W)的數據集上調用,返回一個 (K, (Seq[V], Seq[W]))元組的數據集。這個操作也可以稱之為groupwith。
代碼示例:
>>> sorted_txt = txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).sortByKey()
>>> sorted_txt.cogroup(sorted_txt).collect()
[(u'bag', (<pyspark.resultiterable.ResultIterable object at 0x1323790>, <pyspark.resultiterable.ResultIterable object at 0x1323310>)), (u'apple', (<pyspark.resultiterable.ResultIterable object at 0x1323990>, <pyspark.resultiterable.ResultIterable object at 0x1323ad0>)), (u'cat', (<pyspark.resultiterable.ResultIterable object at 0x1323110>, <pyspark.resultiterable.ResultIterable object at 0x13230d0>))]
>>> sorted_txt.cogroup(sorted_txt).collect()[0][1][0].data
[2]
示意圖:
cartesian
笛卡爾積,在類型為 T 和 U 類型的數據集上調用時,返回一個 (T, U)對數據集(兩兩的元素對)。
代碼示例:
>>> sorted_txt = txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).sortByKey()
>>> sorted_txt.cogroup(sorted_txt).collect()
[(u'bag', (<pyspark.resultiterable.ResultIterable object at 0x1323790>, <pyspark.resultiterable.ResultIterable object at 0x1323310>)), (u'apple', (<pyspark.resultiterable.ResultIterable object at 0x1323990>, <pyspark.resultiterable.ResultIterable object at 0x1323ad0>)), (u'cat', (<pyspark.resultiterable.ResultIterable object at 0x1323110>, <pyspark.resultiterable.ResultIterable object at 0x13230d0>))]
>>> sorted_txt.cogroup(sorted_txt).collect()[0][1][0].data
[2]
示意圖:
pipe
處理RDD的每一行作為shell命令輸入,shell命令結果為輸出。
代碼示例:
>>> txt.pipe("awk '{print $1}'").collect()
[u'apple', u'bag', u'cat']
示意圖:
coalesce
減少RDD分區數。
代碼示例:
>>> txt.coalesce(1).collect()
[u'apple', u'bag bag', u'cat cat cat']
示意圖:
repartition
對RDD重新分區,類似於coalesce。
代碼示例:
>>> txt.repartition(1).collect()
[u'apple', u'bag bag', u'cat cat cat']
zip
合併兩個RDD序列為元組,要求序列長度相等。
代碼示例:
>>> txt.zip(txt).collect()
[(u'apple', u'apple'), (u'bag bag', u'bag bag'), (u'cat cat cat', u'cat cat cat')]
示意圖:
3. action
reduce
聚集數據集中的所有元素。
代碼示例:
>>> txt.reduce(lambda a, b: a + " " + b)
u'apple bag bag cat cat cat'
示意圖:
collect
以數組的形式,返回數據集的所有元素。
代碼示例:
>>> txt.collect()
[u'apple', u'bag bag', u'cat cat cat']
count
返回數據集的元素的個數。
代碼示例:
>>> txt.count()
3
first
返回數據集第一個元素。
代碼示例:
>>> txt.first()
u'apple'
take
返回數據集前n個元素。
代碼示例:
>>> txt.take(2)
[u'apple', u'bag bag']
takeSample
採樣返回數據集前n個元素。第一個參數表示是否放回抽樣,第二個參數表示抽樣個數,第三個參數表示隨機數seed。
代碼示例:
>>> txt.takeSample(False, 2, 1)
[u'cat cat cat', u'bag bag']
takeOrdered
排序返回前n個元素。
代碼示例:
>>> txt.takeOrdered(2)
[u'apple', u'bag bag']
saveAsTextFile
將數據集的元素,以textfile的形式,保存到本地文件系統,HDFS或者任何其它hadoop支持的文件系統。
代碼示例:
>>> txt.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).saveAsTextFile("file:///tmp/out")
查看輸出文件:
$cat /tmp/out/part-00001
(u'bag', 2)
(u'apple', 1)
(u'cat', 3)
saveAsSequenceFile
將數據集的元素,以Hadoop sequencefile的格式,保存到指定的目錄下,本地系統,HDFS或者任何其它hadoop支持的文件系統。這個只限於由key-value對組成,並實現了Hadoop的Writable介面,或者隱式的可以轉換為Writable的RDD。
countByKey
對(K,V)類型的RDD有效,返回一個(K,Int)對的Map,表示每一個key對應的元素個數。
代碼示例:
>>> txt.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).countByKey()
defaultdict(<type 'int'>, {u'bag': 2, u'apple': 1, u'cat': 3})
foreach
在數據集的每一個元素上,運行函數func進行更新。這通常用於邊緣效果,例如更新一個累加器,或者和外部存儲系統進行交互。
代碼示例:
>>> def func(line): print line
>>> txt.foreach(lambda line: func(line))
apple
bag bag
cat cat cat
4. 其他
文中未提及的transformation和action函數可以通過如下命令查詢:
>>> dir(txt)
['__add__', '__class__', '__delattr__', '__dict__', '__doc__', '__format__', '__getattribute__', '__getnewargs__', '__hash__', '__init__', '__module__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_computeFractionForSampleSize', '_defaultReducePartitions', '_id', '_jrdd', '_jrdd_deserializer', '_memory_limit', '_pickled', '_reserialize', '_to_java_object_rdd', 'aggregate', 'aggregateByKey', 'cache', 'cartesian', 'checkpoint', 'coalesce', 'cogroup', 'collect', 'collectAsMap', 'combineByKey', 'context', 'count', 'countApprox', 'countApproxDistinct', 'countByKey', 'countByValue', 'ctx', 'distinct', 'filter', 'first', 'flatMap', 'flatMapValues', 'fold', 'foldByKey', 'foreach', 'foreachPartition', 'fullOuterJoin', 'getCheckpointFile', 'getNumPartitions', 'getStorageLevel', 'glom', 'groupBy', 'groupByKey', 'groupWith', 'histogram', 'id', 'intersection', 'isCheckpointed', 'isEmpty', 'is_cached', 'is_checkpointed', 'join', 'keyBy', 'keys', 'leftOuterJoin', 'lookup', 'map', 'mapPartitions', 'mapPartitionsWithIndex', 'mapPartitionsWithSplit', 'mapValues', 'max', 'mean', 'meanApprox', 'min', 'name', 'partitionBy', 'partitioner', 'persist', 'pipe', 'randomSplit', 'reduce', 'reduceByKey', 'reduceByKeyLocally', 'repartition', 'repartitionAndSortWithinPartitions', 'rightOuterJoin', 'sample', 'sampleByKey', 'sampleStdev', 'sampleVariance', 'saveAsHadoopDataset', 'saveAsHadoopFile', 'saveAsNewAPIHadoopDataset', 'saveAsNewAPIHadoopFile', 'saveAsPickleFile', 'saveAsSequenceFile', 'saveAsTextFile', 'setName', 'sortBy', 'sortByKey', 'stats', 'stdev', 'subtract', 'subtractByKey', 'sum', 'sumApprox', 'take', 'takeOrdered', 'takeSample', 'toDF', 'toDebugString', 'toLocalIterator', 'top', 'treeAggregate', 'treeReduce', 'union', 'unpersist', 'values', 'variance', 'zip', 'zipWithIndex', 'zipWithUniqueId']
查詢具體函數的使用文檔:
>>> help(txt.zipWithIndex)
Help on method zipWithIndex in module pyspark.rdd:
zipWithIndex(self) method of pyspark.rdd.RDD instance
Zips this RDD with its element indices.
The ordering is first based on the partition index and then the
ordering of items within each partition. So the first item in
the first partition gets index 0, and the last item in the last
partition receives the largest index.
This method needs to trigger a spark job when this RDD contains
more than one partitions.
>>> sc.parallelize(["a", "b", "c", "d"], 3).zipWithIndex().collect()
[('a', 0), ('b', 1), ('c', 2), ('d', 3)]
(END)