在進行Spark Streaming的開發時,我們常常需要將DStream轉為DataFrame來進行進一步的處理, 共有兩種方式,方式一: 利用map運算元和tuple來完成,一般的場景下採用這種方式即可。 但是有的時候我們會遇到列數大於22的情況,這個時候會受到scala的tuple數不能超過22 ...
在進行Spark Streaming的開發時,我們常常需要將DStream轉為DataFrame來進行進一步的處理,
共有兩種方式,方式一:
val spark = SparkSession.builder()
.appName("Test")
.getOrCreate()
import spark.implicits._
dStream.foreachRDD{ rdd =>
val df = rdd.map(_.split(" "))
.map(t => (t(1),t(2),t(3)))
.toDF("col1","col2","col3")
// 業務邏輯
}
利用map運算元和tuple來完成,一般的場景下採用這種方式即可。
但是有的時候我們會遇到列數大於22的情況,這個時候會受到scala的tuple數不能超過22的影響。這時可以採用方式二:
val spark = SparkSession.builder()
.appName("Test")
.getOrCreate()
dStream.foreachRDD{ rdd =>
val res:RDD[Row] = rdd.map{ row =>
val buffer = ArrayBuffer.empty[Any]
val fields: Array[String] = row.split("\\|~\\|")
buffer.append(fields(0))
buffer.append(fields(1))
buffer.append(fields(2))
// 省略
buffer.append(fields(25))
Row.fromSeq(buffer)
}
val schema = StructType(Seq(
StructField("col1", StringType, false),
StructField("col2", StringType, false),
StructField("col3", StringType, false),
// 省略
StructField("col26", StringType, false)
))
val df: DataFrame = spark.createDataFrame(result, schema)
// 業務邏輯
}