理解Spark SQL(二)—— SQLContext和HiveContext

来源:https://www.cnblogs.com/roushi17/archive/2019/11/21/sqlcontext_hivecontext.html
-Advertisement-
Play Games

使用Spark SQL,除了使用之前介紹的方法,實際上還可以使用SQLContext或者HiveContext通過編程的方式實現。前者支持SQL語法解析器(SQL-92語法),後者支持SQL語法解析器和HiveSQL語法解析器,預設為HiveSQL語法解析器,用戶可以通過配置切換成SQL語法解析器來 ...


使用Spark SQL,除了使用之前介紹的方法,實際上還可以使用SQLContext或者HiveContext通過編程的方式實現。前者支持SQL語法解析器(SQL-92語法),後者支持SQL語法解析器和HiveSQL語法解析器,預設為HiveSQL語法解析器,用戶可以通過配置切換成SQL語法解析器來運行HiveQL不支持的語法,如:select 1。實際上HiveContext是SQLContext的子類,因此在HiveContext運行過程中除了override的函數和變數,可以使用和SQLContext一樣的函數和變數。

因為spark-shell工具實際就是運行的scala程式片段,為了方便,下麵採用spark-shell進行演示。

首先來看SQLContext,因為是標準SQL,可以不依賴於Hive的metastore,比如下麵的例子(沒有啟動hive metastore):

[root@BruceCentOS4 ~]# $SPARK_HOME/bin/spark-shell --master yarn --conf spark.sql.catalogImplementation=in-memory

 

 scala> case class offices(office:Int,city:String,region:String,mgr:Int,target:Double,sales:Double)
defined class offices

scala> val rddOffices=sc.textFile("/user/hive/warehouse/orderdb.db/offices/offices.txt").map(_.split("\t")).map(p=>offices(p(0).trim.toInt,p(1),p(2),p(3).trim.toInt,p(4).trim.toDouble,p(5).trim.toDouble))
rddOffices: org.apache.spark.rdd.RDD[offices] = MapPartitionsRDD[3] at map at <console>:26

scala> val officesDataFrame = spark.createDataFrame(rddOffices)
officesDataFrame: org.apache.spark.sql.DataFrame = [office: int, city: string ... 4 more fields]

scala> officesDataFrame.createOrReplaceTempView("offices")

scala> spark.sql("select city from offices where region='Eastern'").map(t=>"City: " + t(0)).collect.foreach(println)
City: NewYork                                                                   
City: Chicago
City: Atlanta

scala>

 執行上面的命令後,實際上在yarn集群中啟動了一個yarn client模式的Spark Application,然後在scala>提示符後輸入的語句會生成RDD的transformation,最後一條命令中的collect會生成RDD的action,即會觸發Job的提交和程式的執行。

命令行中之所以加上--conf spark.sql.catalogImplementation=in-memory選項,是因為spark-shell中的預設啟動的SparkSession對象spark是預設支持Hive的,不帶這個選項啟動的話,程式就會去連接hive metastore,因為這裡並沒有啟動hive metastore,因此程式在執行createDataFrame函數時會報錯。

程式中的第一行是1個case class語句,這裡是定義後面的數據文件的模式的(定義模式除了這個方法,其實還有另外一種方法,後面再介紹)。第二行從hdfs中讀取一個文本文件,並工通過map映射到了模式上面。第三行基於第二行的RDD生成DataFrame,第四行基於第三行的DataFrame註冊了一個邏輯上的臨時表,最後一行就可以通過SparkSession的sql函數來執行sql語句了。

實際上,SQLContext是Spark 1.x中的SQL入口,在Spark 2.x中,使用SparkSession作為SQL的入口,但是為了向後相容,Spark 2.x仍然支持SQLContext來操作SQL,不過會提示deprecated,所以上面的例子是採用Spark 2.x中的寫法。

實際上還有另外一種方法來操作SQL,針對同樣的數據,例如:

scala> import org.apache.spark.sql._
import org.apache.spark.sql._

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> val schema = new StructType(Array(StructField("office", IntegerType, false), StructField("city", StringType, false), StructField("region", StringType, false), StructField("mgr", IntegerType, true), StructField("target", DoubleType, true), StructField("sales", DoubleType, false)))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(office,IntegerType,false), StructField(city,StringType,false), StructField(region,StringType,false), StructField(mgr,IntegerType,true), StructField(target,DoubleType,true), StructField(sales,DoubleType,false))

scala> val rowRDD = sc.textFile("/user/hive/warehouse/orderdb.db/offices/offices.txt").map(_.split("\t")).map(p => Row(p(0).trim.toInt,p(1),p(2),p(3).trim.toInt,p(4).trim.toDouble,p(5).trim.toDouble))
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[3] at map at <console>:30

scala> val dataFrame = spark.createDataFrame(rowRDD, schema)
dataFrame: org.apache.spark.sql.DataFrame = [office: int, city: string ... 4 more fields]

scala> dataFrame.createOrReplaceTempView("offices")

scala> spark.sql("select city from offices where region='Eastern'").map(t=>"City: " + t(0)).collect.foreach(println)
City: NewYork                                                                   
City: Chicago
City: Atlanta

這個例子與之前的例子有一些不同,主要的地方有3個:

1. 之前的例子是採用case class定義模式,Spark採用反射來推斷Schema;而這個例子採用StructType類型的對象來定義模式,它接收一個數組,數組成員是StructField對象,代表一個欄位的定義,每個欄位的定義由欄位名稱、欄位類型和是否允許為空組成;

2. 對於代表數據的RDD,之前的例子是直接用case class定義的類型來分割欄位,而這個例子是用的Row類型;

3. 在使用createDataFrame函數生成DataFrame時,該函數的參數不一樣,之前的例子只要傳入RDD對象即可(對象中隱含了模式),而這個例子需要同時傳入RDD和定義的schema;

實際編程中建議採用第二種方法,因為其更加靈活,schema信息可以不必是寫死的,而是可以在程式運行的過程中生成。

 

下麵接著來看HiveContext的用法,使用HiveContext之前需要確保:

  • 使用的Spark是支持Hive的;
  • Hive的配置文件hive-site.xml已經在Spark的conf目錄下;
  • hive metastore已經啟動;

舉例說明:

首先啟動hive metastore:

[root@BruceCentOS ~]# nohup hive --service metastore &

然後仍然通過spark-shell來舉例說明,啟動spark-shell,如下所示:

[root@BruceCentOS4 ~]# $SPARK_HOME/bin/spark-shell --master yarn

scala> spark.sql("show databases").collect.foreach(println)
[default]
[orderdb]

scala> spark.sql("use orderdb")
res2: org.apache.spark.sql.DataFrame = []

scala> spark.sql("show tables").collect.foreach(println)
[orderdb,customers,false]
[orderdb,offices,false]
[orderdb,orders,false]
[orderdb,products,false]
[orderdb,salesreps,false]

scala> spark.sql("select city from offices where region='Eastern'").map(t=>"City: " + t(0)).collect.foreach(println)
City: NewYork                                                                   
City: Chicago
City: Atlanta

scala>

可以看到這次啟動spark-shell沒有帶上最後那個選項,這是因為這裡我們打算用HiveContext來操作Hive中的數據,需要支持Hive。前面說過spark-shell是預設開啟了Hive支持的。同SQLContext類似,Spark 2.x中也不需要再用HiveContext對象來操作SQL了,直接用SparkSession對象來操作就好了。可以看到這裡可以直接操作表,不用再定義schema,這是因為schema是由外部的hive metastore定義的,spark通過連接到hive metastore來讀取表的schema信息,因此這裡能直接操作SQL。

 

另外,除了上面的使用SQLContext操作普通文件(需要額外定義模式)和使用HiveContext操作Hive表數據(需要開啟hive metastore)之外,SQLContext還能操作JSON、PARQUET等文件,由於這兩種數據文件自己帶了模式信息,因此可以直接基於文件創建DataFrame,例如:

scala> val df = spark.read.json("file:///opt/spark/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]                

scala> df.createOrReplaceTempView("people")

scala> spark.sql("select name,age from people where age>19").map(t=>"Name :" + t(0) + ", Age: " + t(1)).collect.foreach(println)
Name :Andy, Age: 30    

 

最後來看下DataFrame的另一種叫做DSL(Domain Specific Language)的用法。

scala> val df = spark.read.json("file:///opt/spark/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]                

scala> df.show()
+----+-------+                                                                  
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+


scala> df.select("name").show()
+-------+                                                                       
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+


scala> df.select(df("name"), df("age") + 1).show()
+-------+---------+                                                             
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+


scala> df.filter(df("age") > 21).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+


scala> df.groupBy("age").count().show()
+----+-----+                                                                    
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+


scala>

以上是對Spark SQL的SQLContext和HiveContext基本用法的一些總結,都是採用spark-shell工具舉的例子。實際上由於spark-shell是運行scala程式片段的工具,上述例子完全可以改成獨立的應用程式。我將在下一篇博文當中嘗試使用Scala、Java和Python來編寫獨立的程式來操作上面的示例hive資料庫orderdb,可以適當使用一些較為複雜的SQL來統計分析數據。

 

 

 

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 1、ifconfig命令臨時配置IP地址 2、setup工具永久配置IP地址 3、修改網路配置文件 4、圖形界面配置IP地址 ifconfig命令臨時配置IP地址 主要的作用是查看網路信息,也可以臨時設置網卡IP地址 輸出的第一行信息: 首先標明瞭是乙太網和當前電腦的網卡的MAC地址 第二行信息: ...
  • OSI/ISO七層模型和TCP/IP四層模型 網路層協議和IP劃分 OSI的七層框架 物理層:設備之間的比特流的傳輸、物理介面、電氣特性等。 數據鏈路層:成幀、用MAC地址訪問媒介、錯誤檢測與修正。 網路層:提供邏輯地址、選路。 傳輸層:可靠與不可靠的傳輸、傳輸前的錯誤檢測、流量控。 會話層:對應用 ...
  • Ctrl + Alt + Fx Linux多用戶、多任務切換 ...
  • 一 Pod生命周期管理 1.1 Pod生命周期 Pod在整個生命周期過程中被系統定義瞭如下各種狀態。 狀態值 描述 Pending API Server已經創建該Pod,且Pod內還有一個或多個容器的鏡像沒有創建,包括正在下載鏡像的過程。 Running Pod內所有容器均已創建,且至少有一個容器處 ...
  • 最近學習了linux關於進程間通信的相關知識,所以決定藉助進程和共用記憶體,並按照生產者消費者模型來創建一個簡易聊天程式。下麵簡單的說一下程式的思路。 首先是服務端程式,服務端會創建兩個進程,進程1負責接收客戶端傳送過來的消息,並存儲起來。進程2負責讀取進程1存取的消息。這裡使用到了生產者和消費者編程 ...
  • glibc是gnu發佈的libc庫,即c運行庫,glibc是linux系統中最底層的api,幾乎其它任何運行庫都會依賴於glibc。glibc除了封裝linux操作系統所提供的系統服務外,它本身也提供了許多其它一些必要功能服務的實現。很多linux的基本命令,比如ls,mv,cp, rm, ll,l ...
  • 互聯網概述 WWW:萬維網 FTP:文件傳輸協議 E-MAIL:電子郵件 WWW 典型的C/S架構 URL:統一資源定位 協議+功能變數名稱或IP:埠+網頁路徑+網頁名 http://www.xxx.com:80/index.php .com 一級功能變數名稱,由功能變數名稱分配商分配的 xxxx 二級功能變數名稱,個人、企業向 ...
  • 數據完整性 1.域完整性: 匹配完整性:非空、預設 欄位/列 2.實體完整性: 匹配完整性:主鍵、唯一鍵 記錄/行 3.引用完整性: 匹配完整性:外鍵 表與表之間 約束:constraint MySQL中的約束分類 主鍵:primary key 唯一鍵:unique 非空:not null 預設:d ...
一周排行
    -Advertisement-
    Play Games
  • 新改進提供的Taurus Rpc 功能,可以簡化微服務間的調用,同時可以不用再手動輸出模塊名稱,或調用路徑,包括負載均衡,這一切,由框架實現並提供了。新的Taurus Rpc 功能,將使得服務間的調用,更加輕鬆、簡約、高效。 ...
  • 本章將和大家分享ES的數據同步方案和ES集群相關知識。廢話不多說,下麵我們直接進入主題。 一、ES數據同步 1、數據同步問題 Elasticsearch中的酒店數據來自於mysql資料庫,因此mysql數據發生改變時,Elasticsearch也必須跟著改變,這個就是Elasticsearch與my ...
  • 引言 在我們之前的文章中介紹過使用Bogus生成模擬測試數據,今天來講解一下功能更加強大自動生成測試數據的工具的庫"AutoFixture"。 什麼是AutoFixture? AutoFixture 是一個針對 .NET 的開源庫,旨在最大程度地減少單元測試中的“安排(Arrange)”階段,以提高 ...
  • 經過前面幾個部分學習,相信學過的同學已經能夠掌握 .NET Emit 這種中間語言,並能使得它來編寫一些應用,以提高程式的性能。隨著 IL 指令篇的結束,本系列也已經接近尾聲,在這接近結束的最後,會提供幾個可供直接使用的示例,以供大伙分析或使用在項目中。 ...
  • 當從不同來源導入Excel數據時,可能存在重覆的記錄。為了確保數據的準確性,通常需要刪除這些重覆的行。手動查找並刪除可能會非常耗費時間,而通過編程腳本則可以實現在短時間內處理大量數據。本文將提供一個使用C# 快速查找並刪除Excel重覆項的免費解決方案。 以下是實現步驟: 1. 首先安裝免費.NET ...
  • C++ 異常處理 C++ 異常處理機制允許程式在運行時處理錯誤或意外情況。它提供了捕獲和處理錯誤的一種結構化方式,使程式更加健壯和可靠。 異常處理的基本概念: 異常: 程式在運行時發生的錯誤或意外情況。 拋出異常: 使用 throw 關鍵字將異常傳遞給調用堆棧。 捕獲異常: 使用 try-catch ...
  • 優秀且經驗豐富的Java開發人員的特征之一是對API的廣泛瞭解,包括JDK和第三方庫。 我花了很多時間來學習API,尤其是在閱讀了Effective Java 3rd Edition之後 ,Joshua Bloch建議在Java 3rd Edition中使用現有的API進行開發,而不是為常見的東西編 ...
  • 框架 · 使用laravel框架,原因:tp的框架路由和orm沒有laravel好用 · 使用強制路由,方便介面多時,分多版本,分文件夾等操作 介面 · 介面開發註意欄位類型,欄位是int,查詢成功失敗都要返回int(對接java等強類型語言方便) · 查詢介面用GET、其他用POST 代碼 · 所 ...
  • 正文 下午找企業的人去鎮上做貸後。 車上聽同事跟那個司機對罵,火星子都快出來了。司機跟那同事更熟一些,連我在內一共就三個人,同事那一手指桑罵槐給我都聽愣了。司機也是老社會人了,馬上聽出來了,為那個無辜的企業經辦人辯護,實際上是為自己辯護。 “這個事情你不能怪企業。”“但他們總不能讓銀行的人全權負責, ...
  • 1. JUnit 最佳實踐指南 原文: https://howtodoinjava.com/best-practices/unit-testing-best-practices-junit-reference-guide/ 我假設您瞭解 JUnit 的基礎知識。 如果您沒有基礎知識,請首先閱讀(已針 ...