一、 RDD創建 從本地文件系統中載入數據創建RDD sc:SparkContext(shell自動創建) 本地文件系統中載入數據創建RDD Spark採用textFile()方法來從文件系統中載入數據創建RDD 該方法把文件的URI作為參數,這個URI可以是: 本地文件系統的地址 或者是分散式文件 ...
目錄
一、 RDD創建
從本地文件系統中載入數據創建RDD
-
sc:SparkContext(shell自動創建)
-
本地文件系統中載入數據創建RDD
Spark採用textFile()方法來從文件系統中載入數據創建RDD
該方法把文件的URI作為參數,這個URI可以是:
-
本地文件系統的地址
-
或者是分散式文件系統HDFS的地址
-
或者是Amazon S3的地址等等
-
從HDFS載入數據創建RDD
-
啟動hdfs
-
上傳文件
-
查看文件
-
spark內載入文件
textFile預設是讀hdfs,所以hdfs可以省略。
hdfs的預設目錄,前三條語句是完全等價的,可以使用其中任意一種方式
不是預設目錄,要還上路徑
-
停止hdfs
通過並行集合(列表)創建RDD
-
輸入列表、字元串、numpy生成數組
二、 RDD操作
轉換操作
對於RDD而言,每一次轉換操作都會產生新的RDD,供給下一個“轉換”使用
轉換得到的RDD是惰性求值的,也就是說,整個轉換過程只是記錄了轉換的軌跡,並不會發生真正的計算,只有遇到行動操作時,才會發生真正的計算,開始從血緣關係源頭開始,進行物理的轉換操作
操作 | 含義 |
---|---|
filter(func) | 篩選出滿足函數func的元素,並返回一個新的數據集 |
map(func) | 將每個元素傳遞到函數func中,並將結果返回為一個新的數據集 |
flatMap(func) | 與map()相似,但每個輸入元素都可以映射到0或多個輸出結果 |
groupByKey() | 應用於(K,V)鍵值對的數據集時,返回一個新的(K, Iterable)形式的數據集 |
reduceByKey(func) | 應用於(K,V)鍵值對的數據集時,返回一個新的(K, V)形式的數據集,其中每個值是將每個key傳遞到函數func中進行聚合後的結果 |
filter(func)
-
顯式定義函數
結果不明顯,換個關鍵詞
-
lambda函數
map(func)
-
字元串分詞
-
顯式定義函數
-
lambda函數
-
-
數字加100
-
顯式定義函數
-
lambda函數
-
-
字元串加固定首碼
-
顯式定義函數
-
lambda函數
-
flatMap(func)
-
分詞
-
單詞映射成鍵值對
reduceByKey()
-
統計詞頻,累加
-
乘法規則
groupByKey()
-
單詞分組
-
查看分組的內容
-
分組之後做累加 map
sortByKey()
-
詞頻統計按單詞排序
sortBy()
-
詞頻統計按詞頻排序
行動操作
行動操作是真正觸發計算的地方。Spark程式執行到行動操作時,才會執行真正的計算,從文件中載入數據,完成一次又一次轉換操作,最終,完成行動操作得到結果。
操作 | 含義 |
---|---|
count() | 返回數據集中的元素個數 |
collect() | 以數組的形式返回數據集中的所有元素 |
first() | 返回數據集中的第一個元素 |
take(n) | 以數組的形式返回數據集中的前n個元素 |
foreach(func) | 將數據集中的每個元素傳遞到函數func中運行 |
reduce(func) | 通過函數func(輸入兩個參數並返回一個值)聚合數據集中的元素 |
foreach(func)
-
foreach(print)
-
foreach(lambda a:print(a.upper())
collect()
count()
take(n)
reduce()
-
數值型的rdd元素做累加
-
與reduceByKey區別
reduceByKey(func)應用於(K,V)鍵值對的數據集時,返回一個新的(K, V)形式的數據集,其中的每個值是將每個key傳遞到函數func中進行聚合後得到的結果