# 使用PySpark ## 配置python環境 在所有節點上按照python3,版本必須是python3.6及以上版本 ```Shell yum install -y python3 ``` 修改所有節點的環境變數 ```Shell export JAVA_HOME=/usr/local/jdk ...
使用PySpark
配置python環境
在所有節點上按照python3,版本必須是python3.6及以上版本
yum install -y python3
修改所有節點的環境變數
export JAVA_HOME=/usr/local/jdk1.8.0_251
export PYSPARK_PYTHON=python3
export HADOOP_HOME=/bigdata/hadoop-3.2.1
export HADOOP_CONF_DIR=/bigdata/hadoop-3.2.1/etc/hadoop
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin
使用pyspark shell
/bigdata/spark-3.2.3-bin-hadoop3.2/bin/pyspark \
--master spark://node-1.51doit.cn:7077 \
--executor-memory 1g --total-executor-cores 10
在pyspark shell使用python編寫wordcount
sc.textFile("hdfs://node-1.51doit.cn:8020/data/wc").flatMap(lambda line: line.split(' ')).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).sortBy(lambda t: t[1], False).saveAsTextFile('hdfs://node-1.51doit.cn:8020/out01')
在pycharm中使用python編寫wordcount
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setAppName('WordCount').setMaster('local[*]')
sc = SparkContext(conf=conf)
lines = sc.textFile('file:///Users/star/Desktop/data.txt')
words = lines.flatMap(lambda line: line.split(' '))
wordAndOne = words.map(lambda word: (word, 1))
reduced = wordAndOne.reduceByKey(lambda x, y: x + y)
result = reduced.sortBy(lambda t: t[1], False)
print(result.collect())
RDD
RDD的全稱為Resilient Distributed Dataset,是一個彈性、可複原的分散式數據集,是Spark中最基本的抽象,是一個不可變的、有多個分區的、可以並行計算的集合。RDD中並不裝真正要計算的數據,而裝的是描述信息,描述以後從哪裡讀取數據,調用了用什麼方法,傳入了什麼函數,以及依賴關係等。
RDD的特點
• 有一系列連續的分區:分區編號從0開始,分區的數量決定了對應階段Task的並行度
• 有一個函數作用在每個輸入切片上或對應的分區上: 每一個分區都會生成一個Task,對該分區的數據進行計算,這個函數就是具體的計算邏輯
• RDD和RDD之間存在一系列依賴關係:RDD調用Transformation後會生成一個新的RDD,子RDD會記錄父RDD的依賴關係,包括寬依賴(有shuffle)和窄依賴(沒有shuffle)
• (可選的)K-V的RDD在Shuffle會有分區器,預設使用HashPartitioner
• (可選的)如果從HDFS中讀取數據,會有一個最優位置:spark在調度任務之前會讀取NameNode的元數據信息,獲取數據的位置,移動計算而不是移動數據,這樣可以提高計算效率。
RDD的運算元(方法)分類
• Transformation:即轉換運算元,調用轉換運算元會生成一個新的RDD,Transformation是Lazy的,不會觸發job執行。
• Action:行動運算元,調用行動運算元會觸發job執行,本質上是調用了sc.runJob方法,該方法從最後一個RDD,根據其依賴關係,從後往前,劃分Stage,生成TaskSet。
創建RDD的方法
• 從HDFS指定的目錄據創建RDD
val lines: RDD[String] = sc.textFile("hdfs://node-1.51doit.cn:9000/log")
• 通過並行化方式,將Driver端的集合轉成RDD
val rdd1: RDD[Int] = sc.parallelize(Array(1,2,3,4,5,6,7,8,9))
查看RDD的分區數量
val rdd1: RDD[Int] = sc.parallelize(Array(1,2,3,4,5,6,7,8,9))
rdd1.partitions.length