Spark--SQL

来源:http://www.cnblogs.com/one--way/archive/2016/09/06/5845971.html
-Advertisement-
Play Games

SQL 程式中SQL執行的結果返回的是DataFrame, DataFrames DataFrames是分散式數據集,由帶名字的列組成。類似關係型資料庫的結構。 DataFrames的數據來源包括:結構化數據文件,Hive表,RDDs,外部資料庫;結構化數據文件包括json,parquet. Dat ...


 

SQL

程式中SQL執行的結果返回的是DataFrame,

 

DataFrames

DataFrames是分散式數據集,由帶名字的列組成。類似關係型資料庫的結構。

DataFrames的數據來源包括:結構化數據文件,Hive表,RDDs,外部資料庫;json是半結構化文件.

 

DataFrames的操作

import org.apache.spark.sql.{Column, DataFrame, SQLContext}
import org.apache.spark.{SparkContext, SparkConf}

/**
  * Created by Edward on 2016/9/6.
  */
object DFTest {
  def main(args: Array[String]) {

    val conf: SparkConf = new SparkConf().setAppName("DF").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)

    val sqlContext: SQLContext = new SQLContext(sc)

    val df: DataFrame = sqlContext.read.json("D:\\documents\\Spark\\MyDemo\\Test\\res\\people.json")
    
// Show the content of the DataFrame df.show()
// age name // null Michael // 30 Andy // 19 Justin // Print the schema in a tree format df.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df.select("name").show() // name // Michael // Andy // Justin // Select everybody, but increment the age by 1 df.select(df("name"), df("age") + 1).show() // name (age + 1) // Michael null // Andy 31 // Justin 20 // Select people older than 21 df.filter(df("age") > 21).show() // age name // 30 Andy // Count people by age df.groupBy("age").count().show() // age count // null 1 // 19 1 // 30 1 sc.stop() } }

讀取的Json的文件內容:

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

更多操作參考:http://spark.apache.org/docs/1.6.0/api/scala/index.html#org.apache.spark.sql.DataFrame

 

兩種方式將RDDs轉換成DataFrames

如果是json和parquet直接可以轉換成DF,如果是普通的數據文件需要將讀取的文件數據結構RDDs轉換成DataFrames。

1.反射  (簡潔,需要指定表結構類型)

import org.apache.spark.{SparkContext, SparkConf}

/**
  * Created by Edward on 2016/9/6.
  */
object Reflection {

  // Define the schema using a case class.
  // Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
  // you can use custom classes that implement the Product interface.
  case class Person(name: String, age: Int)

  def main(args: Array[String]) {

    val conf: SparkConf = new SparkConf().setAppName("Reflection").setMaster("local")

    val sc: SparkContext = new SparkContext(conf)

    // sc is an existing SparkContext.
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    // this is used to implicitly convert an RDD to a DataFrame.
    import sqlContext.implicits._

    // Create an RDD of Person objects and register it as a table.
    val people = sc.textFile("res/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
    people.registerTempTable("people")

    // SQL statements can be run by using the sql methods provided by sqlContext.
    val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")

    // The results of SQL queries are DataFrames and support all the normal RDD operations.
    // The columns of a row in the result can be accessed by field index:
    teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

    // or by field name:
    teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)

    // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
    teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
    // Map("name" -> "Justin", "age" -> 19)

    sc.stop()
  }
}

 

2.動態

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}

/**
  * Created by Edward on 2016/9/6.
  */
object programmatic {

  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("Programmatic").setMaster("local")

    val sc = new SparkContext(conf)

    val sqlContext = new SQLContext(sc)

    // Create an RDD
    val people = sc.textFile("res/people.txt")

    // The schema is encoded in a string
    val schemaString = "name age"

    // Import Row.
    import org.apache.spark.sql.Row;

    // Import Spark SQL data types
    import org.apache.spark.sql.types.{StructType, StructField, StringType};

    // Generate the schema based on the string of schema
    val schema =
      StructType(
        schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

    // Convert records of the RDD (people) to Rows.
    val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))

    // Apply the schema to the RDD.
    val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)

    // Register the DataFrames as a table.
    peopleDataFrame.registerTempTable("people")

    // SQL statements can be run by using the sql methods provided by sqlContext.
    val results = sqlContext.sql("SELECT name FROM people")

    // The results of SQL queries are DataFrames and support all the normal RDD operations.
    // The columns of a row in the result can be accessed by field index or by field name.
    results.map(t => "Name: " + t(0)).collect().foreach(println)

    sc.stop()

  }
}

 

數據源

預設的數據源是parquet

import org.apache.spark.sql.{SaveMode, DataFrame, SQLContext}
import org.apache.spark.{SparkContext, SparkConf}


/**
  * Created by Edward on 2016/9/7.
  */
object Parquet {
  def main(args: Array[String]) {

    val sparkConf: SparkConf = new SparkConf().setAppName("Parquet").setMaster("local")
    val sc = new SparkContext(sparkConf)

    val sqlContext  = new SQLContext(sc)

    //val parquet = sqlContext.read.parquet("res/users.parquet")
    //val parquet = sqlContext.read.load("res/users.parquet")  //預設數據源是parquet,可以配置spark.sql.sources.default修改
    //val parquet  = sqlContext.read.parquet("new.parquet/*.parquet")  //可以使用模糊匹配
    //val json = sqlContext.read.format("json").load("res/people.json") //讀取json文件,通過format指定文件格式
    val json = sqlContext.read.json("res/people.json") //通過json方法直接讀取json文件

    //json.select("name","age").write.mode(SaveMode.Overwrite).save("new.parquet") //預設保存為parquet文件
    json.select("name","age").write.mode(SaveMode.Overwrite).format("json").save("jsonfile") //保存為json文件 jsonfile為指定目錄
    json.show()
    //parquet.show()

    sc.stop()
  }
}

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 註意事項 : 激活碼以及 防止SQL Server 和vs版本衝突 開發版(Developer): PTTFM-X467G-P7RH2-3Q6CG-4DMYB 企業版(Enterprise): JD8Y6-HQG69-P9H84-XDTPG-34MBB 防止衝突 需要先安裝SQL Server 20 ...
  • 使用“Ctrl + R”組合鍵快速打開cmd視窗,並輸入“cmd”命令,打開cmd視窗。 使用“Ctrl + R”組合鍵快速打開cmd視窗,並輸入“cmd”命令,打開cmd視窗。 使用“mysql -uroot -proot”命令可以連接到本地的mysql服務。 使用“mysql -uroot -p ...
  • 前一篇文章,已經介紹了BMR的基礎用法,再結合Spark和Scala的文檔,我想應該是可以開始你的數據分析之路的.這一篇文章,著重進行一些簡單的思路上的引導和分析.如果你分析招聘數據時,卡在了某個環節,可以試著閱讀本文. 在繼續下麵的各種分析前,請確保已經讀完了本系列文章的第三篇,並正確配置了BMR... ...
  • 最近在使用mybatis,然後用到了小於等於,直接在XML中使用了 sum(case when p.pool_year 猜想可能是由於特殊字元的緣故,於是用了轉義字元進行了替換了,如下: sum(case when p.pool_year <= '2014' then p.pool_rmb e ...
  • 說到資料庫,我們首先要瞭解的應該是它的歷史或者說前身。細緻的說,資料庫的發展主要經歷了三個階段:1、人工管理階段 2、文件系統階段 3、資料庫系統階段。如何來理解呢?實質上,資料庫,顧名思義,就是管理數據的倉庫。那麼顯然,這不是一個真實的倉庫,而只是一個電子倉庫。為什麼我們要有這麼一個電子的倉庫呢? ...
  • 一:Oracle11g的安裝過程(Windows版本)很簡單,步驟為: 1. 首先從Oracle官方網站上下載Oracle11g資料庫,大約為1.7G。解壓後,setup.ext就可以開始安裝 2. 在安裝開始的時候,會要求填寫一些信息。需要註意兩項: 1)SID。這應該是安裝的資料庫的Servic ...
  • 備份資料庫(進入Mysql bin目錄下) 備份表結構及數據 mysqldump -hlocalhost -uroot -proot db_dev>d:\db_dev.sql 備份表結構 mysqldump -hlocalhost -uroot -proot -d db_dev>d:\db_dev. ...
  • 最近在抓取一些社交網站的數據,抓下來的數據用MySql存儲。問我為什麼用MySql,那自然是入門簡單,並且我當時只熟悉MySql。可是,隨著數據量越來越大,有一個問題始終困擾著我,那就是 社交關係的存儲 。 就以新浪微博舉例,一個大V少則十幾萬,多則幾千萬的粉絲,這些關註關係要怎麼存呢?在MySql ...
一周排行
    -Advertisement-
    Play Games
  • 前言 本文介紹一款使用 C# 與 WPF 開發的音頻播放器,其界面簡潔大方,操作體驗流暢。該播放器支持多種音頻格式(如 MP4、WMA、OGG、FLAC 等),並具備標記、實時歌詞顯示等功能。 另外,還支持換膚及多語言(中英文)切換。核心音頻處理採用 FFmpeg 組件,獲得了廣泛認可,目前 Git ...
  • OAuth2.0授權驗證-gitee授權碼模式 本文主要介紹如何筆者自己是如何使用gitee提供的OAuth2.0協議完成授權驗證並登錄到自己的系統,完整模式如圖 1、創建應用 打開gitee個人中心->第三方應用->創建應用 創建應用後在我的應用界面,查看已創建應用的Client ID和Clien ...
  • 解決了這個問題:《winForm下,fastReport.net 從.net framework 升級到.net5遇到的錯誤“Operation is not supported on this platform.”》 本文內容轉載自:https://www.fcnsoft.com/Home/Sho ...
  • 國內文章 WPF 從裸 Win 32 的 WM_Pointer 消息獲取觸摸點繪製筆跡 https://www.cnblogs.com/lindexi/p/18390983 本文將告訴大家如何在 WPF 裡面,接收裸 Win 32 的 WM_Pointer 消息,從消息裡面獲取觸摸點信息,使用觸摸點 ...
  • 前言 給大家推薦一個專為新零售快消行業打造了一套高效的進銷存管理系統。 系統不僅具備強大的庫存管理功能,還集成了高性能的輕量級 POS 解決方案,確保頁面載入速度極快,提供良好的用戶體驗。 項目介紹 Dorisoy.POS 是一款基於 .NET 7 和 Angular 4 開發的新零售快消進銷存管理 ...
  • ABP CLI常用的代碼分享 一、確保環境配置正確 安裝.NET CLI: ABP CLI是基於.NET Core或.NET 5/6/7等更高版本構建的,因此首先需要在你的開發環境中安裝.NET CLI。這可以通過訪問Microsoft官網下載並安裝相應版本的.NET SDK來實現。 安裝ABP ...
  • 問題 問題是這樣的:第三方的webapi,需要先調用登陸介面獲取Cookie,訪問其它介面時攜帶Cookie信息。 但使用HttpClient類調用登陸介面,返回的Headers中沒有找到Cookie信息。 分析 首先,使用Postman測試該登陸介面,正常返回Cookie信息,說明是HttpCli ...
  • 國內文章 關於.NET在中國為什麼工資低的分析 https://www.cnblogs.com/thinkingmore/p/18406244 .NET在中國開發者的薪資偏低,主要因市場需求、技術棧選擇和企業文化等因素所致。歷史上,.NET曾因微軟的閉源策略發展受限,儘管後來推出了跨平臺的.NET ...
  • 在WPF開發應用中,動畫不僅可以引起用戶的註意與興趣,而且還使軟體更加便於使用。前面幾篇文章講解了畫筆(Brush),形狀(Shape),幾何圖形(Geometry),變換(Transform)等相關內容,今天繼續講解動畫相關內容和知識點,僅供學習分享使用,如有不足之處,還請指正。 ...
  • 什麼是委托? 委托可以說是把一個方法代入另一個方法執行,相當於指向函數的指針;事件就相當於保存委托的數組; 1.實例化委托的方式: 方式1:通過new創建實例: public delegate void ShowDelegate(); 或者 public delegate string ShowDe ...