案例需求: 假設用戶需要每個1秒鐘需要統計4秒鐘 視窗中數據的量,然後對統計的結果值進行checkpoint處理 ##### 數據規劃 使用自定義運算元每秒鐘產生大約10000條數據 產生的數據為一個四元組(Long,String,String,Interger)-- (id,name,info,co ...
案例需求:
假設用戶需要每個1秒鐘需要統計4秒鐘 視窗中數據的量,然後對統計的結果值進行checkpoint處理
數據規劃
使用自定義運算元每秒鐘產生大約10000條數據
產生的數據為一個四元組(Long,String,String,Interger)-- (id,name,info,count)
數據經統計後,統計結果列印到終端輸出
列印輸出的結果為Long類型的數據
開發自定義數據源:
代碼實現:
// ** 開發自定義數據源
// 1、自定義樣例類
case class Msg(id:Long, name:String,info:String,cout:Int)
// 2、自定義數據源,繼承RichSourceFunction
class MySourceFunction extends RichSourceFunction[Msg]{
var isRunning = true
// 3、實現run方法,每秒向流中註入10000個樣例類
override def run(ctx: SourceFunction.SourceContext[Msg]): Unit = {
while (isRunning){
for(i<-0 until 10000){
//收集數據
ctx.collect(Msg(1L, "name_"+i, "test_info", 1))
}
// 休眠 1s
TimeUnit.SECONDS.sleep(1)
}
}
override def cancel(): Unit = {
isRunning = false
}
}
開發自定義的狀態
代碼實現:
// ** 開發自定義狀態 **
//1、繼承Serializable ListCheckpointed
class UDFState extends Serializable{
private var count = 0L
//2、為總數count提供set和get方法
def setState(s:Long) = count = s
def getState:Long = count
}
開發自定義Window和檢查點
代碼實現:
//1、繼承WindowFunction
//3、繼承ListCheckpointed
class MyWindowAndCheckpoint extends WindowFunction[Msg,Long,Tuple,TimeWindow] with ListCheckpointed[UDFState]{
// 求和總數
var total = 0L
//2、重寫apply方法,對視窗數據進行總數累加
override def apply(key: Tuple, window: TimeWindow, input: Iterable[Msg], out: Collector[Long]): Unit = {
var count = 0L
for(msg<-input){
count = count + 1
}
total = total + count
out.collect(count)
}
// 自定義快照
override def snapshotState(checkpointId: Long, timestamp: Long): util.List[UDFState] = {
val udfList = new util.ArrayList[UDFState]()
// 創建UDFState對象
var udfState = new UDFState
udfState.setState(total)
udfList.add(udfState)
// 返回數據
udfList
}
// 恢復快照
override def restoreState(state: util.List[UDFState]): Unit = {
val udfState:UDFState = state.get(0)
// 取出監測點的值 賦值給total即可
total = udfState.getState
}
}
開發主業務
代碼實現
def main(args: Array[String]): Unit = {
// 1、流處理環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 2、開啟checkpoint,間隔時間為6s
env.enableCheckpointing(6000)
// 3、設置checkpoint位置
env.setStateBackend(new FsStateBackend("file:///E:/itcast_zz_test/maven_flink/flink-base/src/dev_checkpoint"))
// 4、添加數據源
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 5、添加數據源
import org.apache.flink.api.scala._
val sourceDataStream:DataStream[Msg] = env.addSource(new MySourceFunction)
//6、添加水印支持
val watermarkDataStream = sourceDataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Msg]() {
override def getCurrentWatermark: Watermark = {
new Watermark(System.currentTimeMillis())
}
// 抽取當前時間
override def extractTimestamp(element: Msg, previousElementTimestamp: Long): Long = {
System.currentTimeMillis()
}
})
//7、keyby分組
val keyedStream: KeyedStream[Msg, Tuple] = watermarkDataStream.keyBy(0)
//8、設置滑動視窗,視窗時間為4s,滑動事件為1s
val windowedSteam:WindowedStream[Msg, Tuple, TimeWindow] = keyedStream.timeWindow(Time.seconds(4), Time.seconds(1))
//9、指定自定義視窗
val result:DataStream[Long] = windowedSteam.apply(new MyWindowAndCheckpoint)
//10、列印結果
result.print()
//11、執行任務
env.execute()
}
引用的包
package com.wanghao
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import java.util
import java.util.concurrent.TimeUnit