Spark Streaming筆記——技術點彙總

来源:http://www.cnblogs.com/netoxi/archive/2017/08/04/7223414.html
-Advertisement-
Play Games

目錄 · 概況 · 原理 · API · DStream · WordCount示例 · Input DStream · Transformation Operation · Output Operation · 緩存與持久化 · Checkpoint · 性能調優 · 降低批次處理時間 · 設置合 ...


目錄

· 概況

· 原理

· API

    · DStream

    · WordCount示例

    · Input DStream

    · Transformation Operation

    · Output Operation

    · 緩存與持久化

    · Checkpoint

· 性能調優

    · 降低批次處理時間

    · 設置合理批次時間間隔

    · 記憶體調優


 

概況

1. Spark Streaming支持實時數據流的可擴展(scalable)、高吞吐(high-throughput)、容錯(fault-tolerant)的流處理(stream processing)。

2. 架構圖

3. 特性

    a) 可線性伸縮至超過數百個節點;

    b) 實現亞秒級延遲處理;

    c) 可與Spark批處理和互動式處理無縫集成;

    d) 提供簡單的API實現複雜演算法;

    e) 更多的流方式支持,包括KafkaFlumeKinesisTwitterZeroMQ等。

原理

Spark在接收到實時輸入數據流後,將數據劃分成批次(divides the data into batches),然後轉給Spark Engine處理,按批次生成最後的結果流(generate the final stream of results in batches)。 

API

DStream

1. DStreamDiscretized Stream,離散流)是Spark Stream提供的高級抽象連續數據流。

2. 組成:一個DStream可看作一個RDDs序列。

3. 核心思想:將計算作為一系列較小時間間隔的、狀態無關的、確定批次的任務,每個時間間隔內接收的輸入數據被可靠存儲在集群中,作為一個輸入數據集。

4. 特性:一個高層次的函數式編程API、強一致性以及高校的故障恢復。

5. 應用程式模板

    a) 模板1

 1 import org.apache.spark.SparkConf
 2 import org.apache.spark.SparkContext
 3 import org.apache.spark.streaming.Seconds
 4 import org.apache.spark.streaming.StreamingContext
 5 
 6 object Test {
 7   def main(args: Array[String]): Unit = {
 8     val conf = new SparkConf().setAppName("Test")
 9     val sc = new SparkContext(conf)
10     val ssc = new StreamingContext(sc, Seconds(1))
11     
12     // ...
13   }
14 }

    b) 模板2

 1 import org.apache.spark.SparkConf
 2 import org.apache.spark.streaming.Seconds
 3 import org.apache.spark.streaming.StreamingContext
 4 
 5 object Test {
 6   def main(args: Array[String]): Unit = {
 7     val conf = new SparkConf().setAppName("Test")
 8     val ssc = new StreamingContext(conf, Seconds(1))
 9     
10     // ...
11   }
12 }

WordCount示例

 1 import org.apache.spark.SparkConf
 2 import org.apache.spark.storage.StorageLevel
 3 import org.apache.spark.streaming.Seconds
 4 import org.apache.spark.streaming.StreamingContext
 5 import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
 6 
 7 object Test {
 8   def main(args: Array[String]): Unit = {
 9     val conf = new SparkConf().setAppName("Test")
10     val ssc = new StreamingContext(conf, Seconds(1))
11     
12     val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
13     val wordCounts = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
14     wordCounts.print
15     
16     ssc.start
17     ssc.awaitTermination
18   }
19 }

Input DStream

1. Input DStream是一種從流式數據源獲取原始數據流的DStream,分為基本輸入源(文件系統、SocketAkka Actor、自定義數據源)和高級輸入源(KafkaFlume等)。

2. Receiver

    a) 每個Input DStream(文件流除外)都會對應一個單一的Receiver對象,負責從數據源接收數據並存入Spark記憶體進行處理。應用程式中可創建多個Input DStream並行接收多個數據流。

    b) 每個Receiver是一個長期運行在Worker或者Executor上的Task,所以會占用該應用程式的一個核(core)。如果分配給Spark Streaming應用程式的核數小於或等於Input DStream個數(即Receiver個數),則只能接收數據,卻沒有能力全部處理(文件流除外,因為無需Receiver)。

3. Spark Streaming已封裝各種數據源,需要時參考官方文檔。

Transformation Operation

1. 常用Transformation

名稱

說明

map(func)

Return a new DStream by passing each element of the source DStream through a function func.

flatMap(func)

Similar to map, but each input item can be mapped to 0 or more output items.

filter(func)

Return a new DStream by selecting only the records of the source DStream on which func returns true.

repartition(numPartitions)

Changes the level of parallelism in this DStream by creating more or fewer partitions.

union(otherStream)

Return a new DStream that contains the union of the elements in the source DStream and otherDStream.

count()

Return a new DStream of single-element RDDs by counting the number of elements in each RDD of the source DStream.

reduce(func)

Return a new DStream of single-element RDDs by aggregating the elements in each RDD of the source DStream using a function func (which takes two arguments and returns one). The function should be associative so that it can be computed in parallel.

countByValue()

When called on a DStream of elements of type K, return a new DStream of (K, Long) pairs where the value of each key is its frequency in each RDD of the source DStream.

reduceByKey(func, [numTasks])

When called on a DStream of (K, V) pairs, return a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function. Note: By default, this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks.

join(otherStream, [numTasks])

When called on two DStreams of (K, V) and (K, W) pairs, return a new DStream of (K, (V, W)) pairs with all pairs of elements for each key.

cogroup(otherStream, [numTasks])

When called on a DStream of (K, V) and (K, W) pairs, return a new DStream of (K, Seq[V], Seq[W]) tuples.

transform(func)

Return a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream. This can be used to do arbitrary RDD operations on the DStream.

updateStateByKey(func)

Return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values for the key. This can be used to maintain arbitrary state data for each key.

2. updateStateByKey(func)

    a) updateStateByKey可對DStream中的數據按key做reduce,然後對各批次數據累加。

    b) WordCount的updateStateByKey版本

 1 import org.apache.spark.SparkConf
 2 import org.apache.spark.storage.StorageLevel
 3 import org.apache.spark.streaming.Seconds
 4 import org.apache.spark.streaming.StreamingContext
 5 import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
 6 
 7 object Test {
 8   def main(args: Array[String]): Unit = {
 9     val conf = new SparkConf().setAppName("Test")
10     val ssc = new StreamingContext(conf, Seconds(1))
11     
12     // updateStateByKey前需設置checkpoint
13     ssc.checkpoint("/spark/checkpoint")
14     
15     val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
16     val addFunc = (currValues: Seq[Int], prevValueState: Option[Int]) => {
17       // 當前批次單詞的總數
18       val currCount = currValues.sum
19       // 已累加的值
20       val prevCount = prevValueState.getOrElse(0)
21       // 返回累加後的結果,是一個Option[Int]類型
22       Some(currCount + prevCount)
23     }
24     val wordCounts = lines.flatMap(_.split(" ")).map((_, 1)).updateStateByKey[Int](addFunc)
25     wordCounts.print
26     
27     ssc.start
28     ssc.awaitTermination
29   }
30 }

3. transform(func)

    a) 通過對原DStream的每個RDD應用轉換函數,創建一個新的DStream。

    b) 官方文檔代碼舉例

1 val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information
2 
3 val cleanedDStream = wordCounts.transform(rdd => {
4   rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
5   ...
6 })

4. Window operations

    a) 視窗操作:基於window對數據transformation(個人認為與Storm的tick相似,但功能更強大)。

    b) 參數:視窗長度(window length)和滑動時間間隔(slide interval)必須是源DStream批次間隔的倍數。

    c) 舉例說明:視窗長度為3,滑動時間間隔為2;上一行是原始DStream,下一行是視窗化的DStream。

    d) 常見window operation

名稱

說明

window(windowLength, slideInterval)

Return a new DStream which is computed based on windowed batches of the source DStream.

countByWindow(windowLength, slideInterval)

Return a sliding window count of elements in the stream.

reduceByWindow(func, windowLength, slideInterval)

Return a new single-element stream, created by aggregating elements in the stream over a sliding interval using func. The function should be associative so that it can be computed correctly in parallel.

reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])

When called on a DStream of (K, V) pairs, returns a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function func over batches in a sliding window. Note: By default, this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks.

reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])

A more efficient version of the above reduceByKeyAndWindow() where the reduce value of each window is calculated incrementally using the reduce values of the previous window. This is done by reducing the new data that enters the sliding window, and “inverse reducing” the old data that leaves the window. An example would be that of “adding” and “subtracting” counts of keys as the window slides. However, it is applicable only to “invertible reduce functions”, that is, those reduce functions which have a corresponding “inverse reduce” function (taken as parameter invFunc). Like in reduceByKeyAndWindow, the number of reduce tasks is configurable through an optional argument. Note that checkpointing must be enabled for using this operation.

countByValueAndWindow(windowLength, slideInterval, [numTasks])

When called on a DStream of (K, V) pairs, returns a new DStream of (K, Long) pairs where the value of each key is its frequency within a sliding window. Like in reduceByKeyAndWindow, the number of reduce tasks is configurable through an optional argument.

    e) 官方文檔代碼舉例 

1 // window operations前需設置checkpoint
2 ssc.checkpoint("/spark/checkpoint")
3 
4 // Reduce last 30 seconds of data, every 10 seconds
5 val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))

5. join(otherStream, [numTasks])

    a) 連接數據流

    b) 官方文檔代碼舉例1

1 val stream1: DStream[String, String] = ...
2 val stream2: DStream[String, String] = ...
3 val joinedStream = stream1.join(stream2)

    c) 官方文檔代碼舉例2

1 val windowedStream1 = stream1.window(Seconds(20))
2 val windowedStream2 = stream2.window(Minutes(1))
3 val joinedStream = windowedStream1.join(windowedStream2)

Output Operation

名稱

說明

print()

Prints the first ten elements of every batch of data in a DStream on the driver node running the streaming application. This is useful for development and debugging.

saveAsTextFiles(prefix, [suffix])

Save this DStream's contents as text files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]".

saveAsObjectFiles(prefix, [suffix])

Save this DStream's contents as SequenceFiles of serialized Java objects. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]".

saveAsHadoopFiles(prefix, [suffix])

Save this DStream's contents as Hadoop files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]".

foreachRDD(func)

The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database. Note that the function func is executed in the driver process running the streaming application, and will usually have RDD actions in it that will force the computation of the streaming RDDs.

緩存與持久化

1. 通過persist()DStream中每個RDD存儲在記憶體。

2. Window operations會自動持久化在記憶體,無需顯示調用persist()

3. 通過網路接收的數據流(如KafkaFlumeSocketZeroMQRocketMQ等)執行persist()時,預設在兩個節點上持久化序列化後的數據,實現容錯。

Checkpoint

1. 用途:Spark基於容錯存儲系統(如HDFSS3)進行故障恢復。

2. 分類

    a) 元數據檢查點:保存流式計算信息用於Driver運行節點的故障恢復,包括創建應用程式的配置、應用程式定義的DStream operations、已入隊但未完成的批次。

    b) 數據檢查點:保存生成的RDD。由於stateful transformation需要合併多個批次的數據,即生成的RDD依賴於前幾個批次RDD的數據(dependency chain),為縮短dependency chain從而減少故障恢復時間,需將中間RDD定期保存至可靠存儲(如HDFS)。

3. 使用時機:

    a) Stateful transformationupdateStateByKey()以及window operations

    b) 需要Driver故障恢復的應用程式。

4. 使用方法

    a) Stateful transformation

streamingContext.checkpoint(checkpointDirectory)

    b) 需要Driver故障恢復的應用程式(以WordCount舉例):如果checkpoint目錄存在,則根據checkpoint數據創建新StreamingContext;否則(如首次運行)新建StreamingContext。

 1 import org.apache.spark.SparkConf
 2 import org.apache.spark.storage.StorageLevel
 3 import org.apache.spark.streaming.Seconds
 4 import org.apache.spark.streaming.StreamingContext
 5 import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
 6 
 7 object Test {
 8   def main(args: Array[String]): Unit = {
 9     val checkpointDir = "/spark/checkpoint"
10     def createContextFunc(): StreamingContext = {
11       val conf = new SparkConf().setAppName("Test")
12       val ssc = new StreamingContext(conf, Seconds(1))
13       ssc.checkpoint(checkpointDir)
14       ssc
15     }
16     val ssc = StreamingContext.getOrCreate(checkpointDir, createContextFunc _)
17     
18     val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
19     val wordCounts = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
20     wordCounts.print
21     
22     ssc.start
23     ssc.awaitTermination
24   }
25 }

5. checkpoint時間間隔

    a) 方法

dstream.checkpoint(checkpointInterval)

    b) 原則:一般設置為滑動時間間隔的5-10倍。

    c) 分析:checkpoint會增加存儲開銷、增加批次處理時間。當批次間隔較小(如1秒)時,checkpoint可能會減小operation吞吐量;反之,checkpoint時間間隔較大會導致lineage和task數量增長。

性能調優

降低批次處理時間

1. 數據接收並行度

    a) 增加DStream:接收網路數據(如Kafka、Flume、Socket等)時會對數據反序列化再存儲在Spark,由於一個DStream只有Receiver對象,如果成為瓶頸可考慮增加DStream。

1 val numStreams = 5
2 val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
3 val unifiedStream = streamingContext.union(kafkaStreams)
4 unifiedStream.print()

    b) 設置“spark.streaming.blockInterval”參數:接收的數據被存儲在Spark記憶體前,會被合併成block,而block數量決定了Task數量;舉例,當批次時間間隔為2秒且block時間間隔為200毫秒時,Task數量約為10;如果Task數量過低,則浪費了CPU資源;推薦的最小block時間間隔為50毫秒。

    c) 顯式對Input DStream重新分區:在進行更深層次處理前,先對輸入數據重新分區。

inputStream.repartition(<number of partitions>)

2. 數據處理並行度:reduceByKey、reduceByKeyAndWindow等operation可通過設置“spark.default.parallelism”參數或顯式設置並行度方法參數控制。

3. 數據序列化:可配置更高效的Kryo序列化。

設置合理批次時間間隔

1. 原則:處理數據的速度應大於或等於數據輸入的速度,即批次處理時間大於或等於批次時間間隔。

2. 方法:

    a) 先設置批次時間間隔為5-10秒以降低數據輸入速度;

    b) 再通過查看log4j日誌中的“Total delay”,逐步調整批次時間間隔,保證“Total delay”小於批次時間間隔。

記憶體調優

1. 持久化級別:開啟壓縮,設置參數“spark.rdd.compress”。

2. GC策略:在Driver和Executor上開啟CMS。

 

作者:netoxi
出處:http://www.cnblogs.com/netoxi
本文版權歸作者和博客園共有,歡迎轉載,未經同意須保留此段聲明,且在文章頁面明顯位置給出原文連接。歡迎指正與交流。

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • mysql 命令導出 Select *from 表名 into outfile "地址";導入 Source D:/mydb.sql /*將外部的sql文件導入到資料庫中*/ dos 命令mysqldump -uroot –p123456 test > c:/a.sql 在dos 視窗下,輸入該命令 ...
  • 原因:中間存在回車符或者換行符,所以要先將此符號替換掉; LTRIM(RTRIM(REPLACE(REPLACE( A,char(13),''),char(10),'') )) LTRIM(A) 去換左邊空格 RTRIM(A) 去換右邊空格 REPLACE( A,char(13),'') 將回車符替 ...
  • 轉載微信公眾號“ 架構師之路“文章 其實看完我還是有些地方不明白,先留著以後慢慢消化~~ 本文將以“好友中心”為例,介紹“多對多”類業務,隨著數據量的逐步增大,資料庫性能顯著降低,資料庫水平切分相關的架構實踐。 一、什麼是多對多關係 所謂的“多對多”,來自資料庫設計中的“實體-關係”ER模型,用來描 ...
  • 1.啟動mysql:1、net start mysql(停止mysql:net stop mysql 其中,mysql是安裝mysql時服務的名字) 2.登錄mysql:cmd中輸入:mysql -h localhost -u username -p 輸入密碼。<-h -u -p 後面的參數緊跟> ...
  • (1)mysql是一個小型關係型資料庫管理系統。 (2)mysql是一個快速、多線程、多用戶、健壯的SQL資料庫伺服器。與其他資料庫管理系統比,mysql有以下的優勢: mysql是一個關係資料庫管理系統。 mysql是開源的。 mysql伺服器是一個快速的、可靠和易使用的資料庫伺服器。 mysql ...
  • 啟動和停止SQL Server服務三種形式 電腦—>右鍵—>管理—>服務和應用程式—>服務—>sql server(MSSQLSERVER) 開始—>安裝路徑—>配置工具—>sql server配置管理器 Windows(鍵)+r 啟動SQL Server服務:net start mssqlser ...
  • 一、sql中的group by 用法解析: Group By語句從英文的字面意義上理解就是“根據(by)一定的規則進行分組(Group)”。 作用:通過一定的規則將一個數據集劃分成若幹個小的區域,然後針對若幹個小區域進行數據處理。 註意:group by 是先排序後分組! 舉例說明:如果要用到gro ...
  • 序:StreamId是storm中實現DAG有向無環圖的重要一個特性,但是從實際生產環境來看,這個功能其實蠻影響生產環境的穩定性的,我們系統在迭代時會帶來整體服務的不可用。 StreamId是storm中實現DAG有向無環圖的重要一個特性,官方也提供對應的介面實現讓開發者自己靈活化構造自己的ADG圖 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...