pyspark 內容介紹(一)

来源:http://www.cnblogs.com/wenBlog/archive/2017/01/20/6323678.html
-Advertisement-
Play Games

pyspark 包介紹 子包 pyspark.sql module pyspark.streaming module pyspark.ml package pyspark.mllib package 子包 pyspark.sql module pyspark.streaming module pys ...


 

pyspark 包介紹

子包

內容

PySpark是針對Spark的Python API。根據網上提供的資料,現在彙總一下這些類的基本用法,並舉例說明如何具體使用。也是總結一下經常用到的這些公有類的使用方式。方便初學者查詢及使用。

Public 類們:

  • SparkContext:

    Spark 功能的主入口。

  • RDD:

    彈性分散式數據集,就是在Spark中的基礎抽象

  • Broadcast:

    一個在task之間重用的廣播變數。

  • Accumulator:

    一個“add-only” 共用變數,task只能增加值。

  • SparkConf:

    用於配置Spark.

  • SparkFiles:

    在job中訪問文件。

  • StorageLevel:

    更細粒度的緩存持久化級別。

     

將分為兩篇介紹這些類的內容,這裡首先介紹SparkConf
1. class pyspark.SparkConf(loadDefaults=True, _jvm=None, _jconf=None)

配置一個Spark應用,一般用來設置各種Spark的鍵值對作為參數。

大多數時候,使用SparkConf()來創建SparkConf對象,也用於載入來自spark.* Java系統的屬性值。此時,在SparkConf對象上設置的任何參數都有高於系統屬性的優先順序。

對於單元測試,也能調用SparkConf(false)來略過額外的配置,無論系統屬性是什麼都可以獲得相同的配置。

這個類中的設值方法都是支持鏈式結構的,例如,你可以這樣編寫配置conf.setMaster(“local”).setAppName(“My app”)

註意:

一旦SparkConf對象被傳遞給Spark,它就被覆制並且不能被其他人修改。

contains(key)

配置中是否包含一個指定鍵。

get(key, defaultValue=None)

獲取配置的某些鍵值,或者返回預設值。

getAll()

得到所有的鍵值對的list。

set(key, value)

設置配置屬性。

setAll(pairs)

通過傳遞一個鍵值對的list,為多個參數賦值。

etAppName(value)

設置應用名稱

setExecutorEnv(key=None, value=None, pairs=None)

設置環境變數複製給執行器。

setIfMissing(key, value)

如果沒有,則設置一個配置屬性。

setMaster(value)

設置主連接地址。

setSparkHome(value)

設置工作節點上的Spark安裝路徑。

toDebugString()

返回一個可列印的配置版本。

2. class pyspark.SparkContext(master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=0, serializer=PickleSerializer(), conf=None, gateway=None, jsc=None, profiler_cls=<class 'pyspark.profiler.BasicProfiler'>)

Spark功能的主入口,SparkContext 代表到Spark 集群的連接,並且在集群上能創建RDD和broadcast。

PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar')
accumulator(value, accum_param=None)

用指定的初始化值創建一個Accumulator累加器。使用AccumulatorParam對象定義如何添加數據類型的值。預設AccumulatorParams為整型和浮點型。如果其他類型需要自定義。

addFile(path, recursive=False)

使用在每個節點上的Spark job添加文件下載。這裡path 參數可以使本地文件也可以使在HDFS中的文件,也可以是HTTP、HTTPS或者URI。

在Spark的job中訪問文件,使用L{SparkFiles.get(fileName)<pyspark.files.SparkFiles.get>}可以找到下載位置。

如果遞歸選項被設置為“TRUE”則路徑能被指定。當前路徑僅僅支持Hadoop文件系統。

 1 >>> from pyspark import SparkFiles
 2 >>> path = os.path.join(tempdir, "test.txt")
 3 >>> with open(path, "w") as testFile:
 4 ...    _ = testFile.write("100") 
 5 >>> sc.addFile(path)
 6 >>> def func(iterator):
 7 ...    with open(SparkFiles.get("test.txt")) as testFile:
 8 ...        fileVal = int(testFile.readline())
 9 ...        return [x * fileVal for x in iterator]
10 >>> sc.parallelize([1, 2, 3, 4]).mapPartitions(func).collect()
11 [100, 200, 300, 400]

 

addPyFile(path)

為所有將在SparkContext上執行的任務添加一個a.py或者.zip的附件。這裡path 參數可以使本地文件也可以使在HDFS中的文件,也可以是HTTP、HTTPS或者FTP URI。

applicationId

Spark應用的唯一ID,它的格式取決於調度器實現。

  • 本地模式下像這樣的ID‘local-1433865536131’
  • 模式下像這樣的ID‘application_1433865536131_34483’
>>> sc.applicationId  
u'local-...'

 

binaryFiles(path, minPartitions=None)

註意

  • 從HDFS上讀取二進位文件的路徑,本地文件系統(在所有節點上都可用),或者其他hadoop支持的文件系統URI黨組偶一個二進位數組。每個文件作為單獨的記錄,並且返回一個鍵值對,這個鍵就是每個文件的了路徑,值就是每個文件的內容。
  • 小文件優先選擇,大文件也可以,但是會引起性能問題。
binaryRecords(path, recordLength)
  • path – 輸入文件路徑
  • recordLength – 分割記錄的長度(位數)
註意

從平面二進位文件中載入數據,假設每個記錄都是一套指定數字格式的數字(ByteBuffer),並且每個記錄位數的數是恆定的。

broadcast(value)

廣播一個制度變數到集群,返回一個L{Broadcast<pyspark.broadcast.Broadcast>} 對象在分散式函數中讀取。這個變數將只發一次給每個集群。

cancelAllJobs()

取消所有已排程的或者正在運行的job。

cancelJobGroup(groupId)

 

取消指定組的已激活job,查看SparkContext.setJobGroup更多信息。

 

defaultMinPartitions

當不被用戶指定時,預設Hadoop RDDs 為最小分區。

defaultParallelism

當不被用戶指定時,預設並行級別執行。(例如reduce task)

dump_profiles(path)

轉存配置信息到目錄路徑下。

emptyRDD()

創建沒有分區或者元素的RDD。

getConf()
getLocalProperty(key)

在當前線程中得到一個本地設置屬性。

classmethod getOrCreate(conf=None)
參數:conf – SparkConf (optional)

獲取或者實例化一個SparkContext並且註冊為單例模式對象。

hadoopFile(path, inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0)、

用任意來自HDFS的鍵和值類讀取一個老的Hadoop輸入格式,本地系統(所有節點可用),或者任何支持Hadoop的文件系統的URI。這個機制是與sc.sequenceFile是一樣的。

Hadoop 配置可以作為Python的字典傳遞。這將被轉化成Java中的配置。

參數:

  • path – Hadoop文件路徑
  • inputFormatClass – 輸入的Hadoop文件的規範格式(例如 “org.apache.hadoop.mapred.TextInputFormat”)
  • keyClass – 可寫鍵類的合格類名 (例如“org.apache.hadoop.io.Text”)
  • valueClass –可寫值類的合格類名 (e.g. “org.apache.hadoop.io.LongWritable”)
  • keyConverter – (預設為none)
  • valueConverter – (預設為none)
  • conf – Hadoop配置,作為一個字典傳值 (預設為none)
  • batchSize – Python對象的數量代表一個單一的JAVA對象 (預設 0, 表示自動匹配batchSize)
hadoopRDD(inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0)

讀取Hadoop輸入格式用任意鍵值類。與上面的類相似。

參數:

  • inputFormatClass – 輸入的Hadoop文件的規範格式(例如 “org.apache.hadoop.mapred.TextInputFormat”)
  • keyClass – 可寫鍵類的合格類名 (例如“org.apache.hadoop.io.Text”)
  • valueClass –可寫值類的合格類名 (e.g. “org.apache.hadoop.io.LongWritable”)
  • keyConverter – (預設為none)
  • valueConverter – (預設為none)
  • conf – Hadoop配置,作為一個字典傳值 (預設為none)
  • batchSize – Python對象的數量代表一個單一的JAVA對象 (預設 0, 表示自動匹配batchSize)
newAPIHadoopFile(path, inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0)

與上面的功能類似.

newAPIHadoopRDD(inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0)

任意Hadoop的配置作為參數傳遞。

parallelize(c, numSlices=None)

分配一個本Python集合構成一個RDD。如果輸入代表了一個性能範圍,建議使用xrange。

>>> sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()
[[0], [2], [3], [4], [6]]
>>> sc.parallelize(xrange(0, 6, 2), 5).glom().collect()
[[], [0], [], [2], [4]]

 

pickleFile(name, minPartitions=None)

載入使用RDD.saveAsPickleFile方法保存的RDD。

>>> tmpFile = NamedTemporaryFile(delete=True)
>>> tmpFile.close()
>>> sc.parallelize(range(10)).saveAsPickleFile(tmpFile.name, 5)
>>> sorted(sc.pickleFile(tmpFile.name, 3).collect())
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

 

range(start, end=None, step=1, numSlices=None)

創建一個int類型元素組成的RDD,從開始值到結束(不包含結束),裡面都是按照步長增長的元素。這就要用到Python內置的函數range()。如果只有一個參數調用,這個參數就表示結束值,開始值預設為0.

參數:

  • start –起始值
  • end – 結束值(不包含)
  • step – 步長(預設: 1)
  • numSlices –RDD分區數量(切片數)

返回值:RDD

>>> sc.range(5).collect()
[0, 1, 2, 3, 4]
>>> sc.range(2, 4).collect()
[2, 3]
>>> sc.range(1, 7, 2).collect()
[1, 3, 5]

 

runJob(rdd, partitionFunc, partitions=None, allowLocal=False)

執行指定的partitionFunc 在指定的分區,返回一個元素數組。如果不指定分區,則將運行在所有分區上。

>>> myRDD = sc.parallelize(range(6), 3)
>>> sc.runJob(myRDD, lambda part: [x * x for x in part])
[0, 1, 4, 9, 16, 25]

>>> myRDD = sc.parallelize(range(6), 3)
>>> sc.runJob(myRDD, lambda part: [x * x for x in part], [0, 2], True)
[0, 1, 16, 25]

 


 
  • sequenceFile(path, keyClass=None, valueClass=None, keyConverter=None, valueConverter=None, minSplits=None, batchSize=0)
  • 讀取Hadoop 的SequenceFile,機制如下:

    1.一個Java RDD通過SequenceFile或者其他輸入格式創建,需要鍵值的可寫類參數。

    2.序列化

    3.如果失敗,則對每個鍵值調用‘toString’。

    4.在Python上,PickleSerializer用來反序列化。

參數:

path –序列化文件路徑

keyClass – 可用鍵類(例如 “org.apache.hadoop.io.Text”)

valueClass – 可用值類 (例如 “org.apache.hadoop.io.LongWritable”)

keyConverter

valueConverter

minSplits – 數據集最低分割數(預設 min(2, sc.defaultParallelism))

batchSize – 代表一個JAVA對象Python對象的數量 (預設0, 自動)

 

setCheckpointDir(dirName)

 

設定作為檢查點的RDD的目錄,如果運行在集群上,則目錄一定時HDFS路徑。

setJobGroup(groupId, description, interruptOnCancel=False)

分配一個組ID給所有被這個線程開啟的job。

通常,一個執行單位由多個Spark 的action或者job組成。應用程式可以將所有把所有job組成一個組,給一個組的描述。一旦設置好,Spark的web UI 將關聯job和組。

應用使用SparkContext.cancelJobGroup來取消組。

>>> import threading
>>> from time import sleep
>>> result = "Not Set"
>>> lock = threading.Lock()
>>> def map_func(x):
...     sleep(100)
...     raise Exception("Task should have been cancelled")
>>> def start_job(x):
...     global result
...     try:
...         sc.setJobGroup("job_to_cancel", "some description")
...         result = sc.parallelize(range(x)).map(map_func).collect()
...     except Exception as e:
...         result = "Cancelled"
...     lock.release()
>>> def stop_job():
...     sleep(5)
...     sc.cancelJobGroup("job_to_cancel")
>>> supress = lock.acquire()
>>> supress = threading.Thread(target=start_job, args=(10,)).start()
>>> supress = threading.Thread(target=stop_job).start()
>>> supress = lock.acquire()
>>> print(result)
Cancelled

 

如果對於job組,interruptOnCancel被設定為True,那麼那麼取消job將在執行線程中調用Thread.interrupt()。這對於確保任務實時停止是有作用的。但是預設情況下,HDFS可以通過標記節點為dead狀態來停止線程。

setLocalProperty(key, value)

設定本地影響提交工作的屬性,例如Spark 公平調度池。

setLogLevel(logLevel)

控制日誌級別。重寫任何用戶自定義的日誌設定。有效的日誌級別包括:ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN。

classmethod setSystemProperty(key, value)

設定Java系統屬性,例如spark.executor.memory,這一定要在實例化SparkContext之前被激活。

show_profiles()

列印配置信息到標準輸出。

sparkUser()

為運行SparkContext 的用戶獲得SPARK_USER

startTime

當SparkContext被髮起,則返回新的時間紀元。

statusTracker()

Return StatusTracker object

返回StatusTracker對象

stop()

關閉SparkContext。

textFile(name, minPartitions=None, use_unicode=True)

從HDFS中讀取一個text文件,本地文件系統(所有節點可用),或者任何支持Hadoop的文件系統的URI,然後返回一個字元串類型的RDD。

如果用戶use_unicode為False,則strings類型將為str(用utf-8編碼),這是一種比unicode更快、更小的編碼(Spark1.2以後加入)。

>>> path = os.path.join(tempdir, "sample-text.txt")
>>> with open(path, "w") as testFile:
...    _ = testFile.write("Hello world!")
>>> textFile = sc.textFile(path)
>>> textFile.collect()
[u'Hello world!']

 

uiWebUrl

返回由SparkContext的SparkUI實例化開啟的URL。

union(rdds)

建立RDD列表的聯合。

支持不同序列化格式的RDD的unions()方法,需要使用預設的串列器將它們強制序列化(串列化):

>>> path = os.path.join(tempdir, "union-text.txt")
>>> with open(path, "w") as testFile:
...    _ = testFile.write("Hello")
>>> textFile = sc.textFile(path)
>>> textFile.collect()
[u'Hello']
>>> parallelized = sc.parallelize(["World!"])
>>> sorted(sc.union([textFile, parallelized]).collect())
[u'Hello', 'World!']

 

version

應用運行的Spark的版本。

wholeTextFiles(path, minPartitions=None, use_unicode=True)

讀取HDFS的文本文件的路徑,這是一個本地文件系統(所有節點可用),或者任何支持Hadoop的文件系統的URI。每個文件被當做一個獨立記錄來讀取,然後返回一個鍵值對,鍵為每個文件的路徑,值為每個文件的內容。

如果用戶use_unicode為False,則strings類型將為str(用utf-8編碼),這是一種比unicode更快、更小的編碼(Spark1.2以後加入)。

舉例說明,如果有如下文件:

hdfs://a-hdfs-path/part-00000
hdfs://a-hdfs-path/part-00001
...
hdfs://a-hdfs-path/part-nnnnn

如果執行 rdd = sparkContext.wholeTextFiles(“hdfs://a-hdfs-path”), 那麼rdd 包含:

(a-hdfs-path/part-00000, its content)
(a-hdfs-path/part-00001, its content)
...
(a-hdfs-path/part-nnnnn, its content)

註意

這種情況適合小文件,因為每個文件都會被載入到記憶體中。消耗很多記憶體啊!

>>> dirPath = os.path.join(tempdir, "files")
>>> os.mkdir(dirPath)
>>> with open(os.path.join(dirPath, "1.txt"), "w") as file1:
...    _ = file1.write("1")
>>> with open(os.path.join(dirPath, "2.txt"), "w") as file2:
...    _ = file2.write("2")
>>> textFiles = sc.wholeTextFiles(dirPath)
>>> sorted(textFiles.collect())
[(u'.../1.txt', u'1'), (u'.../2.txt', u'2')]

 

本篇接少了兩個類SparkContextSparkConf,下一篇將會介紹其餘的幾個類的內容,這是一篇彙總性質的文章主要便於以後使用時知道具體類中的方法調用為剛剛接觸Spark和我差不多人提供參考。還有理解不到位的請多多理解。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  •    受人所托,做一個類似於qq賬號信息里的一個動畫,感覺挺有意思,也沒感覺有多難,就開始做了,結果才發現學的數學知識都還給體育老師了,研究了大半天才做出來。    先看一下 "動畫效果" : 用到的知識點: (1)三角函數 (2)CALayer (3)CAT ...
  • 博客也開了挺長時間了,一直都沒有來寫博客,主要原因是自己懶~~~此篇博客算是給2017年一個好的開始,同時也給2016年畫上一個句點,不留遺憾。 那就讓我們正式進入今天的主題:貝塞爾曲線。 首先,讓我們來瞭解下什麼是貝塞爾曲線。 貝塞爾曲線(Bézier curve),又稱貝茲曲線或貝濟埃曲線,是應 ...
  • /*可以更改列的大小,只要將滑鼠指針懸停到該列標題的右邊界,再單擊並拖動該列邊界到合適的位置。雙擊右邊界使得該列自動調整大小如果選擇幾個單元,然後將其剪切並複製到其他網格,則這幾個單元可作為單獨的單元處理(如果選擇"以文本格式顯示結果"選項,剪切的數據會全部粘帖到一個單元格中)可以從多行只選擇一列或 ...
  • Java代碼 Java代碼 關註流行國外網站 facebook:http://www.fb-on.com facebook官網:http://www.facebookzh.com facebook:http://www.cn-face-book.com youtube:http://www.yout ...
  • AlwaysOn是在SQL Server 2012中新引入的一種高可用技術,從名稱中可以看出,AlwaysOn的設計目標是保持資料庫系統永遠可用。AlwaysOn利用了Windows伺服器故障轉移集群(Windows Server Failover Clustering,簡稱WSFC)的健康檢測和自 ...
  • /*僅返回一個結果集,且該結果只有很窄的幾列想要以單個文本文件來保存返回結果返回多個結果集,但該結果比較小,且不需要使用多個滾動條就可以在同一頁面上查看多個結果集。*/ ...
  • 一、前言 今天天氣很好,大晴天,心情也好好的。就將MySQL常用的語句總結一下,記錄在隨筆里,也順便分享分享。日後,這篇隨筆我將會持續更新,作為我自己的MySQL語句大全。 二、常用SQL語句 我將由外到里進行編寫(資料庫到表再到數據) 庫層: 1、SHOW DATABASES; 2、CREATE ...
  • 這是一本書的名字,叫做【Hadoop大數據分析與挖掘實戰】,我從2017.1開始學習 軟體版本為Centos6.4 64bit,VMware,Hadoop2.6.0,JDK1.7. 但是這本書的出版時間為2016.1,待到我2017.1使用時,一部分內容已經發生了翻天覆地的變化。 於是我開始寫這麼一... ...
一周排行
    -Advertisement-
    Play Games
  • 示例項目結構 在 Visual Studio 中創建一個 WinForms 應用程式後,項目結構如下所示: MyWinFormsApp/ │ ├───Properties/ │ └───Settings.settings │ ├───bin/ │ ├───Debug/ │ └───Release/ ...
  • [STAThread] 特性用於需要與 COM 組件交互的應用程式,尤其是依賴單線程模型(如 Windows Forms 應用程式)的組件。在 STA 模式下,線程擁有自己的消息迴圈,這對於處理用戶界面和某些 COM 組件是必要的。 [STAThread] static void Main(stri ...
  • 在WinForm中使用全局異常捕獲處理 在WinForm應用程式中,全局異常捕獲是確保程式穩定性的關鍵。通過在Program類的Main方法中設置全局異常處理,可以有效地捕獲並處理未預見的異常,從而避免程式崩潰。 註冊全局異常事件 [STAThread] static void Main() { / ...
  • 前言 給大家推薦一款開源的 Winform 控制項庫,可以幫助我們開發更加美觀、漂亮的 WinForm 界面。 項目介紹 SunnyUI.NET 是一個基於 .NET Framework 4.0+、.NET 6、.NET 7 和 .NET 8 的 WinForm 開源控制項庫,同時也提供了工具類庫、擴展 ...
  • 說明 該文章是屬於OverallAuth2.0系列文章,每周更新一篇該系列文章(從0到1完成系統開發)。 該系統文章,我會儘量說的非常詳細,做到不管新手、老手都能看懂。 說明:OverallAuth2.0 是一個簡單、易懂、功能強大的許可權+可視化流程管理系統。 有興趣的朋友,請關註我吧(*^▽^*) ...
  • 一、下載安裝 1.下載git 必須先下載並安裝git,再TortoiseGit下載安裝 git安裝參考教程:https://blog.csdn.net/mukes/article/details/115693833 2.TortoiseGit下載與安裝 TortoiseGit,Git客戶端,32/6 ...
  • 前言 在項目開發過程中,理解數據結構和演算法如同掌握蓋房子的秘訣。演算法不僅能幫助我們編寫高效、優質的代碼,還能解決項目中遇到的各種難題。 給大家推薦一個支持C#的開源免費、新手友好的數據結構與演算法入門教程:Hello演算法。 項目介紹 《Hello Algo》是一本開源免費、新手友好的數據結構與演算法入門 ...
  • 1.生成單個Proto.bat內容 @rem Copyright 2016, Google Inc. @rem All rights reserved. @rem @rem Redistribution and use in source and binary forms, with or with ...
  • 一:背景 1. 講故事 前段時間有位朋友找到我,說他的窗體程式在客戶這邊出現了卡死,讓我幫忙看下怎麼回事?dump也生成了,既然有dump了那就上 windbg 分析吧。 二:WinDbg 分析 1. 為什麼會卡死 窗體程式的卡死,入口門檻很低,後續往下分析就不一定了,不管怎麼說先用 !clrsta ...
  • 前言 人工智慧時代,人臉識別技術已成為安全驗證、身份識別和用戶交互的關鍵工具。 給大家推薦一款.NET 開源提供了強大的人臉識別 API,工具不僅易於集成,還具備高效處理能力。 本文將介紹一款如何利用這些API,為我們的項目添加智能識別的亮點。 項目介紹 GitHub 上擁有 1.2k 星標的 C# ...