使用Spark SQL,除了使用之前介紹的方法,實際上還可以使用SQLContext或者HiveContext通過編程的方式實現。前者支持SQL語法解析器(SQL-92語法),後者支持SQL語法解析器和HiveSQL語法解析器,預設為HiveSQL語法解析器,用戶可以通過配置切換成SQL語法解析器來 ...
使用Spark SQL,除了使用之前介紹的方法,實際上還可以使用SQLContext或者HiveContext通過編程的方式實現。前者支持SQL語法解析器(SQL-92語法),後者支持SQL語法解析器和HiveSQL語法解析器,預設為HiveSQL語法解析器,用戶可以通過配置切換成SQL語法解析器來運行HiveQL不支持的語法,如:select 1。實際上HiveContext是SQLContext的子類,因此在HiveContext運行過程中除了override的函數和變數,可以使用和SQLContext一樣的函數和變數。
因為spark-shell工具實際就是運行的scala程式片段,為了方便,下麵採用spark-shell進行演示。
首先來看SQLContext,因為是標準SQL,可以不依賴於Hive的metastore,比如下麵的例子(沒有啟動hive metastore):
[root@BruceCentOS4 ~]# $SPARK_HOME/bin/spark-shell --master yarn --conf spark.sql.catalogImplementation=in-memory
scala> case class offices(office:Int,city:String,region:String,mgr:Int,target:Double,sales:Double)
defined class offices
scala> val rddOffices=sc.textFile("/user/hive/warehouse/orderdb.db/offices/offices.txt").map(_.split("\t")).map(p=>offices(p(0).trim.toInt,p(1),p(2),p(3).trim.toInt,p(4).trim.toDouble,p(5).trim.toDouble))
rddOffices: org.apache.spark.rdd.RDD[offices] = MapPartitionsRDD[3] at map at <console>:26
scala> val officesDataFrame = spark.createDataFrame(rddOffices)
officesDataFrame: org.apache.spark.sql.DataFrame = [office: int, city: string ... 4 more fields]
scala> officesDataFrame.createOrReplaceTempView("offices")
scala> spark.sql("select city from offices where region='Eastern'").map(t=>"City: " + t(0)).collect.foreach(println)
City: NewYork
City: Chicago
City: Atlanta
scala>
執行上面的命令後,實際上在yarn集群中啟動了一個yarn client模式的Spark Application,然後在scala>提示符後輸入的語句會生成RDD的transformation,最後一條命令中的collect會生成RDD的action,即會觸發Job的提交和程式的執行。
命令行中之所以加上--conf spark.sql.catalogImplementation=in-memory選項,是因為spark-shell中的預設啟動的SparkSession對象spark是預設支持Hive的,不帶這個選項啟動的話,程式就會去連接hive metastore,因為這裡並沒有啟動hive metastore,因此程式在執行createDataFrame函數時會報錯。
程式中的第一行是1個case class語句,這裡是定義後面的數據文件的模式的(定義模式除了這個方法,其實還有另外一種方法,後面再介紹)。第二行從hdfs中讀取一個文本文件,並工通過map映射到了模式上面。第三行基於第二行的RDD生成DataFrame,第四行基於第三行的DataFrame註冊了一個邏輯上的臨時表,最後一行就可以通過SparkSession的sql函數來執行sql語句了。
實際上,SQLContext是Spark 1.x中的SQL入口,在Spark 2.x中,使用SparkSession作為SQL的入口,但是為了向後相容,Spark 2.x仍然支持SQLContext來操作SQL,不過會提示deprecated,所以上面的例子是採用Spark 2.x中的寫法。
實際上還有另外一種方法來操作SQL,針對同樣的數據,例如:
scala> import org.apache.spark.sql._
import org.apache.spark.sql._
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
scala> val schema = new StructType(Array(StructField("office", IntegerType, false), StructField("city", StringType, false), StructField("region", StringType, false), StructField("mgr", IntegerType, true), StructField("target", DoubleType, true), StructField("sales", DoubleType, false)))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(office,IntegerType,false), StructField(city,StringType,false), StructField(region,StringType,false), StructField(mgr,IntegerType,true), StructField(target,DoubleType,true), StructField(sales,DoubleType,false))
scala> val rowRDD = sc.textFile("/user/hive/warehouse/orderdb.db/offices/offices.txt").map(_.split("\t")).map(p => Row(p(0).trim.toInt,p(1),p(2),p(3).trim.toInt,p(4).trim.toDouble,p(5).trim.toDouble))
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[3] at map at <console>:30
scala> val dataFrame = spark.createDataFrame(rowRDD, schema)
dataFrame: org.apache.spark.sql.DataFrame = [office: int, city: string ... 4 more fields]
scala> dataFrame.createOrReplaceTempView("offices")
scala> spark.sql("select city from offices where region='Eastern'").map(t=>"City: " + t(0)).collect.foreach(println)
City: NewYork
City: Chicago
City: Atlanta
這個例子與之前的例子有一些不同,主要的地方有3個:
1. 之前的例子是採用case class定義模式,Spark採用反射來推斷Schema;而這個例子採用StructType類型的對象來定義模式,它接收一個數組,數組成員是StructField對象,代表一個欄位的定義,每個欄位的定義由欄位名稱、欄位類型和是否允許為空組成;
2. 對於代表數據的RDD,之前的例子是直接用case class定義的類型來分割欄位,而這個例子是用的Row類型;
3. 在使用createDataFrame函數生成DataFrame時,該函數的參數不一樣,之前的例子只要傳入RDD對象即可(對象中隱含了模式),而這個例子需要同時傳入RDD和定義的schema;
實際編程中建議採用第二種方法,因為其更加靈活,schema信息可以不必是寫死的,而是可以在程式運行的過程中生成。
下麵接著來看HiveContext的用法,使用HiveContext之前需要確保:
- 使用的Spark是支持Hive的;
- Hive的配置文件hive-site.xml已經在Spark的conf目錄下;
- hive metastore已經啟動;
舉例說明:
首先啟動hive metastore:
[root@BruceCentOS ~]# nohup hive --service metastore &
然後仍然通過spark-shell來舉例說明,啟動spark-shell,如下所示:
[root@BruceCentOS4 ~]# $SPARK_HOME/bin/spark-shell --master yarn
scala> spark.sql("show databases").collect.foreach(println)
[default]
[orderdb]
scala> spark.sql("use orderdb")
res2: org.apache.spark.sql.DataFrame = []
scala> spark.sql("show tables").collect.foreach(println)
[orderdb,customers,false]
[orderdb,offices,false]
[orderdb,orders,false]
[orderdb,products,false]
[orderdb,salesreps,false]
scala> spark.sql("select city from offices where region='Eastern'").map(t=>"City: " + t(0)).collect.foreach(println)
City: NewYork
City: Chicago
City: Atlanta
scala>
可以看到這次啟動spark-shell沒有帶上最後那個選項,這是因為這裡我們打算用HiveContext來操作Hive中的數據,需要支持Hive。前面說過spark-shell是預設開啟了Hive支持的。同SQLContext類似,Spark 2.x中也不需要再用HiveContext對象來操作SQL了,直接用SparkSession對象來操作就好了。可以看到這裡可以直接操作表,不用再定義schema,這是因為schema是由外部的hive metastore定義的,spark通過連接到hive metastore來讀取表的schema信息,因此這裡能直接操作SQL。
另外,除了上面的使用SQLContext操作普通文件(需要額外定義模式)和使用HiveContext操作Hive表數據(需要開啟hive metastore)之外,SQLContext還能操作JSON、PARQUET等文件,由於這兩種數據文件自己帶了模式信息,因此可以直接基於文件創建DataFrame,例如:
scala> val df = spark.read.json("file:///opt/spark/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.createOrReplaceTempView("people")
scala> spark.sql("select name,age from people where age>19").map(t=>"Name :" + t(0) + ", Age: " + t(1)).collect.foreach(println)
Name :Andy, Age: 30
最後來看下DataFrame的另一種叫做DSL(Domain Specific Language)的用法。
scala> val df = spark.read.json("file:///opt/spark/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
scala> df.select("name").show()
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
scala> df.select(df("name"), df("age") + 1).show()
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+
scala> df.filter(df("age") > 21).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
scala> df.groupBy("age").count().show()
+----+-----+
| age|count|
+----+-----+
| 19| 1|
|null| 1|
| 30| 1|
+----+-----+
scala>
以上是對Spark SQL的SQLContext和HiveContext基本用法的一些總結,都是採用spark-shell工具舉的例子。實際上由於spark-shell是運行scala程式片段的工具,上述例子完全可以改成獨立的應用程式。我將在下一篇博文當中嘗試使用Scala、Java和Python來編寫獨立的程式來操作上面的示例hive資料庫orderdb,可以適當使用一些較為複雜的SQL來統計分析數據。