在用戶代碼中,我們設置生成水印和事件時間的方法assignTimestampsAndWatermarks()中這裡有個方法的重載 我們傳入的對象分為兩種 AssignerWithPunctuatedWatermarks(可以理解為每條數據都會產生水印,如果不想產生水印,返回一個null的水印) As ...
在用戶代碼中,我們設置生成水印和事件時間的方法assignTimestampsAndWatermarks()中這裡有個方法的重載
我們傳入的對象分為兩種
AssignerWithPunctuatedWatermarks(可以理解為每條數據都會產生水印,如果不想產生水印,返回一個null的水印)
AssignerWithPeriodicWatermarks(周期性的生成水印)
來看一下源碼中是如何實現這兩種水印的
二話不說打開org.apache.flink.streaming.runtime.operators.TimestampsAndPunctuatedWatermarksOperator.java
這個類的processElement方法
看到源碼這裡這段邏輯就 非常的清晰了
先通過用戶的代碼獲取到事件時間,註入到element裡面就直接往下個opeartor發送了
然後通過用戶代碼獲取水印,這裡會判斷水印是否為null
不為null的就直接往下游emit 了
現在看一下AssignerWithPeriodicWatermarks如何周期的發送生成的水印
直接打開TimestampsAndPeriodicWatermarksOperator.java這個類
這裡先不看processElement()方法,先看open方法
可以看到它將 當前時間其實就是System.currentTimeMillis()+ watermarkInterval水印間隔 註冊作為了一個timer定時器
這樣就知道了,當他過了這個水印間隔時間以後肯定會觸發操作
來看一下這個間隔時間以後觸發了什麼操作
可以看到,他先是獲取了當前的水印時間,然後直接emit出去了????
Periodic模式明明是在接收數據的processElement()發送水印的
然後又再次註冊了一個 當前時間+間隔的 timer,這樣就無限的觸發下去了
既然他在這裡發送了水印,來看下他的processElement方法
果然他周期性的發送水印以後,接收數據的processElement()方法裡面就沒有發送水印了
只有獲取事件時間的邏輯了