總算可以開始寫第一篇技術博客了,就從學習Spark開始吧。之前閱讀了很多關於Spark的文章,對Spark的工作機制及編程模型有了一定瞭解,下麵把Spark中對RDD的常用操作函數做一下總結,以pyspark庫為例。 RDD 的操作函數(operation)主要分為2種類型 Transformati ...
總算可以開始寫第一篇技術博客了,就從學習Spark開始吧。之前閱讀了很多關於Spark的文章,對Spark的工作機制及編程模型有了一定瞭解,下麵把Spark中對RDD的常用操作函數做一下總結,以pyspark庫為例。
RDD 的操作函數(operation)主要分為2種類型 Transformation 和 Action,如下圖:
Transformation 操作不是馬上提交 Spark 集群執行的,Spark 在遇到 Transformation 操作時只會記錄需要這樣的操作,並不會去執行,需要等到有 Action 操作的時候才會真正啟動計算過程進行計算.針對每個 Action,Spark 會生成一個 Job, 從數據的創建開始,經過 Transformation, 結尾是 Action 操作.這些操作對應形成一個有向無環圖(DAG),形成 DAG 的先決條件是最後的函數操作是一個Action.
Transformation:
map
(f, preservesPartitioning=False):將一個函數應用到這個RDD的每個element上,返回一個新的RDD。下麵例子將rdd中每個element複製兩遍:
1 from pyspark import SparkContext 2 3 sc = SparkContext('local', 'test') 4 5 rdd = sc.parallelize(['a', 'b', 'c']) 6 7 rdd.map(lambda x: x*2).collect() 8 9 Out: ['aa', 'bb', 'cc']
filter
(f):返回僅包含滿足應用到element函數的新RDD。下麵例子將過濾出rdd中的偶數:
1 rdd = sc.parallelize([1, 2, 3, 4]) 2 3 rdd.filter(lambda x: x%2 == 0).collect() 4 5 Out: [2, 4]
flatMap
(f, preservesPartitioning=False):返回一個新的RDD,首先將一個函數應用到這個RDD的所有element上,註意返回的是多個結果。
1 rdd.flatMap(lambda x: range(1, x)).collect() 2 3 Out: [1, 1, 2, 1, 2, 3]
mapPartitions
(f, preservesPartitioning=False):通過將一個函數應用到這個RDD的每個partition上,返回一個新的RDD。
1 rdd = sc.parallelize([1, 2, 3, 4], 2) 2 3 def f(iterator): yield sum(iterator) 4 5 rdd.mapPartitions(f).collect() 6 Out:[3, 7]
mapPartitionsWithIndex
(f, preservesPartitioning=False):通過在RDD的每個partition上應用一個函數來返回一個新的RDD,同時跟蹤原始partition的索引。下麵例子返回索引和:
1 rdd = sc.parallelize([1, 2, 3, 4], 4) 2 3 def f(splitIndex, iterator): yield splitIndex 4 5 rdd.mapPartitionsWithIndex(f).sum() 6 7 Out:6
sample
(withReplacement, fraction, seed=None):根據給定的隨機種子seed,隨機抽樣出數量為frac的數據,返回RDD。
1 rdd = sc.parallelize(range(100), 4) 2 3 rdd.sample(False, 0.2, 10).count() 4 5 Out: 21
union
(other):返回兩個RDD的並集。
1 rdd = sc.parallelize([1, 1, 2, 3]) 2 3 rdd.union(rdd).collect() 4 5 Out: [1, 1, 2, 3, 1, 1, 2, 3]
distinct
(numPartitions=None):類似於python中的set(),返回不重覆的元素集合。
1 sc.parallelize([1, 1, 2, 3]).distinct().collect() 2 3 Out:[1, 2, 3]
groupByKey
(numPartitions=None, partitionFunc=<function portable_hash>):在一個由(K,V)對組成的數據集上調用,返回一個(K,Seq[V])對的數據集。註意:預設情況下,使用8個並行任務進行分組,你可以傳入numTask可選參數,根據數據量設置不同數目的Task。
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.groupByKey().mapValues(len).collect())
[('a', 2), ('b', 1)]
>>> sorted(rdd.groupByKey().mapValues(list).collect())
[('a', [1, 1]), ('b', [1])]
reduceByKey
(func, numPartitions=None, partitionFunc=<function portable_hash>):在一個(K,V)對的數據集上使用,返回一個(K,V)對的數據集,key相同的值,都被使用指定的reduce函數聚合到一起。和groupbykey類似,任務的個數是可以通過第二個可選參數來配置的。
>>> from operator import add >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) >>> sorted(rdd.reduceByKey(add).collect()) [('a', 2), ('b', 1)]
sortByKey
(ascending=True, numPartitions=None, keyfunc=<function RDD.<lambda>>):按照key來進行排序,是升序還是降序,ascending是boolean類型
1 >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] 2 >>> sc.parallelize(tmp).sortByKey().first() 3 ('1', 3) 4 >>> sc.parallelize(tmp).sortByKey(True, 1).collect() 5 [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)] 6 >>> sc.parallelize(tmp).sortByKey(True, 2).collect() 7 [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)] 8 >>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)] 9 >>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)]) 10 >>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect() 11 [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5),...('white', 9), ('whose', 6)]
join
(other, numPartitions=None):在類型為(K,V)和(K,W)類型的數據集上調用,返回一個(K,(V,W))對,每個key中的所有元素都在一起的數據集。預設為inner join
>>> x = sc.parallelize([("a", 1), ("b", 4)]) >>> y = sc.parallelize([("a", 2), ("a", 3)]) >>> sorted(x.join(y).collect()) [('a', (1, 2)), ('a', (1, 3))]
cogroup
(other, numPartitions=None):當有兩個KV的dataset(K,V)和(K,W),返回的是(K,Seq[V],Seq[W])的dataset,即outer join
>>> x = sc.parallelize([("a", 1), ("b", 4)]) >>> y = sc.parallelize([("a", 2)]) >>> [(x, tuple(map(list, y))) for x, y in sorted(list(x.cogroup(y).collect()))] [('a', ([1], [2])), ('b', ([4], []))]
cartesian
(other):笛卡爾積。但在數據集T和U上調用時,返回一個(T,U)對的數據集,所有元素交互進行笛卡爾積。
>>> rdd = sc.parallelize([1, 2]) >>> sorted(rdd.cartesian(rdd).collect()) [(1, 1), (1, 2), (2, 1), (2, 2)]
Action:
reduce
(f):說白了就是聚集,但是傳入的函數是兩個參數輸入返回一個值,這個函數必須是滿足交換律和結合律的
>>> from operator import add >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add) 15 >>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add) 10 >>> sc.parallelize([]).reduce(add) Traceback (most recent call last): ... ValueError: Can not reduce() empty RDD
collect
():一般在filter或者足夠小的結果的時候,再用collect封裝返回一個數組
count():返回的是dataset中的element的個數
first():返回的是dataset中的第一個元素
take(n):返回一個數組,由數據集的前n個元素組成。註意,這個操作目前並非在多個節點上,並行執行,而是Driver程式所在機器,單機計算所有的元素(Gateway的記憶體壓力會增大,需要謹慎使用)
>>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2) [2, 3] >>> sc.parallelize([2, 3, 4, 5, 6]).take(10) [2, 3, 4, 5, 6] >>> sc.parallelize(range(100), 100).filter(lambda x: x > 90).take(3) [91, 92, 93]
takeSample
(withReplacement, num, seed=None):抽樣返回一個dataset中的num個元素,隨機種子seed
>>> rdd = sc.parallelize(range(0, 10)) >>> len(rdd.takeSample(True, 20, 1)) 20 >>> len(rdd.takeSample(False, 5, 2)) 5 >>> len(rdd.takeSample(False, 15, 3)) 10
saveAsTextFile
(path, compressionCodecClass=None):將數據集的元素,以textfile的形式,保存到本地文件系統,hdfs或者任何其它hadoop支持的文件系統。Spark將會調用每個元素的toString方法,並將它轉換為文件中的一行文本
saveAsSequenceFile
(path, compressionCodecClass=None):將數據集的元素,以sequencefile的格式,保存到指定的目錄下,本地系統,hdfs或者任何其它hadoop支持的文件系統。RDD的元素必須由key-value對組成,並都實現了Hadoop的Writable介面,或隱式可以轉換為Writable(Spark包括了基本類型的轉換,例如Int,Double,String等等)
countByKey
():返回的是key對應的個數的一個map,作用於一個RDD
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) >>> sorted(rdd.countByKey().items()) [('a', 2), ('b', 1)]
foreach
(f):在數據集的每一個元素上,運行函數func。這通常用於更新一個累加器變數,或者和外部存儲系統做交互
>>> def f(x): print(x) >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f)