理解Spark SQL(二)—— SQLContext和HiveContext

来源:https://www.cnblogs.com/roushi17/archive/2019/11/21/sqlcontext_hivecontext.html
-Advertisement-
Play Games

使用Spark SQL,除了使用之前介紹的方法,實際上還可以使用SQLContext或者HiveContext通過編程的方式實現。前者支持SQL語法解析器(SQL-92語法),後者支持SQL語法解析器和HiveSQL語法解析器,預設為HiveSQL語法解析器,用戶可以通過配置切換成SQL語法解析器來 ...


使用Spark SQL,除了使用之前介紹的方法,實際上還可以使用SQLContext或者HiveContext通過編程的方式實現。前者支持SQL語法解析器(SQL-92語法),後者支持SQL語法解析器和HiveSQL語法解析器,預設為HiveSQL語法解析器,用戶可以通過配置切換成SQL語法解析器來運行HiveQL不支持的語法,如:select 1。實際上HiveContext是SQLContext的子類,因此在HiveContext運行過程中除了override的函數和變數,可以使用和SQLContext一樣的函數和變數。

因為spark-shell工具實際就是運行的scala程式片段,為了方便,下麵採用spark-shell進行演示。

首先來看SQLContext,因為是標準SQL,可以不依賴於Hive的metastore,比如下麵的例子(沒有啟動hive metastore):

[root@BruceCentOS4 ~]# $SPARK_HOME/bin/spark-shell --master yarn --conf spark.sql.catalogImplementation=in-memory

 

 scala> case class offices(office:Int,city:String,region:String,mgr:Int,target:Double,sales:Double)
defined class offices

scala> val rddOffices=sc.textFile("/user/hive/warehouse/orderdb.db/offices/offices.txt").map(_.split("\t")).map(p=>offices(p(0).trim.toInt,p(1),p(2),p(3).trim.toInt,p(4).trim.toDouble,p(5).trim.toDouble))
rddOffices: org.apache.spark.rdd.RDD[offices] = MapPartitionsRDD[3] at map at <console>:26

scala> val officesDataFrame = spark.createDataFrame(rddOffices)
officesDataFrame: org.apache.spark.sql.DataFrame = [office: int, city: string ... 4 more fields]

scala> officesDataFrame.createOrReplaceTempView("offices")

scala> spark.sql("select city from offices where region='Eastern'").map(t=>"City: " + t(0)).collect.foreach(println)
City: NewYork                                                                   
City: Chicago
City: Atlanta

scala>

 執行上面的命令後,實際上在yarn集群中啟動了一個yarn client模式的Spark Application,然後在scala>提示符後輸入的語句會生成RDD的transformation,最後一條命令中的collect會生成RDD的action,即會觸發Job的提交和程式的執行。

命令行中之所以加上--conf spark.sql.catalogImplementation=in-memory選項,是因為spark-shell中的預設啟動的SparkSession對象spark是預設支持Hive的,不帶這個選項啟動的話,程式就會去連接hive metastore,因為這裡並沒有啟動hive metastore,因此程式在執行createDataFrame函數時會報錯。

程式中的第一行是1個case class語句,這裡是定義後面的數據文件的模式的(定義模式除了這個方法,其實還有另外一種方法,後面再介紹)。第二行從hdfs中讀取一個文本文件,並工通過map映射到了模式上面。第三行基於第二行的RDD生成DataFrame,第四行基於第三行的DataFrame註冊了一個邏輯上的臨時表,最後一行就可以通過SparkSession的sql函數來執行sql語句了。

實際上,SQLContext是Spark 1.x中的SQL入口,在Spark 2.x中,使用SparkSession作為SQL的入口,但是為了向後相容,Spark 2.x仍然支持SQLContext來操作SQL,不過會提示deprecated,所以上面的例子是採用Spark 2.x中的寫法。

實際上還有另外一種方法來操作SQL,針對同樣的數據,例如:

scala> import org.apache.spark.sql._
import org.apache.spark.sql._

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> val schema = new StructType(Array(StructField("office", IntegerType, false), StructField("city", StringType, false), StructField("region", StringType, false), StructField("mgr", IntegerType, true), StructField("target", DoubleType, true), StructField("sales", DoubleType, false)))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(office,IntegerType,false), StructField(city,StringType,false), StructField(region,StringType,false), StructField(mgr,IntegerType,true), StructField(target,DoubleType,true), StructField(sales,DoubleType,false))

scala> val rowRDD = sc.textFile("/user/hive/warehouse/orderdb.db/offices/offices.txt").map(_.split("\t")).map(p => Row(p(0).trim.toInt,p(1),p(2),p(3).trim.toInt,p(4).trim.toDouble,p(5).trim.toDouble))
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[3] at map at <console>:30

scala> val dataFrame = spark.createDataFrame(rowRDD, schema)
dataFrame: org.apache.spark.sql.DataFrame = [office: int, city: string ... 4 more fields]

scala> dataFrame.createOrReplaceTempView("offices")

scala> spark.sql("select city from offices where region='Eastern'").map(t=>"City: " + t(0)).collect.foreach(println)
City: NewYork                                                                   
City: Chicago
City: Atlanta

這個例子與之前的例子有一些不同,主要的地方有3個:

1. 之前的例子是採用case class定義模式,Spark採用反射來推斷Schema;而這個例子採用StructType類型的對象來定義模式,它接收一個數組,數組成員是StructField對象,代表一個欄位的定義,每個欄位的定義由欄位名稱、欄位類型和是否允許為空組成;

2. 對於代表數據的RDD,之前的例子是直接用case class定義的類型來分割欄位,而這個例子是用的Row類型;

3. 在使用createDataFrame函數生成DataFrame時,該函數的參數不一樣,之前的例子只要傳入RDD對象即可(對象中隱含了模式),而這個例子需要同時傳入RDD和定義的schema;

實際編程中建議採用第二種方法,因為其更加靈活,schema信息可以不必是寫死的,而是可以在程式運行的過程中生成。

 

下麵接著來看HiveContext的用法,使用HiveContext之前需要確保:

  • 使用的Spark是支持Hive的;
  • Hive的配置文件hive-site.xml已經在Spark的conf目錄下;
  • hive metastore已經啟動;

舉例說明:

首先啟動hive metastore:

[root@BruceCentOS ~]# nohup hive --service metastore &

然後仍然通過spark-shell來舉例說明,啟動spark-shell,如下所示:

[root@BruceCentOS4 ~]# $SPARK_HOME/bin/spark-shell --master yarn

scala> spark.sql("show databases").collect.foreach(println)
[default]
[orderdb]

scala> spark.sql("use orderdb")
res2: org.apache.spark.sql.DataFrame = []

scala> spark.sql("show tables").collect.foreach(println)
[orderdb,customers,false]
[orderdb,offices,false]
[orderdb,orders,false]
[orderdb,products,false]
[orderdb,salesreps,false]

scala> spark.sql("select city from offices where region='Eastern'").map(t=>"City: " + t(0)).collect.foreach(println)
City: NewYork                                                                   
City: Chicago
City: Atlanta

scala>

可以看到這次啟動spark-shell沒有帶上最後那個選項,這是因為這裡我們打算用HiveContext來操作Hive中的數據,需要支持Hive。前面說過spark-shell是預設開啟了Hive支持的。同SQLContext類似,Spark 2.x中也不需要再用HiveContext對象來操作SQL了,直接用SparkSession對象來操作就好了。可以看到這裡可以直接操作表,不用再定義schema,這是因為schema是由外部的hive metastore定義的,spark通過連接到hive metastore來讀取表的schema信息,因此這裡能直接操作SQL。

 

另外,除了上面的使用SQLContext操作普通文件(需要額外定義模式)和使用HiveContext操作Hive表數據(需要開啟hive metastore)之外,SQLContext還能操作JSON、PARQUET等文件,由於這兩種數據文件自己帶了模式信息,因此可以直接基於文件創建DataFrame,例如:

scala> val df = spark.read.json("file:///opt/spark/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]                

scala> df.createOrReplaceTempView("people")

scala> spark.sql("select name,age from people where age>19").map(t=>"Name :" + t(0) + ", Age: " + t(1)).collect.foreach(println)
Name :Andy, Age: 30    

 

最後來看下DataFrame的另一種叫做DSL(Domain Specific Language)的用法。

scala> val df = spark.read.json("file:///opt/spark/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]                

scala> df.show()
+----+-------+                                                                  
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+


scala> df.select("name").show()
+-------+                                                                       
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+


scala> df.select(df("name"), df("age") + 1).show()
+-------+---------+                                                             
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+


scala> df.filter(df("age") > 21).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+


scala> df.groupBy("age").count().show()
+----+-----+                                                                    
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+


scala>

以上是對Spark SQL的SQLContext和HiveContext基本用法的一些總結,都是採用spark-shell工具舉的例子。實際上由於spark-shell是運行scala程式片段的工具,上述例子完全可以改成獨立的應用程式。我將在下一篇博文當中嘗試使用Scala、Java和Python來編寫獨立的程式來操作上面的示例hive資料庫orderdb,可以適當使用一些較為複雜的SQL來統計分析數據。

 

 

 

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 1、ifconfig命令臨時配置IP地址 2、setup工具永久配置IP地址 3、修改網路配置文件 4、圖形界面配置IP地址 ifconfig命令臨時配置IP地址 主要的作用是查看網路信息,也可以臨時設置網卡IP地址 輸出的第一行信息: 首先標明瞭是乙太網和當前電腦的網卡的MAC地址 第二行信息: ...
  • OSI/ISO七層模型和TCP/IP四層模型 網路層協議和IP劃分 OSI的七層框架 物理層:設備之間的比特流的傳輸、物理介面、電氣特性等。 數據鏈路層:成幀、用MAC地址訪問媒介、錯誤檢測與修正。 網路層:提供邏輯地址、選路。 傳輸層:可靠與不可靠的傳輸、傳輸前的錯誤檢測、流量控。 會話層:對應用 ...
  • Ctrl + Alt + Fx Linux多用戶、多任務切換 ...
  • 一 Pod生命周期管理 1.1 Pod生命周期 Pod在整個生命周期過程中被系統定義瞭如下各種狀態。 狀態值 描述 Pending API Server已經創建該Pod,且Pod內還有一個或多個容器的鏡像沒有創建,包括正在下載鏡像的過程。 Running Pod內所有容器均已創建,且至少有一個容器處 ...
  • 最近學習了linux關於進程間通信的相關知識,所以決定藉助進程和共用記憶體,並按照生產者消費者模型來創建一個簡易聊天程式。下麵簡單的說一下程式的思路。 首先是服務端程式,服務端會創建兩個進程,進程1負責接收客戶端傳送過來的消息,並存儲起來。進程2負責讀取進程1存取的消息。這裡使用到了生產者和消費者編程 ...
  • glibc是gnu發佈的libc庫,即c運行庫,glibc是linux系統中最底層的api,幾乎其它任何運行庫都會依賴於glibc。glibc除了封裝linux操作系統所提供的系統服務外,它本身也提供了許多其它一些必要功能服務的實現。很多linux的基本命令,比如ls,mv,cp, rm, ll,l ...
  • 互聯網概述 WWW:萬維網 FTP:文件傳輸協議 E-MAIL:電子郵件 WWW 典型的C/S架構 URL:統一資源定位 協議+功能變數名稱或IP:埠+網頁路徑+網頁名 http://www.xxx.com:80/index.php .com 一級功能變數名稱,由功能變數名稱分配商分配的 xxxx 二級功能變數名稱,個人、企業向 ...
  • 數據完整性 1.域完整性: 匹配完整性:非空、預設 欄位/列 2.實體完整性: 匹配完整性:主鍵、唯一鍵 記錄/行 3.引用完整性: 匹配完整性:外鍵 表與表之間 約束:constraint MySQL中的約束分類 主鍵:primary key 唯一鍵:unique 非空:not null 預設:d ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...