1. 結構 1.1 概述 Structured Streaming組件滑動視窗功能由三個參數決定其功能:視窗時間、滑動步長和觸發時間. 視窗時間:是指確定數據操作的長度; 滑動步長:是指視窗每次向前移動的時間長度; 觸發時間:是指Structured Streaming將數據寫入外部DataStre ...
1. 結構
1.1 概述
Structured Streaming組件滑動視窗功能由三個參數決定其功能:視窗時間、滑動步長和觸發時間.
- 視窗時間:是指確定數據操作的長度;
- 滑動步長:是指視窗每次向前移動的時間長度;
- 觸發時間:是指Structured Streaming將數據寫入外部DataStreamWriter的時間間隔。
圖 11
1.2 API
用戶管理Structured Streaming的視窗功能,可以分為兩步完成:
1) 定義視窗和滑動步長API是通過一個全局的window方法來設置,如下所示是其Spark實現細節:
def window(timeColumn:Column, windowDuratiion:String, slideDuration:String):Column ={ window(timeColumn, windowDuration, slideDuration, "0" second) } |
- timecolumn:具有時間戳的列;
- windowDuration:為視窗的時間長度;
- slideDuration:為滑動的步長;
- return:返回的數據類型是Column。
Structured Streaming在通過readStream對象的load方法載入數據後,悔返回一個DataFrame對象(Dataset[T]類型)。所以用戶將上述定義的Column對象傳遞給DataFrame對象,從而就實現了視窗功能的設置。
由於window方法返回的數據類型是Column,所以只要DataFrame對象方法中具有columnl類型的參數就可以進行設置。如Dataset的select和groupBy方法。如下是Spark源碼中select和groupBy方法的實現細節:
def select (cols:Column*):DataFrame = withPlan{ Project(cols.map(_.named),logicalPlan) } |
def groupBy(cols:Column*):RelationGroupedDataset={ RelationGroupedDataset(toDF(), cols.map(_.expr), RelationGroupedDataset.GroupByType) } |
1.3 類型
如上述介紹的Structured Streaming API,根據Dataset提供的方法,我們可以將其分為兩類:
- 聚合操作:是指具有對數據進行組合操作的方法,如groupBy方法;
- 非聚合操作:是指普通的數據操作方法,如select方法
PS:
兩類操作都有明確的輸出形式(outputMode),不能混用。
2. 聚合操作
2.1 操作方法
聚合操作是指接收到的數據DataFrame先進行groupBy等操作,器操作的特征是返回RelationGroupedDataset類型的數據。若Structured Streaming存在的聚合操作,那麼輸出形式必須為"complete",否則程式會出現異常。
如下所示的聚合操作示例:
Import spark.implicits._ Val words = … // streaming DataFrame of schema{timestamp:timestamp, word:String} val windowedCounts = words.groupBy( window($"timestamp","10 minutes","5 minutes"), $"word" ).count() |
2.2 example
本例是Spark程式自帶的example,其功能是接收socket數據,在接受socket數據,在接受完數據後將數據按空格" "進行分割;然後統計每個單詞出現的次數;最後按時間戳排序輸出。
如下具體程式內容:
package org.apache.spark.examples.sql.streaming
import java.sql.Timestamp
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._
/** * Counts words in UTF8 encoded, '\n' delimited text received from the network over a * sliding window of configurable duration. Each line from the network is tagged * with a timestamp that is used to determine the windows into which it falls. * * Usage: StructuredNetworkWordCountWindowed <hostname> <port> <window duration> * [<slide duration>] * <hostname> and <port> describe the TCP server that Structured Streaming * would connect to receive data. * <window duration> gives the size of window, specified as integer number of seconds * <slide duration> gives the amount of time successive windows are offset from one another, * given in the same units as above. <slide duration> should be less than or equal to * <window duration>. If the two are equal, successive windows have no overlap. If * <slide duration> is not provided, it defaults to <window duration>. * * To run this on your local machine, you need to first run a Netcat server * `$ nc -lk 9999` * and then run the example * `$ bin/run-example sql.streaming.StructuredNetworkWordCountWindowed * localhost 9999 <window duration in seconds> [<slide duration in seconds>]` * * One recommended <window duration>, <slide duration> pair is 10, 5 */ object StructuredNetworkWordCountWindowed {
def main(args: Array[String]) { if (args.length < 3) { System.err.println("Usage: StructuredNetworkWordCountWindowed <hostname> <port>" + " <window duration in seconds> [<slide duration in seconds>]") System.exit(1) }
val host = args(0) val port = args(1).toInt val windowSize = args(2).toInt val slideSize = if (args.length == 3) windowSize else args(3).toInt if (slideSize > windowSize) { System.err.println("<slide duration> must be less than or equal to <window duration>") } val windowDuration = s"$windowSize seconds" val slideDuration = s"$slideSize seconds"
val spark = SparkSession .builder .appName("StructuredNetworkWordCountWindowed") .getOrCreate()
import spark.implicits._
// Create DataFrame representing the stream of input lines from connection to host:port val lines = spark.readStream .format("socket") .option("host", host) .option("port", port) .option("includeTimestamp", true) //輸出內容包括時間戳 .load()
// Split the lines into words, retaining timestamps val words = lines.as[(String, Timestamp)].flatMap(line => line._1.split(" ").map(word => (word, line._2)) ).toDF("word", "timestamp")
// Group the data by window and word and compute the count of each group //設置視窗大小和滑動視窗步長 val windowedCounts = words.groupBy( window($"timestamp", windowDuration, slideDuration), $"word" ).count().orderBy("window")
// Start running the query that prints the windowed word counts to the console //由於採用聚合操作,所以需要指定"complete"輸出形式。指定"truncate"只是為了在控制台輸出時,不進行列寬度自動縮小。 val query = windowedCounts.writeStream .outputMode("complete") .format("console") .option("truncate", "false") .start()
query.awaitTermination() } } |
3. 非聚合操作
3.1 操作方法
非聚合操作是指接收到的數據DataFrame進行select等操作,其操作的特征是返回Dataset類型的數據。若Structured Streaming進行非聚合操作,那麼輸出形式必須為"append",否則程式會出現異常。若spark 2.1.1 版本則輸出形式開可以是"update"。
3.2 example
本例功能只是簡單地將接收到的數據保持原樣輸出,不進行任何其它操作。只是為了觀察Structured Streaming的視窗功能。如下所示:
object StructuredNetworkWordCountWindowed {
def main(args: Array[String]) { if (args.length < 3) { System.err.println("Usage: StructuredNetworkWordCountWindowed <hostname> <port>" + " <window duration in seconds> [<slide duration in seconds>]") System.exit(1) }
val host = args(0) val port = args(1).toInt val windowSize = args(2).toInt val slideSize = if (args.length == 3) windowSize else args(3).toInt val triggerTime = args(4).toInt if (slideSize > windowSize) { System.err.println("<slide duration> must be less than or equal to <window duration>") } val windowDuration = s"$windowSize seconds" val slideDuration = s"$slideSize seconds"
val spark = SparkSession .builder .appName("StructuredNetworkWordCountWindowed") .getOrCreate()
import spark.implicits._
// Create DataFrame representing the stream of input lines from connection to host:port val lines = spark.readStream .format("socket") .option("host", host) .option("port", port) .option("includeTimestamp", true) .load()
val wordCounts:DataFrame = lines.select(window($"timestamp",windowDuration,slideDuration),$"value")
// Start running the query that prints the windowed word counts to the console val query = wordCounts.writeStream .outputMode("append") .format("console") .trigger(ProcessingTime(s"$triggerTime seconds")) .option("truncate", "false") .start()
query.awaitTermination() } } |
#nc –lk 9999 1 2 3 4 5 6 |
#spark-submit –class structuredNetWordCount ./sparkStreaming.jar localhost 9999 3 2 1 |
輸出: Batch:0 +---------------------------------------+-----+ |window |value| |[2017-05-16 11:14:15.0,2017-05-16 11:14:19.0]|1 | |[2017-05-16 11:14:15.0,2017-05-16 11:14:19.0]|2 | +---------------------------------------+-----+
Batch:1 +---------------------------------------+-----+ |window |value| |[2017-05-16 11:14:15.0,2017-05-16 11:14:19.0]|3 | |[2017-05-16 11:14:18.0,2017-05-16 11:14:22.0]|3 | |[2017-05-16 11:14:18.0,2017-05-16 11:14:22.0]|4 | +---------------------------------------+-----+
Batch:2 +---------------------------------------+-----+ |window |value| |[2017-05-16 11:14:18.0,2017-05-16 11:14:22.0]|5 | |[2017-05-16 11:14:18.0,2017-05-16 11:14:22.0]|6 | |[2017-05-16 11:14:21.0,2017-05-16 11:14:25.0]|6 | +---------------------------------------+-----+ |
4. 參考文獻
[1]. Structured Streaming Programming Guide. [2]. Kafka Integration Guide.