圖解Spark API

来源:http://www.cnblogs.com/fanzhidongyzby/archive/2016/12/15/6185293.html
-Advertisement-
Play Games

初識spark,需要對其API有熟悉的瞭解才能方便開發上層應用。本文用圖形的方式直觀表達相關API的工作特點,並提供瞭解新的API介面使用的方法。例子代碼全部使用python實現。 1. 數據源準備 準備輸入文件: 啟動pyspark: 使用textFile創建RDD: 查看RDD分區與數據: 2. ...


初識spark,需要對其API有熟悉的瞭解才能方便開發上層應用。本文用圖形的方式直觀表達相關API的工作特點,並提供瞭解新的API介面使用的方法。例子代碼全部使用python實現。

1. 數據源準備

準備輸入文件:

$ cat /tmp/in
apple
bag bag
cat cat cat

啟動pyspark:

$ ./spark/bin/pyspark

使用textFile創建RDD:

>>> txt = sc.textFile("file:///tmp/in", 2)

查看RDD分區與數據:

>>> txt.glom().collect()
[[u'apple', u'bag bag'], [u'cat cat cat']]

2. transformation

flatMap

處理RDD的每一行,一對多映射。

代碼示例:

>>> txt.flatMap(lambda line: line.split()).collect()
[u'apple', u'bag', u'bag', u'cat', u'cat', u'cat']

示意圖:

map

處理RDD的每一行,一對一映射。

代碼示例:

>>> txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).collect()
[(u'apple', 1), (u'bag', 1), (u'bag', 1), (u'cat', 1), (u'cat', 1), (u'cat', 1)]

示意圖:

filter

處理RDD的每一行,過濾掉不滿足條件的行。

代碼示例:

>>> txt.flatMap(lambda line: line.split()).filter(lambda word: word !='bag').collect()
[u'apple', u'cat', u'cat', u'cat']

示意圖:

mapPartitions

逐個處理每一個partition,使用迭代器it訪問每個partition的行。

代碼示例:

>>> txt.flatMap(lambda line: line.split()).mapPartitions(lambda it: [len(list(it))]).collect()
[3, 3]

示意圖:

mapPartitionsWithIndex

逐個處理每一個partition,使用迭代器it訪問每個partition的行,index保存partition的索引,等價於mapPartitionsWithSplit(過期函數)。

代碼示例:

>>> txt.flatMap(lambda line: line.split()).mapPartitionsWithIndex(lambda index, it: [index]).collect()
[0, 1]

示意圖:

sample

根據採樣因數指定的比例,對數據進行採樣,可以選擇是否用隨機數進行替換,seed用於指定隨機數生成器種子。第一個參數表示是否放回抽樣,第二個參數表示抽樣比例,第三個參數表示隨機數seed。

代碼示例:

>>> txt.flatMap(lambda line: line.split()).sample(False, 0.5, 5).collect()
[u'bag', u'bag', u'cat', u'cat']

示意圖:

union

合併RDD,不去重。

代碼示例:

>>> txt.union(txt).collect()
[u'apple', u'bag bag', u'cat cat cat', u'apple', u'bag bag', u'cat cat cat']

示意圖:

distinct

對RDD去重。

代碼示例:

>>> txt.flatMap(lambda line: line.split()).distinct().collect()
[u'bag', u'apple', u'cat']

示意圖:

groupByKey

在一個(K,V)對的數據集上調用,返回一個(K,Seq[V])對的數據集。

代碼示例:

>>> txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).groupByKey().collect()
[(u'bag', <pyspark.resultiterable.ResultIterable object at 0x128a150>), (u'apple', <pyspark.resultiterable.ResultIterable object at 0x128a550>), (u'cat', <pyspark.resultiterable.ResultIterable object at 0x13234d0>)]
>>> txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).groupByKey().collect()[0][1].data
[1, 1]

示意圖:

reduceByKey

在一個(K,V)對的數據集上調用時,返回一個(K,V)對的數據集,使用指定的reduce函數,將相同key的值聚合到一起。

代碼示例:

>>> txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).collect()
[(u'bag', 2), (u'apple', 1), (u'cat', 3)]

示意圖:

aggregateByKey

自定義聚合函數,類似groupByKey。在一個(K,V)對的數據集上調用,不過可以返回一個(K,Seq[U])對的數據集。

代碼示例(實現groupByKey的功能):

>>> txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).aggregateByKey([], lambda seq, elem: seq + [elem], lambda a, b: a + b).collect()
[(u'bag', [1, 1]), (u'apple', [1]), (u'cat', [1, 1, 1])]

sortByKey

在一個(K,V)對的數據集上調用,K必須實現Ordered介面,返回一個按照Key進行排序的(K,V)對數據集。升序或降序由ascending布爾參數決定。

代碼示例:

>>> txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).sortByKey().collect()
[(u'apple', 1), (u'bag', 2), (u'cat', 3)]

示意圖:

join

在類型為(K,V)和(K,W)類型的數據集上調用時,返回一個相同key對應的所有元素對在一起的(K, (V, W))數據集。

代碼示例:

>>> sorted_txt = txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).sortByKey()
>>> sorted_txt.join(sorted_txt).collect()
[(u'bag', (2, 2)), (u'apple', (1, 1)), (u'cat', (3, 3))]

示意圖:

cogroup

在類型為(K,V)和(K,W)的數據集上調用,返回一個 (K, (Seq[V], Seq[W]))元組的數據集。這個操作也可以稱之為groupwith。

代碼示例:

>>> sorted_txt = txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).sortByKey()
>>> sorted_txt.cogroup(sorted_txt).collect()
[(u'bag', (<pyspark.resultiterable.ResultIterable object at 0x1323790>, <pyspark.resultiterable.ResultIterable object at 0x1323310>)), (u'apple', (<pyspark.resultiterable.ResultIterable object at 0x1323990>, <pyspark.resultiterable.ResultIterable object at 0x1323ad0>)), (u'cat', (<pyspark.resultiterable.ResultIterable object at 0x1323110>, <pyspark.resultiterable.ResultIterable object at 0x13230d0>))]
>>> sorted_txt.cogroup(sorted_txt).collect()[0][1][0].data
[2]

示意圖:

cartesian

笛卡爾積,在類型為 T 和 U 類型的數據集上調用時,返回一個 (T, U)對數據集(兩兩的元素對)。

代碼示例:

>>> sorted_txt = txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).sortByKey()
>>> sorted_txt.cogroup(sorted_txt).collect()
[(u'bag', (<pyspark.resultiterable.ResultIterable object at 0x1323790>, <pyspark.resultiterable.ResultIterable object at 0x1323310>)), (u'apple', (<pyspark.resultiterable.ResultIterable object at 0x1323990>, <pyspark.resultiterable.ResultIterable object at 0x1323ad0>)), (u'cat', (<pyspark.resultiterable.ResultIterable object at 0x1323110>, <pyspark.resultiterable.ResultIterable object at 0x13230d0>))]
>>> sorted_txt.cogroup(sorted_txt).collect()[0][1][0].data
[2]

示意圖:

pipe

處理RDD的每一行作為shell命令輸入,shell命令結果為輸出。

代碼示例:

>>> txt.pipe("awk '{print $1}'").collect()
[u'apple', u'bag', u'cat']

示意圖:

coalesce

減少RDD分區數。

代碼示例:

>>> txt.coalesce(1).collect()
[u'apple', u'bag bag', u'cat cat cat']

示意圖:

repartition

對RDD重新分區,類似於coalesce。

代碼示例:

>>> txt.repartition(1).collect()
[u'apple', u'bag bag', u'cat cat cat']

zip

合併兩個RDD序列為元組,要求序列長度相等。

代碼示例:

>>> txt.zip(txt).collect()
[(u'apple', u'apple'), (u'bag bag', u'bag bag'), (u'cat cat cat', u'cat cat cat')]

示意圖:

3. action

reduce

聚集數據集中的所有元素。

代碼示例:

>>> txt.reduce(lambda a, b: a + " " + b)
u'apple bag bag cat cat cat'

示意圖:

collect

以數組的形式,返回數據集的所有元素。

代碼示例:

>>> txt.collect()
[u'apple', u'bag bag', u'cat cat cat']

count

返回數據集的元素的個數。

代碼示例:

>>> txt.count()
3

first

返回數據集第一個元素。

代碼示例:

>>> txt.first()
u'apple'

take

返回數據集前n個元素。

代碼示例:

>>> txt.take(2)
[u'apple', u'bag bag']

takeSample

採樣返回數據集前n個元素。第一個參數表示是否放回抽樣,第二個參數表示抽樣個數,第三個參數表示隨機數seed。

代碼示例:

>>> txt.takeSample(False, 2, 1)
[u'cat cat cat', u'bag bag']

takeOrdered

排序返回前n個元素。

代碼示例:

>>> txt.takeOrdered(2)
[u'apple', u'bag bag']

saveAsTextFile

將數據集的元素,以textfile的形式,保存到本地文件系統,HDFS或者任何其它hadoop支持的文件系統。

代碼示例:

>>> txt.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).saveAsTextFile("file:///tmp/out")

查看輸出文件:

$cat /tmp/out/part-00001
(u'bag', 2)
(u'apple', 1)
(u'cat', 3)

saveAsSequenceFile

將數據集的元素,以Hadoop sequencefile的格式,保存到指定的目錄下,本地系統,HDFS或者任何其它hadoop支持的文件系統。這個只限於由key-value對組成,並實現了Hadoop的Writable介面,或者隱式的可以轉換為Writable的RDD。

countByKey

對(K,V)類型的RDD有效,返回一個(K,Int)對的Map,表示每一個key對應的元素個數。

代碼示例:

>>> txt.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).countByKey()
defaultdict(<type 'int'>, {u'bag': 2, u'apple': 1, u'cat': 3})

foreach

在數據集的每一個元素上,運行函數func進行更新。這通常用於邊緣效果,例如更新一個累加器,或者和外部存儲系統進行交互。

代碼示例:

>>> def func(line): print line
>>> txt.foreach(lambda line: func(line))
apple
bag bag
cat cat cat

4. 其他

文中未提及的transformation和action函數可以通過如下命令查詢:

>>> dir(txt)
['__add__', '__class__', '__delattr__', '__dict__', '__doc__', '__format__', '__getattribute__', '__getnewargs__', '__hash__', '__init__', '__module__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_computeFractionForSampleSize', '_defaultReducePartitions', '_id', '_jrdd', '_jrdd_deserializer', '_memory_limit', '_pickled', '_reserialize', '_to_java_object_rdd', 'aggregate', 'aggregateByKey', 'cache', 'cartesian', 'checkpoint', 'coalesce', 'cogroup', 'collect', 'collectAsMap', 'combineByKey', 'context', 'count', 'countApprox', 'countApproxDistinct', 'countByKey', 'countByValue', 'ctx', 'distinct', 'filter', 'first', 'flatMap', 'flatMapValues', 'fold', 'foldByKey', 'foreach', 'foreachPartition', 'fullOuterJoin', 'getCheckpointFile', 'getNumPartitions', 'getStorageLevel', 'glom', 'groupBy', 'groupByKey', 'groupWith', 'histogram', 'id', 'intersection', 'isCheckpointed', 'isEmpty', 'is_cached', 'is_checkpointed', 'join', 'keyBy', 'keys', 'leftOuterJoin', 'lookup', 'map', 'mapPartitions', 'mapPartitionsWithIndex', 'mapPartitionsWithSplit', 'mapValues', 'max', 'mean', 'meanApprox', 'min', 'name', 'partitionBy', 'partitioner', 'persist', 'pipe', 'randomSplit', 'reduce', 'reduceByKey', 'reduceByKeyLocally', 'repartition', 'repartitionAndSortWithinPartitions', 'rightOuterJoin', 'sample', 'sampleByKey', 'sampleStdev', 'sampleVariance', 'saveAsHadoopDataset', 'saveAsHadoopFile', 'saveAsNewAPIHadoopDataset', 'saveAsNewAPIHadoopFile', 'saveAsPickleFile', 'saveAsSequenceFile', 'saveAsTextFile', 'setName', 'sortBy', 'sortByKey', 'stats', 'stdev', 'subtract', 'subtractByKey', 'sum', 'sumApprox', 'take', 'takeOrdered', 'takeSample', 'toDF', 'toDebugString', 'toLocalIterator', 'top', 'treeAggregate', 'treeReduce', 'union', 'unpersist', 'values', 'variance', 'zip', 'zipWithIndex', 'zipWithUniqueId']

查詢具體函數的使用文檔:

>>> help(txt.zipWithIndex)
Help on method zipWithIndex in module pyspark.rdd:

zipWithIndex(self) method of pyspark.rdd.RDD instance
    Zips this RDD with its element indices.
    
    The ordering is first based on the partition index and then the
    ordering of items within each partition. So the first item in
    the first partition gets index 0, and the last item in the last
    partition receives the largest index.
    
    This method needs to trigger a spark job when this RDD contains
    more than one partitions.
    
    >>> sc.parallelize(["a", "b", "c", "d"], 3).zipWithIndex().collect()
    [('a', 0), ('b', 1), ('c', 2), ('d', 3)]
(END)




您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 一.SAX解析方法介紹 SAX(Simple API for XML)是一個解析速度快並且占用記憶體少的XML解析器,非常適合用於Android等移動設備。 SAX解析器是一種基於事件的解析器,事件驅動的流式解析方式是,從文件的開始順序解析到文檔的結束,不可暫停或倒退。它的核心是事件處理模式,主要是圍 ...
  • ublic class MemorySpaceCheck { /** * 計算剩餘空間 * @param path * @return */ public static String getAvailableSize(String path) { StatFs fileStats = new Sta ...
  • 1:https關閉證書跟功能變數名稱的驗證 如果報 In order to validate a domain name for self signed certificates, you MUST use pinning 也是上面這種方式進行解決 2: iOS UIWebView 訪問https繞過證書驗 ...
  • 作為Android四大組件之一,Activity可以說是最基本也是最常見的組件,它提供了一個顯示界面,從而實現與用戶的交互,作為初學者,必須熟練掌握。今天我們就來通過實驗演示,來幫助大家理解Activity的四大啟動模式。 演示效果如下: 第一步:實驗前準備,相關配置文件以及Activity的建立 ...
  • 我們都知道,靜態變數用起來是挺方便的,可是一不小心那就say拜拜了。說一說我在項目中遇到的情況,測試了好多次,最後才發現原因。感動啊! private static String UserRootPath = "/sdcard/User/"+UserManager.username; private ...
  • 這次搭建iOS的ProtocolBuffer編譯器和把*.proto源文件編譯成*.pbobjc.h 和 *.pbobjc.m文件時,碰到不少問題! 搭建pb編譯器到時沒有什麼問題,只是在把*.proto文件編譯出來後,我用cocoaPods集成ProtocolBuffers到自己項目, cocoa ...
  • 從iOS8系統開始,用戶可以在設置裡面設置在WiFi環境下,自動更新安裝的App。此功能大大方便了用戶,但是一些用戶沒有開啟此項功能,因此還是需要在程式裡面提示用戶的 方法一:在伺服器介面約定對應的數據,這樣,伺服器直接傳遞信息,提示用戶有新版本,可以去商店升級 註意:這個方法是有毛病的,若您的Ap ...
  • 這是一篇我曾經拜讀過的資料庫基礎總結性的文章,原文出自園友 "游戲世界" 。最近想重新鞏固一遍,不過原文訪問受限,我在某網站找到爬蟲版,重新排版後轉載至此處。 1.什麼是SQL語句 SQL語言,結構化的查詢語言(Structured Query Language),是關係資料庫管理系統的標準語言。它 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...