Spark SQL支持兩種RDDs轉換為DataFrames的方式 使用反射獲取RDD內的Schema 當已知類的Schema的時候,使用這種基於反射的方法會讓代碼更加簡潔而且效果也很好。 通過編程介面指定Schema 通過Spark SQL的介面創建RDD的Schema,這種方式會讓代碼比較冗長。 ...
Spark SQL支持兩種RDDs轉換為DataFrames的方式 使用反射獲取RDD內的Schema 當已知類的Schema的時候,使用這種基於反射的方法會讓代碼更加簡潔而且效果也很好。 通過編程介面指定Schema 通過Spark SQL的介面創建RDD的Schema,這種方式會讓代碼比較冗長。 這種方法的好處是,在運行時才知道數據的列以及列的類型的情況下,可以動態生成Schema。
原文和作者一起討論:http://www.cnblogs.com/intsmaze/p/6613755.html
微信:intsmaze
使用反射獲取Schema(Inferring the Schema Using Reflection)import org.apache.spark.sql.{DataFrameReader, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
object InferringSchema {
def main(args: Array[String]) {
//創建SparkConf()並設置App名稱
val conf = new SparkConf().setAppName("SQL-intsmaze")
//SQLContext要依賴SparkContext
val sc = new SparkContext(conf)
//創建SQLContext
val sqlContext = new SQLContext(sc)
//從指定的地址創建RDD
val lineRDD = sc.textFile("hdfs://192.168.19.131:9000/person.tzt").map(_.split(","))
//創建case class
//將RDD和case class關聯
val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
//導入隱式轉換,如果不導入無法將RDD轉換成DataFrame
//將RDD轉換成DataFrame
import sqlContext.implicits._
val personDF = personRDD.toDF
//註冊表
personDF.registerTempTable("intsmaze")
//傳入SQL
val df = sqlContext.sql("select * from intsmaze order by age desc limit 2")
//將結果以JSON的方式存儲到指定位置
df.write.json("hdfs://192.168.19.131:9000/personresult")
//停止Spark Context
sc.stop()
}
}
//case class一定要放到外面
case class Person(id: Int, name: String, age: Int)
spark shell中不需要導入sqlContext.implicits._是因為spark shell預設已經自動導入了。
打包提交到yarn集群:
/home/hadoop/app/spark/bin/spark-submit --class InferringSchema \
--master yarn \
--deploy-mode cluster \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 2 \
--queue default \
/home/hadoop/sparksql-1.0-SNAPSHOT.jar
通過編程介面指定Schema(Programmatically Specifying the Schema)
當JavaBean不能被預先定義的時候,編程創建DataFrame分為三步:
從原來的RDD創建一個Row格式的RDD.
創建與RDD中Rows結構匹配的StructType,通過該StructType創建表示RDD的Schema.
通過SQLContext提供的createDataFrame方法創建DataFrame,方法參數為RDD的Schema.
import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.types._ import org.apache.spark.{SparkContext, SparkConf} object SpecifyingSchema { def main(args: Array[String]) { //創建SparkConf()並設置App名稱 val conf = new SparkConf().setAppName("SQL-intsmaze") //SQLContext要依賴SparkContext val sc = new SparkContext(conf) //創建SQLContext val sqlContext = new SQLContext(sc) //從指定的地址創建RDD val personRDD = sc.textFile(args(0)).map(_.split(",")) //通過StructType直接指定每個欄位的schema val schema = StructType( List( StructField("id", IntegerType, true), StructField("name", StringType, true), StructField("age", IntegerType, true) ) ) //將RDD映射到rowRDD val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt)) //將schema信息應用到rowRDD上 val personDataFrame = sqlContext.createDataFrame(rowRDD, schema) //註冊表 personDataFrame.registerTempTable("intsmaze") //執行SQL val df = sqlContext.sql("select * from intsmaze order by age desc ") //將結果以JSON的方式存儲到指定位置 df.write.json(args(1)) //停止Spark Context sc.stop() } }將程式打成jar包,上傳到spark集群,提交Spark任務
/home/hadoop/app/spark/bin/spark-submit --class SpecifyingSchema \
--master yarn \
--deploy-mode cluster \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 2 \
--queue default \
/home/hadoop/sparksql-1.0-SNAPSHOT.jar \
hdfs://192.168.19.131:9000/person.txt hdfs://192.168.19.131:9000/intsmazeresult
/home/hadoop/app/spark/bin/spark-submit --class SpecifyingSchema \
--master yarn \
--deploy-mode client \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 2 \
--queue default \
/home/hadoop/sparksql-1.0-SNAPSHOT.jar \
hdfs://192.168.19.131:9000/person.txt hdfs://192.168.19.131:9000/intsmazeresult
在maven項目的pom.xml中添加Spark SQL的依賴
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.6.2</version>
</dependency>