理解Spark SQL(三)—— Spark SQL程式舉例

来源:https://www.cnblogs.com/roushi17/archive/2019/11/26/spark_sql_examples.html
-Advertisement-
Play Games

上一篇說到,在Spark 2.x當中,實際上SQLContext和HiveContext是過時的,相反是採用SparkSession對象的sql函數來操作SQL語句的。使用這個函數執行SQL語句前需要先調用DataFrame的createOrReplaceTempView註冊一個臨時表,所以關鍵是先 ...


上一篇說到,在Spark 2.x當中,實際上SQLContext和HiveContext是過時的,相反是採用SparkSession對象的sql函數來操作SQL語句的。使用這個函數執行SQL語句前需要先調用DataFrame的createOrReplaceTempView註冊一個臨時表,所以關鍵是先要將RDD轉換成DataFrame。實際上,在Spark中實際聲明瞭

type DataFrame = Dataset[Row]

所以,DataFrame是Dataset[Row]的別名。RDD是提供面向低層次的API,而DataFrame/Dataset提供面向高層次的API(適合於SQL等面向結構化數據的場合)。

下麵提供一些Spark SQL程式的例子。

例子一:SparkSQLExam.scala

 1 package bruce.bigdata.spark.example
 2 
 3 import org.apache.spark.sql.Row
 4 import org.apache.spark.sql.SparkSession
 5 import org.apache.spark.sql.types._
 6 
 7 object SparkSQLExam {
 8 
 9     case class offices(office:Int,city:String,region:String,mgr:Int,target:Double,sales:Double)
10     
11     def main(args: Array[String]) {
12 
13         val spark = SparkSession
14           .builder
15           .appName("SparkSQLExam")
16           .getOrCreate()
17         
18         runSparkSQLExam1(spark)
19         runSparkSQLExam2(spark)
20         
21         spark.stop()
22     
23     }
24     
25     
26     private def runSparkSQLExam1(spark: SparkSession): Unit = {
27     
28         import spark.implicits._
29         
30         val rddOffices=spark.sparkContext.textFile("/user/hive/warehouse/orderdb.db/offices/offices.txt").map(_.split("\t")).map(p=>offices(p(0).trim.toInt,p(1),p(2),p(3).trim.toInt,p(4).trim.toDouble,p(5).trim.toDouble))
31         val officesDataFrame = spark.createDataFrame(rddOffices)
32         
33         officesDataFrame.createOrReplaceTempView("offices")
34         spark.sql("select city from offices where region='Eastern'").map(t=>"City: " + t(0)).collect.foreach(println)
35         
36     
37     }
38     
39     private def runSparkSQLExam2(spark: SparkSession): Unit = {
40     
41          import spark.implicits._
42          import org.apache.spark.sql._
43          import org.apache.spark.sql.types._
44         
45          val schema = new StructType(Array(StructField("office", IntegerType, false), StructField("city", StringType, false), StructField("region", StringType, false), StructField("mgr", IntegerType, true), StructField("target", DoubleType, true), StructField("sales", DoubleType, false)))
46          val rowRDD = spark.sparkContext.textFile("/user/hive/warehouse/orderdb.db/offices/offices.txt").map(_.split("\t")).map(p => Row(p(0).trim.toInt,p(1),p(2),p(3).trim.toInt,p(4).trim.toDouble,p(5).trim.toDouble))
47          val dataFrame = spark.createDataFrame(rowRDD, schema)
48          
49          dataFrame.createOrReplaceTempView("offices2")        
50          spark.sql("select city from offices2 where region='Western'").map(t=>"City: " + t(0)).collect.foreach(println)
51         
52     }
53     
54 }

使用下麵的命令進行編譯:

[root@BruceCentOS4 scala]# scalac SparkSQLExam.scala

在編譯之前,需要在CLASSPATH中增加路徑:

export CLASSPATH=$CLASSPATH:$SPARK_HOME/jars/*:$(/opt/hadoop/bin/hadoop classpath)

然後打包成jar文件:

[root@BruceCentOS4 scala]# jar -cvf spark_exam_scala.jar bruce

然後通過spark-submit提交程式到yarn集群執行,為了方便從客戶端查看結果,這裡採用yarn cient模式運行。

[root@BruceCentOS4 scala]# $SPARK_HOME/bin/spark-submit --class bruce.bigdata.spark.example.SparkSQLExam --master yarn --deploy-mode client spark_exam_scala.jar

運行結果截圖:

 

例子二:SparkSQLExam.scala(需要啟動hive metastore)

 1 package  bruce.bigdata.spark.example
 2 
 3 import org.apache.spark.sql.{SaveMode, SparkSession}
 4 
 5 object SparkHiveExam {
 6 
 7     def main(args: Array[String]) {
 8         
 9         val spark = SparkSession
10           .builder()
11           .appName("Spark Hive Exam")
12           .config("spark.sql.warehouse.dir", "/user/hive/warehouse")
13           .enableHiveSupport()
14           .getOrCreate()
15        
16         import spark.implicits._
17         
18         //使用hql查看hive數據
19         spark.sql("show databases").collect.foreach(println)
20         spark.sql("use orderdb")
21         spark.sql("show tables").collect.foreach(println)
22         spark.sql("select city from offices where region='Eastern'").map(t=>"City: " + t(0)).collect.foreach(println)
23         
24         //將hql查詢出的數據保存到另外一張新建的hive表
25         //找出訂單金額超過1萬美元的產品
26         spark.sql("""create table products_high_sales(mfr_id string,product_id string,description string) 
27                    ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' STORED AS TEXTFILE""")
28         spark.sql("""select mfr_id,product_id,description
29                    from products a inner join orders b
30                    on a.mfr_id=b.mfr and a.product_id=b.product
31                    where b.amount>10000""").write.mode(SaveMode.Overwrite).saveAsTable("products_high_sales")
32         
33         //將HDFS文件數據導入到hive表中            
34         spark.sql("""CREATE TABLE IF NOT EXISTS offices2 (office int,city string,region string,mgr int,target double,sales double ) 
35                    ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' STORED AS TEXTFILE""")
36         spark.sql("LOAD DATA INPATH '/user/hive/warehouse/orderdb.db/offices/offices.txt' INTO TABLE offices2")
37         
38         spark.stop()
39     }
40 }

使用下麵的命令進行編譯:

[root@BruceCentOS4 scala]# scalac SparkHiveExam.scala

使用下麵的命令打包:

[root@BruceCentOS4 scala]# jar -cvf spark_exam_scala.jar bruce

使用下麵的命令運行:

[root@BruceCentOS4 scala]# $SPARK_HOME/bin/spark-submit --class bruce.bigdata.spark.example.SparkHiveExam --master yarn --deploy-mode client spark_exam_scala.jar

程式運行結果:

 

另外上述程式運行後,hive中多了2張表:

 

 

例子三:spark_sql_exam.py

 1 from __future__ import print_function
 2 
 3 from pyspark.sql import SparkSession
 4 from pyspark.sql.types import *
 5 
 6 
 7 if __name__ == "__main__":
 8     spark = SparkSession \
 9         .builder \
10         .appName("Python Spark SQL exam") \
11         .config("spark.some.config.option", "some-value") \
12         .getOrCreate()
13 
14     schema = StructType([StructField("office", IntegerType(), False), StructField("city", StringType(), False), 
15         StructField("region", StringType(), False), StructField("mgr", IntegerType(), True), 
16         StructField("Target", DoubleType(), True), StructField("sales", DoubleType(), False)])
17         
18     rowRDD = spark.sparkContext.textFile("/user/hive/warehouse/orderdb.db/offices/offices.txt").map(lambda p: p.split("\t")) \
19         .map(lambda p: (int(p[0].strip()), p[1], p[2], int(p[3].strip()), float(p[4].strip()), float(p[5].strip())))
20             
21     dataFrame = spark.createDataFrame(rowRDD, schema)
22     dataFrame.createOrReplaceTempView("offices")
23     spark.sql("select city from offices where region='Eastern'").show()
24     
25     spark.stop()

 執行命令運行程式:

[root@BruceCentOS4 spark]# $SPARK_HOME/bin/spark-submit --master yarn --deploy-mode client spark_sql_exam.py

程式運行結果:

 

例子四:JavaSparkSQLExam.java

 1 package bruce.bigdata.spark.example;
 2 
 3 import java.util.ArrayList;
 4 import java.util.List;
 5 
 6 import org.apache.spark.api.java.JavaRDD;
 7 import org.apache.spark.api.java.function.Function;
 8 import org.apache.spark.api.java.function.MapFunction;
 9 import org.apache.spark.sql.Dataset;
10 import org.apache.spark.sql.Row;
11 import org.apache.spark.sql.RowFactory;
12 import org.apache.spark.sql.SparkSession;
13 import org.apache.spark.sql.types.DataTypes;
14 import org.apache.spark.sql.types.StructField;
15 import org.apache.spark.sql.types.StructType;
16 import org.apache.spark.sql.AnalysisException;
17 
18 
19 public class JavaSparkSQLExam {
20     public static void main(String[] args) throws AnalysisException {
21         SparkSession spark = SparkSession
22           .builder()
23           .appName("Java Spark SQL exam")
24           .config("spark.some.config.option", "some-value")
25           .getOrCreate();    
26         
27         List<StructField> fields = new ArrayList<>();
28         fields.add(DataTypes.createStructField("office", DataTypes.IntegerType, false));
29         fields.add(DataTypes.createStructField("city", DataTypes.StringType, false));
30         fields.add(DataTypes.createStructField("region", DataTypes.StringType, false));
31         fields.add(DataTypes.createStructField("mgr", DataTypes.IntegerType, true));
32         fields.add(DataTypes.createStructField("target", DataTypes.DoubleType, true));
33         fields.add(DataTypes.createStructField("sales", DataTypes.DoubleType, false));
34         
35         StructType schema = DataTypes.createStructType(fields);
36         
37         
38         JavaRDD<String> officesRDD = spark.sparkContext()
39           .textFile("/user/hive/warehouse/orderdb.db/offices/offices.txt", 1)
40           .toJavaRDD();
41         
42         JavaRDD<Row> rowRDD = officesRDD.map((Function<String, Row>) record -> {
43           String[] attributes = record.split("\t");
44           return RowFactory.create(Integer.valueOf(attributes[0].trim()), attributes[1], attributes[2], Integer.valueOf(attributes[3].trim()), Double.valueOf(attributes[4].trim()), Double.valueOf(attributes[5].trim()));
45         });
46         
47         Dataset<Row> dataFrame = spark.createDataFrame(rowRDD, schema);
48         
49         dataFrame.createOrReplaceTempView("offices");
50         Dataset<Row> results = spark.sql("select city from offices where region='Eastern'");
51         results.collectAsList().forEach(r -> System.out.println(r));
52         
53         spark.stop();
54     }
55 }

編譯打包後通過如下命令執行:

[root@BruceCentOS4 spark]# $SPARK_HOME/bin/spark-submit --class bruce.bigdata.spark.example.JavaSparkSQLExam --master yarn --deploy-mode client spark_exam_java.jar

運行結果:

 

上面是一些關於Spark SQL程式的一些例子,分別採用了Scala/Python/Java來編寫的。另外除了這三種語言,Spark還支持R語言編寫程式,因為我自己也不熟悉,就不舉例了。不管用什麼語言,其實API都是基本一致的,主要是採用DataFrame和Dataset的高層次API來調用和執行SQL。使用這些API,可以輕鬆的將結構化數據轉化成SQL來操作,同時也能夠方便的操作Hive中的數據。

 

 

 

 

 

 

 

 

 

 

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • # 定時刷新 nvidia-smi 顯示的結果 nvidia-smi -l 1 # 以 1 秒的頻率進行刷新 nvidia-smi -lms 1 #以 1 毫秒的頻率進行刷新 #保持更新,更多內容請關註 cnblogs.com/xuyaowen; 相關鏈接: https://www.cnblogs. ...
  • 目錄: 1、Zabbix介紹 2、LAMP/LNMP介紹 3、Zabbix安裝與部署 1.Zabbix介紹 zabbix是一個基於WEB界面的提供分散式系統監視以及網路監視功能的企業級的開源解決方案。 zabbix能監視各種網路參數,保證伺服器系統的安全運營;並提供靈活的通知機制以讓系統管理員快速定 ...
  • watch 能間歇地執行程式,並將輸出結果以全屏的方式顯示,預設時2s執行一次; watch -n 5 ping -c 1 www.baidu.com # 進行迴圈5秒鐘,發送一次ping包; 使用範例: To watch for mail, you might do watch -n 60 fro ...
  • 一 前期準備 1.1 前置條件 集群部署:Kubernetes集群部署參考003——019。 glusterfs-Kubernetes部署:參考《附010.Kubernetes永久存儲之GlusterFS超融合部署》。 1.2 部署規劃 本實驗使用StatefulSet部署MongoDB集群,同時每 ...
  • 問題現象: 表示連接管道已經斷開 解決方法: 方法一:客戶端配置 在客戶端的 ~/.ssh/ config文件(如不存在請自行創建)中添加下麵內容: ServerAliveInterval 60 方法二:伺服器端配置 在伺服器的 /etc/ssh/sshd_config 中添加如下的配置: Clie ...
  • 修改nginx的配置文件,添加client_max_body_size 欄位 註:client_max_body_size 必須要放在server下的server_name下,而不是放在location欄位下麵 ...
  • https://sqlserver.code.blog/2019/11/25/you-may-fail-to-backup-log-or-restore-log-after-tde-certification-key-rotation/ ...
  • https://sqlserver.code.blog/2019/11/25/password-required-when-you-trying-to-add-a-database-having-a-master-key-to-ag-group/ ...
一周排行
    -Advertisement-
    Play Games
  • 基於.NET Framework 4.8 開發的深度學習模型部署測試平臺,提供了YOLO框架的主流系列模型,包括YOLOv8~v9,以及其系列下的Det、Seg、Pose、Obb、Cls等應用場景,同時支持圖像與視頻檢測。模型部署引擎使用的是OpenVINO™、TensorRT、ONNX runti... ...
  • 十年沉澱,重啟開發之路 十年前,我沉浸在開發的海洋中,每日與代碼為伍,與演算法共舞。那時的我,滿懷激情,對技術的追求近乎狂熱。然而,隨著歲月的流逝,生活的忙碌逐漸占據了我的大部分時間,讓我無暇顧及技術的沉澱與積累。 十年間,我經歷了職業生涯的起伏和變遷。從初出茅廬的菜鳥到逐漸嶄露頭角的開發者,我見證了 ...
  • C# 是一種簡單、現代、面向對象和類型安全的編程語言。.NET 是由 Microsoft 創建的開發平臺,平臺包含了語言規範、工具、運行,支持開發各種應用,如Web、移動、桌面等。.NET框架有多個實現,如.NET Framework、.NET Core(及後續的.NET 5+版本),以及社區版本M... ...
  • 前言 本文介紹瞭如何使用三菱提供的MX Component插件實現對三菱PLC軟元件數據的讀寫,記錄了使用電腦模擬,模擬PLC,直至完成測試的詳細流程,並重點介紹了在這個過程中的易錯點,供參考。 用到的軟體: 1. PLC開發編程環境GX Works2,GX Works2下載鏈接 https:// ...
  • 前言 整理這個官方翻譯的系列,原因是網上大部分的 tomcat 版本比較舊,此版本為 v11 最新的版本。 開源項目 從零手寫實現 tomcat minicat 別稱【嗅虎】心有猛虎,輕嗅薔薇。 系列文章 web server apache tomcat11-01-官方文檔入門介紹 web serv ...
  • 1、jQuery介紹 jQuery是什麼 jQuery是一個快速、簡潔的JavaScript框架,是繼Prototype之後又一個優秀的JavaScript代碼庫(或JavaScript框架)。jQuery設計的宗旨是“write Less,Do More”,即倡導寫更少的代碼,做更多的事情。它封裝 ...
  • 前言 之前的文章把js引擎(aardio封裝庫) 微軟開源的js引擎(ChakraCore))寫好了,這篇文章整點js代碼來測一下bug。測試網站:https://fanyi.youdao.com/index.html#/ 逆向思路 逆向思路可以看有道翻譯js逆向(MD5加密,AES加密)附完整源碼 ...
  • 引言 現代的操作系統(Windows,Linux,Mac OS)等都可以同時打開多個軟體(任務),這些軟體在我們的感知上是同時運行的,例如我們可以一邊瀏覽網頁,一邊聽音樂。而CPU執行代碼同一時間只能執行一條,但即使我們的電腦是單核CPU也可以同時運行多個任務,如下圖所示,這是因為我們的 CPU 的 ...
  • 掌握使用Python進行文本英文統計的基本方法,並瞭解如何進一步優化和擴展這些方法,以應對更複雜的文本分析任務。 ...
  • 背景 Redis多數據源常見的場景: 分區數據處理:當數據量增長時,單個Redis實例可能無法處理所有的數據。通過使用多個Redis數據源,可以將數據分區存儲在不同的實例中,使得數據處理更加高效。 多租戶應用程式:對於多租戶應用程式,每個租戶可以擁有自己的Redis數據源,以確保數據隔離和安全性。 ...