Spark Struntured Streaming是Spark 2.1.0版本後新增加的流計算引擎,本博將通過幾篇博文詳細介紹這個框架。這篇是介紹Spark Structured Streaming的基本開發方法。以Spark 自帶的example進行測試和介紹,其為"StructuredNetw ...
Spark Struntured Streaming是Spark 2.1.0版本後新增加的流計算引擎,本博將通過幾篇博文詳細介紹這個框架。這篇是介紹Spark Structured Streaming的基本開發方法。以Spark 自帶的example進行測試和介紹,其為"StructuredNetworkWordcount.scala"文件。
1. Quick Example
由於我們是在單機上進行測試,所以需要修單機運行模型,修改後的程式如下:
package org.apache.spark.examples.sql.streaming
import org.apache.spark.sql.SparkSession
/** * Counts words in UTF8 encoded, '\n' delimited text received from the network. * * Usage: StructuredNetworkWordCount <hostname> <port> * <hostname> and <port> describe the TCP server that Structured Streaming * would connect to receive data. * * To run this on your local machine, you need to first run a Netcat server * `$ nc -lk 9999` * and then run the example * `$ bin/run-example sql.streaming.StructuredNetworkWordCount * localhost 9999` */ object StructuredNetworkWordCount { def main(args: Array[String]) { if (args.length < 2) { System.err.println("Usage: StructuredNetworkWordCount <hostname> <port>") System.exit(1) }
val host = args(0) val port = args(1).toInt
val spark = SparkSession .builder .appName("StructuredNetworkWordCount") .master("local[*]") .getOrCreate()
import spark.implicits._
// Create DataFrame representing the stream of input lines from connection to host:port val lines = spark.readStream .format("socket") .option("host", host) .option("port", port) .load()
// Split the lines into words val words = lines.as[String].flatMap(_.split(" "))
// Generate running word count val wordCounts = words.groupBy("value").count()
// Start running the query that prints the running counts to the console val query = wordCounts.writeStream .outputMode("complete") .format("console") .start()
query.awaitTermination() } } |
2. 剖析
對於上述所示的程式,進行如下的解讀和分析:
2.1 數據輸入
在創建SparkSessiion對象之後,需要設置數據源的類型,及數據源的配置。然後就會數據源中源源不斷的接收數據,接收到的數據以DataFrame對象存在,該類型與Spark SQL中定義類型一樣,內部由Dataset數組組成。
如下程式所示,設置輸入源的類型為socket,並配置socket源的IP地址和埠號。接收到的數據流存儲到lines對象中,其類型為DataFarme。
// Create DataFrame representing the stream of input lines from connection to host:port val lines = spark.readStream .format("socket") .option("host", host) .option("port", port) .load() |
2.2 單詞統計
如下程式所示,首先將接受到的數據流lines轉換為String類型的序列;接著每一批數據都以空格分隔為獨立的單詞;最後再對每個單詞進行分組並統計次數。
// Split the lines into words val words = lines.as[String].flatMap(_.split(" "))
// Generate running word count val wordCounts = words.groupBy("value").count() |
2.3 數據輸出
通過DataFrame對象的writeStream方法獲取DataStreamWrite對象,DataStreamWrite類定義了一些數據輸出的方式。Quick example程式將數據輸出到控制終端。註意只有在調用start()方法後,才開始執行Streaming進程,start()方法會返回一個StreamingQuery對象,用戶可以使用該對象來管理Streaming進程。如上述程式調用awaitTermination()方法阻塞接收所有數據。
3. 異常
3.1 拒絕連接
當直接提交編譯後的架包時,即沒有啟動"nc –lk 9999"時,會出現圖 11所示的錯誤。解決該異常,只需在提交(spark-submit)程式之前,先啟動"nc"命令即可解決,且不能使用"nc –lk localhost 9999".
圖 11
3.2 NoSuchMethodError
當通過mvn打包程式後,在命令行通過spark-submit提交架包,能夠正常執行,而通過IDEA執行時會出現圖 12所示的錯誤。
圖 12
出現這個異常,是由於項目中依賴的Scala版本與Spark編譯的版本不一致,從而導致出現這種錯誤。圖 13和圖 14所示,Spark 2.10是由Scala 2.10版本編譯而成的,而項目依賴的Scala版本是2.11.8,從而導致出現了錯誤。
圖 13
圖 14
解決該問題,只需在項目的pom.xml文件中指定與spark編譯的版本一致,即可解決該問題。如圖 15所示的執行結果。
圖 15
4. 參考文獻
[1]. Structured Streaming Programming Guide.