今天主要介紹一下如何將 Spark dataframe 的數據轉成 json 數據。用到的是 scala 提供的 json 處理的 api。 用過 Spark SQL 應該知道,Spark dataframe 本身有提供一個 api 可以供我們將數據轉成一個 JsonArray,我們可以在 spar ...
今天主要介紹一下如何將 Spark dataframe 的數據轉成 json 數據。用到的是 scala 提供的 json 處理的 api。
用過 Spark SQL 應該知道,Spark dataframe 本身有提供一個 api 可以供我們將數據轉成一個 JsonArray,我們可以在 spark-shell 裡頭舉個慄子來看一下。
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("master").appName("test").config("spark.sql.warehouse.dir", warehouseLocation).enableHiveSupport().getOrCreate();
//提供隱式轉換功能,比如將 Rdd 轉為 dataframe
import spark.implicits._
val df:DataFrame = sc.parallelize(Array(("abc",2),("efg",4))).toDF()
df.show()
/*-------------show -----------
+---+---+
| _1| _2|
+---+---+
|abc| 2|
|efg| 4|
+---+---+
*/
//這裡使用 dataframe Api 轉換成 jsonArray
val jsonStr:String = a.toJSON.collectAsList.toString
/*--------------- json String-------------
[{"_1":"abc","_2":2}, {"_1":"efg","_2":4}]
*/
可以發現,我們可以使用 dataframe 提供的 api 直接將 dataframe 轉換成 jsonArray 的形式,但這樣子卻有些冗餘。以上面的例子來說,很多時候我要的不是這樣的形式。
[{"_1":"abc","_2":2}, {"_1":"efg","_2":4}]
而是下麵這種形式。
[{"abc":2}, {"efg":4}]
這才是我們通常會使用到的 json 格式。以 dataframe 的 api 轉換而成的 json 明顯太過冗餘。為此,我們需要藉助一些 json 處理的包,本著能懶則懶的原則,直接使用 scala 提供的 json 處理包。
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("master").appName("test").config("spark.sql.warehouse.dir", warehouseLocation).enableHiveSupport().getOrCreate();
//提供隱式轉換功能,比如將 Rdd 轉為 dataframe
import spark.implicits._
val df:DataFrame = sc.parallelize(Array(("abc",2),("efg",4))).toDF()
df.show()
/*-------------show -----------
+---+---+
| _1| _2|
+---+---+
|abc| 2|
|efg| 4|
+---+---+
*/
//接下來不一樣了
val df2Array:Array[Tuple2[String,Int]] = df.collect().map{case org.apache.spark.sql.Row(x:String,y:Int) => (x,y)}
val jsonData:Array[JSONObject] = aM.map{ i =>
new JSONObject(Map(i._1 -> i._2))
}
val jsonArray:JSONArray = new JSONArray(jsonData.toList)
/*-----------jsonArray------------
[{"abc" : 2}, {"efg" : 4}]
*/
大概說明一下上述的代碼,首先我們要先將 df 變數進行 collect 操作,將它轉換成 Array ,但是要生成 jsonObject 得是 Array[Tuple2[T,T]] 的格式,所以我們需要再進一步轉換成對應格式。這裡的 map 是函數式編程裡面的 map 。
然後也是用 map 操作生成 Array[JSONObject],最後再轉換成 JSONArray 就可以。
將數據轉換成 json 的格式通常不能太大,一般用在 spark 跑出數據結果後寫入到其他資料庫的時候會用到,比如 Mysql 。
以上~~
歡迎關註公眾號哈爾的數據城堡,裡面有數據,代碼,以及深度的思考。