大數據Hadoop之——Spark SQL+Spark Streaming

来源:https://www.cnblogs.com/liugp/archive/2022/04/23/16183056.html
-Advertisement-
Play Games

一、Spark SQL概述 Spark SQL是Spark用來處理結構化數據的一個模塊,它提供了兩個編程抽象叫做DataFrame和DataSet並且作為分散式SQL查詢引擎的作用,其實也是對RDD的再封裝。大數據Hadoop之——計算引擎Spark,官方文檔:https://spark.apach ...


目錄

一、Spark SQL概述

Spark SQL是Spark用來處理結構化數據的一個模塊,它提供了兩個編程抽象叫做DataFrame和DataSet並且作為分散式SQL查詢引擎的作用,其實也是對RDD的再封裝。大數據Hadoop之——計算引擎Spark,官方文檔:https://spark.apache.org/sql/

二、SparkSQL版本

1)SparkSQL的演變之路

  • 1.0以前: Shark(入口:SQLContext和HiveContext)

    1. SQLContext:主要DataFrame的構建以及DataFrame的執行,SQLContext指的是spark中SQL模塊的程式入口。
    2. HiveContext:是SQLContext的子類,專門用於與Hive的集成,比如讀取Hive的元數據,數據存儲到Hive表、Hive的視窗分析函數等。
  • 1.1.x開始:SparkSQL(只是測試性的)

  • 1.3.x: SparkSQL(正式版本)+Dataframe

  • 1.5.x: SparkSQL 鎢絲計劃

  • 1.6.x: SparkSQL+DataFrame+DataSet(測試版本)

  • 2.x:

    1. 入口:SparkSession(spark應用程式的一個整體入口),合併了SQLContext和HiveContext
    2. SparkSQL+DataFrame+DataSet(正式版本)
    3. Spark Streaming-》Structured Streaming(DataSet)

2)shark與SparkSQL對比

  • shark
    1. 執行計劃優化完全依賴於Hive,不方便添加新的優化策略;
    2. Spark是線程級並行,而MapReduce是進程級並行。
    3. Spark在相容Hive的實現上存線上程安全問題,導致Shark
      不得不使用另外一套獨立維護的打了補丁的Hive源碼分支;
  • Spark SQL
    1. 作為Spark生態的一員繼續發展,而不再受限於Hive,
    2. 只是相容Hive;Hive on Spark作為Hive的底層引擎之一
    3. Hive可以採用Map-Reduce、Tez、Spark等引擎

3)SparkSession

  • SparkSession是Spark 2.0引如的新概念。SparkSession為用戶提供了統一的切入點,來讓用戶學習spark的各項功能。
  • 在spark的早期版本中,SparkContext是spark的主要切入點,由於RDD是主要的API,我們通過sparkcontext來創建和操作RDD。對於每個其他的API,我們需要使用不同的context。

【例如】對於Streming,我們需要使用StreamingContext;對於sql,使用sqlContext;對於Hive,使用hiveContext。但是隨著DataSet和DataFrame的API逐漸成為標準的API,就需要為他們建立接入點。所以在spark2.0中,引入SparkSession作為DataSet和DataFrame API的切入點,SparkSession封裝了SparkConf、SparkContext和SQLContext。為了向後相容,SQLContext和HiveContext也被保存下來。

  • SparkSession實質上是SQLContext和HiveContext的組合(未來可能還會加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同樣是可以使用的。SparkSession內部封裝了sparkContext,所以計算實際上是由sparkContext完成的,在spark 2.x中不推薦使用SparkContext對象讀取數據,而是推薦SparkSession

三、RDD、DataFrames和DataSet

1)三者關聯關係

DataFrame 和 DataSet 是 Spark SQL 提供的基於 RDD 的結構化數據抽象。它既有 RDD 不可變、分區、存儲依賴關係等特性,又擁有類似於關係型資料庫的結構化信息。所以,基於 DataFrame 和 DataSet API 開發出的程式會被自動優化,使得開發人員不需要操作底層的 RDD API 來進行手動優化,大大提升開發效率。但是 RDD API 對於非結構化的數據處理有獨特的優勢,比如文本流數據,而且更方便我們做底層的操作

1)RDD

RDD(Resilient Distributed Dataset)叫做彈性分散式數據集,是Spark中最基本的數據抽象,它代表一個不可變、可分區、裡面的元素可並行計算的集合。RDD具有數據流模型的特點:自動容錯、位置感知性調度和可伸縮性。RDD允許用戶在執行多個查詢時顯式地將工作集緩存在記憶體中,後續的查詢能夠重用工作集,這極大地提升了查詢速度。

1、核心概念

  • 一組分片(Partition):即數據集的基本組成單位。對於RDD來說,每個分片都會被一個計算任務處理,並決定並行計算的粒度。用戶可以在創建RDD時指定RDD的分片個數,如果沒有指定,那麼就會採用預設值。預設值就是程式所分配到的CPU Core的數目。

  • 一個計算每個分區的函數。Spark中RDD的計算是以分片為單位的,每個RDD都會實現compute函數以達到這個目的。compute函數會對迭代器進行複合,不需要保存每次計算的結果。

  • RDD之間的依賴關係:RDD的每次轉換都會生成一個新的RDD,所以RDD之間就會形成類似於流水線一樣的前後依賴關係。在部分分區數據丟失時,Spark可以通過這個依賴關係重新計算丟失的分區數據,而不是對RDD的所有分區進行重新計算。

  • 一個Partitioner:即RDD的分片函數。當前Spark中實現了兩種類型的分片函數,一個是基於哈希的HashPartitioner,另外一個是基於範圍的RangePartitioner。只有對於於key-value的RDD,才會有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函數不但決定了RDD本身的分片數量,也決定了parent RDD Shuffle輸出時的分片數量。

  • 一個列表:存儲存取每個Partition的優先位置(preferred location)。對於一個HDFS文件來說,這個列表保存的就是每個Partition所在的塊的位置。按照“移動數據不如移動計算”的理念,Spark在進行任務調度的時候,會儘可能地將計算任務分配到其所要處理數據塊的存儲位置。

2、RDD簡單操作

啟動spark-shell,其實spark-shell低層也是調用spark-submit,首先需要配置好,當然也可以寫在命令行,但是不推薦。配置如下,僅供參考(這裡使用yarn模式):

$ cat spark-defaults.conf

啟動spark-shell(下麵會詳解講解)

$ spark-shell

【問題】發現有個WARN:WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
【原因】是因為Spark提交任務到yarn集群,需要上傳相關spark的jar包到HDFS。
【解決】 提前上傳到HDFS集群,並且在Spark配置文件指定文件路徑,就可以避免每次提交任務到Yarn都需要重覆上傳文件。下麵是解決的具體操作步驟:

### 打包jars,jar相關的參數說明
#-c  創建一個jar包
# -t 顯示jar中的內容列表
#-x 解壓jar包
#-u 添加文件到jar包中
#-f 指定jar包的文件名
#-v  生成詳細的報造,並輸出至標準設備
#-m 指定manifest.mf文件.(manifest.mf文件中可以對jar包及其中的內容作一些一設置)
#-0 產生jar包時不對其中的內容進行壓縮處理
#-M 不產生所有文件的清單文件(Manifest.mf)。這個參數與忽略掉-m參數的設置
#-i    為指定的jar文件創建索引文件
#-C 表示轉到相應的目錄下執行jar命令,相當於cd到那個目錄,然後不帶-C執行jar命令
$ cd /opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2
$ jar cv0f spark-libs.jar -C ./jars/ .
$ ll
### 在hdfs上創建存放jar包目錄
$ hdfs dfs -mkdir -p /spark/jars
## 上傳jars到HDFS
$ hdfs dfs -put spark-libs.jar /spark/jars/
## 增加配置spark-defaults.conf 
spark.yarn.archive=hdfs:///spark/jars/spark-libs.jar

然後再啟動spark-shell

在Spark Shell中,有一個專有的SparkContext已經為您創建好了,變數名叫做sc,自己創建的SparkContext將無法工作。

$ spark-shell

### 由一個已經存在的Scala集合創建。
val array = Array(1,2,3,4,5)
# spark使用parallelize方法創建RDD
val rdd = sc.parallelize(array)

這裡只是簡單的創建RDD操作,後面會有更多RDD相關的演示操作。

3、RDD API

Spark支持兩個類型(運算元)操作:Transformation和Action

1)Transformation

主要做的是就是將一個已有的RDD生成另外一個RDD。Transformation具有lazy特性(延遲載入)。Transformation運算元的代碼不會真正被執行。只有當我們的程式裡面遇到一個action運算元的時候,代碼才會真正的被執行。這種設計讓Spark更加有效率地運行。

常用的Transformation:

轉換 含義
map(func) 返回一個新的RDD,該RDD由每一個輸入元素經過func函數轉換後組成
filter(func) 返回一個新的RDD,該RDD由經過func函數計算後返回值為true的輸入元素組成
flatMap(func) 類似於map,但是每一個輸入元素可以被映射為0或多個輸出元素(所以func應該返回一個序列,而不是單一元素)
mapPartitions(func) 類似於map,但獨立地在RDD的每一個分片上運行,因此在類型為T的RDD上運行時,func的函數類型必須是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func) 類似於mapPartitions,但func帶有一個整數參數表示分片的索引值,因此在類型為T的RDD上運行時,func的函數類型必須是(Int, Interator[T]) => Iterator[U]
sample(withReplacement, fraction, seed) 根據fraction指定的比例對數據進行採樣,可以選擇是否使用隨機數進行替換,seed用於指定隨機數生成器種子
union(otherDataset) 對源RDD和參數RDD求並集後返回一個新的RDD
intersection(otherDataset) 對源RDD和參數RDD求交集後返回一個新的RDD
distinct([numTasks])) 對源RDD進行去重後返回一個新的RDD
groupByKey([numTasks]) 在一個(K,V)的RDD上調用,返回一個(K, Iterator[V])的RDD
reduceByKey(func, [numTasks]) 在一個(K,V)的RDD上調用,返回一個(K,V)的RDD,使用指定的reduce函數,將相同key的值聚合到一起,與groupByKey類似,reduce任務的個數可以通過第二個可選的參數來設置
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 先按分區聚合 再總的聚合 每次要跟初始值交流 例如:aggregateByKey(0)(+,+) 對k/y的RDD進行操作
sortByKey([ascending], [numTasks]) 在一個(K,V)的RDD上調用,K必須實現Ordered介面,返回一個按照key進行排序的(K,V)的RDD
sortBy(func,[ascending], [numTasks]) 與sortByKey類似,但是更靈活 第一個參數是根據什麼排序 第二個是怎麼排序 false倒序 第三個排序後分區數 預設與原RDD一樣
join(otherDataset, [numTasks]) 在類型為(K,V)和(K,W)的RDD上調用,返回一個相同key對應的所有元素對在一起的(K,(V,W))的RDD 相當於內連接(求交集)
cogroup(otherDataset, [numTasks]) 在類型為(K,V)和(K,W)的RDD上調用,返回一個(K,(Iterable,Iterable))類型的RDD
cartesian(otherDataset) 兩個RDD的笛卡爾積 的成很多個K/V
pipe(command, [envVars]) 調用外部程式
coalesce(numPartitions) 重新分區 第一個參數是要分多少區,第二個參數是否shuffle 預設false 少分區變多分區 true 多分區變少分區 false
repartition(numPartitions)
重新分區 必須shuffle 參數是要分多少區 少變多
repartitionAndSortWithinPartitions(partitioner) 重新分區+排序 比先分區再排序效率高 對K/V的RDD進行操作
foldByKey(zeroValue)(seqOp) 該函數用於K/V做摺疊,合併處理 ,與aggregate類似 第一個括弧的參數應用於每個V值 第二括弧函數是聚合例如:+
combineByKey 合併相同的key的值 rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
partitionBy(partitioner) 對RDD進行分區 partitioner是分區器 例如new HashPartition(2)
cache/persist RDD緩存,可以避免重覆計算從而減少時間,區別:cache內部調用了persist運算元,cache預設就一個緩存級別MEMORY-ONLY ,而persist則可以選擇緩存級別
Subtract(rdd) 返回前rdd元素不在後rdd的rdd
leftOuterJoin leftOuterJoin類似於SQL中的左外關聯left outer join,返回結果以前面的RDD為主,關聯不上的記錄為空。只能用於兩個RDD之間的關聯,如果要多個RDD關聯,多關聯幾次即可。
rightOuterJoin rightOuterJoin類似於SQL中的有外關聯right outer join,返回結果以參數中的RDD為主,關聯不上的記錄為空。只能用於兩個RDD之間的關聯,如果要多個RDD關聯,多關聯幾次即可
subtractByKey substractByKey和基本轉換操作中的subtract類似只不過這裡是針對K的,返回在主RDD中出現,並且不在otherRDD中出現的元素
2)Action

觸發代碼的運行,我們一段spark代碼裡面至少需要有一個action操作。

常用的Action:

動作 含義
reduce(func) 通過func函數聚集RDD中的所有元素,這個功能必須是課交換且可並聯的
collect() 在驅動程式中,以數組的形式返回數據集的所有元素
count() 返回RDD的元素個數
first() 返回RDD的第一個元素(類似於take(1))
take(n) 返回一個由數據集的前n個元素組成的數組
takeSample(withReplacement,num, [seed]) 返回一個數組,該數組由從數據集中隨機採樣的num個元素組成,可以選擇是否用隨機數替換不足的部分,seed用於指定隨機數生成器種子
takeOrdered(n, [ordering]) 返回原RDD排序(預設升序排)後,前n個元素組成的數組
saveAsTextFile(path) 將數據集的元素以textfile的形式保存到HDFS文件系統或者其他支持的文件系統,對於每個元素,Spark將會調用toString方法,將它裝換為文件中的文本
saveAsSequenceFile(path) 將數據集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可以使HDFS或者其他Hadoop支持的文件系統。
saveAsObjectFile(path) saveAsObjectFile用於將RDD中的元素序列化成對象,存儲到文件中。使用方法和saveAsTextFile類似
countByKey() 針對(K,V)類型的RDD,返回一個(K,Int)的map,表示每一個key對應的元素個數。
foreach(func) 在數據集的每一個元素上,運行函數func進行更新。
aggregate 先對分區進行操作,在總體操作
reduceByKeyLocally 返回一個 dict 對象,同樣是將同 key 的元素進行聚合
lookup lookup用於(K,V)類型的RDD,指定K值,返回RDD中該K對應的所有V值。
top top函數用於從RDD中,按照預設(降序)或者指定的排序規則,返回前num個元素。
fold fold是aggregate的簡化,將aggregate中的seqOp和combOp使用同一個函數op。
foreachPartition 遍歷原RDD元素經過func函數運算過後的結果集,foreachPartition運算元分區操作

4、實戰操作

1、針對各個元素的轉化操作

我們最常用的轉化操作應該是map() 和filter(),轉化操作map() 接收一個函數,把這個函數用於RDD 中的每個元素,將函數的返回結果作為結果RDD 中對應元素的值。而轉化操作filter() 則接收一個函數,並將RDD 中滿足該函數的元素放入新的RDD 中返回。

讓我們看一個簡單的例子,用map() 對RDD 中的所有數求平方

# 通過parallelize創建RDD對象
val input = sc.parallelize(List(1, 2, 3, 4))
val result = input.map(x => x * x)
println(result.collect().mkString(","))

2、對一個數據為{1,2,3,3}的RDD進行基本的RDD轉化操作(去重)

var rdd = sc.parallelize(List(1,2,3,3))
rdd.distinct().collect().mkString(",")

3、對數據分別為{1,2,3}和{3,4,5}的RDD進行針對兩個RDD的轉化操作

var rdd = sc.parallelize(List(1,2,3))
var other = sc.parallelize(List(3,4,5))
# 生成一個包含兩個RDD中所有元素的RDD
rdd.union(other).collect().mkString(",")
# 求兩個RDD共同的元素RDD
rdd.intersection(other).collect().mkString(",")

4、行動操作

行動操作reduce(),它接收一個函數作為參數,這個函數要操作兩個RDD 的元素類型的數據並返回一個同樣類型的新元素。一個簡單的例子就是函數+,可以用它來對我們的RDD 進行累加。使用reduce(),可以很方便地計算出RDD中所有元素的總和、元素的個數,以及其他類型的聚合操作。

var rdd = sc.parallelize(List(1,2,3,4,5,6,7))
# 求和
var sum = rdd.reduce((x, y) => x + y)
# 求元素個數
var sum = rdd.count()

# 聚合操作
var rdd = sc.parallelize(List(1,2,3,4,5,6,7))
var result = rdd.aggregate((0,0))((acc,value) => (acc._1 + value,acc._2 + 1),(acc1,acc2) => (acc1._1 + acc2._1 , acc1._2 + acc2._2))
var avg = result._1/result._2.toDouble

這裡只是演示幾個簡單的示例,更多RDD的操作,可以參考官方文檔學習哦。

2)DataFrames

在Spark中,DataFrame提供了一個領域特定語言(DSL)和SQL來操作結構化數據,DataFrame是一種以RDD為基礎的分散式數據集,類似於傳統資料庫中的二維表格。

  • RDD,由於無從得知所存數據元素的具體內部結構,Spark Core只能在stage層面進行簡單、通用的流水線優化。
  • DataFrame底層是以RDD為基礎的分散式數據集,和RDD的主要區別的是:RDD中沒有schema信息,而DataFrame中數據每一行都包含schema。DataFrame = RDD + shcema

1、DSL風格語法操作

1)DataFrame創建

創建DataFrame的兩種基本方式:

  • 已存在的RDD調用toDF()方法轉換得到DataFrame。
  • 通過Spark讀取數據源直接創建DataFrame。

直接創建DataFarme對象

若使用SparkSession方式創建DataFrame,可以使用spark.read從不同類型的文件中載入數據創建DataFrame。spark.read的具體操作,如下所示。

方法名 描述
spark.read.text(“people.txt”) 讀取txt格式文件,創建DataFrame
spark.read.csv (“people.csv”) 讀取csv格式文件,創建DataFrame
spark.read.text(“people.json”) 讀取json格式文件,創建DataFrame
spark.read.text(“people.parquet”) 讀取parquet格式文件,創建DataFrame

1、在本地創建一個person.txt文本文檔,用於讀取:運行spark-shell:

# person.txt,Name,Age,Height
p1_name,18,165
p2_name,19,170
p3_name,20,188
p4_name,21,190
# 啟動spark shell,預設會創建一個spark名稱的spark session對象
$ spark-shell
# 定義變數,【註意】所有節點都得創建這個person文件,要不然調度沒有這個文件的機器會報錯
var inputFile = "file:///opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2/test/person.txt"
# 讀取本地文件
val personDF = spark.read.text("file:///opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2/test/person.txt")
val personDF = spark.read.text(inputFile)
# 顯示
personDF.show()
# 將文件put到hdfs上
# 讀取hdfs文件(推薦)
val psersonDF = spark.read.text("hdfs:///person.txt")

2、有RDD轉換成DataFrame

動作 含義
show() 查看DataFrame中的具體內容信息
printSchema() 查看DataFrame的Schema信息
select() 查看DataFrame中選取部分列的數據及進行重命名
filter() 實現條件查詢,過濾出想要的結果
groupBy() 對記錄進行分組
sort() 對特定欄位進行排序操作
toDF() 把RDD數據類型轉成DataFarme
# 讀取文本文檔,按逗號分割開來
val lineRDD = sc.textFile("hdfs:///person.txt").map(_.split(","))
case class Person(name:String, age:Int, height:Int)
# 按照樣式類對RDD數據進行分割成map
val personRDD = lineRDD.map(x => Person(x(0).toString, x(1).toInt, x(2).toInt))
# 把RDD數據類型轉成DataFarme
val personDF = personRDD.toDF()
# 查看這個表
personDF.show()
# 查看Schema數據
personDF.printSchema()
# 查看列
personDF.select(personDF.col("name")).show
# 過濾年齡小於25的
personDF.filter(col("age") >= 25).show


這裡提供常用的spark dataframe方法:

方法名 含義
collect() 返回值是一個數組,返回dataframe集合所有的行
collectAsList() 返回值是一個java類型的數組,返回dataframe集合所有的行
count() 返回一個number類型的,返回dataframe集合的行數
describe(cols: String*) 返回一個通過數學計算的類表值(count, mean, stddev, min, and max),這個可以傳多個參數,中間用逗號分隔,如果有欄位為空,那麼不參與運算,只這對數值類型的欄位。例如df.describe("age", "height").show()
first() 返回第一行 ,類型是row類型
head() 返回第一行 ,類型是row類型
head(n:Int) 返回n行 ,類型是row 類型
show() 返回dataframe集合的值 預設是20行,返回類型是unit
show(n:Int) 返回n行,返回值類型是unit
table(n:Int) 返回n行 ,類型是row 類型
cache() 同步數據的記憶體
columns 返回一個string類型的數組,返回值是所有列的名字
dtypes 返回一個string類型的二維數組,返回值是所有列的名字以及類型
explan() 列印執行計劃 物理的
explain(n:Boolean) 輸入值為 false 或者true ,返回值是unit 預設是false ,如果輸入true 將會列印 邏輯的和物理的
isLocal 返回值是Boolean類型,如果允許模式是local返回true 否則返回false
persist(newlevel:StorageLevel) 返回一個dataframe.this.type 輸入存儲模型類型
printSchema() 列印出欄位名稱和類型 按照樹狀結構來列印
registerTempTable(tablename:String) 返回Unit ,將df的對象只放在一張表裡面,這個表隨著對象的刪除而刪除了
schema 返回structType 類型,將欄位名稱和類型按照結構體類型返回
toDF() 返回一個新的dataframe類型的
toDF(colnames:String*) 將參數中的幾個欄位返回一個新的dataframe類型的
unpersist() 返回dataframe.this.type 類型,去除模式中的數據
unpersist(blocking:Boolean) 返回dataframe.this.type類型 true 和unpersist是一樣的作用false 是去除RDD
agg(expers:column*) 返回dataframe類型 ,同數學計算求值
agg(exprs: Map[String, String]) 返回dataframe類型 ,同數學計算求值 map類型的
agg(aggExpr: (String, String), aggExprs: (String, String)*) 返回dataframe類型 ,同數學計算求值
apply(colName: String) 返回column類型,捕獲輸入進去列的對象
as(alias: String) 返回一個新的dataframe類型,就是原來的一個別名
col(colName: String) 返回column類型,捕獲輸入進去列的對象
cube(col1: String, cols: String*) 返回一個GroupedData類型,根據某些欄位來彙總
distinct 去重 返回一個dataframe類型
drop(col: Column) 刪除某列 返回dataframe類型
dropDuplicates(colNames: Array[String]) 刪除相同的列 返回一個dataframe
except(other: DataFrame) 返回一個dataframe,返回在當前集合存在的在其他集合不存在的
filter(conditionExpr: String) 刷選部分數據,返回dataframe類型
groupBy(col1: String, cols: String*) 根據某寫欄位來彙總返回groupedate類型
intersect(other: DataFrame) 返回一個dataframe,在2個dataframe都存在的元素
join(right: DataFrame, joinExprs: Column, joinType: String) 一個是關聯的dataframe,第二個關聯的條件,第三個關聯的類型:inner, outer, left_outer, right_outer, leftsemi
limit(n: Int) 返回dataframe類型 去n 條數據出來
orderBy(sortExprs: Column*) 做alise排序
sort(sortExprs: Column*) 排序 df.sort(df("age").desc).show(); 預設是asc
select(cols:string*) dataframe 做欄位的刷選 df.select($"colA", $"colB" + 1)
withColumnRenamed(existingName: String, newName: String) 修改列表 df.withColumnRenamed("name","names").show();
withColumn(colName: String, col: Column) 增加一列 df.withColumn("aa",df("name")).show();

這裡已經列出了很多常用方法了,基本上涵蓋了大部分操作,當然也可以參考官方文檔

2、SQL風格語法操作

DataFrame的一個強大之處就是我們可以將它看作是一個關係型數據表,然後可以通過在程式中使用spark.sql() 來執行SQL查詢,結果將作為一個DataFrame返回。因為spark session包含了Hive Context,所以spark.sql() 會自動啟動連接hive,預設模式就是hive里的local模式(內嵌derby)

啟動spark-shell

$ spark-shell

會在執行spark-shell當前目錄下生成兩個文件:derby.log,metastore_db

接下來就可以happy的寫sql了,這裡就演示幾個命令,跟之前的hive一樣,把sql語句放在spark.sql()方法里執行即可,不清楚hive sql的可以參考我之前的文章:大數據Hadoop之——數據倉庫Hive

# 有個預設default庫
$ spark.sql("show databases").show
# 預設當前庫是default
$ spark.sql("show tables").show

通過spark-sql啟動spark shell

操作就更像sql語法了,已經跟hive差不多了。接下來演示幾個命令,大家就很清楚了。

$ spark-sql
show databases;
create database test007

同樣也會在當前目錄下自動創建兩個文件:derby.log,metastore_db

3)DataSet

DataSet是分散式的數據集合,Dataset提供了強類型支持,也是在RDD的每行數據加了類型約束。DataSet是在Spark1.6中添加的新的介面。它集中了RDD的優點(強類型和可以用強大lambda函數)以及使用了Spark SQL優化的執行引擎。DataSet可以通過JVM的對象進行構建,可以用函數式的轉換(map/flatmap/filter)進行多種操作。

1、通過spark.createDataset通過集合進行創建dataSet

val ds1 = spark.createDataset(1 to 10)
ds1.show

2、從已經存在的rdd當中構建dataSet

官方文檔

val ds2 = spark.createDataset(sc.textFile("hdfs:////person.txt"))

3、通過樣例類配合創建DataSet

case class Person(name:String,age:Int)
val personDataList = List(Person("zhangsan",18),Person("lisi",28))
val personDS = personDataList.toDS
personDS.show

4、通過DataFrame轉化生成
Music.json文件內容如下:

{"name":"上海灘","singer":"葉麗儀","album":"香港電視劇主題歌","path":"mp3/shanghaitan.mp3"}
{"name":"一生何求","singer":"陳百強","album":"香港電視劇主題歌","path":"mp3/shanghaitan.mp3"}
{"name":"紅日","singer":"李克勤","album":"懷舊專輯","path":"mp3/shanghaitan.mp3"}
{"name":"愛如潮水","singer":"張信哲","album":"懷舊專輯","path":"mp3/airucaoshun.mp3"}
{"name":"紅茶館","singer":"陳惠嫻","album":"懷舊專輯","path":"mp3/redteabar.mp3"}

case class Music(name:String,singer:String,album:String,path:String)
# 註意把test.json傳到hdfs上
val jsonDF = spark.read.json("hdfs:///Music.json")
val jsonDS = jsonDF.as[Music]
jsonDS.show

RDD,DataFrame,DataSet互相轉化

四、RDD、DataFrame和DataSet的共性與區別

  • RDD[Person]:以Person為類型參數,但不瞭解 其內部結構。

  • DataFrame:提供了詳細的結構信息schema(結構)列的名稱和類型。這樣看起來就像一張表了

  • DataSet[Person]:不光有schema(結構)信息,還有類型信息

1)共性

  • 三者都是spark平臺下的分散式彈性數據集,為處理超大型數據提供便利
  • 三者都有惰性機制。在創建時、轉換時(如map)不會立即執行,只有在遇到action運算元的時候(比如foreach),才開始進行觸發計算。極端情況下,如果代碼中只有創建、轉換,但是沒有在後面的action中使用對應的結果,在執行時會被跳過。
  • 三者都有partition的概念,都有緩存(cache)的操作,還可以進行檢查點操作(checkpoint)
  • 三者都有許多共同的函數(如map、filter,sorted等等)。
    在對DataFrame和DataSet操作的時候,大多數情況下需要引入隱式轉換(ssc.implicits._)

2)區別

  • DataFrame:DataFrame是DataSet的特例,也就是說DataSet[Row]的別名;DataFrame = RDD + schema
    1. DataFrame的每一行的固定類型為Row,只有通過解析才能獲得各個欄位的值
    2. DataFrame與DataSet通常與spark ml同時使用
    3. DataFrame與DataSet均支持sparkSql操作,比如select,groupby等,也可以註冊成臨時表,進行sql語句操作
    4. DataFrame與DateSet支持一些方便的保存方式,比如csv,可以帶上表頭,這樣每一列的欄位名就可以一目瞭然
  • DataSet:DataSet = RDD + case class
    1. DataSet與DataFrame擁有相同的成員函數,區別隻是只是每一行的數據類型不同。
    2. DataSet的每一行都是case class,在自定義case class之後可以很方便的獲取每一行的信息

五、spark-shell

Spark的shell作為一個強大的互動式數據分析工具,提供了一個簡單的方式學習API。它可以使用Scala(在Java虛擬機上運行現有的Java庫的一個很好方式)或Python。spark-shell的本質是在後臺調用了spark-submit腳本來啟動應用程式的,在spark-shell中會創建了一個名為sc的SparkContext對象。

【註】spark-shell只能以client方式啟動。

查看幫助

$ spark-shell --help

spark-shell常用選項

--master MASTER_URL 指定模式(spark://host:port, mesos://host:port, yarn,
                              k8s://https://host:port, or local (Default: local[*]))
--executor-memory MEM 指定每個Executor的記憶體,預設1GB
--total-executor-cores NUM 指定所有Executor所占的核數
--num-executors NUM 指定Executor的個數
--help, -h 顯示幫助信息
--version 顯示版本號

從上面幫助看,spark有五種運行模式:spark、mesos、yarn、k8s、local。這裡主要講local和yarn模式

Master URL 含義
local 在本地運行,只有一個工作進程,無並行計算能力
local[K] 在本地運行,有 K 個工作進程,通常設置 K 為機器的CPU 核心數量
local[*] 在本地運行,工作進程數量等於機器的 CPU 核心數量。
spark://HOST:PORT 以 Standalone 模式運行,這是 Spark 自身提供的集群運行模式,預設埠號: 7077
mesos://HOST:PORT 在 Mesos 集群上運行,Driver 進程和 Worker 進程運行在 Mesos 集群上,部署模式必須使用固定值:--deploy-mode cluster
yarn 在yarn集群上運行,依賴於hadoop集群,yarn資源調度框架,將應用提交給yarn,在ApplactionMaster(相當於Stand alone模式中的Master)中運行driver,在集群上調度資源,開啟excutor執行任務。
k8s 在k8s集群上運行

1)local

在Spark Shell中,有一個專有的SparkContext已經為您創建好了,變數名叫做sc。自己創建的SparkContext將無法工作。可以用--master參數來設置SparkContext要連接的集群,用--jars來設置需要添加到CLASSPATH的jar包,如果有多個jar包,可以使用逗號分隔符連接它們。例如,在一個擁有2核的環境上運行spark-shell,使用:

#資源存儲的位置,預設為本地,以及使用什麼調度框架 ,預設使用的是spark內置的資源管理和調度框架Standalone 
# local單機版,只占用一個線程,local[*]占用當前所有線程,local[2]:2個CPU核運行
$ spark-shell --master local[2]
# --master 預設為 local[*] 
#預設使用集群最大的記憶體大小
--executor-memorty
#預設使用最大核數
--total-executor-cores 
$ spark-shell --master local[*] --executor-memory 1g --total-executor-cores 1

Web UI地址:http://hadoop-node1:4040

隨後,就可以使用spark-shell內使用Scala語言完成一定的操作。這裡做幾個簡單的操作,有興趣的話,可以自行去瞭解scala

val textFile = sc.textFile("file:///opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2/README.md")
textFile.count()
textFile.first()

其中,count代表RDD中的總數據條數;first代表RDD中的第一行數據。

2)on Yarn(推薦)

# on yarn,也可以在配置文件中修改這個欄位spark.master
$ spark-shell --master yarn 

--master用來設置context將要連接並使用的資源主節點,master的值是standalone模式中spark的集群地址、yarn或mesos集群的URL,或是一個local地址。

六、SparkSQL和Hive的集成(Spark on Hive)

1)創建軟鏈接

$ ln -s /opt/bigdata/hadoop/server/apache-hive-3.1.2-bin/conf/hive-site.xml /opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2/conf/hive-site.xml

2)複製 hive lib目錄 下的mysql連接jar包到spark的jars下

$ cp /opt/bigdata/hadoop/server/apache-hive-3.1.2-bin/lib/mysql-connector-java-5.1.49-bin.jar /opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2/jars/

3)配置

# 創建spark日誌在hdfs存儲目錄
$ hadoop fs -mkdir -p /tmp/spark
$ cd /opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2/conf
$ cp spark-defaults.conf.template spark-defaults.conf

在spark-defaults.conf追加如下配置:

# 使用yarn模式
spark.master                     yarn
spark.eventLog.enabled           true
spark.eventLog.dir               hdfs://hadoop-node1:8082/tmp/spark
spark.serializer                 org.apache.spark.serializer.KryoSerializer
spark.driver.memory              512m
spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"

4)啟動 spark-shell操作Hive(local)

支持多用戶得啟動metastore服務

$ nohup hive --service metastore &
$ ss -atnlp|grep 9083

在hive-site.xml加入如下配置:

<property>  
  <name>hive.metastore.uris</name>  
  <value>thrift://hadoop-node1:9083</value>  
</property>  

啟動spark-sql

# yarn模式,--master yarn可以不帶,因為上面在配置文件里已經配置了yarn模式了
$ spark-sql --master yarn
show databases;

從上圖就可發現,已經查到我之前創建的庫了,說明已經集成ok了。

七、Spark beeline

Spark Thrift Server 是 Spark 社區基於 HiveServer2 實現的一個 Thrift 服務。旨在無縫相容
HiveServer2。因為 Spark Thrift Server 的介面和協議都和 HiveServer2 完全一致,因此我們部署好Spark Thrift Server後,可以直接使用hive的beeline訪問Spark Thrift Server執行相關語句。Spark Thrift Server 的目的也只是取代 HiveServer2,因此它依舊可以和 Hive Metastore進行交互,獲取到 hive 的元數據。

1)Spark Thrift Server架構於HiveServer2架構對比

2)Spark Thrift Server和HiveServer2的區別

Hive on Spark Spark Thrift Server
任務提交模式 每個session都會創建一個RemoteDriver,也就是對於一個Application。之後將sql解析成執行的物理計劃序列化後發到RemoteDriver執行 本身的Server服務就是一個Driver,直接接收sql執行。也就是所有的session都共用一個Application
性能 性能一般 如果存儲格式是orc或者parquet,性能會比hive高幾倍,某些語句甚至會高幾十倍。其他格式的話,性能相差不是很大,有時hive性能會更好
併發 如果任務執行不是非同步的,就是在thrift的worker線程中執行,受worker線程數量的限制。非同步的話則放到線程池執行,併發度受非同步線程池大小限制。 處理任務的模式和Hive一樣。
sql相容 主要支持ANSI SQL 2003,但並不完全遵守,只是大部分支持。並擴展了很多自己的語法 Spark SQL也有自己的實現標準,因此和hive不會完全相容。具體哪些語句會不相容需要測試才能知道
HA 可以通過zk實現HA 沒有內置的HA實現,不過spark社區提了一個issue並帶上了patch,可以拿來用:https://issues.apache.org/jira/browse/SPARK-11100

【總結】Spark Thrift Server說白了就是小小的改動了下HiveServer2,代碼量也不多。雖然介面和HiveServer2完全一致,但是它以單個Application在集群運行的方式還是比較奇葩的。可能官方也是為了實現簡單而沒有再去做更多的優化。

3)配置啟動Spark Thrift Server

1、配置hive-site.xml

<!-- hs2埠 -->
<property>
  <name>hive.server2.thrift.port</name>
  <value>11000</value>
</property>

2、啟動spark thriftserver服務(不能起hs2,因為配置是一樣的,會有衝突)

$ cd /opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2/sbin
$ ./start-thriftserver.sh
$ ss -tanlp|grep 11000

3、啟動beeline操作

# 為了和hive的區別,這裡使用絕對路徑啟動
$ cd /opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2/bin
# 操作跟hive操作一模一樣,只是計算引擎不一樣了,換成了spark了
$ ./beeline
!connect jdbc:hive2://hadoop-node1:11000
show databases;

訪問HDFS WEB UI:http://hadoop-node1:8088/cluster/apps


八、Spark Streaming

Spark Streaming與其他大數據框架Storm、Flink一樣,Spark Streaming是基於Spark Core基礎之上用於處理實時計算業務的框架。其實現就是把輸入的流數據進行按時間切分,切分的數據塊用離線批處理的方式進行並行計算處理。原理如下圖:

支持多種數據源獲取數據:

Spark處理的是批量的數據(離線數據),Spark Streaming實際上處理並不是像Strom一樣來一條處理一條數據,而是將接收到的實時流數據,按照一定時間間隔,對數據進行拆分,交給Spark Engine引擎,最終得到一批批的結果。

由於考慮到本篇文章篇幅太長,所以這裡只是稍微提了一下,如果有時間會繼續補充Spark Streaming相關的知識點,請耐心等待……

官方文檔:https://spark.apache.org/docs/3.2.0/streaming-programming-guide.html


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 之前寫過一篇如使用阿裡雲上部署.NET 3.1自定義運行時的文章,吐槽一下,雖然現在已經2022年了,但是阿裡雲函數計算的支持依然停留在.NET Core 2.1,更新緩慢,由於程式解包大小的限制,也不能放太複雜的東西的上去,雖然現在.NET 6裁剪包能挺好地解決這個問題,但是心裡還是不爽。 需求 ...
  • 之前曾在《C# 中容易忽視的 Encoding.GetByteCount 記憶體問題》中提到過,可以使用 Encoding.Default.GetByteCount 方法來判斷字元是全寬(寬度為 2)還是半寬(寬度為 1)。 這個方法實際上是計算對字元編碼後產生的位元組數,只是在中文環境下,寬字元在使用 ...
  • 本筆記有特殊目錄,點擊開啟: 專有目錄 在Linux系統中編輯文本總是離不開一位老幫手——Vi。而因為其誕生的年代有些久遠,有些操作在現在看來可能有點“反直覺”。 於是我決定寫這樣一篇小筆記,記錄一下我記憶Vi的這些這些 常用 操作和指令的方法(主要靠的是英語和圖示了)。 當然,正如“好記性不如爛筆 ...
  • Vmware Station安裝Arch Linux 一、測試環境及工具 當前教程在win10環境中進行,網路為有線連接 vmware station版本為 16.2.1 arch linux鏡像為 archlinux-2022.04.05-x86_64.iso 終端工具Xshell 6,也可使用p ...
  • 一篇科普文章,介紹什麼是 Linux 信號,以及它的基本用法,內含精美圖表。 ...
  • MySQL 回表 五花馬,千金裘,呼兒將出換美酒,與爾同銷萬古愁。 一、簡述 回表,顧名思義就是回到表中,也就是先通過普通索引掃描出數據所在的行,再通過行主鍵ID 取出索引中未包含的數據。所以回表的產生也是需要一定條件的,如果一次索引查詢就能獲得所有的select 記錄就不需要回表,如果select ...
  • 分享嘉賓:高大月@美團點評,Apache Kylin PMC成員,Druid Commiter 編輯整理:Druid中國用戶組 6th MeetUp 出品平臺:DataFunTalk -- 導讀: 長久以來,對SQL和許可權的支持一直是Druid的軟肋。雖然社區早在0.9和0.12版本就分別添加了對S ...
  • Quantexa 服務特色是場景決策智能CDI(contextual decision intelligence) 主要落地場景是金融以及涉及交易的各類機構的反洗錢反金融詐騙, 客戶畫像, 風控 解決的問題是監管合規, 降低誤判率, 提高準確率, 降成本, 提高行業競爭力 面向的主要客戶是銀行, 保... ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...