SQL 程式中SQL執行的結果返回的是DataFrame, DataFrames DataFrames是分散式數據集,由帶名字的列組成。類似關係型資料庫的結構。 DataFrames的數據來源包括:結構化數據文件,Hive表,RDDs,外部資料庫;結構化數據文件包括json,parquet. Dat ...
SQL
程式中SQL執行的結果返回的是DataFrame,
DataFrames
DataFrames是分散式數據集,由帶名字的列組成。類似關係型資料庫的結構。
DataFrames的數據來源包括:結構化數據文件,Hive表,RDDs,外部資料庫;json是半結構化文件.
DataFrames的操作
import org.apache.spark.sql.{Column, DataFrame, SQLContext} import org.apache.spark.{SparkContext, SparkConf} /** * Created by Edward on 2016/9/6. */ object DFTest { def main(args: Array[String]) { val conf: SparkConf = new SparkConf().setAppName("DF").setMaster("local") val sc: SparkContext = new SparkContext(conf) val sqlContext: SQLContext = new SQLContext(sc) val df: DataFrame = sqlContext.read.json("D:\\documents\\Spark\\MyDemo\\Test\\res\\people.json")
// Show the content of the DataFrame df.show() // age name // null Michael // 30 Andy // 19 Justin // Print the schema in a tree format df.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df.select("name").show() // name // Michael // Andy // Justin // Select everybody, but increment the age by 1 df.select(df("name"), df("age") + 1).show() // name (age + 1) // Michael null // Andy 31 // Justin 20 // Select people older than 21 df.filter(df("age") > 21).show() // age name // 30 Andy // Count people by age df.groupBy("age").count().show() // age count // null 1 // 19 1 // 30 1 sc.stop() } }
讀取的Json的文件內容:
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
更多操作參考:http://spark.apache.org/docs/1.6.0/api/scala/index.html#org.apache.spark.sql.DataFrame
兩種方式將RDDs轉換成DataFrames
如果是json和parquet直接可以轉換成DF,如果是普通的數據文件需要將讀取的文件數據結構RDDs轉換成DataFrames。
1.反射 (簡潔,需要指定表結構類型)
import org.apache.spark.{SparkContext, SparkConf} /** * Created by Edward on 2016/9/6. */ object Reflection { // Define the schema using a case class. // Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit, // you can use custom classes that implement the Product interface. case class Person(name: String, age: Int) def main(args: Array[String]) { val conf: SparkConf = new SparkConf().setAppName("Reflection").setMaster("local") val sc: SparkContext = new SparkContext(conf) // sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // this is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._ // Create an RDD of Person objects and register it as a table. val people = sc.textFile("res/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF() people.registerTempTable("people") // SQL statements can be run by using the sql methods provided by sqlContext. val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19") // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by field index: teenagers.map(t => "Name: " + t(0)).collect().foreach(println) // or by field name: teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println) // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T] teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println) // Map("name" -> "Justin", "age" -> 19) sc.stop() } }
2.動態
import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkContext, SparkConf} /** * Created by Edward on 2016/9/6. */ object programmatic { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Programmatic").setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) // Create an RDD val people = sc.textFile("res/people.txt") // The schema is encoded in a string val schemaString = "name age" // Import Row. import org.apache.spark.sql.Row; // Import Spark SQL data types import org.apache.spark.sql.types.{StructType, StructField, StringType}; // Generate the schema based on the string of schema val schema = StructType( schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))) // Convert records of the RDD (people) to Rows. val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim)) // Apply the schema to the RDD. val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema) // Register the DataFrames as a table. peopleDataFrame.registerTempTable("people") // SQL statements can be run by using the sql methods provided by sqlContext. val results = sqlContext.sql("SELECT name FROM people") // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by field index or by field name. results.map(t => "Name: " + t(0)).collect().foreach(println) sc.stop() } }
數據源
預設的數據源是parquet
import org.apache.spark.sql.{SaveMode, DataFrame, SQLContext} import org.apache.spark.{SparkContext, SparkConf} /** * Created by Edward on 2016/9/7. */ object Parquet { def main(args: Array[String]) { val sparkConf: SparkConf = new SparkConf().setAppName("Parquet").setMaster("local") val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) //val parquet = sqlContext.read.parquet("res/users.parquet") //val parquet = sqlContext.read.load("res/users.parquet") //預設數據源是parquet,可以配置spark.sql.sources.default修改 //val parquet = sqlContext.read.parquet("new.parquet/*.parquet") //可以使用模糊匹配 //val json = sqlContext.read.format("json").load("res/people.json") //讀取json文件,通過format指定文件格式 val json = sqlContext.read.json("res/people.json") //通過json方法直接讀取json文件 //json.select("name","age").write.mode(SaveMode.Overwrite).save("new.parquet") //預設保存為parquet文件 json.select("name","age").write.mode(SaveMode.Overwrite).format("json").save("jsonfile") //保存為json文件 jsonfile為指定目錄 json.show() //parquet.show() sc.stop() } }