目錄 · 概述 · 原理 · 組成 · 執行流程 · 性能 · API · 應用程式模板 · 通用讀寫方法 · RDD轉為DataFrame · Parquet文件數據源 · JSON文件數據源 · Hive數據源 · 資料庫JDBC數據源 · DataFrame Operation · 性能調優 ...
目錄
· 概述
· 原理
· 組成
· 執行流程
· 性能
· API
· 應用程式模板
· 通用讀寫方法
· Hive數據源
· 性能調優
· 緩存數據
· 參數調優
· 案例
· 數據準備
· 查詢部門職工數
概述
1. Spark SQL是Spark的結構化數據處理模塊。
2. Spark SQL特點
a) 數據相容:可從Hive表、外部資料庫(JDBC)、RDD、Parquet文件、JSON文件獲取數據,可通過Scala方法或SQL方式操作這些數據,並把結果轉回RDD。
b) 組件擴展:SQL語法解析器、分析器、優化器均可重新定義。
c) 性能優化:記憶體列存儲、動態位元組碼生成等優化技術,記憶體緩存數據。
d) 多語言支持:Scala、Java、Python、R。
原理
組成
1. Catalyst優化:優化處理查詢語句的整個過程,包括解析、綁定、優化、物理計劃等,主要由關係代數(relation algebra)、表達式(expression)以及查詢優化(query optimization)組成。
2. Spark SQL內核:處理數據的輸入輸出,從不同數據源(結構化數據Parquet文件JSON文件、Hive表、外部資料庫、已有RDD)獲取數據,執行查詢(expression of queries),並將查詢結果輸出成DataFrame。
3. Hive支持:對Hive數據的處理,主要包括HiveQL、MetaStore、SerDes、UDFs等。
執行流程
1. SqlParser對SQL語句解析,生成Unresolved邏輯計劃(未提取Schema信息);
2. Catalyst分析器結合數據字典(catalog)進行綁定,生成Analyzed邏輯計劃,過程中Schema Catalog要提取Schema信息;
3. Catalyst優化器對Analyzed邏輯計劃優化,按照優化規則得到Optimized邏輯計劃;
4. 與Spark Planner交互,應用策略(strategy)到plan,使用Spark Planner將邏輯計劃轉換成物理計劃,然後調用next函數,生成可執行物理計劃。
性能
1. 記憶體列式緩存:記憶體列式(in-memory columnar format)緩存(再次執行時無需重覆讀取),僅掃描需要的列,並自動調整壓縮比使記憶體使用率和GC壓力最小化。
2. 動態代碼和位元組碼生成技術:提升重覆表達式求值查詢的速率。
3. Tungsten優化:
a) 由Spark自己管理記憶體而不是JVM,避免了JVM GC帶來的性能損失。
b) 記憶體中Java對象被存儲成Spark自己的二進位格式,直接在二進位格式上計算,省去序列化和反序列化時間;此格式更緊湊,節省記憶體空間。
API
應用程式模板
1 import org.apache.spark.SparkConf 2 import org.apache.spark.SparkContext 3 import org.apache.spark.sql.SQLContext 4 import org.apache.spark.sql.hive.HiveContext 5 6 object Test { 7 def main(args: Array[String]): Unit = { 8 val conf = new SparkConf().setAppName("Test") 9 val sc = new SparkContext(conf) 10 val sqlContext = new HiveContext(sc) 11 12 // ... 13 } 14 }
通用讀寫方法
1. Spark SQL內置數據源短名稱有json、parquet、jdbc,預設parquet(通過“spark.sql.sources.default”配置)。
2. 保存模式:
Scala/Java |
Python |
說明 |
SaveMode.ErrorIfExists |
"error" |
預設,如果資料庫已經存在,拋出異常 |
SaveMode.Append |
"append" |
如果資料庫已經存在,追加DataFrame數據 |
SaveMode.Overwrite |
"overwrite" |
如果資料庫已經存在,重寫DataFrame數據 |
SaveMode.Ignore |
"ignore" |
如果資料庫已經存在,忽略DataFrame數據 |
3. 讀寫文件代碼(統一使用sqlContext.read和dataFrame.write)模板:
1 val dataFrame = sqlContext.read.format("數據源名稱").load("文件路徑") 2 val newDataFrame = dataFrame // 操作數據得到新DataFrame 3 newDataFrame.write.format("數據源名稱").save("文件路徑")
RDD轉為DataFrame
1. 方法1
a) 方法:使用反射機制推斷RDD Schema。
b) 場景:運行前知道Schema。
c) 特點:代碼簡潔。
d) 示例:
1 import org.apache.spark.SparkConf 2 import org.apache.spark.SparkContext 3 import org.apache.spark.sql.SQLContext 4 5 object Test { 6 7 def main(args: Array[String]): Unit = { 8 val conf = new SparkConf().setAppName("Test") 9 val sc = new SparkContext(conf) 10 val sqlContext = new SQLContext(sc) 11 12 // 將一個RDD隱式轉換為一個DataFrame 13 import sqlContext.implicits._ 14 // 使用case定義Schema(不能超過22個屬性) 15 case class Person(name: String, age: Int) 16 // 讀取文件創建MappedRDD,再將數據寫入Person類,隱式轉換為DataFrame 17 val peopleDF = sc.textFile("/test/people.csv").map(_.split(",")).map(cols => Person(cols(0), cols(1).trim.toInt)).toDF() 18 // DataFrame註冊臨時表 19 peopleDF.registerTempTable("table_people") 20 21 // SQL 22 val teenagers = sqlContext.sql("select name, age from table_people where age >= 13 and age <= 19") 23 teenagers.collect.foreach(println) 24 } 25 26 }
2. 方法2
a) 方法:以編程方式定義RDD Schema。
b) 場景:運行前不知道Schema。
c) 示例:
1 import org.apache.spark.SparkConf 2 import org.apache.spark.SparkContext 3 import org.apache.spark.sql.SQLContext 4 import org.apache.spark.sql.types.StructType 5 import org.apache.spark.sql.types.StringType 6 import org.apache.spark.sql.types.StructField 7 import org.apache.spark.sql.Row 8 9 object Test { 10 11 def main(args: Array[String]): Unit = { 12 val conf = new SparkConf().setAppName("Test") 13 val sc = new SparkContext(conf) 14 val sqlContext = new SQLContext(sc) 15 16 // 將一個RDD隱式轉換為一個DataFrame 17 import sqlContext.implicits._ 18 // 使用case定義Schema(不能超過22個屬性) 19 case class Person(name: String, age: Int) 20 // 讀取文件創建MappedRDD 21 val peopleFile = sc.textFile("/test/people.csv") 22 // 運行時從某處獲取的Schema結構 23 val schemaArray = Array("name", "age") 24 // 創建Schema 25 val schema = StructType(schemaArray.map(fieldName => StructField(fieldName, StringType, true))) 26 // 將文本轉為RDD 27 val rowRDD = peopleFile.map(_.split(",")).map(cols => Row(cols(0), cols(1).trim)) 28 // 將Schema應用於RDD 29 val peopleDF = sqlContext.createDataFrame(rowRDD, schema) 30 // DataFrame註冊臨時表 31 peopleDF.registerTempTable("table_people") 32 33 // SQL 34 val teenagers = sqlContext.sql("select name, age from table_people where age >= 13 and age <= 19") 35 teenagers.collect.foreach(println) 36 } 37 38 }
Parquet文件數據源
1. Parquet優點:
a) 高效、Parquet採用列式存儲避免讀入不需要的數據,具有極好的性能和GC;
b) 方便的壓縮和解壓縮,並具有極好的壓縮比例;
c) 可直接讀寫Parquet文件,比磁碟更好的緩存效果。
2. Spark SQL支持根據Parquet文件自描述自動推斷Schema,生成DataFrame。
3. 編程示例:
1 // 載入文件創建DataFrame 2 val peopleDF = sqlContext.read.load("/test/people.parquet") 3 peopleDF.printSchema 4 // DataFrame註冊臨時表 5 peopleDF.registerTempTable("table_people") 6 7 // SQL 8 val teenagers = sqlContext.sql("select name, age from table_people where age >= 13 and age <= 19") 9 teenagers.collect.foreach(println)
4. 分區發現(partition discovery)
a) 與Hive分區表類似,通過分區列的值對錶設置分區目錄,載入Parquet數據源可自動發現和推斷分區信息。
b) 示例:有一個分區列為gender和country的分區表,載入路徑“/path/to/table”可自動提取分區信息
path └── to └── table ├── gender=male │ ├── ... │ │ │ ├── country=US │ │ └── data.parquet │ ├── country=CN │ │ └── data.parquet │ └── ... └── gender=female ├── ... │ ├── country=US │ └── data.parquet ├── country=CN │ └── data.parquet └── ...
創建的DataFrame的Schema:
root |-- name: string (nullable = true) |-- age: long (nullable = true) |-- gender: string (nullable = true) |-- country: string (nullable = true)
c) 分區列數據類型:支持numeric和string類型的自動推斷,通過“spark.sql.sources.partitionColumnTypeInference.enabled”配置開啟或關閉(預設開啟),關閉後分區列全為string類型。
JSON文件數據源
1. Spark SQL支持根據JSON文件自描述自動推斷Schema,生成DataFrame。
2. 示例:
1 // 載入文件創建DataFrame(JSON文件自描述Schema) 2 val peopleDF = sqlContext.read.format("json").load("/test/people.json") 3 peopleDF.printSchema 4 // DataFrame註冊臨時表 5 peopleDF.registerTempTable("table_people") 6 7 // SQL 8 val teenagers = sqlContext.sql("select name, age from table_people where age >= 13 and age <= 19") 9 teenagers.collect.foreach(println)
Hive數據源
1. HiveContext
a) 操作Hive數據源須創建SQLContext的子類HiveContext對象。
b) Standalone集群:添加hive-site.xml到$SPARK_HOME/conf目錄。
c) YARN集群:添加hive-site.xml到$YARN_CONF_DIR目錄;添加Hive元資料庫JDBC驅動jar文件到$HADOOP_HOME/lib目錄。
d) 最簡單方法:通過spark-submit命令參數--file和--jar參數分別指定hive-site.xml和Hive元資料庫JDBC驅動jar文件。
e) 未找到hive-site.xml:當前目錄下自動創建metastore_db和warehouse目錄。
f) 模板:
val sqlContext = new HiveContext(sc)
2. 使用HiveQL
a) “spark.sql.dialect”配置:SQLContext僅sql,HiveContext支持sql、hiveql(預設)。
b) 模板:
sqlContext.sql("HiveQL")
3. 支持Hive特性
a) Hive查詢語句,包括select、group by、order by、cluster by、sort by;
b) Hive運算符,包括:關係運算符(=、⇔、==、<>、<、>、>=、<=等)、算術運算符(+、-、*、/、%等)、邏輯運算符(and、&&、or、||等)、複雜類型構造函數、數據函數(sign、ln、cos等)、字元串函數(instr、length、printf);
c) 用戶自定義函數(UDF);
d) 用戶自定義聚合函數(UDAF);
e) 用戶自定義序列化格式(SerDes);
f) 連接操作,包括join、{left | right | full} outer join、left semi join、cross join;
g) 聯合操作(union);
h) 子查詢:select col from (select a + b as col from t1) t2;
i) 抽樣(Sampling);
j) 解釋(Explain);
k) 分區表(Partitioned table);
l) 所有Hive DDL操作函數,包括create table、create table as select、alter table;
m) 大多數Hive數據類型tinyint、smallint、int、bigint、boolean、float、double、string、binary、timestamp、date、array<>、map<>、struct<>。
資料庫JDBC數據源
1. Spark SQL支持載入資料庫表生成DataFrame。
2. 模板(註意:需要相關JDBC驅動jar文件)
val jdbcOptions = Map("url" -> "", "driver" -> "", "dbtable" -> "")
sqlContext.read.format("jdbc").options(jdbcOptions).load
3. JDBC參數
名稱 |
說明 |
url |
The JDBC URL to connect to. |
dbtable |
The JDBC table that should be read. Note that anything that is valid in a FROM clause of a SQL query can be used. For example, instead of a full table you could also use a subquery in parentheses. |
driver |
The class name of the JDBC driver to use to connect to this URL. |
partitionColumn, lowerBound, upperBound, numPartitions |
These options must all be specified if any of them is specified. They describe how to partition the table when reading in parallel from multiple workers. partitionColumn must be a numeric column from the table in question. Notice that lowerBound and upperBound are just used to decide the partition stride, not for filtering the rows in table. So all rows in the table will be partitioned and returned. |
fetchSize |
The JDBC fetch size, which determines how many rows to fetch per round trip. This can help performance on JDBC drivers which default to low fetch size (eg. Oracle with 10 rows). |
DataFrame Operation
1. 分類:
a) DataFrameAction
名稱 |
說明 |
collect: Array[Row] |
以Array形式返回DataFrame的所有Row |
collectAsList: List[Row] |
以List形式返回DataFrame的所有Row |
count(): Long |
返回DataFrame的Row數目 |
first(): Row |
返回第一個Row |
head(): Row |
返回第一個Row |
show(): Unit |
以表格形式顯示DataFrame的前20個Row |
take(n: Int): Array[Row] |
返回DataFrame前n個Row |
b) 基礎DataFrame函數(basic DataFrame functions)
名稱 |
說明 |
cache(): DataFrame.this.type |
緩存DataFrame |
columns: Array[String] |
以Array形式返回全部的列名 |
dtypes: Array[(String, String)] |
以Array形式返回全部的列名和數據類型 |
explain: Unit |
列印physical plan到控制台 |
isLocal: Boolean |
返回collect和take是否可以本地運行 |
persist(newLevel: StorageLevel: DataFrame.this.type |
根據StorageLevel持久化 |
printSchema(): Unit |
以樹格式列印Schema |
registerTempTable(tableName: String): Unit |
使用給定的名字註冊DataFrame為臨時表 |
schema: StructType |
返回DataFrame的Schema |
toDF(colNames: String*): DataFrame |
返回一個重新指定column的DataFrame |
unpersist(): DataFrame.this.type |
移除持久化 |
c) 集成語言查詢(language integrated queries)
名稱 |
說明 |
agg(aggExpr: (String, String), aggExpr: (String, String)*): DataFrame agg(exprs: Map[String, String]): DataFrame agg(expr: Column, exprs: Column*): DataFrame |
在整體DataFrame不分組聚合 |
apply(colName: String): Column |
以Column形式返回列名為colName的列 |
as(alias: String): DataFrame as(alias: Symbol): DataFrame |
以一個別名集方式返回一個新DataFrame |
col(colName: String): Column |
同apply |
cube(col: String, cols: String*): GroupedData |
使用專門的列(以便聚合),給當前DataFrame創建一個多維數據集 |
distinct: DataFrame |
對Row去重,返回新DataFrame |
drop(col: Column): DataFrame |
刪除一個列,返回新DataFrame |
except(other: DataFrame): DataFrame |
集合差,返回新DataFrame |
filter(conditionExpr: String): DataFrame filter(condition: Column): DataFrame |
使用給定的SQL表達式過濾 |
groupBy(col: String, cols: String*): GroupedData |
使用給定的列分組DataFrame,以便能夠聚合 |
intersect(other: DataFrame): DataFrame |
交集,返回新DataFrame |
limit(n: Int): DataFrame |
獲取前n行數據,返回新DataFrame |
join(right: DataFrame):DataFrame join(right: DataFrame, joinExprs: Column):DataFrame join(right: DataFrame, joinExprs: Column, joinType: String):DataFrame |
Join,第1個為笛卡爾積(Cross Join),第2個為Inner Join |
orderBy(col: String, cols: String*): DataFrame orderBy(sortExprs: Columns*): DataFrame |
使用給定表達式排序,返回新DataFrame |
sample(withReplacement: Boolean, fraction: Double): DataFrame |
使用隨機種子,抽樣部分行返回新DataFrame |
select(col: String, cols: String*): DataFrame select(cols: Column*): DataFrame selectExpr(exprs: String*): DataFrame |
選擇一個列集合 |
sort(col: String, cols: String*): DataFrame sort(sortExprs: Column*): DataFrame |
同orderBy |
unionAll(other: DataFrame): DataFrame |
集合和,返回新DataFrame |
where(conditionExpr: String): DataFrame where(condition: Column): DataFrame |
同filter |
withColumn(colName, col: Column) |
添加新列,返回新DataFrame |
withColumnRenamed(existingName: String, newName: String) |
重命名列,返回新DataFrame |
d) 輸出操作
名稱 |
說明 |
write |
保存DataFrame內容到外部文件存儲、Hive表: dataFrame.write.save("路徑") // 預設Parquet數據源 dataFrame.write.format("數據源名稱").save("路徑") dataFrame.write.saveAsTable("表名") dataFrame.write.insertInto("表名") |
e) RDD Operation
DataFrame本質是一個擁有多個分區的RDD,支持RDD Operation:coalesce、flatMap、foreach、foreachPartition、javaRDD、map、mapPartitions、repartition、toJSON、toJavaRDD等。
性能調優
緩存數據
1. 記憶體列式(in-memory columnar format)緩存:Spark SQL僅掃描需要的列,並自動調整壓縮比使記憶體使用率和GC壓力最小化。
2. 相關配置:
名稱 |
預設值 |
說明 |
spark.sql.inMemoryColumnarStorage.compressed |
true |
true時,Spark SQL基於數據統計為每列自動選擇壓縮編碼 |
spark.sql.inMemoryColumnarStorage.batchSize |
10000 |
控制記憶體列式緩存的批處理大小,大批量可提升記憶體使用率,但會增加記憶體溢出風險 |
3. 緩存/移除緩存代碼模板:
// 緩存方法1(lazy) sqlContext.cacheTable("表名") // 緩存方法2(lazy) dataFrame.cache() // 移除緩存(eager) sqlContext.uncacheTable("表名") // 註意:RDD的cache方法不是列式緩存 rdd.cache()
參數調優
名稱 |
預設值 |
說明 |
spark.sql.autoBroadcastJoinThreshold |
10485760 (10MB) |
當執行Join時,對一個將要被廣播到所有Worker的表配置最大位元組,通過設置為-1禁止廣播 |
spark.sql.tungsten.enabled |
true |
配置是否開啟Tungsten優化,預設開啟 |
spark.sql.shuffle.partitions |
200 |
當執行Join或Aggregation進行Shuffle時,配置可用分區數 |
案例
數據準備
1. 數據結構
a) 職工基本信息(people)
欄位 |
說明 |
name |
姓名 |
id |
ID |
gender |
性別 |
age |
年齡 |
year |
入職年份 |
position |
職位 |
deptid |
所在部門ID |
b) 部門基本信息(department)
欄位 |
說明 |
name |
名稱 |
deptid |
ID |
c) 職工考勤信息(attendance)
欄位 |
說明 |
id |
職工ID |
year |
年 |
month |
月 |
overtime |
加班 |
latetime |
遲到 |
absenteeism |
曠工 |
leaveearlytime |
早退小時 |
d) 職工工資清單(salary)
欄位 |
說明 |
id |
職工ID |
salary |
工資 |
2. 建庫、建表(spark-shell方式)
1 sqlContext.sql("create database hrs") 2 sqlContext.sql("use hrs") 3 sqlContext.sql("create external table if not exists people(name string, id int, gender string, age int, year int, position string, deptid int) row format delimited fields terminated by ',' lines terminated by '\n' location '/test/hrs/people'") 4 sqlContext.sql("create external table if not exists department(name string, deptid int) row format delimited fields terminated by ',' lines terminated by '\n' location '/test/hrs/department'") 5 sqlContext.sql("create external table if not exists attendance(id int, year int, month int, overtime int, latetime int, absenteeism int, leaveearlytime int) row format delimited fields terminated by ',' lines terminated by '\n' location '/test/hrs/attendance'") 6 sqlContext.sql("create external table if not exists salary(id int, salary int) row format delimited fields terminated by ',' lines terminated by '\n' location '/test/hrs/salary'")
3. 測試數據
a) 職工基本信息(people.csv)
Michael,1,male,37,2001,developer,2 Andy,2,female,33,2003,manager,1 Justin,3,female,23,2013,recruitingspecialist,3 John,4,male,22,2014,developer,2 Herry,5,male,27,2010,developer,1 Brewster,6,male,37,2001,manager,2 Brice,7,female,30,2003,manager,3 Justin,8,male,23,2013,recruitingspecialist,3 John,9,male,22,2014,developer,1 Herry,10,female,27,2010,recruitingspecialist,3
b) 部門基本信息(department.csv)
manager,1 researchhanddevelopment,2 humanresources,3
c) 職工考勤信息(attendance.csv)
1,2015,12,0,2,4,0 2,2015,8,5,0,5,3 3,2015,3,16,4,1,5 4,2015,3,0,0,0,0 5,2015,3,0,3,0,0 6,2015,3,32,0,0,0 7,2015,3,0,16,3,32 8,2015,19,36,0,0,0 9,2015,5,6,30,0,2 10,2015,10,6,56,40,0 1,2014,12,0,2,4,0 2,2014,8,5,0,5,3 3,2014,3,16,4,1,5 4,2014,3,0,0,0,0 5,2014,3,0,3,0,0 6,2014,3,32,0,0,0 7,2014,3,0,16,3,32 8,2014,19,36,0,0,0 9,2014,5,6,30,0,2 10,2014,10,6,56,40,0
d) 職工工資清單(salary.csv)
1,5000 2,10000 3,6000 4,7000 5,5000 6,11000 7,12000 8,5500 9,6500 10,4500
4. 上傳數據文件至HDFS
hadoop fs -mkdir /test/hrs/people hadoop fs -mkdir /test/hrs/department hadoop fs -mkdir /test/hrs/attendance hadoop fs -mkdir /test/hrs/salary hadoop fs -put people.csv /test/hrs/people hadoop fs -put department.csv /test/hrs/department hadoop fs -put attendance.csv /test/hrs/attendance hadoop fs -put salary.csv /test/hrs/salary
查詢部門職工數
1. HiveQL方式
1 sqlContext.sql("select d.name, count(p.id) from people p join department d on p.deptid = d.deptid group by d.name").show
2. Scala方式
1 val peopleDF = sqlContext.table("people") 2 val departmentDF = sqlContext.table("department") 3 peopleDF.join(departmentDF, peopleDF("deptid") === departmentDF("deptid")).groupBy(departmentDF("name")).agg(count(peopleDF("id")).as("cnt")).select(departmentDF("name"), col("cnt")).show
3. 結果
查詢各部門職工工資總數,併排序
1. HiveQL方式
1 sqlContext.sql("select d.name, sum(s.salary) as salarysum from people p join department d on p.deptid = d.deptid join salary s on p.id = s.id group by d.name order by salarysum").show
2. Scala方式
1 val peopleDF = sqlContext.table("people") 2 val departmentDF = sqlContext.table("department") 3 val salaryDF = sqlContext.table("salary") 4 peopleDF.join(departmentDF, peopleDF("deptid") === departmentDF("deptid")).join(salaryDF, peopleDF("id") === salaryDF("id")).groupBy(departmentDF("name")).agg(sum(salaryDF("salary")).as("salarysum")).orderBy("salarysum").select(departmentDF("name"), col("salarysum")).show
3. 結果
查詢各部門職工考勤信息
1. HiveQL方式
1 sqlContext.sql("select d.name, ai.year, sum(ai.attinfo) from (select p.id, p.deptid, a.year, a.month, (a.overtime - a.latetime - a.absenteeism - a.leaveearlytime) as attinfo from attendance a join people p on a.id = p.id) ai join department d on ai.deptid = d.deptid group by d.name, ai.year").show
2. Scala方式
1 val attendanceDF = sqlContext.table("attendance") 2 val peopleDF = sqlContext.table("people") 3 val departmentDF = sqlContext.table("department") 4 val subqueryDF = attendanceDF.join(peopleDF, attendanceDF("id") === peopleDF("id")).select(peopleDF("id"), peopleDF("deptid"), attendanceDF("year"), attendanceDF("month"), (attendanceDF("overtime") - attendanceDF("latetime") - attendanceDF("absenteeism") - attendanceDF("leaveearlytime")).as("attinfo")) 5 subqueryDF.join(departmentDF, subqueryDF("deptid") === departmentDF("deptid")).groupBy(departmentDF("name"), subqueryDF("year")).agg(sum(subqueryDF("attinfo")).as("attinfosum")).select(departmentDF("name"), subqueryDF("year"), col("attinfosum")).show
3. 結果