Spark Streaming Spark Streaming 是Spark為了用戶實現流式計算的模型。 數據源包括Kafka,Flume,HDFS等。 DStream 離散化流(discretized stream), Spark Streaming 使用DStream作為抽象表示。是隨時間推移而 ...
Spark Streaming
Spark Streaming 是Spark為了用戶實現流式計算的模型。
數據源包括Kafka,Flume,HDFS等。
DStream 離散化流(discretized stream), Spark Streaming 使用DStream作為抽象表示。是隨時間推移而收到的數據的序列。DStream內部的數據都是RDD形式存儲, DStream是由這些RDD所組成的離散序列。
編寫Streaming步驟:
1.創建StreamingContext
// Create a local StreamingContext with two working thread and batch interval of 5 second.
// The master requires 2 cores to prevent from a starvation scenario.
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(5))
創建本地化StreamingContext, 需要至少2個工作線程。一個是receiver,一個是計算節點。
2.定義輸入源,創建輸入DStream
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("node1", 9999)
3.定義流的計算過程,使用transformation和output operation DStream
// Split each line into words val words = lines.flatMap(_.split(" ")) // Count each word in each batch val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) // Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print()
4.開始接收數據及處理數據,使用streamingContext.start()
ssc.start() // Start the computation
5.等待批處理被終止,使用streamingContext.awaitTermination()
ssc.awaitTermination() // Wait for the computation to terminate
6.可以手工停止批處理,使用streamingContext.stop()
數據源
數據源分為兩種
1.基本源
text,HDFS等
2.高級源
Flume,Kafka等
DStream支持兩種操作
一、轉化操作(transformation)
無狀態轉化(stateless):每個批次的處理不依賴於之前批次的數據
有狀態轉化(stateful):跨時間區間跟蹤數據的操作;一些先前批次的數據被用來在新的批次中參與運算。
- 滑動視窗:
- 追蹤狀態變化:updateStateByKey()
transform函數
transform操作允許任意RDD-to-RDD函數被應用在一個DStream中.比如在DStream中的RDD可以和DStream外部的RDD進行Join操作。通常用來過濾垃圾郵件等操作。
不屬於DStream的RDD,在每個批間隔都被調用;允許你做隨時間變化的RDD操作。RDD操作,partitions的數量,廣播變數的值,可以變化在不同的批次中。
例子:
import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Created by Edward on 2016/9/16. */ object TransformOperation { def main(args: Array[String]) { val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("TransformOperation") val ssc = new StreamingContext(conf, Seconds(5)) val textStream: ReceiverInputDStream[String] = ssc.socketTextStream("node1",9999) //黑名單過濾功能,可以將數據存到redis或者資料庫,每批次間隔都會重新取數據並參與運算,保證數據可以動態載入進來。 val blackList=Array(Tuple2("Tom", true)) val listRDD = ssc.sparkContext.parallelize(blackList).persist() //創建RDD val map = textStream.map(x=>(x.split(" ")(1),x)) //通過transform將DStream中的RDD進行過濾操作 val dStream = map.transform(rdd =>{ //listRDD.collect() //println(listRDD.collect.length) //通過RDD的左鏈接及過濾函數,對數據進行處理,生成新的RDD rdd.leftOuterJoin(listRDD).filter(x =>{ //使用transform操作DStream中的rdd rdd左鏈接listRDD, 併進行過濾操作 if(!x._2._2.isEmpty && x._2._2.get)// if(x._2._2.getOrElse(false)) //如果沒取到值則結果為false false else{ true } }) }) dStream.print() ssc.start() ssc.awaitTermination() } }
視窗函數
二、輸出操作(output operation)