前幾天在社區群上,有人問了一個問題 既然上游最小水印會決定視窗觸發,那如果我上游其中一條流突然沒有了數據,我的視窗還會繼續觸發嗎? 看到這個問題,我蒙了???? 對哈,因為我是選擇上游所有流中水印最小的一條作為當前水印時間,那萬一最小水印的那條流突然裡面沒有數據了 那我的最小水印不就一直不往前走了, ...
前幾天在社區群上,有人問了一個問題
既然上游最小水印會決定視窗觸發,那如果我上游其中一條流突然沒有了數據,我的視窗還會繼續觸發嗎?
看到這個問題,我蒙了????
對哈,因為我是選擇上游所有流中水印最小的一條作為當前水印時間,那萬一最小水印的那條流突然裡面沒有數據了
那我的最小水印不就一直不往前走了,一直是那個沒有數據流的水印了嗎,因為它的水印最小,而且一直不會更新了
????然後視窗再也不觸發????
思考了一下,發現好像也對,當我有一個上游的水印沒來的時候,我就等著唄,誰知道他是不是延遲了
但是!!!
萬一他真的就是正常的,出現這種hash極端數據傾斜的情況怎麼辦呢,MQ的一個partation就是沒有數據
那難不成我還真不計算了,一直等著?
懷著這個疑問
首先我想到的是,難道是在生成水印的時候,這條流沒有數據了,我為了不讓流停下來,就算沒數據也周期性的發送水印?
於是有了這篇文章 Flink中Periodic水印和Punctuated水印實現原理(源碼分析)
但是,無果!!!
那想要流不停下計算只能在source端實現了,於是看了下源碼
看到sourceFunction.java介面的這個方法時,便解開了我的疑惑
上面就是說事件時間處理時,可以把流標記為 idle停滯的,就是說這個流不會再發送數據和水印了
且允許下游任務推進
ok 找到了那現在來看一下它是如何實現的,看下具體實現類
這裡看到這個streamStatus 的停滯idle狀態會被emit廣播往下游發送
既然往下發了,看下下游接收到這個status是做了什麼
打開StreamInputProcessor.java的processInput()方法 (這裡是task端運行job的邏輯以後隨緣更新到會細講)
這裡接收到了某上游流的狀態改變了,這裡毫無疑問就是更新stream的狀態
修改了stream和channel的狀態為idle 停滯 以後呢
來到水印更新的邏輯 (這裡不瞭解的可以看看這裡 Flink中watermark為什麼選擇最小一條(源碼分析))
前面就是說如果是來自已經是idle停滯的流的水印,那我就忽略這條水印
然後來看看,來自沒有停滯idle的流的水印,是如何更新當前水印的 findAndOutputNewMinWatermarkAcrossAlignedChannels方法
註意到這裡
會先判斷這個channel是否是idel的!!!!
也就是說當某一個上游的流沒有數據停滯了,他是不會參與水印更新邏輯的
真相大白,水印還是會繼續往前推進不會停下,計算不會停下
這裡就引出了一個思考也是自己在思考的
這裡暴露的介面其實是留給我們source源自己實現的,什麼時候我們認為流變成了停滯的,我們想他繼續強
制推進,繼續計算,應該都是要我們自己去決定的,就是說,我是等著數據來才計算呢,還是我繼續強制流繼續
執行呢,其實是根據自己對source的設計來的,這也是自己的一個思考,自己也沒有細研究以後會研究一下主流
source的設計,看能不能解開自己的疑惑
五分鐘以後 這!!!FlinkKafkaConsumerBase.java
難道沒有offset就停滯了,這麼簡單嗎