Spark--SQL

来源:http://www.cnblogs.com/one--way/archive/2016/09/06/5845971.html
-Advertisement-
Play Games

SQL 程式中SQL執行的結果返回的是DataFrame, DataFrames DataFrames是分散式數據集,由帶名字的列組成。類似關係型資料庫的結構。 DataFrames的數據來源包括:結構化數據文件,Hive表,RDDs,外部資料庫;結構化數據文件包括json,parquet. Dat ...


 

SQL

程式中SQL執行的結果返回的是DataFrame,

 

DataFrames

DataFrames是分散式數據集,由帶名字的列組成。類似關係型資料庫的結構。

DataFrames的數據來源包括:結構化數據文件,Hive表,RDDs,外部資料庫;json是半結構化文件.

 

DataFrames的操作

import org.apache.spark.sql.{Column, DataFrame, SQLContext}
import org.apache.spark.{SparkContext, SparkConf}

/**
  * Created by Edward on 2016/9/6.
  */
object DFTest {
  def main(args: Array[String]) {

    val conf: SparkConf = new SparkConf().setAppName("DF").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)

    val sqlContext: SQLContext = new SQLContext(sc)

    val df: DataFrame = sqlContext.read.json("D:\\documents\\Spark\\MyDemo\\Test\\res\\people.json")
    
// Show the content of the DataFrame df.show()
// age name // null Michael // 30 Andy // 19 Justin // Print the schema in a tree format df.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df.select("name").show() // name // Michael // Andy // Justin // Select everybody, but increment the age by 1 df.select(df("name"), df("age") + 1).show() // name (age + 1) // Michael null // Andy 31 // Justin 20 // Select people older than 21 df.filter(df("age") > 21).show() // age name // 30 Andy // Count people by age df.groupBy("age").count().show() // age count // null 1 // 19 1 // 30 1 sc.stop() } }

讀取的Json的文件內容:

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

更多操作參考:http://spark.apache.org/docs/1.6.0/api/scala/index.html#org.apache.spark.sql.DataFrame

 

兩種方式將RDDs轉換成DataFrames

如果是json和parquet直接可以轉換成DF,如果是普通的數據文件需要將讀取的文件數據結構RDDs轉換成DataFrames。

1.反射  (簡潔,需要指定表結構類型)

import org.apache.spark.{SparkContext, SparkConf}

/**
  * Created by Edward on 2016/9/6.
  */
object Reflection {

  // Define the schema using a case class.
  // Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
  // you can use custom classes that implement the Product interface.
  case class Person(name: String, age: Int)

  def main(args: Array[String]) {

    val conf: SparkConf = new SparkConf().setAppName("Reflection").setMaster("local")

    val sc: SparkContext = new SparkContext(conf)

    // sc is an existing SparkContext.
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    // this is used to implicitly convert an RDD to a DataFrame.
    import sqlContext.implicits._

    // Create an RDD of Person objects and register it as a table.
    val people = sc.textFile("res/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
    people.registerTempTable("people")

    // SQL statements can be run by using the sql methods provided by sqlContext.
    val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")

    // The results of SQL queries are DataFrames and support all the normal RDD operations.
    // The columns of a row in the result can be accessed by field index:
    teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

    // or by field name:
    teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)

    // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
    teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
    // Map("name" -> "Justin", "age" -> 19)

    sc.stop()
  }
}

 

2.動態

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}

/**
  * Created by Edward on 2016/9/6.
  */
object programmatic {

  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("Programmatic").setMaster("local")

    val sc = new SparkContext(conf)

    val sqlContext = new SQLContext(sc)

    // Create an RDD
    val people = sc.textFile("res/people.txt")

    // The schema is encoded in a string
    val schemaString = "name age"

    // Import Row.
    import org.apache.spark.sql.Row;

    // Import Spark SQL data types
    import org.apache.spark.sql.types.{StructType, StructField, StringType};

    // Generate the schema based on the string of schema
    val schema =
      StructType(
        schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

    // Convert records of the RDD (people) to Rows.
    val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))

    // Apply the schema to the RDD.
    val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)

    // Register the DataFrames as a table.
    peopleDataFrame.registerTempTable("people")

    // SQL statements can be run by using the sql methods provided by sqlContext.
    val results = sqlContext.sql("SELECT name FROM people")

    // The results of SQL queries are DataFrames and support all the normal RDD operations.
    // The columns of a row in the result can be accessed by field index or by field name.
    results.map(t => "Name: " + t(0)).collect().foreach(println)

    sc.stop()

  }
}

 

數據源

預設的數據源是parquet

import org.apache.spark.sql.{SaveMode, DataFrame, SQLContext}
import org.apache.spark.{SparkContext, SparkConf}


/**
  * Created by Edward on 2016/9/7.
  */
object Parquet {
  def main(args: Array[String]) {

    val sparkConf: SparkConf = new SparkConf().setAppName("Parquet").setMaster("local")
    val sc = new SparkContext(sparkConf)

    val sqlContext  = new SQLContext(sc)

    //val parquet = sqlContext.read.parquet("res/users.parquet")
    //val parquet = sqlContext.read.load("res/users.parquet")  //預設數據源是parquet,可以配置spark.sql.sources.default修改
    //val parquet  = sqlContext.read.parquet("new.parquet/*.parquet")  //可以使用模糊匹配
    //val json = sqlContext.read.format("json").load("res/people.json") //讀取json文件,通過format指定文件格式
    val json = sqlContext.read.json("res/people.json") //通過json方法直接讀取json文件

    //json.select("name","age").write.mode(SaveMode.Overwrite).save("new.parquet") //預設保存為parquet文件
    json.select("name","age").write.mode(SaveMode.Overwrite).format("json").save("jsonfile") //保存為json文件 jsonfile為指定目錄
    json.show()
    //parquet.show()

    sc.stop()
  }
}

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 註意事項 : 激活碼以及 防止SQL Server 和vs版本衝突 開發版(Developer): PTTFM-X467G-P7RH2-3Q6CG-4DMYB 企業版(Enterprise): JD8Y6-HQG69-P9H84-XDTPG-34MBB 防止衝突 需要先安裝SQL Server 20 ...
  • 使用“Ctrl + R”組合鍵快速打開cmd視窗,並輸入“cmd”命令,打開cmd視窗。 使用“Ctrl + R”組合鍵快速打開cmd視窗,並輸入“cmd”命令,打開cmd視窗。 使用“mysql -uroot -proot”命令可以連接到本地的mysql服務。 使用“mysql -uroot -p ...
  • 前一篇文章,已經介紹了BMR的基礎用法,再結合Spark和Scala的文檔,我想應該是可以開始你的數據分析之路的.這一篇文章,著重進行一些簡單的思路上的引導和分析.如果你分析招聘數據時,卡在了某個環節,可以試著閱讀本文. 在繼續下麵的各種分析前,請確保已經讀完了本系列文章的第三篇,並正確配置了BMR... ...
  • 最近在使用mybatis,然後用到了小於等於,直接在XML中使用了 sum(case when p.pool_year 猜想可能是由於特殊字元的緣故,於是用了轉義字元進行了替換了,如下: sum(case when p.pool_year <= '2014' then p.pool_rmb e ...
  • 說到資料庫,我們首先要瞭解的應該是它的歷史或者說前身。細緻的說,資料庫的發展主要經歷了三個階段:1、人工管理階段 2、文件系統階段 3、資料庫系統階段。如何來理解呢?實質上,資料庫,顧名思義,就是管理數據的倉庫。那麼顯然,這不是一個真實的倉庫,而只是一個電子倉庫。為什麼我們要有這麼一個電子的倉庫呢? ...
  • 一:Oracle11g的安裝過程(Windows版本)很簡單,步驟為: 1. 首先從Oracle官方網站上下載Oracle11g資料庫,大約為1.7G。解壓後,setup.ext就可以開始安裝 2. 在安裝開始的時候,會要求填寫一些信息。需要註意兩項: 1)SID。這應該是安裝的資料庫的Servic ...
  • 備份資料庫(進入Mysql bin目錄下) 備份表結構及數據 mysqldump -hlocalhost -uroot -proot db_dev>d:\db_dev.sql 備份表結構 mysqldump -hlocalhost -uroot -proot -d db_dev>d:\db_dev. ...
  • 最近在抓取一些社交網站的數據,抓下來的數據用MySql存儲。問我為什麼用MySql,那自然是入門簡單,並且我當時只熟悉MySql。可是,隨著數據量越來越大,有一個問題始終困擾著我,那就是 社交關係的存儲 。 就以新浪微博舉例,一個大V少則十幾萬,多則幾千萬的粉絲,這些關註關係要怎麼存呢?在MySql ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...