Spark學習筆記之RDD中的Transformation和Action函數

来源:https://www.cnblogs.com/wmx24/archive/2018/03/13/8563514.html
-Advertisement-
Play Games

總算可以開始寫第一篇技術博客了,就從學習Spark開始吧。之前閱讀了很多關於Spark的文章,對Spark的工作機制及編程模型有了一定瞭解,下麵把Spark中對RDD的常用操作函數做一下總結,以pyspark庫為例。 RDD 的操作函數(operation)主要分為2種類型 Transformati ...


  總算可以開始寫第一篇技術博客了,就從學習Spark開始吧。之前閱讀了很多關於Spark的文章,對Spark的工作機制及編程模型有了一定瞭解,下麵把Spark中對RDD的常用操作函數做一下總結,以pyspark庫為例。

  RDD 的操作函數(operation)主要分為2種類型 Transformation 和 Action,如下圖:

  

  Transformation 操作不是馬上提交 Spark 集群執行的,Spark 在遇到 Transformation 操作時只會記錄需要這樣的操作,並不會去執行,需要等到有 Action 操作的時候才會真正啟動計算過程進行計算.針對每個 Action,Spark 會生成一個 Job, 從數據的創建開始,經過 Transformation, 結尾是 Action 操作.這些操作對應形成一個有向無環圖(DAG),形成 DAG 的先決條件是最後的函數操作是一個Action.   

 

Transformation:

map(f, preservesPartitioning=False):將一個函數應用到這個RDD的每個element上,返回一個新的RDD。下麵例子將rdd中每個element複製兩遍:

1 from pyspark import SparkContext
2 
3 sc = SparkContext('local', 'test')
4 
5 rdd = sc.parallelize(['a', 'b', 'c'])
6 
7 rdd.map(lambda x: x*2).collect()
8 
9 Out: ['aa', 'bb', 'cc']

 

filter(f):返回僅包含滿足應用到element函數的新RDD。下麵例子將過濾出rdd中的偶數:

1 rdd = sc.parallelize([1, 2, 3, 4])
2 
3 rdd.filter(lambda x: x%2 == 0).collect()
4 
5 Out: [2, 4]

 

flatMap(f, preservesPartitioning=False):返回一個新的RDD,首先將一個函數應用到這個RDD的所有element上,註意返回的是多個結果。

1 rdd.flatMap(lambda x: range(1, x)).collect()
2 
3 Out: [1, 1, 2, 1, 2, 3]

 

mapPartitions(f, preservesPartitioning=False):通過將一個函數應用到這個RDD的每個partition上,返回一個新的RDD。

1 rdd = sc.parallelize([1, 2, 3, 4], 2)
2 
3 def f(iterator): yield sum(iterator)
4 
5 rdd.mapPartitions(f).collect()
6 Out:[3, 7]

 

mapPartitionsWithIndex(f, preservesPartitioning=False):通過在RDD的每個partition上應用一個函數來返回一個新的RDD,同時跟蹤原始partition的索引。下麵例子返回索引和:

1 rdd = sc.parallelize([1, 2, 3, 4], 4)
2 
3 def f(splitIndex, iterator): yield splitIndex
4 
5 rdd.mapPartitionsWithIndex(f).sum()
6 
7 Out:6

 

sample(withReplacement, fraction, seed=None)根據給定的隨機種子seed,隨機抽樣出數量為frac的數據,返回RDD。

1 rdd = sc.parallelize(range(100), 4)
2 
3 rdd.sample(False, 0.2, 10).count()
4 
5 Out: 21

 

union(other):返回兩個RDD的並集。

1 rdd = sc.parallelize([1, 1, 2, 3])
2 
3 rdd.union(rdd).collect()
4 
5 Out: [1, 1, 2, 3, 1, 1, 2, 3]

 

distinct(numPartitions=None):類似於python中的set(),返回不重覆的元素集合。

1 sc.parallelize([1, 1, 2, 3]).distinct().collect()
2 
3 Out:[1, 2, 3]

 

groupByKey(numPartitions=None, partitionFunc=<function portable_hash>)在一個由(K,V)對組成的數據集上調用,返回一個(K,Seq[V])對的數據集。註意:預設情況下,使用8個並行任務進行分組,你可以傳入numTask可選參數,根據數據量設置不同數目的Task。

 

>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.groupByKey().mapValues(len).collect())
[('a', 2), ('b', 1)]
>>> sorted(rdd.groupByKey().mapValues(list).collect())
[('a', [1, 1]), ('b', [1])]

 

 

reduceByKey(func, numPartitions=None, partitionFunc=<function portable_hash>)在一個(K,V)對的數據集上使用,返回一個(K,V)對的數據集,key相同的值,都被使用指定的reduce函數聚合到一起。和groupbykey類似,任務的個數是可以通過第二個可選參數來配置的。

 

>>> from operator import add
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.reduceByKey(add).collect())
[('a', 2), ('b', 1)]

 

 

sortByKey(ascending=True, numPartitions=None, keyfunc=<function RDD.<lambda>>):按照key來進行排序,是升序還是降序,ascending是boolean類型

 1 >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
 2 >>> sc.parallelize(tmp).sortByKey().first()
 3 ('1', 3)
 4 >>> sc.parallelize(tmp).sortByKey(True, 1).collect()
 5 [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
 6 >>> sc.parallelize(tmp).sortByKey(True, 2).collect()
 7 [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
 8 >>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)]
 9 >>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)])
10 >>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect()
11 [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5),...('white', 9), ('whose', 6)]

 

join(other, numPartitions=None)在類型為(K,V)和(K,W)類型的數據集上調用,返回一個(K,(V,W))對,每個key中的所有元素都在一起的數據集。預設為inner join

 

>>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2), ("a", 3)])
>>> sorted(x.join(y).collect())
[('a', (1, 2)), ('a', (1, 3))]

 

 

cogroup(other, numPartitions=None):當有兩個KV的dataset(K,V)和(K,W),返回的是(K,Seq[V],Seq[W])的dataset,即outer join

>>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2)])
>>> [(x, tuple(map(list, y))) for x, y in sorted(list(x.cogroup(y).collect()))]
[('a', ([1], [2])), ('b', ([4], []))]

 

cartesian(other)笛卡爾積。但在數據集T和U上調用時,返回一個(T,U)對的數據集,所有元素交互進行笛卡爾積。

>>> rdd = sc.parallelize([1, 2])
>>> sorted(rdd.cartesian(rdd).collect())
[(1, 1), (1, 2), (2, 1), (2, 2)]

 


Action:

reduce(f):說白了就是聚集,但是傳入的函數是兩個參數輸入返回一個值,這個函數必須是滿足交換律和結合律的

>>> from operator import add
>>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
15
>>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add)
10
>>> sc.parallelize([]).reduce(add)
Traceback (most recent call last):
    ...
ValueError: Can not reduce() empty RDD

 

collect():一般在filter或者足夠小的結果的時候,再用collect封裝返回一個數組

 

count():返回的是dataset中的element的個數

 

first():返回的是dataset中的第一個元素 

 

take(n):返回一個數組,由數據集的前n個元素組成。註意,這個操作目前並非在多個節點上,並行執行,而是Driver程式所在機器,單機計算所有的元素(Gateway的記憶體壓力會增大,需要謹慎使用)

 

>>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2)
[2, 3]
>>> sc.parallelize([2, 3, 4, 5, 6]).take(10)
[2, 3, 4, 5, 6]
>>> sc.parallelize(range(100), 100).filter(lambda x: x > 90).take(3)
[91, 92, 93]

 

 

takeSample(withReplacement, num, seed=None):抽樣返回一個dataset中的num個元素,隨機種子seed

>>> rdd = sc.parallelize(range(0, 10))
>>> len(rdd.takeSample(True, 20, 1))
20
>>> len(rdd.takeSample(False, 5, 2))
5
>>> len(rdd.takeSample(False, 15, 3))
10

 

 

saveAsTextFile(path, compressionCodecClass=None):將數據集的元素,以textfile的形式,保存到本地文件系統,hdfs或者任何其它hadoop支持的文件系統。Spark將會調用每個元素的toString方法,並將它轉換為文件中的一行文本

 

saveAsSequenceFile(path, compressionCodecClass=None):將數據集的元素,以sequencefile的格式,保存到指定的目錄下,本地系統,hdfs或者任何其它hadoop支持的文件系統。RDD的元素必須由key-value對組成,並都實現了Hadoop的Writable介面,或隱式可以轉換為Writable(Spark包括了基本類型的轉換,例如Int,Double,String等等)

 

countByKey():返回的是key對應的個數的一個map,作用於一個RDD

 

>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.countByKey().items())
[('a', 2), ('b', 1)]

 

 

foreach(f):在數據集的每一個元素上,運行函數func。這通常用於更新一個累加器變數,或者和外部存儲系統做交互

 

>>> def f(x): print(x)
>>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f)

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 轉載自:http://blog.csdn.net/j755ing/article/details/69400439 第一步: 下載 材料/工具: 下載 VMware Workstation 12 Pro , 下載 Mac OS安裝補丁(unlocker208), 下載 OS X 10.11.1(15 ...
  • 資源競爭 相互隔絕:兩個進程不會同時進入critical section progress: critical section之外的進程不會阻止其他進程進入他們的critical section bounded waiting: 每個進程等待有限時間即可進入critical section spee ...
  • 轉自:https://www.cnblogs.com/waynechou/p/xtrabackup_backup.html 閱讀目錄 xtrabackup 選項 xtrabackup 全量備份恢復 xtrabackup 增量備份恢復 轉自:https://www.cnblogs.com/waynec ...
  • 更新源列表 打開"終端視窗",輸入"sudo apt-get update" 安裝ssh 打開"終端視窗",輸入"sudo apt-get install openssh-server"-->回車-->輸入"y"-->回車-->安裝完成。 查看ssh服務是否啟動 打開"終端視窗",輸入"sudo p ...
  • Linux 的發行版實在是太多了。初次接觸 Linux 的同學,面對這麼的發行版,估計會有點暈。所以,在寫完《新手如何搞定 Linux 操作系統》一文之後,俺接著來掃盲一下 Linux 的發行版。 ★"內核"與"發行版"的關係 對於新手而言,需要先搞清楚這兩個概念(已經明白的同學,請跳過本節)。 ◇ ...
  • 大家好,我是痞子衡,是正經搞技術的痞子。今天痞子衡給大家介紹的是飛思卡爾i.MX RT系列MCU的基本特性。 ...
  • 等待事件介紹 關於等待事件RESOURCE_SEMAPHORE_QUERY_COMPILE,官方的介紹如下: Occurs when the number of concurrent query compilations reaches a throttling limit. High waits ... ...
  • 錯誤提示原因:安裝時檢測出電腦沒有安裝JDK,而且是版本7(其他版本不行) 解決方法:先進下麵這個網站安裝JDK,安裝好後配置環境變數,然後重新安裝SQL Server 2016即可 http://www.oracle.com/technetwork/java/javase/downloads/ja ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...