Spark Structured Streaming框架(1)之基本用法

来源:http://www.cnblogs.com/huliangwen/archive/2017/09/03/7470599.html
-Advertisement-
Play Games

Spark Struntured Streaming是Spark 2.1.0版本後新增加的流計算引擎,本博將通過幾篇博文詳細介紹這個框架。這篇是介紹Spark Structured Streaming的基本開發方法。以Spark 自帶的example進行測試和介紹,其為"StructuredNetw ...


   Spark Struntured Streaming是Spark 2.1.0版本後新增加的流計算引擎,本博將通過幾篇博文詳細介紹這個框架。這篇是介紹Spark Structured Streaming的基本開發方法。以Spark 自帶的example進行測試和介紹,其為"StructuredNetworkWordcount.scala"文件。

1. Quick Example

  由於我們是在單機上進行測試,所以需要修單機運行模型,修改後的程式如下:

package org.apache.spark.examples.sql.streaming

 

import org.apache.spark.sql.SparkSession

 

/**

* Counts words in UTF8 encoded, '\n' delimited text received from the network.

*

* Usage: StructuredNetworkWordCount <hostname> <port>

* <hostname> and <port> describe the TCP server that Structured Streaming

* would connect to receive data.

*

* To run this on your local machine, you need to first run a Netcat server

* `$ nc -lk 9999`

* and then run the example

* `$ bin/run-example sql.streaming.StructuredNetworkWordCount

* localhost 9999`

*/

object StructuredNetworkWordCount {

def main(args: Array[String]) {

if (args.length < 2) {

System.err.println("Usage: StructuredNetworkWordCount <hostname> <port>")

System.exit(1)

}

 

val host = args(0)

val port = args(1).toInt

 

val spark = SparkSession

.builder

.appName("StructuredNetworkWordCount")

.master("local[*]")

.getOrCreate()

 

import spark.implicits._

 

// Create DataFrame representing the stream of input lines from connection to host:port

val lines = spark.readStream

.format("socket")

.option("host", host)

.option("port", port)

.load()

 

// Split the lines into words

val words = lines.as[String].flatMap(_.split(" "))

 

// Generate running word count

val wordCounts = words.groupBy("value").count()

 

// Start running the query that prints the running counts to the console

val query = wordCounts.writeStream

.outputMode("complete")

.format("console")

.start()

 

query.awaitTermination()

}

}

 

2. 剖析

  對於上述所示的程式,進行如下的解讀和分析:

2.1 數據輸入

  在創建SparkSessiion對象之後,需要設置數據源的類型,及數據源的配置。然後就會數據源中源源不斷的接收數據,接收到的數據以DataFrame對象存在,該類型與Spark SQL中定義類型一樣,內部由Dataset數組組成。

如下程式所示,設置輸入源的類型為socket,並配置socket源的IP地址和埠號。接收到的數據流存儲到lines對象中,其類型為DataFarme。

// Create DataFrame representing the stream of input lines from connection to host:port

val lines = spark.readStream

.format("socket")

.option("host", host)

.option("port", port)

.load()

 

2.2 單詞統計

  如下程式所示,首先將接受到的數據流lines轉換為String類型的序列;接著每一批數據都以空格分隔為獨立的單詞;最後再對每個單詞進行分組並統計次數。

// Split the lines into words

val words = lines.as[String].flatMap(_.split(" "))

 

// Generate running word count

val wordCounts = words.groupBy("value").count()

 

2.3 數據輸出

通過DataFrame對象的writeStream方法獲取DataStreamWrite對象,DataStreamWrite類定義了一些數據輸出的方式。Quick example程式將數據輸出到控制終端。註意只有在調用start()方法後,才開始執行Streaming進程,start()方法會返回一個StreamingQuery對象,用戶可以使用該對象來管理Streaming進程。如上述程式調用awaitTermination()方法阻塞接收所有數據。

 

3. 異常

3.1 拒絕連接

  當直接提交編譯後的架包時,即沒有啟動"nc –lk 9999"時,會出現圖 11所示的錯誤。解決該異常,只需在提交(spark-submit)程式之前,先啟動"nc"命令即可解決,且不能使用"nc –lk localhost 9999".

圖 11

3.2 NoSuchMethodError

  當通過mvn打包程式後,在命令行通過spark-submit提交架包,能夠正常執行,而通過IDEA執行時會出現圖 12所示的錯誤。

圖 12

  出現這個異常,是由於項目中依賴的Scala版本與Spark編譯的版本不一致,從而導致出現這種錯誤。圖 13和圖 14所示,Spark 2.10是由Scala 2.10版本編譯而成的,而項目依賴的Scala版本是2.11.8,從而導致出現了錯誤。

圖 13

 

圖 14

  解決該問題,只需在項目的pom.xml文件中指定與spark編譯的版本一致,即可解決該問題。如圖 15所示的執行結果。

圖 15

4. 參考文獻

[1]. Structured Streaming Programming Guide.

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 本文就linux中的壓縮工具gzip、bzip2和xz以及打包工具tar做出介紹,基本上能夠滿足工作中的需求和日常的使用。 ...
  • 當在Windows刪除文件時出現找不到該項目或者顯示該文件不在磁碟中,可以嘗試以下方法: 在要刪除文件的同級目錄下 新建一文本文檔,將下列代碼複製到文檔中,將文檔保存為尾碼名為.bat的文檔(名字隨意), DEL /F /A /Q \\?\%1 RD /S /Q \\?\%1 把你想刪除的文件或文件 ...
  • Linux系統擴容方法彙總 相信很多朋友都有過這樣的經歷,本想裝個Ubantu玩玩,沒想到玩久了反而不習慣Windows了,然而開始裝系統的時候只分配了非常小的空間,那應該怎樣擴展我們的ubantu呢?下麵我為大家總結幾種方法(僅在ubantu下測試過) 一、通過系統整體遷移 首先:進入Window ...
  • Cobbler(補鞋匠)是通過將DHCP、TFTP、DNS、HTTP等服務進行集成,創建一個中央管理節點,其可以實現的功能有配置服務,創建存儲庫,解壓縮操作系統媒介,代理或集成一個配置管理系統,控制電源管理等。 Cobbler的最終目的是實現無需進行人工干預即可安裝機器。 pxe概述 預啟動執行環境 ...
  • boot分區是系統啟動中最重要的部分,如果伺服器由於病毒攻擊又或者被管理員誤刪除了boot分區。那麼就會存在潛在的風險。為什麼說是潛在的風險?因為boot分區被刪除後系統仍在繼續運行,看似無狀況但是在執行關機操作後就會無法啟動。 大致步驟 恢復過程 1.首先查看系統的磁碟情況,根目錄在邏輯捲,boo ...
  • 在windows系統中個,每個進程擁有自己獨立的虛擬地址空間(Virtual Address Space)。這一地址空間的大小與電腦硬體、操作系統以及應用程式都有關係。 對於32位程式來說,最多能使用2GB空間(0x00010000-0x7FFEFFFF)。為了獲得3GB的地址空間,在不同的win ...
  • 1 文件{ ls -rtl # 按時間倒敘列出所有目錄和文件 ll -rt touch file # 創建空白文件 rm -rf 目錄名 # 不提示刪除非空目錄(-r:遞歸刪除 -f強制) dos2unix # windows文本轉linux文本 unix2dos # linux文本轉windows ...
  • 1. 結構 1.1 概述 Structured Streaming組件滑動視窗功能由三個參數決定其功能:視窗時間、滑動步長和觸發時間. 視窗時間:是指確定數據操作的長度; 滑動步長:是指視窗每次向前移動的時間長度; 觸發時間:是指Structured Streaming將數據寫入外部DataStre ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...