前面說到了 Flink的TaskManager啟動(源碼分析) 啟動了TaskManager 然後 Flink的Job啟動JobManager端(源碼分析) 說到JobManager會將轉化得到的TDD發送到TaskManager的RPC 這篇主要就講一下,Job在TaskManager端是如何啟動 ...
前面說到了 Flink的TaskManager啟動(源碼分析) 啟動了TaskManager
然後 Flink的Job啟動JobManager端(源碼分析) 說到JobManager會將轉化得到的TDD發送到TaskManager的RPC
這篇主要就講一下,Job在TaskManager端是如何啟動的
先來看一下,TaskManager端用來接收JobManager發送過來的TDD對象的RPC介面
在TaskExecutor.java中
這個方法用於接收了一個TaskDeploymentDescriptor對象用於啟動任務(上一篇知道這裡executionGraph的每一個並行度都會調用deploy方法生成一個TDD)
來看一下具體接收到以後做了什麼
創建了一個Task並且將其內部的一個線程啟動起來了
註意這裡從TDD中得到了InputGate,Partition的信息,用於創建InputGate,ResultPartition
InputGate用於對接上游產生的數據(消費)
ResultPartition用於往下游發送自己產生的數據(生產)
來看一下Task創建,在Task的構造方法中
這裡看到創建了對應往下游發送數據的ResultPartition
ResultPartition中創建的SubPartition具體分為
可以看到就是說三個參數分別對應
PIPELINED 可以邊消費邊生產,是有背壓的,這個partition沒有buffer數量的限制
(因為背壓的控制是通過,接收數據端公用同一個指定大小的bufferPool,以後背壓的時候講)
其他同理
這裡看一下不同類型的ResultPartitionType是創建的什麼subpartitions
BLOCKING 這種創建了一個SpillableSubpartition並且傳進去了一個ioManager(這個ioManager以後io管理細講)
大致看了一下就是說這種Subpartition是會落盤的
PIPELINED 而這種方式是完全基於記憶體的
根據上游的信息創建好ResultPartition以後
接著創建了InputGate用於接收上游的數據,並且在create方法中
會根據partition的位置創建對應的channel,這裡可以分為
Local 就是說下游和自己是在同一臺機器
Remote 下游是需要通過網路發送的
並且在這裡將inputGate和它所有的inputChannels關聯了起來
創建完inputGate以後Task就初始化完了,然後會被start()起來,來看下Task的run方法
在run方法中
這個地方會為初始化inputGate與ResultPartition的bufferPool(以後講到反壓在講)
繼續
這裡通過反射創建了一個StreamTask的實例
並且
調用了他的invoke()方法,這裡也是Job開始的邏輯,來看一下invoke方法
在invoke方法中
只要知道這裡會初始化OperatorChain這裡包含了我們用戶運算元的邏輯(這裡不細講,隨緣講到Task操作責任鏈的時候講)
然後得到了operatorChain的頭headoperator其實這裡的頭就包含了用戶的第一個運算元邏輯在裡面
然後init()方法中用上面的headoperator初始化了一個inputProcess對象並且關聯上了上面創建的inputGate(也是留到責任鏈講)
接著
這裡就是上面在init方法中創建的inputProcess,並且調用了他的processInput方法
重頭戲來了,來看一下processInput方法
這裡有個while(true)也就是說這裡會一直迴圈下去
來看一下他迴圈做什麼
這裡!!!!這個streamOperator就是上面構造inputProcess時傳入的headOperator
這個processElement方法裡面就是調用用戶的方法啦
也就是不停的從上游接收到數據以後,調用用戶具體的處理邏輯
這裡job就啟動完成了
註意這個while迴圈內既然開始走我們用戶的邏輯,那肯定會先從inputGate關聯到的上游獲取數據
這裡就非常重要了,因為接收數據就包含了很多的機制的實現
包含了watermark處理的邏輯,水印對齊的邏輯,水印更新的邏輯,如下
以及idle停滯流邏輯,流狀態更新邏輯
以及如何接收數據邏輯,接收端反壓的邏輯,barriers對齊的邏輯,checkpoint觸發的邏輯
所以這個StreamInputProcessor.processInput()方法是一個非常重要的方法,以後隨緣更新各種機制的時候也會經常看到