pyspark 內容介紹(一)

来源:http://www.cnblogs.com/wenBlog/archive/2017/01/20/6323678.html
-Advertisement-
Play Games

pyspark 包介紹 子包 pyspark.sql module pyspark.streaming module pyspark.ml package pyspark.mllib package 子包 pyspark.sql module pyspark.streaming module pys ...


 

pyspark 包介紹

子包

內容

PySpark是針對Spark的Python API。根據網上提供的資料,現在彙總一下這些類的基本用法,並舉例說明如何具體使用。也是總結一下經常用到的這些公有類的使用方式。方便初學者查詢及使用。

Public 類們:

  • SparkContext:

    Spark 功能的主入口。

  • RDD:

    彈性分散式數據集,就是在Spark中的基礎抽象

  • Broadcast:

    一個在task之間重用的廣播變數。

  • Accumulator:

    一個“add-only” 共用變數,task只能增加值。

  • SparkConf:

    用於配置Spark.

  • SparkFiles:

    在job中訪問文件。

  • StorageLevel:

    更細粒度的緩存持久化級別。

     

將分為兩篇介紹這些類的內容,這裡首先介紹SparkConf
1. class pyspark.SparkConf(loadDefaults=True, _jvm=None, _jconf=None)

配置一個Spark應用,一般用來設置各種Spark的鍵值對作為參數。

大多數時候,使用SparkConf()來創建SparkConf對象,也用於載入來自spark.* Java系統的屬性值。此時,在SparkConf對象上設置的任何參數都有高於系統屬性的優先順序。

對於單元測試,也能調用SparkConf(false)來略過額外的配置,無論系統屬性是什麼都可以獲得相同的配置。

這個類中的設值方法都是支持鏈式結構的,例如,你可以這樣編寫配置conf.setMaster(“local”).setAppName(“My app”)

註意:

一旦SparkConf對象被傳遞給Spark,它就被覆制並且不能被其他人修改。

contains(key)

配置中是否包含一個指定鍵。

get(key, defaultValue=None)

獲取配置的某些鍵值,或者返回預設值。

getAll()

得到所有的鍵值對的list。

set(key, value)

設置配置屬性。

setAll(pairs)

通過傳遞一個鍵值對的list,為多個參數賦值。

etAppName(value)

設置應用名稱

setExecutorEnv(key=None, value=None, pairs=None)

設置環境變數複製給執行器。

setIfMissing(key, value)

如果沒有,則設置一個配置屬性。

setMaster(value)

設置主連接地址。

setSparkHome(value)

設置工作節點上的Spark安裝路徑。

toDebugString()

返回一個可列印的配置版本。

2. class pyspark.SparkContext(master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=0, serializer=PickleSerializer(), conf=None, gateway=None, jsc=None, profiler_cls=<class 'pyspark.profiler.BasicProfiler'>)

Spark功能的主入口,SparkContext 代表到Spark 集群的連接,並且在集群上能創建RDD和broadcast。

PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar')
accumulator(value, accum_param=None)

用指定的初始化值創建一個Accumulator累加器。使用AccumulatorParam對象定義如何添加數據類型的值。預設AccumulatorParams為整型和浮點型。如果其他類型需要自定義。

addFile(path, recursive=False)

使用在每個節點上的Spark job添加文件下載。這裡path 參數可以使本地文件也可以使在HDFS中的文件,也可以是HTTP、HTTPS或者URI。

在Spark的job中訪問文件,使用L{SparkFiles.get(fileName)<pyspark.files.SparkFiles.get>}可以找到下載位置。

如果遞歸選項被設置為“TRUE”則路徑能被指定。當前路徑僅僅支持Hadoop文件系統。

 1 >>> from pyspark import SparkFiles
 2 >>> path = os.path.join(tempdir, "test.txt")
 3 >>> with open(path, "w") as testFile:
 4 ...    _ = testFile.write("100") 
 5 >>> sc.addFile(path)
 6 >>> def func(iterator):
 7 ...    with open(SparkFiles.get("test.txt")) as testFile:
 8 ...        fileVal = int(testFile.readline())
 9 ...        return [x * fileVal for x in iterator]
10 >>> sc.parallelize([1, 2, 3, 4]).mapPartitions(func).collect()
11 [100, 200, 300, 400]

 

addPyFile(path)

為所有將在SparkContext上執行的任務添加一個a.py或者.zip的附件。這裡path 參數可以使本地文件也可以使在HDFS中的文件,也可以是HTTP、HTTPS或者FTP URI。

applicationId

Spark應用的唯一ID,它的格式取決於調度器實現。

  • 本地模式下像這樣的ID‘local-1433865536131’
  • 模式下像這樣的ID‘application_1433865536131_34483’
>>> sc.applicationId  
u'local-...'

 

binaryFiles(path, minPartitions=None)

註意

  • 從HDFS上讀取二進位文件的路徑,本地文件系統(在所有節點上都可用),或者其他hadoop支持的文件系統URI黨組偶一個二進位數組。每個文件作為單獨的記錄,並且返回一個鍵值對,這個鍵就是每個文件的了路徑,值就是每個文件的內容。
  • 小文件優先選擇,大文件也可以,但是會引起性能問題。
binaryRecords(path, recordLength)
  • path – 輸入文件路徑
  • recordLength – 分割記錄的長度(位數)
註意

從平面二進位文件中載入數據,假設每個記錄都是一套指定數字格式的數字(ByteBuffer),並且每個記錄位數的數是恆定的。

broadcast(value)

廣播一個制度變數到集群,返回一個L{Broadcast<pyspark.broadcast.Broadcast>} 對象在分散式函數中讀取。這個變數將只發一次給每個集群。

cancelAllJobs()

取消所有已排程的或者正在運行的job。

cancelJobGroup(groupId)

 

取消指定組的已激活job,查看SparkContext.setJobGroup更多信息。

 

defaultMinPartitions

當不被用戶指定時,預設Hadoop RDDs 為最小分區。

defaultParallelism

當不被用戶指定時,預設並行級別執行。(例如reduce task)

dump_profiles(path)

轉存配置信息到目錄路徑下。

emptyRDD()

創建沒有分區或者元素的RDD。

getConf()
getLocalProperty(key)

在當前線程中得到一個本地設置屬性。

classmethod getOrCreate(conf=None)
參數:conf – SparkConf (optional)

獲取或者實例化一個SparkContext並且註冊為單例模式對象。

hadoopFile(path, inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0)、

用任意來自HDFS的鍵和值類讀取一個老的Hadoop輸入格式,本地系統(所有節點可用),或者任何支持Hadoop的文件系統的URI。這個機制是與sc.sequenceFile是一樣的。

Hadoop 配置可以作為Python的字典傳遞。這將被轉化成Java中的配置。

參數:

  • path – Hadoop文件路徑
  • inputFormatClass – 輸入的Hadoop文件的規範格式(例如 “org.apache.hadoop.mapred.TextInputFormat”)
  • keyClass – 可寫鍵類的合格類名 (例如“org.apache.hadoop.io.Text”)
  • valueClass –可寫值類的合格類名 (e.g. “org.apache.hadoop.io.LongWritable”)
  • keyConverter – (預設為none)
  • valueConverter – (預設為none)
  • conf – Hadoop配置,作為一個字典傳值 (預設為none)
  • batchSize – Python對象的數量代表一個單一的JAVA對象 (預設 0, 表示自動匹配batchSize)
hadoopRDD(inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0)

讀取Hadoop輸入格式用任意鍵值類。與上面的類相似。

參數:

  • inputFormatClass – 輸入的Hadoop文件的規範格式(例如 “org.apache.hadoop.mapred.TextInputFormat”)
  • keyClass – 可寫鍵類的合格類名 (例如“org.apache.hadoop.io.Text”)
  • valueClass –可寫值類的合格類名 (e.g. “org.apache.hadoop.io.LongWritable”)
  • keyConverter – (預設為none)
  • valueConverter – (預設為none)
  • conf – Hadoop配置,作為一個字典傳值 (預設為none)
  • batchSize – Python對象的數量代表一個單一的JAVA對象 (預設 0, 表示自動匹配batchSize)
newAPIHadoopFile(path, inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0)

與上面的功能類似.

newAPIHadoopRDD(inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0)

任意Hadoop的配置作為參數傳遞。

parallelize(c, numSlices=None)

分配一個本Python集合構成一個RDD。如果輸入代表了一個性能範圍,建議使用xrange。

>>> sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()
[[0], [2], [3], [4], [6]]
>>> sc.parallelize(xrange(0, 6, 2), 5).glom().collect()
[[], [0], [], [2], [4]]

 

pickleFile(name, minPartitions=None)

載入使用RDD.saveAsPickleFile方法保存的RDD。

>>> tmpFile = NamedTemporaryFile(delete=True)
>>> tmpFile.close()
>>> sc.parallelize(range(10)).saveAsPickleFile(tmpFile.name, 5)
>>> sorted(sc.pickleFile(tmpFile.name, 3).collect())
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

 

range(start, end=None, step=1, numSlices=None)

創建一個int類型元素組成的RDD,從開始值到結束(不包含結束),裡面都是按照步長增長的元素。這就要用到Python內置的函數range()。如果只有一個參數調用,這個參數就表示結束值,開始值預設為0.

參數:

  • start –起始值
  • end – 結束值(不包含)
  • step – 步長(預設: 1)
  • numSlices –RDD分區數量(切片數)

返回值:RDD

>>> sc.range(5).collect()
[0, 1, 2, 3, 4]
>>> sc.range(2, 4).collect()
[2, 3]
>>> sc.range(1, 7, 2).collect()
[1, 3, 5]

 

runJob(rdd, partitionFunc, partitions=None, allowLocal=False)

執行指定的partitionFunc 在指定的分區,返回一個元素數組。如果不指定分區,則將運行在所有分區上。

>>> myRDD = sc.parallelize(range(6), 3)
>>> sc.runJob(myRDD, lambda part: [x * x for x in part])
[0, 1, 4, 9, 16, 25]

>>> myRDD = sc.parallelize(range(6), 3)
>>> sc.runJob(myRDD, lambda part: [x * x for x in part], [0, 2], True)
[0, 1, 16, 25]

 


 
  • sequenceFile(path, keyClass=None, valueClass=None, keyConverter=None, valueConverter=None, minSplits=None, batchSize=0)
  • 讀取Hadoop 的SequenceFile,機制如下:

    1.一個Java RDD通過SequenceFile或者其他輸入格式創建,需要鍵值的可寫類參數。

    2.序列化

    3.如果失敗,則對每個鍵值調用‘toString’。

    4.在Python上,PickleSerializer用來反序列化。

參數:

path –序列化文件路徑

keyClass – 可用鍵類(例如 “org.apache.hadoop.io.Text”)

valueClass – 可用值類 (例如 “org.apache.hadoop.io.LongWritable”)

keyConverter

valueConverter

minSplits – 數據集最低分割數(預設 min(2, sc.defaultParallelism))

batchSize – 代表一個JAVA對象Python對象的數量 (預設0, 自動)

 

setCheckpointDir(dirName)

 

設定作為檢查點的RDD的目錄,如果運行在集群上,則目錄一定時HDFS路徑。

setJobGroup(groupId, description, interruptOnCancel=False)

分配一個組ID給所有被這個線程開啟的job。

通常,一個執行單位由多個Spark 的action或者job組成。應用程式可以將所有把所有job組成一個組,給一個組的描述。一旦設置好,Spark的web UI 將關聯job和組。

應用使用SparkContext.cancelJobGroup來取消組。

>>> import threading
>>> from time import sleep
>>> result = "Not Set"
>>> lock = threading.Lock()
>>> def map_func(x):
...     sleep(100)
...     raise Exception("Task should have been cancelled")
>>> def start_job(x):
...     global result
...     try:
...         sc.setJobGroup("job_to_cancel", "some description")
...         result = sc.parallelize(range(x)).map(map_func).collect()
...     except Exception as e:
...         result = "Cancelled"
...     lock.release()
>>> def stop_job():
...     sleep(5)
...     sc.cancelJobGroup("job_to_cancel")
>>> supress = lock.acquire()
>>> supress = threading.Thread(target=start_job, args=(10,)).start()
>>> supress = threading.Thread(target=stop_job).start()
>>> supress = lock.acquire()
>>> print(result)
Cancelled

 

如果對於job組,interruptOnCancel被設定為True,那麼那麼取消job將在執行線程中調用Thread.interrupt()。這對於確保任務實時停止是有作用的。但是預設情況下,HDFS可以通過標記節點為dead狀態來停止線程。

setLocalProperty(key, value)

設定本地影響提交工作的屬性,例如Spark 公平調度池。

setLogLevel(logLevel)

控制日誌級別。重寫任何用戶自定義的日誌設定。有效的日誌級別包括:ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN。

classmethod setSystemProperty(key, value)

設定Java系統屬性,例如spark.executor.memory,這一定要在實例化SparkContext之前被激活。

show_profiles()

列印配置信息到標準輸出。

sparkUser()

為運行SparkContext 的用戶獲得SPARK_USER

startTime

當SparkContext被髮起,則返回新的時間紀元。

statusTracker()

Return StatusTracker object

返回StatusTracker對象

stop()

關閉SparkContext。

textFile(name, minPartitions=None, use_unicode=True)

從HDFS中讀取一個text文件,本地文件系統(所有節點可用),或者任何支持Hadoop的文件系統的URI,然後返回一個字元串類型的RDD。

如果用戶use_unicode為False,則strings類型將為str(用utf-8編碼),這是一種比unicode更快、更小的編碼(Spark1.2以後加入)。

>>> path = os.path.join(tempdir, "sample-text.txt")
>>> with open(path, "w") as testFile:
...    _ = testFile.write("Hello world!")
>>> textFile = sc.textFile(path)
>>> textFile.collect()
[u'Hello world!']

 

uiWebUrl

返回由SparkContext的SparkUI實例化開啟的URL。

union(rdds)

建立RDD列表的聯合。

支持不同序列化格式的RDD的unions()方法,需要使用預設的串列器將它們強制序列化(串列化):

>>> path = os.path.join(tempdir, "union-text.txt")
>>> with open(path, "w") as testFile:
...    _ = testFile.write("Hello")
>>> textFile = sc.textFile(path)
>>> textFile.collect()
[u'Hello']
>>> parallelized = sc.parallelize(["World!"])
>>> sorted(sc.union([textFile, parallelized]).collect())
[u'Hello', 'World!']

 

version

應用運行的Spark的版本。

wholeTextFiles(path, minPartitions=None, use_unicode=True)

讀取HDFS的文本文件的路徑,這是一個本地文件系統(所有節點可用),或者任何支持Hadoop的文件系統的URI。每個文件被當做一個獨立記錄來讀取,然後返回一個鍵值對,鍵為每個文件的路徑,值為每個文件的內容。

如果用戶use_unicode為False,則strings類型將為str(用utf-8編碼),這是一種比unicode更快、更小的編碼(Spark1.2以後加入)。

舉例說明,如果有如下文件:

hdfs://a-hdfs-path/part-00000
hdfs://a-hdfs-path/part-00001
...
hdfs://a-hdfs-path/part-nnnnn

如果執行 rdd = sparkContext.wholeTextFiles(“hdfs://a-hdfs-path”), 那麼rdd 包含:

(a-hdfs-path/part-00000, its content)
(a-hdfs-path/part-00001, its content)
...
(a-hdfs-path/part-nnnnn, its content)

註意

這種情況適合小文件,因為每個文件都會被載入到記憶體中。消耗很多記憶體啊!

>>> dirPath = os.path.join(tempdir, "files")
>>> os.mkdir(dirPath)
>>> with open(os.path.join(dirPath, "1.txt"), "w") as file1:
...    _ = file1.write("1")
>>> with open(os.path.join(dirPath, "2.txt"), "w") as file2:
...    _ = file2.write("2")
>>> textFiles = sc.wholeTextFiles(dirPath)
>>> sorted(textFiles.collect())
[(u'.../1.txt', u'1'), (u'.../2.txt', u'2')]

 

本篇接少了兩個類SparkContextSparkConf,下一篇將會介紹其餘的幾個類的內容,這是一篇彙總性質的文章主要便於以後使用時知道具體類中的方法調用為剛剛接觸Spark和我差不多人提供參考。還有理解不到位的請多多理解。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  •    受人所托,做一個類似於qq賬號信息里的一個動畫,感覺挺有意思,也沒感覺有多難,就開始做了,結果才發現學的數學知識都還給體育老師了,研究了大半天才做出來。    先看一下 "動畫效果" : 用到的知識點: (1)三角函數 (2)CALayer (3)CAT ...
  • 博客也開了挺長時間了,一直都沒有來寫博客,主要原因是自己懶~~~此篇博客算是給2017年一個好的開始,同時也給2016年畫上一個句點,不留遺憾。 那就讓我們正式進入今天的主題:貝塞爾曲線。 首先,讓我們來瞭解下什麼是貝塞爾曲線。 貝塞爾曲線(Bézier curve),又稱貝茲曲線或貝濟埃曲線,是應 ...
  • /*可以更改列的大小,只要將滑鼠指針懸停到該列標題的右邊界,再單擊並拖動該列邊界到合適的位置。雙擊右邊界使得該列自動調整大小如果選擇幾個單元,然後將其剪切並複製到其他網格,則這幾個單元可作為單獨的單元處理(如果選擇"以文本格式顯示結果"選項,剪切的數據會全部粘帖到一個單元格中)可以從多行只選擇一列或 ...
  • Java代碼 Java代碼 關註流行國外網站 facebook:http://www.fb-on.com facebook官網:http://www.facebookzh.com facebook:http://www.cn-face-book.com youtube:http://www.yout ...
  • AlwaysOn是在SQL Server 2012中新引入的一種高可用技術,從名稱中可以看出,AlwaysOn的設計目標是保持資料庫系統永遠可用。AlwaysOn利用了Windows伺服器故障轉移集群(Windows Server Failover Clustering,簡稱WSFC)的健康檢測和自 ...
  • /*僅返回一個結果集,且該結果只有很窄的幾列想要以單個文本文件來保存返回結果返回多個結果集,但該結果比較小,且不需要使用多個滾動條就可以在同一頁面上查看多個結果集。*/ ...
  • 一、前言 今天天氣很好,大晴天,心情也好好的。就將MySQL常用的語句總結一下,記錄在隨筆里,也順便分享分享。日後,這篇隨筆我將會持續更新,作為我自己的MySQL語句大全。 二、常用SQL語句 我將由外到里進行編寫(資料庫到表再到數據) 庫層: 1、SHOW DATABASES; 2、CREATE ...
  • 這是一本書的名字,叫做【Hadoop大數據分析與挖掘實戰】,我從2017.1開始學習 軟體版本為Centos6.4 64bit,VMware,Hadoop2.6.0,JDK1.7. 但是這本書的出版時間為2016.1,待到我2017.1使用時,一部分內容已經發生了翻天覆地的變化。 於是我開始寫這麼一... ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...