TaskManager接收到來自JobManager的jobGraph轉換得到的TDD對象,啟動了任務,在StreamInputProcessor類的processInput()方法中 通過一個while(true)中不停的拉取上游的數據,然後調用streamOperator.processElem ...
TaskManager接收到來自JobManager的jobGraph轉換得到的TDD對象,啟動了任務,在StreamInputProcessor類的processInput()方法中
通過一個while(true)中不停的拉取上游的數據,然後調用streamOperator.processElement(record)調用用戶實現的方法去處理數據拉取的數據
首先先來看下這個operator對象
然後看看OneInputStreamOperator類的UML
這裡所有的實現類沒有全部列出,只列了一些代表
看到這裡,寫過Flink的streamAPI的同學,肯定感覺到很熟悉!!!!!!
這裡!不就是我們常寫flink代碼的那些運算元嘛
對沒有錯,我們程式中實現的那些運算元邏輯,最後都會被封裝成一個OneInputStreamOperator,這裡具體看一個最熟悉的Fliter
來看一下StreamFilter的processElement方法
!!!這裡傳入一個數據後,這個userFunction調用了filter方法並且把數據放進去了
當返回true通過這個output.collect發送出去了
這不就對應了我們用戶自己實現的filter運算元嘛,沒錯這個方法其實就是客戶端的filter方法,這個userFunction包含了用戶實現filter運算元的邏輯
(!!!!!就是說這個processElement方法會調用用戶的邏輯)
(所以這個userFunction可以帶上client的方法實現,這對我們很重要,特別是對flink源碼修改,為clientApi添加新功能方法,運行時可以通過這裡拿到)
繼續
來看看這個output.collect()方法
然後
看到這個,等等等等
我不是從這個processElement()方法進來的嗎,怎麼又開始調processElement()方法了
難道遞歸了? 不對不對
這裡operator不是上一個operator了,而是這個output對象的(這裡是chainOutPut)
看下這個output對象
看下UML類圖,也是只列舉了重要的
先看chainingOutPut的屬性
發現了又出現了OneInputStreamOperator對象
看到這個實現類的名字!chain聯想起了什麼
Flink會將可以chain在一起的運算元在streamGraph轉換成jobGraph的時候根據條件chain在一起
一驚!
來分別看一下ChainingOutPut和RecordWriterOutput的collect()方法有什麼區別
在chain中
在RecordWriter中
這裡chain的ouput,又繼續調用了下一個operator的processElement方法,然後又在processElement方法中又調用output.collect( ),collect中又調用了下一個operator的processElement方法
整個過程就是個無限的迴圈,直到,某一個operator的ouput不為ChainingOutPut,當變為RecordWriterOutput時
上面看到RecordWriterOutput的processElement直接emit發送出去了這個數據,再也沒有繼續調用processElement方法了
這裡也就對應了,flink中的責任鏈,chain在一起的運算元會一個接著一個執行,直到無法chain,就會往下游發送emit了
來看一下UML類圖幫助理解
里中有我,我中有你,一直相互調用直到無法chain,然後emit往下游發送(這裡肯定就有發送端的反壓邏輯,以後隨緣更新)
那這裡的迴圈調用理解了就會想,那如何確定第一個operator調用,然後進入整個調用鏈呢
回到TaskManager接收到JobManager的TDD以後初始化整個任務的時候
StreamTask.java中invoke方法中
先是初始化了一個OperatorChain,裡面其實就是一個數組StreamOperator
在他初始化的時候,其實就是為我們所有的streamOutputs設置了他的output以及會根據jobManager發送過來的TDD(包含信息)
設置成對應的ChainingOutPut還是RecordWriterOutput,chainOutput會設置他的的operator
然後獲取了getHeadOperator()其實就是獲取了他調用連中的第一個
然後在
將這個第一個operator關聯到了inputProcessor對象裡面
後面就簡單了在inputProcessor.processInput中就進入了while(true)迴圈拉取上游數據的邏輯
然後
在這裡調用的第一個processElement方法就是我們的那個headOperator
這樣整個調用責任鏈就開始從第一個Operator運行起來了