SparkSql作為Spark的結構化數據處理模塊,提供了非常強大的API,讓分析人員用一次,就會為之傾倒,為之著迷,為之至死不渝。在內部,SparkSQL使用額外結構信息來執行額外的優化。在外部,可以使用SQL和DataSet 的API與之交互。本文筆者將帶你走進SparkSql的世界,領略Spa ...
SparkSql作為Spark的結構化數據處理模塊,提供了非常強大的API,讓分析人員用一次,就會為之傾倒,為之著迷,為之至死不渝。在內部,SparkSQL使用額外結構信息來執行額外的優化。在外部,可以使用SQL和DataSet 的API與之交互。本文筆者將帶你走進SparkSql的世界,領略SparkSql之諸多妙處。
一、DataSet和DataFrame
當使用編程語言對結構化數據進行操作時候,SparkSql中返回的數據類型是DataSet/DataFrame,因此開篇筆者就先對這兩種數據類型進行簡單的介紹。
Dataset 是分散式的數據集合。是Spark 1.6中添加的一個新介面,是特定域對象中的強類型集合,它可以使用函數或者相關操作並行地進行轉換等操作,數據集可以由JVM對象構造,然後使用函數轉換(map、flatmap、filter等)進行操作。Dataset 支持Scala和javaAPI,不支持Python API。
DataFrame是由列組成的數據集,它在概念上等同於關係資料庫中的表或R/Python中的data frame,但在查詢引擎上進行了豐富的優化。DataFrame可以由各種各樣的源構建,例如:結構化數據文件、hive中的表、外部資料庫或現有的RDD。
二、SparkSQL基於DataFrame的操作
import org.apache.spark.sql.SparkSession 2val spark = SparkSession 3 .builder() 4 .appName("Spark SQL basic example") 5 .getOrCreate() 6//引入Spark的隱式類型轉換,如將RDD轉換成 DataFrame 7import spark.implicits._ 8val df = spark.read.json("/data/tmp/SparkSQL/people.json") 9df.show() //將DataFrame的內容進行標準輸出 10//+---+-------+ 11//|age| name| 12//+---+-------+ 13//| |Michael| 14//| 19| Andy| 15//| 30| Justin| 16//+---+-------+ 17 18df.printSchema() //列印出DataFrame的表結構 19//root 20// |-- age: string (nullable = true) 21// |-- name: string (nullable = true) 22 23df.select("name").show() 24//類似於select name from DataFrame的SQL語句 25 26df.select($"name", $"age" + 1).show() 27//類似於select name,age+1 from DataFrame的SQL語句 28//此處註意,如果對列進行操作,所有列名前都必須加上$符號 29 30df.filter($"age" > 21).show() 31//類似於select * from DataFrame where age>21 的SQL語句 32 33df.groupBy("age").count().show() 34//類似於select age,count(age) from DataFrame group by age; 35 36//同時也可以直接寫SQL進行DataFrame數據的分析 37df.createOrReplaceTempView("people") 38val sqlDF = spark.sql("SELECT * FROM people") 39sqlDF.show()
三、SparkSQL基於DataSet的操作
由於DataSet吸收了RDD和DataFrame的優點,所有可以同時向操作RDD和DataFrame一樣來操作DataSet。看下邊一個簡單的例子。
1case class Person(name: String, age: Long) 2// 通過 case類創建DataSet 3val caseClassDS = Seq(Person("Andy", 32)).toDS() 4caseClassDS.show() 5// +----+---+ 6// |name|age| 7// +----+---+ 8// |Andy| 32| 9// +----+---+ 10 11// 通過基本類型創建DataSet 12importing spark.implicits._ 13val primitiveDS = Seq(1, 2, 3).toDS() 14primitiveDS.map(_ + 1).collect() 15// Returns: Array(2, 3, 4) 16 17// 將DataFrames轉換成DataSet 18val path = "examples/src/main/resources/people.json" 19val peopleDS = spark.read.json(path).as[Person] 20peopleDS.show() 21// +----+-------+ 22// | age| name| 23// +----+-------+ 24// |null|Michael| 25// | 30| Andy| 26// | 19| Justin| 27// +----+-------+
在上邊的例子中能夠發現DataSet的創建是非常簡單的,但是筆者需要強調一點,DataSet是強類型的,也就是說DataSet的每一列都有指定的列標識符和數據類型。下邊的列子將進一步介紹DataSet與RDD的交互。
1import spark.implicits._ 2//將RDD轉換成DataFrame 3val peopleDF = spark.sparkContext 4 .textFile("examples/src/main/resources/people.txt") 5 .map(_.split(",")) 6 .map(attributes=>Person(attributes(0),attributes(1).trim.toInt)) 7 .toDF() 8// 將RDD註冊為一個臨時視圖 9peopleDF.createOrReplaceTempView("people") 10//對臨時視圖進行Sql查詢 11val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19") 12 13// 對teenagersDF 對應的DataFrame進行RDD的運算元map操作 14teenagersDF.map(teenager => "Name: " + teenager(0)).show() 15// +------------+ 16// | value| 17// +------------+ 18// |Name: Justin| 19// +------------+ 20 21// 與上一條語句效果一樣 22teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show() 23// +------------+ 24// | value| 25// +------------+ 26// |Name: Justin| 27// +------------+
四、SparkSQL操作HIve表
Spark SQL支持讀取和寫入存儲在Apache HIVE中的數據。然而,由於Hive具有大量的依賴關係,預設情況下這些依賴性不包含在Spark分佈中。如果能在classpath路徑找到Hive依賴文件,Spark將自動載入它們。另外需要註意的是,這些Hive依賴項須存在於所有Spark的Worker節點上,因為它們需要訪問Hive序列化和反序列化庫(SerDes),以便訪問存儲在Hive中的數據。
1import java.io.File 2import org.apache.spark.sql.{Row, SaveMode, SparkSession} 3 4case class Record(key: Int, value: String) 5 6// 設置hive資料庫預設的路徑 7val warehouseLocation = new File("spark-warehouse").getAbsolutePath 8 9val spark = SparkSession 10 .builder() 11 .appName("Spark Hive Example") 12 .config("spark.sql.warehouse.dir", warehouseLocation) 13 .enableHiveSupport() 14 .getOrCreate() 15 16import spark.implicits._ 17import spark.sql 18 19//創建hive表,導入數據,並且查詢數據 20sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive") 21sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") 22sql("SELECT * FROM src").show() 23 24// +---+-------+ 25// |key| value| 26// +---+-------+ 27// |238|val_238| 28// | 86| val_86| 29// |311|val_311| 30// ... 31 32//對hive表數據進行聚合操作 33sql("SELECT COUNT(*) FROM src").show() 34// +--------+ 35// |count(1)| 36// +--------+ 37// | 500 | 38// +--------+ 39 40// sql執行的查詢結果返回DataFrame類型數據,支持常用的RDD操作 41val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key") 42val stringsDS = sqlDF.map { 43 case Row(key: Int, value: String) => s"Key: $key, Value: $value" 44} 45stringsDS.show() 46// +--------------------+ 47// | value| 48// +--------------------+ 49// |Key: 0, Value: val_0| 50// |Key: 0, Value: val_0| 51// |Key: 0, Value: val_0| 52// ... 53 54// 通過DataFrames創建一個臨時視圖val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i"))) 55recordsDF.createOrReplaceTempView("records") 56 57// 查詢操作可以將臨時的視圖與HIve表中數據進行關聯查詢 58sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show() 59// +---+------+---+------+ 60// |key| value|key| value| 61// +---+------+---+------+ 62// | 2| val_2| 2| val_2| 63// | 4| val_4| 4| val_4| 64// | 5| val_5| 5| val_5| 65// ... 66 67// 創建一個Hive表,並且以parquet格式存儲數據 68sql("CREATE TABLE hive_records(key int, value string) STORED AS PARQUET") 69// 講DataFrame中數據保存到Hive表裡 70val df = spark.table("src") 71df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records") 72sql("SELECT * FROM hive_records").show() 73// +---+-------+ 74// |key| value| 75// +---+-------+ 76// |238|val_238| 77// | 86| val_86| 78// |311|val_311| 79// ... 80 81// 在指定路徑創建一個Parquet文件並且寫入數據 82val dataDir = "/tmp/parquet_data" 83spark.range(10).write.parquet(dataDir) 84// 創建HIve外部表 85sql(s"CREATE EXTERNAL TABLE hive_ints(key int) STORED AS PARQUET LOCATION '$dataDir'") 86sql("SELECT * FROM hive_ints").show() 87// +---+ 88// |key| 89// +---+ 90// | 0| 91// | 1| 92// | 2| 93// ... 94 95// Turn on flag for Hive Dynamic Partitioning 96spark.sqlContext.setConf("hive.exec.dynamic.partition", "true") 97spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict") 98// 通過DataFrame的API創建HIve分區表 99df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl") 100sql("SELECT * FROM hive_part_tbl").show() 101// +-------+---+ 102// | value|key| 103// +-------+---+ 104// |val_238|238| 105// | val_86| 86| 106// |val_311|311| 107// ... 108 109spark.stop()
當然SparkSql的操作遠不止這些,它可以直接對文件快執行Sql查詢,也可以通過JDBC連接到關係型資料庫,對關係型資料庫中的數據進行一些運算分析操作。如果讀者感覺不過癮,可以留言與筆者交流,也可以通過Spark官網查閱相關例子進行學習。下一篇關於Spark的文章,筆者將詳細的介紹Spark的常用運算元,以滿足渴望進行數據分析的小伙伴們的求知的欲望。
更多精彩內容,歡迎掃碼關註以下微信公眾號:大數據技術宅。大數據、AI從關註開始