一、Spark SQL概述 Spark SQL是Spark用來處理結構化數據的一個模塊,它提供了兩個編程抽象叫做DataFrame和DataSet並且作為分散式SQL查詢引擎的作用,其實也是對RDD的再封裝。大數據Hadoop之——計算引擎Spark,官方文檔:https://spark.apach ...
目錄
- 一、Spark SQL概述
- 二、SparkSQL版本
- 三、RDD、DataFrames和DataSet
- 四、RDD、DataFrame和DataSet的共性與區別
- 五、spark-shell
- 六、SparkSQL和Hive的集成(Spark on Hive)
- 七、Spark beeline
- 八、Spark Streaming
一、Spark SQL概述
Spark SQL是Spark用來處理結構化數據的一個模塊,它提供了兩個編程抽象叫做DataFrame和DataSet並且作為分散式SQL查詢引擎的作用,其實也是對RDD的再封裝。大數據Hadoop之——計算引擎Spark,官方文檔:https://spark.apache.org/sql/
二、SparkSQL版本
1)SparkSQL的演變之路
-
1.0以前: Shark(入口:SQLContext和HiveContext)
- SQLContext:主要DataFrame的構建以及DataFrame的執行,SQLContext指的是spark中SQL模塊的程式入口。
- HiveContext:是SQLContext的子類,專門用於與Hive的集成,比如讀取Hive的元數據,數據存儲到Hive表、Hive的視窗分析函數等。
-
1.1.x開始:SparkSQL(只是測試性的)
-
1.3.x: SparkSQL(正式版本)+Dataframe
-
1.5.x: SparkSQL 鎢絲計劃
-
1.6.x: SparkSQL+DataFrame+DataSet(測試版本)
-
2.x:
- 入口:SparkSession(spark應用程式的一個整體入口),合併了SQLContext和HiveContext
- SparkSQL+DataFrame+DataSet(正式版本)
- Spark Streaming-》Structured Streaming(DataSet)
2)shark與SparkSQL對比
- shark
- 執行計劃優化完全依賴於Hive,不方便添加新的優化策略;
- Spark是線程級並行,而MapReduce是進程級並行。
- Spark在相容Hive的實現上存線上程安全問題,導致Shark
不得不使用另外一套獨立維護的打了補丁的Hive源碼分支;
- Spark SQL
- 作為Spark生態的一員繼續發展,而不再受限於Hive,
- 只是相容Hive;Hive on Spark作為Hive的底層引擎之一
- Hive可以採用Map-Reduce、Tez、Spark等引擎
3)SparkSession
- SparkSession是Spark 2.0引如的新概念。SparkSession為用戶提供了統一的切入點,來讓用戶學習spark的各項功能。
- 在spark的早期版本中,SparkContext是spark的主要切入點,由於RDD是主要的API,我們通過sparkcontext來創建和操作RDD。對於每個其他的API,我們需要使用不同的context。
【例如】對於Streming,我們需要使用StreamingContext;對於sql,使用sqlContext;對於Hive,使用hiveContext。但是隨著DataSet和DataFrame的API逐漸成為標準的API,就需要為他們建立接入點。所以在spark2.0中,引入SparkSession作為DataSet和DataFrame API的切入點,SparkSession封裝了SparkConf、SparkContext和SQLContext。為了向後相容,SQLContext和HiveContext也被保存下來。
- SparkSession實質上是SQLContext和HiveContext的組合(未來可能還會加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同樣是可以使用的。SparkSession內部封裝了sparkContext,所以計算實際上是由sparkContext完成的,在spark 2.x中不推薦使用SparkContext對象讀取數據,而是推薦SparkSession。
三、RDD、DataFrames和DataSet
1)三者關聯關係
DataFrame 和 DataSet 是 Spark SQL 提供的基於 RDD 的結構化數據抽象。它既有 RDD 不可變、分區、存儲依賴關係等特性,又擁有類似於關係型資料庫的結構化信息。所以,基於 DataFrame 和 DataSet API 開發出的程式會被自動優化,使得開發人員不需要操作底層的 RDD API 來進行手動優化,大大提升開發效率。但是 RDD API 對於非結構化的數據處理有獨特的優勢,比如文本流數據,而且更方便我們做底層的操作。
1)RDD
RDD(Resilient Distributed Dataset)叫做彈性分散式數據集,是Spark中最基本的數據抽象,它代表一個不可變、可分區、裡面的元素可並行計算的集合。RDD具有數據流模型的特點:自動容錯、位置感知性調度和可伸縮性。RDD允許用戶在執行多個查詢時顯式地將工作集緩存在記憶體中,後續的查詢能夠重用工作集,這極大地提升了查詢速度。
1、核心概念
-
一組分片(Partition):即數據集的基本組成單位。對於RDD來說,每個分片都會被一個計算任務處理,並決定並行計算的粒度。用戶可以在創建RDD時指定RDD的分片個數,如果沒有指定,那麼就會採用預設值。預設值就是程式所分配到的CPU Core的數目。
-
一個計算每個分區的函數。Spark中RDD的計算是以分片為單位的,每個RDD都會實現compute函數以達到這個目的。compute函數會對迭代器進行複合,不需要保存每次計算的結果。
-
RDD之間的依賴關係:RDD的每次轉換都會生成一個新的RDD,所以RDD之間就會形成類似於流水線一樣的前後依賴關係。在部分分區數據丟失時,Spark可以通過這個依賴關係重新計算丟失的分區數據,而不是對RDD的所有分區進行重新計算。
-
一個Partitioner:即RDD的分片函數。當前Spark中實現了兩種類型的分片函數,一個是基於哈希的HashPartitioner,另外一個是基於範圍的RangePartitioner。只有對於於key-value的RDD,才會有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函數不但決定了RDD本身的分片數量,也決定了parent RDD Shuffle輸出時的分片數量。
-
一個列表:存儲存取每個Partition的優先位置(preferred location)。對於一個HDFS文件來說,這個列表保存的就是每個Partition所在的塊的位置。按照“移動數據不如移動計算”的理念,Spark在進行任務調度的時候,會儘可能地將計算任務分配到其所要處理數據塊的存儲位置。
2、RDD簡單操作
啟動spark-shell,其實spark-shell低層也是調用spark-submit,首先需要配置好,當然也可以寫在命令行,但是不推薦。配置如下,僅供參考(這裡使用yarn模式):
$ cat spark-defaults.conf
啟動spark-shell(下麵會詳解講解)
$ spark-shell
【問題】發現有個WARN:WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
【原因】是因為Spark提交任務到yarn集群,需要上傳相關spark的jar包到HDFS。
【解決】 提前上傳到HDFS集群,並且在Spark配置文件指定文件路徑,就可以避免每次提交任務到Yarn都需要重覆上傳文件。下麵是解決的具體操作步驟:
### 打包jars,jar相關的參數說明
#-c 創建一個jar包
# -t 顯示jar中的內容列表
#-x 解壓jar包
#-u 添加文件到jar包中
#-f 指定jar包的文件名
#-v 生成詳細的報造,並輸出至標準設備
#-m 指定manifest.mf文件.(manifest.mf文件中可以對jar包及其中的內容作一些一設置)
#-0 產生jar包時不對其中的內容進行壓縮處理
#-M 不產生所有文件的清單文件(Manifest.mf)。這個參數與忽略掉-m參數的設置
#-i 為指定的jar文件創建索引文件
#-C 表示轉到相應的目錄下執行jar命令,相當於cd到那個目錄,然後不帶-C執行jar命令
$ cd /opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2
$ jar cv0f spark-libs.jar -C ./jars/ .
$ ll
### 在hdfs上創建存放jar包目錄
$ hdfs dfs -mkdir -p /spark/jars
## 上傳jars到HDFS
$ hdfs dfs -put spark-libs.jar /spark/jars/
## 增加配置spark-defaults.conf
spark.yarn.archive=hdfs:///spark/jars/spark-libs.jar
然後再啟動spark-shell
在Spark Shell中,有一個專有的SparkContext已經為您創建好了,變數名叫做sc,自己創建的SparkContext將無法工作。
$ spark-shell
### 由一個已經存在的Scala集合創建。
val array = Array(1,2,3,4,5)
# spark使用parallelize方法創建RDD
val rdd = sc.parallelize(array)
這裡只是簡單的創建RDD操作,後面會有更多RDD相關的演示操作。
3、RDD API
Spark支持兩個類型(運算元)操作:Transformation和Action
1)Transformation
主要做的是就是將一個已有的RDD生成另外一個RDD。Transformation具有lazy特性(延遲載入)。Transformation運算元的代碼不會真正被執行。只有當我們的程式裡面遇到一個action運算元的時候,代碼才會真正的被執行。這種設計讓Spark更加有效率地運行。
常用的Transformation:
轉換 | 含義 |
---|---|
map(func) | 返回一個新的RDD,該RDD由每一個輸入元素經過func函數轉換後組成 |
filter(func) | 返回一個新的RDD,該RDD由經過func函數計算後返回值為true的輸入元素組成 |
flatMap(func) | 類似於map,但是每一個輸入元素可以被映射為0或多個輸出元素(所以func應該返回一個序列,而不是單一元素) |
mapPartitions(func) | 類似於map,但獨立地在RDD的每一個分片上運行,因此在類型為T的RDD上運行時,func的函數類型必須是Iterator[T] => Iterator[U] |
mapPartitionsWithIndex(func) | 類似於mapPartitions,但func帶有一個整數參數表示分片的索引值,因此在類型為T的RDD上運行時,func的函數類型必須是(Int, Interator[T]) => Iterator[U] |
sample(withReplacement, fraction, seed) | 根據fraction指定的比例對數據進行採樣,可以選擇是否使用隨機數進行替換,seed用於指定隨機數生成器種子 |
union(otherDataset) | 對源RDD和參數RDD求並集後返回一個新的RDD |
intersection(otherDataset) | 對源RDD和參數RDD求交集後返回一個新的RDD |
distinct([numTasks])) | 對源RDD進行去重後返回一個新的RDD |
groupByKey([numTasks]) | 在一個(K,V)的RDD上調用,返回一個(K, Iterator[V])的RDD |
reduceByKey(func, [numTasks]) | 在一個(K,V)的RDD上調用,返回一個(K,V)的RDD,使用指定的reduce函數,將相同key的值聚合到一起,與groupByKey類似,reduce任務的個數可以通過第二個可選的參數來設置 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 先按分區聚合 再總的聚合 每次要跟初始值交流 例如:aggregateByKey(0)(+,+) 對k/y的RDD進行操作 |
sortByKey([ascending], [numTasks]) | 在一個(K,V)的RDD上調用,K必須實現Ordered介面,返回一個按照key進行排序的(K,V)的RDD |
sortBy(func,[ascending], [numTasks]) | 與sortByKey類似,但是更靈活 第一個參數是根據什麼排序 第二個是怎麼排序 false倒序 第三個排序後分區數 預設與原RDD一樣 |
join(otherDataset, [numTasks]) | 在類型為(K,V)和(K,W)的RDD上調用,返回一個相同key對應的所有元素對在一起的(K,(V,W))的RDD 相當於內連接(求交集) |
cogroup(otherDataset, [numTasks]) | 在類型為(K,V)和(K,W)的RDD上調用,返回一個(K,(Iterable |
cartesian(otherDataset) | 兩個RDD的笛卡爾積 的成很多個K/V |
pipe(command, [envVars]) | 調用外部程式 |
coalesce(numPartitions) | 重新分區 第一個參數是要分多少區,第二個參數是否shuffle 預設false 少分區變多分區 true 多分區變少分區 false |
repartition(numPartitions) | |
重新分區 必須shuffle 參數是要分多少區 少變多 | |
repartitionAndSortWithinPartitions(partitioner) | 重新分區+排序 比先分區再排序效率高 對K/V的RDD進行操作 |
foldByKey(zeroValue)(seqOp) | 該函數用於K/V做摺疊,合併處理 ,與aggregate類似 第一個括弧的參數應用於每個V值 第二括弧函數是聚合例如:+ |
combineByKey | 合併相同的key的值 rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n) |
partitionBy(partitioner) | 對RDD進行分區 partitioner是分區器 例如new HashPartition(2) |
cache/persist | RDD緩存,可以避免重覆計算從而減少時間,區別:cache內部調用了persist運算元,cache預設就一個緩存級別MEMORY-ONLY ,而persist則可以選擇緩存級別 |
Subtract(rdd) | 返回前rdd元素不在後rdd的rdd |
leftOuterJoin | leftOuterJoin類似於SQL中的左外關聯left outer join,返回結果以前面的RDD為主,關聯不上的記錄為空。只能用於兩個RDD之間的關聯,如果要多個RDD關聯,多關聯幾次即可。 |
rightOuterJoin | rightOuterJoin類似於SQL中的有外關聯right outer join,返回結果以參數中的RDD為主,關聯不上的記錄為空。只能用於兩個RDD之間的關聯,如果要多個RDD關聯,多關聯幾次即可 |
subtractByKey | substractByKey和基本轉換操作中的subtract類似只不過這裡是針對K的,返回在主RDD中出現,並且不在otherRDD中出現的元素 |
2)Action
觸發代碼的運行,我們一段spark代碼裡面至少需要有一個action操作。
常用的Action:
動作 | 含義 |
---|---|
reduce(func) | 通過func函數聚集RDD中的所有元素,這個功能必須是課交換且可並聯的 |
collect() | 在驅動程式中,以數組的形式返回數據集的所有元素 |
count() | 返回RDD的元素個數 |
first() | 返回RDD的第一個元素(類似於take(1)) |
take(n) | 返回一個由數據集的前n個元素組成的數組 |
takeSample(withReplacement,num, [seed]) | 返回一個數組,該數組由從數據集中隨機採樣的num個元素組成,可以選擇是否用隨機數替換不足的部分,seed用於指定隨機數生成器種子 |
takeOrdered(n, [ordering]) | 返回原RDD排序(預設升序排)後,前n個元素組成的數組 |
saveAsTextFile(path) | 將數據集的元素以textfile的形式保存到HDFS文件系統或者其他支持的文件系統,對於每個元素,Spark將會調用toString方法,將它裝換為文件中的文本 |
saveAsSequenceFile(path) | 將數據集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可以使HDFS或者其他Hadoop支持的文件系統。 |
saveAsObjectFile(path) | saveAsObjectFile用於將RDD中的元素序列化成對象,存儲到文件中。使用方法和saveAsTextFile類似 |
countByKey() | 針對(K,V)類型的RDD,返回一個(K,Int)的map,表示每一個key對應的元素個數。 |
foreach(func) | 在數據集的每一個元素上,運行函數func進行更新。 |
aggregate | 先對分區進行操作,在總體操作 |
reduceByKeyLocally | 返回一個 dict 對象,同樣是將同 key 的元素進行聚合 |
lookup | lookup用於(K,V)類型的RDD,指定K值,返回RDD中該K對應的所有V值。 |
top | top函數用於從RDD中,按照預設(降序)或者指定的排序規則,返回前num個元素。 |
fold | fold是aggregate的簡化,將aggregate中的seqOp和combOp使用同一個函數op。 |
foreachPartition | 遍歷原RDD元素經過func函數運算過後的結果集,foreachPartition運算元分區操作 |
4、實戰操作
1、針對各個元素的轉化操作
我們最常用的轉化操作應該是map() 和filter(),轉化操作map() 接收一個函數,把這個函數用於RDD 中的每個元素,將函數的返回結果作為結果RDD 中對應元素的值。而轉化操作filter() 則接收一個函數,並將RDD 中滿足該函數的元素放入新的RDD 中返回。
讓我們看一個簡單的例子,用map() 對RDD 中的所有數求平方
# 通過parallelize創建RDD對象
val input = sc.parallelize(List(1, 2, 3, 4))
val result = input.map(x => x * x)
println(result.collect().mkString(","))
2、對一個數據為{1,2,3,3}的RDD進行基本的RDD轉化操作(去重)
var rdd = sc.parallelize(List(1,2,3,3))
rdd.distinct().collect().mkString(",")
3、對數據分別為{1,2,3}和{3,4,5}的RDD進行針對兩個RDD的轉化操作
var rdd = sc.parallelize(List(1,2,3))
var other = sc.parallelize(List(3,4,5))
# 生成一個包含兩個RDD中所有元素的RDD
rdd.union(other).collect().mkString(",")
# 求兩個RDD共同的元素RDD
rdd.intersection(other).collect().mkString(",")
4、行動操作
行動操作reduce(),它接收一個函數作為參數,這個函數要操作兩個RDD 的元素類型的數據並返回一個同樣類型的新元素。一個簡單的例子就是函數+,可以用它來對我們的RDD 進行累加。使用reduce(),可以很方便地計算出RDD中所有元素的總和、元素的個數,以及其他類型的聚合操作。
var rdd = sc.parallelize(List(1,2,3,4,5,6,7))
# 求和
var sum = rdd.reduce((x, y) => x + y)
# 求元素個數
var sum = rdd.count()
# 聚合操作
var rdd = sc.parallelize(List(1,2,3,4,5,6,7))
var result = rdd.aggregate((0,0))((acc,value) => (acc._1 + value,acc._2 + 1),(acc1,acc2) => (acc1._1 + acc2._1 , acc1._2 + acc2._2))
var avg = result._1/result._2.toDouble
這裡只是演示幾個簡單的示例,更多RDD的操作,可以參考官方文檔學習哦。
2)DataFrames
在Spark中,DataFrame提供了一個領域特定語言(DSL)和SQL來操作結構化數據,DataFrame是一種以RDD為基礎的分散式數據集,類似於傳統資料庫中的二維表格。
- RDD,由於無從得知所存數據元素的具體內部結構,Spark Core只能在stage層面進行簡單、通用的流水線優化。
- DataFrame底層是以RDD為基礎的分散式數據集,和RDD的主要區別的是:RDD中沒有schema信息,而DataFrame中數據每一行都包含schema。DataFrame = RDD + shcema
1、DSL風格語法操作
1)DataFrame創建
創建DataFrame的兩種基本方式:
- 已存在的RDD調用toDF()方法轉換得到DataFrame。
- 通過Spark讀取數據源直接創建DataFrame。
直接創建DataFarme對象
若使用SparkSession方式創建DataFrame,可以使用spark.read從不同類型的文件中載入數據創建DataFrame。spark.read的具體操作,如下所示。
方法名 | 描述 |
---|---|
spark.read.text(“people.txt”) | 讀取txt格式文件,創建DataFrame |
spark.read.csv (“people.csv”) | 讀取csv格式文件,創建DataFrame |
spark.read.text(“people.json”) | 讀取json格式文件,創建DataFrame |
spark.read.text(“people.parquet”) | 讀取parquet格式文件,創建DataFrame |
1、在本地創建一個person.txt文本文檔,用於讀取:運行spark-shell:
# person.txt,Name,Age,Height
p1_name,18,165
p2_name,19,170
p3_name,20,188
p4_name,21,190
# 啟動spark shell,預設會創建一個spark名稱的spark session對象
$ spark-shell
# 定義變數,【註意】所有節點都得創建這個person文件,要不然調度沒有這個文件的機器會報錯
var inputFile = "file:///opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2/test/person.txt"
# 讀取本地文件
val personDF = spark.read.text("file:///opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2/test/person.txt")
val personDF = spark.read.text(inputFile)
# 顯示
personDF.show()
# 將文件put到hdfs上
# 讀取hdfs文件(推薦)
val psersonDF = spark.read.text("hdfs:///person.txt")
2、有RDD轉換成DataFrame
動作 | 含義 |
---|---|
show() | 查看DataFrame中的具體內容信息 |
printSchema() | 查看DataFrame的Schema信息 |
select() | 查看DataFrame中選取部分列的數據及進行重命名 |
filter() | 實現條件查詢,過濾出想要的結果 |
groupBy() | 對記錄進行分組 |
sort() | 對特定欄位進行排序操作 |
toDF() | 把RDD數據類型轉成DataFarme |
# 讀取文本文檔,按逗號分割開來
val lineRDD = sc.textFile("hdfs:///person.txt").map(_.split(","))
case class Person(name:String, age:Int, height:Int)
# 按照樣式類對RDD數據進行分割成map
val personRDD = lineRDD.map(x => Person(x(0).toString, x(1).toInt, x(2).toInt))
# 把RDD數據類型轉成DataFarme
val personDF = personRDD.toDF()
# 查看這個表
personDF.show()
# 查看Schema數據
personDF.printSchema()
# 查看列
personDF.select(personDF.col("name")).show
# 過濾年齡小於25的
personDF.filter(col("age") >= 25).show
這裡提供常用的spark dataframe方法:
方法名 | 含義 |
---|---|
collect() | 返回值是一個數組,返回dataframe集合所有的行 |
collectAsList() | 返回值是一個java類型的數組,返回dataframe集合所有的行 |
count() | 返回一個number類型的,返回dataframe集合的行數 |
describe(cols: String*) | 返回一個通過數學計算的類表值(count, mean, stddev, min, and max),這個可以傳多個參數,中間用逗號分隔,如果有欄位為空,那麼不參與運算,只這對數值類型的欄位。例如df.describe("age", "height").show() |
first() | 返回第一行 ,類型是row類型 |
head() | 返回第一行 ,類型是row類型 |
head(n:Int) | 返回n行 ,類型是row 類型 |
show() | 返回dataframe集合的值 預設是20行,返回類型是unit |
show(n:Int) | 返回n行,返回值類型是unit |
table(n:Int) | 返回n行 ,類型是row 類型 |
cache() | 同步數據的記憶體 |
columns | 返回一個string類型的數組,返回值是所有列的名字 |
dtypes | 返回一個string類型的二維數組,返回值是所有列的名字以及類型 |
explan() | 列印執行計劃 物理的 |
explain(n:Boolean) | 輸入值為 false 或者true ,返回值是unit 預設是false ,如果輸入true 將會列印 邏輯的和物理的 |
isLocal | 返回值是Boolean類型,如果允許模式是local返回true 否則返回false |
persist(newlevel:StorageLevel) | 返回一個dataframe.this.type 輸入存儲模型類型 |
printSchema() | 列印出欄位名稱和類型 按照樹狀結構來列印 |
registerTempTable(tablename:String) | 返回Unit ,將df的對象只放在一張表裡面,這個表隨著對象的刪除而刪除了 |
schema | 返回structType 類型,將欄位名稱和類型按照結構體類型返回 |
toDF() | 返回一個新的dataframe類型的 |
toDF(colnames:String*) | 將參數中的幾個欄位返回一個新的dataframe類型的 |
unpersist() | 返回dataframe.this.type 類型,去除模式中的數據 |
unpersist(blocking:Boolean) | 返回dataframe.this.type類型 true 和unpersist是一樣的作用false 是去除RDD |
agg(expers:column*) | 返回dataframe類型 ,同數學計算求值 |
agg(exprs: Map[String, String]) | 返回dataframe類型 ,同數學計算求值 map類型的 |
agg(aggExpr: (String, String), aggExprs: (String, String)*) | 返回dataframe類型 ,同數學計算求值 |
apply(colName: String) | 返回column類型,捕獲輸入進去列的對象 |
as(alias: String) | 返回一個新的dataframe類型,就是原來的一個別名 |
col(colName: String) | 返回column類型,捕獲輸入進去列的對象 |
cube(col1: String, cols: String*) | 返回一個GroupedData類型,根據某些欄位來彙總 |
distinct | 去重 返回一個dataframe類型 |
drop(col: Column) | 刪除某列 返回dataframe類型 |
dropDuplicates(colNames: Array[String]) | 刪除相同的列 返回一個dataframe |
except(other: DataFrame) | 返回一個dataframe,返回在當前集合存在的在其他集合不存在的 |
filter(conditionExpr: String) | 刷選部分數據,返回dataframe類型 |
groupBy(col1: String, cols: String*) | 根據某寫欄位來彙總返回groupedate類型 |
intersect(other: DataFrame) | 返回一個dataframe,在2個dataframe都存在的元素 |
join(right: DataFrame, joinExprs: Column, joinType: String) | 一個是關聯的dataframe,第二個關聯的條件,第三個關聯的類型:inner, outer, left_outer, right_outer, leftsemi |
limit(n: Int) | 返回dataframe類型 去n 條數據出來 |
orderBy(sortExprs: Column*) | 做alise排序 |
sort(sortExprs: Column*) | 排序 df.sort(df("age").desc).show(); 預設是asc |
select(cols:string*) | dataframe 做欄位的刷選 df.select($"colA", $"colB" + 1) |
withColumnRenamed(existingName: String, newName: String) | 修改列表 df.withColumnRenamed("name","names").show(); |
withColumn(colName: String, col: Column) | 增加一列 df.withColumn("aa",df("name")).show(); |
這裡已經列出了很多常用方法了,基本上涵蓋了大部分操作,當然也可以參考官方文檔
2、SQL風格語法操作
DataFrame的一個強大之處就是我們可以將它看作是一個關係型數據表,然後可以通過在程式中使用spark.sql() 來執行SQL查詢,結果將作為一個DataFrame返回。因為spark session包含了Hive Context,所以spark.sql() 會自動啟動連接hive,預設模式就是hive里的local模式(內嵌derby)
啟動spark-shell
$ spark-shell
會在執行spark-shell當前目錄下生成兩個文件:derby.log,metastore_db
接下來就可以happy的寫sql了,這裡就演示幾個命令,跟之前的hive一樣,把sql語句放在spark.sql()方法里執行即可,不清楚hive sql的可以參考我之前的文章:大數據Hadoop之——數據倉庫Hive
# 有個預設default庫
$ spark.sql("show databases").show
# 預設當前庫是default
$ spark.sql("show tables").show
通過spark-sql啟動spark shell
操作就更像sql語法了,已經跟hive差不多了。接下來演示幾個命令,大家就很清楚了。
$ spark-sql
show databases;
create database test007
同樣也會在當前目錄下自動創建兩個文件:derby.log,metastore_db
3)DataSet
DataSet是分散式的數據集合,Dataset提供了強類型支持,也是在RDD的每行數據加了類型約束。DataSet是在Spark1.6中添加的新的介面。它集中了RDD的優點(強類型和可以用強大lambda函數)以及使用了Spark SQL優化的執行引擎。DataSet可以通過JVM的對象進行構建,可以用函數式的轉換(map/flatmap/filter)進行多種操作。
1、通過spark.createDataset通過集合進行創建dataSet
val ds1 = spark.createDataset(1 to 10)
ds1.show
2、從已經存在的rdd當中構建dataSet
val ds2 = spark.createDataset(sc.textFile("hdfs:////person.txt"))
3、通過樣例類配合創建DataSet
case class Person(name:String,age:Int)
val personDataList = List(Person("zhangsan",18),Person("lisi",28))
val personDS = personDataList.toDS
personDS.show
4、通過DataFrame轉化生成
Music.json文件內容如下:
{"name":"上海灘","singer":"葉麗儀","album":"香港電視劇主題歌","path":"mp3/shanghaitan.mp3"}
{"name":"一生何求","singer":"陳百強","album":"香港電視劇主題歌","path":"mp3/shanghaitan.mp3"}
{"name":"紅日","singer":"李克勤","album":"懷舊專輯","path":"mp3/shanghaitan.mp3"}
{"name":"愛如潮水","singer":"張信哲","album":"懷舊專輯","path":"mp3/airucaoshun.mp3"}
{"name":"紅茶館","singer":"陳惠嫻","album":"懷舊專輯","path":"mp3/redteabar.mp3"}
case class Music(name:String,singer:String,album:String,path:String)
# 註意把test.json傳到hdfs上
val jsonDF = spark.read.json("hdfs:///Music.json")
val jsonDS = jsonDF.as[Music]
jsonDS.show
RDD,DataFrame,DataSet互相轉化
四、RDD、DataFrame和DataSet的共性與區別
-
RDD[Person]:以Person為類型參數,但不瞭解 其內部結構。
-
DataFrame:提供了詳細的結構信息schema(結構)列的名稱和類型。這樣看起來就像一張表了
-
DataSet[Person]:不光有schema(結構)信息,還有類型信息
1)共性
- 三者都是spark平臺下的分散式彈性數據集,為處理超大型數據提供便利
- 三者都有惰性機制。在創建時、轉換時(如map)不會立即執行,只有在遇到action運算元的時候(比如foreach),才開始進行觸發計算。極端情況下,如果代碼中只有創建、轉換,但是沒有在後面的action中使用對應的結果,在執行時會被跳過。
- 三者都有partition的概念,都有緩存(cache)的操作,還可以進行檢查點操作(checkpoint)
- 三者都有許多共同的函數(如map、filter,sorted等等)。
在對DataFrame和DataSet操作的時候,大多數情況下需要引入隱式轉換(ssc.implicits._)
2)區別
- DataFrame:DataFrame是DataSet的特例,也就是說DataSet[Row]的別名;DataFrame = RDD + schema
- DataFrame的每一行的固定類型為Row,只有通過解析才能獲得各個欄位的值
- DataFrame與DataSet通常與spark ml同時使用
- DataFrame與DataSet均支持sparkSql操作,比如select,groupby等,也可以註冊成臨時表,進行sql語句操作
- DataFrame與DateSet支持一些方便的保存方式,比如csv,可以帶上表頭,這樣每一列的欄位名就可以一目瞭然
- DataSet:DataSet = RDD + case class
- DataSet與DataFrame擁有相同的成員函數,區別隻是只是每一行的數據類型不同。
- DataSet的每一行都是case class,在自定義case class之後可以很方便的獲取每一行的信息
五、spark-shell
Spark的shell作為一個強大的互動式數據分析工具,提供了一個簡單的方式學習API。它可以使用Scala(在Java虛擬機上運行現有的Java庫的一個很好方式)或Python。spark-shell的本質是在後臺調用了spark-submit腳本來啟動應用程式的,在spark-shell中會創建了一個名為sc的SparkContext對象。
【註】spark-shell只能以client方式啟動。
查看幫助
$ spark-shell --help
spark-shell常用選項
--master MASTER_URL 指定模式(spark://host:port, mesos://host:port, yarn,
k8s://https://host:port, or local (Default: local[*]))
--executor-memory MEM 指定每個Executor的記憶體,預設1GB
--total-executor-cores NUM 指定所有Executor所占的核數
--num-executors NUM 指定Executor的個數
--help, -h 顯示幫助信息
--version 顯示版本號
從上面幫助看,spark有五種運行模式:spark、mesos、yarn、k8s、local。這裡主要講local和yarn模式
Master URL | 含義 |
---|---|
local | 在本地運行,只有一個工作進程,無並行計算能力 |
local[K] | 在本地運行,有 K 個工作進程,通常設置 K 為機器的CPU 核心數量 |
local[*] | 在本地運行,工作進程數量等於機器的 CPU 核心數量。 |
spark://HOST:PORT | 以 Standalone 模式運行,這是 Spark 自身提供的集群運行模式,預設埠號: 7077 |
mesos://HOST:PORT | 在 Mesos 集群上運行,Driver 進程和 Worker 進程運行在 Mesos 集群上,部署模式必須使用固定值:--deploy-mode cluster |
yarn | 在yarn集群上運行,依賴於hadoop集群,yarn資源調度框架,將應用提交給yarn,在ApplactionMaster(相當於Stand alone模式中的Master)中運行driver,在集群上調度資源,開啟excutor執行任務。 |
k8s | 在k8s集群上運行 |
1)local
在Spark Shell中,有一個專有的SparkContext已經為您創建好了,變數名叫做sc。自己創建的SparkContext將無法工作。可以用--master參數來設置SparkContext要連接的集群,用--jars來設置需要添加到CLASSPATH的jar包,如果有多個jar包,可以使用逗號分隔符連接它們。例如,在一個擁有2核的環境上運行spark-shell,使用:
#資源存儲的位置,預設為本地,以及使用什麼調度框架 ,預設使用的是spark內置的資源管理和調度框架Standalone
# local單機版,只占用一個線程,local[*]占用當前所有線程,local[2]:2個CPU核運行
$ spark-shell --master local[2]
# --master 預設為 local[*]
#預設使用集群最大的記憶體大小
--executor-memorty
#預設使用最大核數
--total-executor-cores
$ spark-shell --master local[*] --executor-memory 1g --total-executor-cores 1
Web UI地址:http://hadoop-node1:4040
隨後,就可以使用spark-shell內使用Scala語言完成一定的操作。這裡做幾個簡單的操作,有興趣的話,可以自行去瞭解scala
val textFile = sc.textFile("file:///opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2/README.md")
textFile.count()
textFile.first()
其中,count代表RDD中的總數據條數;first代表RDD中的第一行數據。
2)on Yarn(推薦)
# on yarn,也可以在配置文件中修改這個欄位spark.master
$ spark-shell --master yarn
--master用來設置context將要連接並使用的資源主節點,master的值是standalone模式中spark的集群地址、yarn或mesos集群的URL,或是一個local地址。
六、SparkSQL和Hive的集成(Spark on Hive)
1)創建軟鏈接
$ ln -s /opt/bigdata/hadoop/server/apache-hive-3.1.2-bin/conf/hive-site.xml /opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2/conf/hive-site.xml
2)複製 hive lib目錄 下的mysql連接jar包到spark的jars下
$ cp /opt/bigdata/hadoop/server/apache-hive-3.1.2-bin/lib/mysql-connector-java-5.1.49-bin.jar /opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2/jars/
3)配置
# 創建spark日誌在hdfs存儲目錄
$ hadoop fs -mkdir -p /tmp/spark
$ cd /opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2/conf
$ cp spark-defaults.conf.template spark-defaults.conf
在spark-defaults.conf追加如下配置:
# 使用yarn模式
spark.master yarn
spark.eventLog.enabled true
spark.eventLog.dir hdfs://hadoop-node1:8082/tmp/spark
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.driver.memory 512m
spark.executor.extraJavaOptions -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"
4)啟動 spark-shell操作Hive(local)
支持多用戶得啟動metastore服務
$ nohup hive --service metastore &
$ ss -atnlp|grep 9083
在hive-site.xml加入如下配置:
<property>
<name>hive.metastore.uris</name>
<value>thrift://hadoop-node1:9083</value>
</property>
啟動spark-sql
# yarn模式,--master yarn可以不帶,因為上面在配置文件里已經配置了yarn模式了
$ spark-sql --master yarn
show databases;
從上圖就可發現,已經查到我之前創建的庫了,說明已經集成ok了。
七、Spark beeline
Spark Thrift Server 是 Spark 社區基於 HiveServer2 實現的一個 Thrift 服務。旨在無縫相容
HiveServer2。因為 Spark Thrift Server 的介面和協議都和 HiveServer2 完全一致,因此我們部署好Spark Thrift Server後,可以直接使用hive的beeline訪問Spark Thrift Server執行相關語句。Spark Thrift Server 的目的也只是取代 HiveServer2,因此它依舊可以和 Hive Metastore進行交互,獲取到 hive 的元數據。
1)Spark Thrift Server架構於HiveServer2架構對比
2)Spark Thrift Server和HiveServer2的區別
Hive on Spark | Spark Thrift Server | |
---|---|---|
任務提交模式 | 每個session都會創建一個RemoteDriver,也就是對於一個Application。之後將sql解析成執行的物理計劃序列化後發到RemoteDriver執行 | 本身的Server服務就是一個Driver,直接接收sql執行。也就是所有的session都共用一個Application |
性能 | 性能一般 | 如果存儲格式是orc或者parquet,性能會比hive高幾倍,某些語句甚至會高幾十倍。其他格式的話,性能相差不是很大,有時hive性能會更好 |
併發 | 如果任務執行不是非同步的,就是在thrift的worker線程中執行,受worker線程數量的限制。非同步的話則放到線程池執行,併發度受非同步線程池大小限制。 | 處理任務的模式和Hive一樣。 |
sql相容 | 主要支持ANSI SQL 2003,但並不完全遵守,只是大部分支持。並擴展了很多自己的語法 | Spark SQL也有自己的實現標準,因此和hive不會完全相容。具體哪些語句會不相容需要測試才能知道 |
HA | 可以通過zk實現HA | 沒有內置的HA實現,不過spark社區提了一個issue並帶上了patch,可以拿來用:https://issues.apache.org/jira/browse/SPARK-11100 |
【總結】Spark Thrift Server說白了就是小小的改動了下HiveServer2,代碼量也不多。雖然介面和HiveServer2完全一致,但是它以單個Application在集群運行的方式還是比較奇葩的。可能官方也是為了實現簡單而沒有再去做更多的優化。
3)配置啟動Spark Thrift Server
1、配置hive-site.xml
<!-- hs2埠 -->
<property>
<name>hive.server2.thrift.port</name>
<value>11000</value>
</property>
2、啟動spark thriftserver服務(不能起hs2,因為配置是一樣的,會有衝突)
$ cd /opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2/sbin
$ ./start-thriftserver.sh
$ ss -tanlp|grep 11000
3、啟動beeline操作
# 為了和hive的區別,這裡使用絕對路徑啟動
$ cd /opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2/bin
# 操作跟hive操作一模一樣,只是計算引擎不一樣了,換成了spark了
$ ./beeline
!connect jdbc:hive2://hadoop-node1:11000
show databases;
訪問HDFS WEB UI:http://hadoop-node1:8088/cluster/apps
八、Spark Streaming
Spark Streaming與其他大數據框架Storm、Flink一樣,Spark Streaming是基於Spark Core基礎之上用於處理實時計算業務的框架。其實現就是把輸入的流數據進行按時間切分,切分的數據塊用離線批處理的方式進行並行計算處理。原理如下圖:
支持多種數據源獲取數據:
Spark處理的是批量的數據(離線數據),Spark Streaming實際上處理並不是像Strom一樣來一條處理一條數據,而是將接收到的實時流數據,按照一定時間間隔,對數據進行拆分,交給Spark Engine引擎,最終得到一批批的結果。
由於考慮到本篇文章篇幅太長,所以這裡只是稍微提了一下,如果有時間會繼續補充Spark Streaming相關的知識點,請耐心等待……
官方文檔:https://spark.apache.org/docs/3.2.0/streaming-programming-guide.html