Spark Structured Streaming框架(4)之視窗管理詳解

来源:http://www.cnblogs.com/huliangwen/archive/2017/09/03/7470737.html
-Advertisement-
Play Games

1. 結構 1.1 概述 Structured Streaming組件滑動視窗功能由三個參數決定其功能:視窗時間、滑動步長和觸發時間. 視窗時間:是指確定數據操作的長度; 滑動步長:是指視窗每次向前移動的時間長度; 觸發時間:是指Structured Streaming將數據寫入外部DataStre ...


1. 結構

1.1 概述

  Structured Streaming組件滑動視窗功能由三個參數決定其功能:視窗時間、滑動步長和觸發時間.

  • 視窗時間:是指確定數據操作的長度;
  • 滑動步長:是指視窗每次向前移動的時間長度;
  • 觸發時間:是指Structured Streaming將數據寫入外部DataStreamWriter的時間間隔。

圖 11

1.2 API

  用戶管理Structured Streaming的視窗功能,可以分為兩步完成:

1) 定義視窗和滑動步長

  API是通過一個全局的window方法來設置,如下所示是其Spark實現細節:

def window(timeColumn:Column, windowDuratiion:String, slideDuration:String):Column ={

window(timeColumn, windowDuration, slideDuration, "0" second)

}

  • timecolumn:具有時間戳的列;
  • windowDuration:為視窗的時間長度;
  • slideDuration:為滑動的步長;
  • return:返回的數據類型是Column
2) 設置

  Structured Streaming在通過readStream對象的load方法載入數據後,悔返回一個DataFrame對象(Dataset[T]類型)。所以用戶將上述定義的Column對象傳遞給DataFrame對象,從而就實現了視窗功能的設置。

  由於window方法返回的數據類型是Column,所以只要DataFrame對象方法中具有columnl類型的參數就可以進行設置。如Dataset的select和groupBy方法。如下是Spark源碼中select和groupBy方法的實現細節:

def select (cols:Column*):DataFrame = withPlan{

Project(cols.map(_.named),logicalPlan)

}

def groupBy(cols:Column*):RelationGroupedDataset={

RelationGroupedDataset(toDF(), cols.map(_.expr), RelationGroupedDataset.GroupByType)

}

1.3 類型

  如上述介紹的Structured Streaming API,根據Dataset提供的方法,我們可以將其分為兩類:

  1. 聚合操作:是指具有對數據進行組合操作的方法,如groupBy方法;
  2. 非聚合操作:是指普通的數據操作方法,如select方法

PS:

    兩類操作都有明確的輸出形式(outputMode),不能混用。

2. 聚合操作

2.1 操作方法

  聚合操作是指接收到的數據DataFrame先進行groupBy等操作,器操作的特征是返回RelationGroupedDataset類型的數據。若Structured Streaming存在的聚合操作,那麼輸出形式必須為"complete",否則程式會出現異常。

如下所示的聚合操作示例:

Import spark.implicits._

Val words = … // streaming DataFrame of schema{timestamp:timestamp, word:String}

val windowedCounts = words.groupBy(

window($"timestamp","10 minutes","5 minutes"),

$"word"

).count()

2.2 example

  本例是Spark程式自帶的example,其功能是接收socket數據,在接受socket數據,在接受完數據後將數據按空格" "進行分割;然後統計每個單詞出現的次數;最後按時間戳排序輸出。

如下具體程式內容:

package org.apache.spark.examples.sql.streaming

 

import java.sql.Timestamp

 

import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.functions._

 

/**

* Counts words in UTF8 encoded, '\n' delimited text received from the network over a

* sliding window of configurable duration. Each line from the network is tagged

* with a timestamp that is used to determine the windows into which it falls.

*

* Usage: StructuredNetworkWordCountWindowed <hostname> <port> <window duration>

* [<slide duration>]

* <hostname> and <port> describe the TCP server that Structured Streaming

* would connect to receive data.

* <window duration> gives the size of window, specified as integer number of seconds

* <slide duration> gives the amount of time successive windows are offset from one another,

* given in the same units as above. <slide duration> should be less than or equal to

* <window duration>. If the two are equal, successive windows have no overlap. If

* <slide duration> is not provided, it defaults to <window duration>.

*

* To run this on your local machine, you need to first run a Netcat server

* `$ nc -lk 9999`

* and then run the example

* `$ bin/run-example sql.streaming.StructuredNetworkWordCountWindowed

* localhost 9999 <window duration in seconds> [<slide duration in seconds>]`

*

* One recommended <window duration>, <slide duration> pair is 10, 5

*/

object StructuredNetworkWordCountWindowed {

 

def main(args: Array[String]) {

if (args.length < 3) {

System.err.println("Usage: StructuredNetworkWordCountWindowed <hostname> <port>" +

" <window duration in seconds> [<slide duration in seconds>]")

System.exit(1)

}

 

val host = args(0)

val port = args(1).toInt

val windowSize = args(2).toInt

val slideSize = if (args.length == 3) windowSize else args(3).toInt

if (slideSize > windowSize) {

System.err.println("<slide duration> must be less than or equal to <window duration>")

}

val windowDuration = s"$windowSize seconds"

val slideDuration = s"$slideSize seconds"

 

val spark = SparkSession

.builder

.appName("StructuredNetworkWordCountWindowed")

.getOrCreate()

 

import spark.implicits._

 

// Create DataFrame representing the stream of input lines from connection to host:port

val lines = spark.readStream

.format("socket")

.option("host", host)

.option("port", port)

.option("includeTimestamp", true) //輸出內容包括時間戳

.load()

 

// Split the lines into words, retaining timestamps

val words = lines.as[(String, Timestamp)].flatMap(line =>

line._1.split(" ").map(word => (word, line._2))

).toDF("word", "timestamp")

 

// Group the data by window and word and compute the count of each group

//設置視窗大小和滑動視窗步長

val windowedCounts = words.groupBy(

window($"timestamp", windowDuration, slideDuration), $"word"

).count().orderBy("window")

 

// Start running the query that prints the windowed word counts to the console

//由於採用聚合操作,所以需要指定"complete"輸出形式。指定"truncate"只是為了在控制台輸出時,不進行列寬度自動縮小。

val query = windowedCounts.writeStream

.outputMode("complete")

.format("console")

.option("truncate", "false")

.start()

 

query.awaitTermination()

}

}

 

3. 非聚合操作

3.1 操作方法

  非聚合操作是指接收到的數據DataFrame進行select等操作,其操作的特征是返回Dataset類型的數據。若Structured Streaming進行非聚合操作,那麼輸出形式必須為"append",否則程式會出現異常。若spark 2.1.1 版本則輸出形式開可以是"update"。

3.2 example

  本例功能只是簡單地將接收到的數據保持原樣輸出,不進行任何其它操作。只是為了觀察Structured Streaming的視窗功能。如下所示:

object StructuredNetworkWordCountWindowed {

 

def main(args: Array[String]) {

if (args.length < 3) {

System.err.println("Usage: StructuredNetworkWordCountWindowed <hostname> <port>" +

" <window duration in seconds> [<slide duration in seconds>]")

System.exit(1)

}

 

val host = args(0)

val port = args(1).toInt

val windowSize = args(2).toInt

val slideSize = if (args.length == 3) windowSize else args(3).toInt

    val triggerTime = args(4).toInt

if (slideSize > windowSize) {

System.err.println("<slide duration> must be less than or equal to <window duration>")

}

val windowDuration = s"$windowSize seconds"

val slideDuration = s"$slideSize seconds"

 

val spark = SparkSession

.builder

.appName("StructuredNetworkWordCountWindowed")

.getOrCreate()

 

import spark.implicits._

 

// Create DataFrame representing the stream of input lines from connection to host:port

val lines = spark.readStream

.format("socket")

.option("host", host)

.option("port", port)

.option("includeTimestamp", true)

.load()

 

    val wordCounts:DataFrame = lines.select(window($"timestamp",windowDuration,slideDuration),$"value")

 

 

// Start running the query that prints the windowed word counts to the console

val query = wordCounts.writeStream

.outputMode("append")

.format("console")

     .trigger(ProcessingTime(s"$triggerTime seconds"))

.option("truncate", "false")

.start()

 

query.awaitTermination()

}

}

#nc –lk 9999

1

2

3

4

5

6

#spark-submit –class structuredNetWordCount ./sparkStreaming.jar localhost 9999 3 2 1

輸出:

Batch:0

+---------------------------------------+-----+

|window |value|

|[2017-05-16 11:14:15.0,2017-05-16 11:14:19.0]|1 |

|[2017-05-16 11:14:15.0,2017-05-16 11:14:19.0]|2 |

+---------------------------------------+-----+

 

Batch:1

+---------------------------------------+-----+

|window |value|

|[2017-05-16 11:14:15.0,2017-05-16 11:14:19.0]|3 |

|[2017-05-16 11:14:18.0,2017-05-16 11:14:22.0]|3 |

|[2017-05-16 11:14:18.0,2017-05-16 11:14:22.0]|4 |

+---------------------------------------+-----+

 

Batch:2

+---------------------------------------+-----+

|window |value|

|[2017-05-16 11:14:18.0,2017-05-16 11:14:22.0]|5 |

|[2017-05-16 11:14:18.0,2017-05-16 11:14:22.0]|6 |

|[2017-05-16 11:14:21.0,2017-05-16 11:14:25.0]|6 |

+---------------------------------------+-----+

 

4. 參考文獻

[1]. Structured Streaming Programming Guide. [2]. Kafka Integration Guide.

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 用途說明 在執行Linux命令時,我們可以把輸出重定向到文件中,比如 ls >a.txt,這時我們就不能看到輸出了,如果我們既想把輸出保存到文件中,又想在屏幕上看到輸出內容,就可以使用tee命令了。tee命令讀取標準輸入,把這些內容同時輸出到標準輸出和(多個)文件中。要註意的是:在使用管道線時,前一 ...
  • 本文就linux中的壓縮工具gzip、bzip2和xz以及打包工具tar做出介紹,基本上能夠滿足工作中的需求和日常的使用。 ...
  • 當在Windows刪除文件時出現找不到該項目或者顯示該文件不在磁碟中,可以嘗試以下方法: 在要刪除文件的同級目錄下 新建一文本文檔,將下列代碼複製到文檔中,將文檔保存為尾碼名為.bat的文檔(名字隨意), DEL /F /A /Q \\?\%1 RD /S /Q \\?\%1 把你想刪除的文件或文件 ...
  • Linux系統擴容方法彙總 相信很多朋友都有過這樣的經歷,本想裝個Ubantu玩玩,沒想到玩久了反而不習慣Windows了,然而開始裝系統的時候只分配了非常小的空間,那應該怎樣擴展我們的ubantu呢?下麵我為大家總結幾種方法(僅在ubantu下測試過) 一、通過系統整體遷移 首先:進入Window ...
  • Cobbler(補鞋匠)是通過將DHCP、TFTP、DNS、HTTP等服務進行集成,創建一個中央管理節點,其可以實現的功能有配置服務,創建存儲庫,解壓縮操作系統媒介,代理或集成一個配置管理系統,控制電源管理等。 Cobbler的最終目的是實現無需進行人工干預即可安裝機器。 pxe概述 預啟動執行環境 ...
  • boot分區是系統啟動中最重要的部分,如果伺服器由於病毒攻擊又或者被管理員誤刪除了boot分區。那麼就會存在潛在的風險。為什麼說是潛在的風險?因為boot分區被刪除後系統仍在繼續運行,看似無狀況但是在執行關機操作後就會無法啟動。 大致步驟 恢復過程 1.首先查看系統的磁碟情況,根目錄在邏輯捲,boo ...
  • 在windows系統中個,每個進程擁有自己獨立的虛擬地址空間(Virtual Address Space)。這一地址空間的大小與電腦硬體、操作系統以及應用程式都有關係。 對於32位程式來說,最多能使用2GB空間(0x00010000-0x7FFEFFFF)。為了獲得3GB的地址空間,在不同的win ...
  • 1 文件{ ls -rtl # 按時間倒敘列出所有目錄和文件 ll -rt touch file # 創建空白文件 rm -rf 目錄名 # 不提示刪除非空目錄(-r:遞歸刪除 -f強制) dos2unix # windows文本轉linux文本 unix2dos # linux文本轉windows ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...