2.sparkSQL--DataFrames與RDDs的相互轉換

来源:http://www.cnblogs.com/intsmaze/archive/2017/03/25/6613755.html
-Advertisement-
Play Games

Spark SQL支持兩種RDDs轉換為DataFrames的方式 使用反射獲取RDD內的Schema 當已知類的Schema的時候,使用這種基於反射的方法會讓代碼更加簡潔而且效果也很好。 通過編程介面指定Schema 通過Spark SQL的介面創建RDD的Schema,這種方式會讓代碼比較冗長。 ...


Spark SQL支持兩種RDDs轉換為DataFrames的方式 使用反射獲取RDD內的Schema     當已知類的Schema的時候,使用這種基於反射的方法會讓代碼更加簡潔而且效果也很好。 通過編程介面指定Schema     通過Spark SQL的介面創建RDD的Schema,這種方式會讓代碼比較冗長。     這種方法的好處是,在運行時才知道數據的列以及列的類型的情況下,可以動態生成Schema。

原文和作者一起討論:http://www.cnblogs.com/intsmaze/p/6613755.html

微信:intsmaze

使用反射獲取Schema(Inferring the Schema Using Reflection)
import org.apache.spark.sql.{DataFrameReader, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

object InferringSchema {
  def main(args: Array[String]) {

    //創建SparkConf()並設置App名稱
    val conf = new SparkConf().setAppName("SQL-intsmaze")

    //SQLContext要依賴SparkContext
    val sc = new SparkContext(conf)
    //創建SQLContext
    val sqlContext = new SQLContext(sc)

    //從指定的地址創建RDD
    val lineRDD = sc.textFile("hdfs://192.168.19.131:9000/person.tzt").map(_.split(","))

    //創建case class
    //將RDD和case class關聯
    val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))

    //導入隱式轉換,如果不導入無法將RDD轉換成DataFrame
    //將RDD轉換成DataFrame
    import sqlContext.implicits._
    val personDF = personRDD.toDF

    //註冊表
    personDF.registerTempTable("intsmaze")
    //傳入SQL
    val df = sqlContext.sql("select * from intsmaze order by age desc limit 2")

    //將結果以JSON的方式存儲到指定位置
    df.write.json("hdfs://192.168.19.131:9000/personresult")

    //停止Spark Context
    sc.stop()
  }
}
//case class一定要放到外面
case class Person(id: Int, name: String, age: Int)
spark shell中不需要導入sqlContext.implicits._是因為spark shell預設已經自動導入了。 打包提交到yarn集群:
/home/hadoop/app/spark/bin/spark-submit --class InferringSchema \
--master yarn \
--deploy-mode cluster \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 2 \
--queue default \
/home/hadoop/sparksql-1.0-SNAPSHOT.jar 

 

通過編程介面指定Schema(Programmatically Specifying the Schema)

當JavaBean不能被預先定義的時候,編程創建DataFrame分為三步:

從原來的RDD創建一個Row格式的RDD.

創建與RDD中Rows結構匹配的StructType,通過該StructType創建表示RDD的Schema.

通過SQLContext提供的createDataFrame方法創建DataFrame,方法參數為RDD的Schema.

 

import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.types._
import org.apache.spark.{SparkContext, SparkConf}

object SpecifyingSchema {
  def main(args: Array[String]) {
    //創建SparkConf()並設置App名稱
    val conf = new SparkConf().setAppName("SQL-intsmaze")
    //SQLContext要依賴SparkContext
    val sc = new SparkContext(conf)
    //創建SQLContext
    val sqlContext = new SQLContext(sc)

    //從指定的地址創建RDD
    val personRDD = sc.textFile(args(0)).map(_.split(","))

    //通過StructType直接指定每個欄位的schema
    val schema = StructType(
      List(
        StructField("id", IntegerType, true),
        StructField("name", StringType, true),
        StructField("age", IntegerType, true)
      )
    )

    //將RDD映射到rowRDD
    val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))

    //將schema信息應用到rowRDD上
    val personDataFrame = sqlContext.createDataFrame(rowRDD, schema)

    //註冊表
    personDataFrame.registerTempTable("intsmaze")
    //執行SQL
    val df = sqlContext.sql("select * from intsmaze order by age desc ")
    //將結果以JSON的方式存儲到指定位置
    df.write.json(args(1))
    //停止Spark Context
    sc.stop()
  }
}
將程式打成jar包,上傳到spark集群,提交Spark任務
/home/hadoop/app/spark/bin/spark-submit --class SpecifyingSchema \
--master yarn \
--deploy-mode cluster \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 2 \
--queue default \
/home/hadoop/sparksql-1.0-SNAPSHOT.jar \
hdfs://192.168.19.131:9000/person.txt hdfs://192.168.19.131:9000/intsmazeresult

 

/home/hadoop/app/spark/bin/spark-submit --class SpecifyingSchema \
--master yarn \
--deploy-mode client \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 2 \
--queue default \
/home/hadoop/sparksql-1.0-SNAPSHOT.jar \
hdfs://192.168.19.131:9000/person.txt hdfs://192.168.19.131:9000/intsmazeresult

maven項目的pom.xml中添加Spark SQL的依賴

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.10</artifactId>
  <version>1.6.2</version>
</dependency>

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 本文出處:http://www.cnblogs.com/wy123/p/6617700.html SQLServer中沒有函數索引,在某些場景下查詢的時候要根據欄位的某一部分做查詢或者經過某種計算之後做查詢,如果使用函數或者其他方式作用在欄位上之後,就會限制到索引的使用,不過我們可以間接地實現類似於 ...
  • org.springframework.beans.factory.BeanDefinitionStoreException異常 ...
  • 運行Pig時出現這個錯誤: 解決的辦法是把${HADOOP_HOME}/share/hadoop/yarn/lib下的jline-2.1.1.jar刪除掉,再重啟Hadoop就可以了。 ...
  • 多表查詢 1. 分類: * 合併結果集(瞭解) * 連接查詢 * 子查詢合併結果集 * 要求被合併的表中,列的類型和列數相同 * UNION,去除重覆行 * UNION ALL,不去除重覆行 連接查詢 1. 分類 * 內連接 * 外連接 > 左外連接 > 右外連接 > 全外連接(MySQL不支持) ...
  • 1.刪除註冊表:在HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Control\Session Manager中找到 PendingFileRenameOperations 項目,並刪除它。 ...
  • mysql 常用指令及中文亂碼解決 Mysql 系統管理指令 登陸本地 Mysql mysql -u username -p # 回車輸入密碼 或者 mysql -u username -p passswd; 登陸遠程 Mysql mysql -h address -u username -p # ...
  • 轉載自http://www.jb51.net/article/30811.htm 謝謝! 方法一: 1、打開查詢分析器,輸入命令 BACKUP LOG database_name WITH NO_LOG 2、再打開企業管理器--右鍵要壓縮的資料庫--所有任務--收縮資料庫--收縮文件--選擇日誌文件 ...
  • 安裝 啟動Mysql服務 設置開機啟動 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...