通過前面的文章瞭解到 Driver將用戶代碼轉換成streamGraph再轉換成Jobgraph後向Jobmanager端提交 JobManager啟動以後會在Dispatcher.java起來RPC方法submitJob(jobGraph),用於接收來自Driver轉化得到的JobGraph來啟動 ...
通過前面的文章瞭解到
Driver將用戶代碼轉換成streamGraph再轉換成Jobgraph後向Jobmanager端提交
JobManager啟動以後會在Dispatcher.java起來RPC方法submitJob(jobGraph),用於接收來自Driver轉化得到的JobGraph來啟動任務
具體來看jobGraph提交到JobManager的submitJob方法
前面都是一些調用鏈沒有什麼好講的,最後到createJobManager( )方法這裡
先看一下1,創建了一個jobmanagerRunner並且將中Driver端得到的JobGraph傳遞了進去
在創建JobManagerRunner的過程中它調用了
這裡主要是為了創建一個jobMaster,在jobMaster的構造方法中
在這裡它先是create傳入了jobgraph然後又通過createAndRestoreExecutionGraph()方法轉換得到executionGraph
這個executionGraph就可以用來調度啟動任務了
具體看一下他的轉化邏輯
可以看到它從createExecutionGraph方法中得到了executionGraph
並且通過getCheckpointCoordinator()方法得到了一個coordinator(主要是用於周期性觸發checkpoint,調用對應TaskManager的rpc生成barriers往下游發送)
繼續看一下他的轉化邏輯
在createExecutionGraph中通過ExecutionGraphBuilder.buildGraph()返回了一個executionGraph
在buildGraph()方法中
創建了一個executionGraph
為executionGraph設置一些基礎信息,包括調度方式等(這裡stream是eager的調度方法)
然後
1處得到了一個的拓撲圖包含了所有jobGraph的所有jobVertex節點
2處就是具體遍歷所有jobGraph的jobVertex生成executionGraph的頂點ExecutionJobVertex
遍歷所有jobGraph的頂點jobVertex
在這裡就具體生成了ExecutionJobVertex中的每一個ExecutionVertex[] taskVertices
當然這裡還會配置很多ExecutionGraph的信息,就不一一列舉了
配置了一些ExecutionGraph的屬性以後
調用了
可以看到我的註釋,就是說這個地方其實是和coordinator的創建有關,在這個方法中
創建了一個coordinator對象
在這裡註冊了一個JobStatus的監聽
來看一下這個監聽的作用
可以看到源碼上的註解就是說用於監聽job狀態的改變,具體監聽
看到這裡就非常明顯了
當監聽到jobstutes的狀態改變時
當jobstatus變成Running時調用了coordinator的.startCheckpointScheduler()方法其中
這裡可以看到創建了一個周期的調度線程
看下線程的run方法
這裡就真相大白了,調用了triggerCheckpoint方法觸發一次checkpoint(觸發checkpoint的邏輯以後隨緣更新到再講)
註意,前面說到只是註冊了一個監聽,也就是說這個coordinator現在其實還沒有啟動起來的!!要到監聽到jobStatus變成running才會啟動
回到最開始的這裡
1處轉化成executionGraph以後
2處具體看一下這個startJobManagerRunner()方法
把jobManager啟動了起來
在其中
啟動了這個jobMasterService
在這裡開啟了jobmaster的一些RPC,像什麼cancel job的stop job 的還有register TM的
然後startJobExecution()方法中
這裡其實會向jobManager中啟動的resourceManager的RPC請求solt信息初始化自己的的soltPool這裡不細講了,我還沒有研究
後面
這個地方就是修改job狀態和調度運行了
其中調用了scheduleExecutionGraph(),在其中又調用了
這個地方比較重要,在其中先
這裡它就通過CAS修改了jobStatue從Created變成了Running
修改完了以後還沒完,還通過這個方法notifyJobStatusChange(),這個方法裡面具體看一看
他遍歷了所有的listener,也就是說會觸發我們前面註冊的那個coordinator的監聽監聽到job狀態改變為running
這裡coordinator就啟動完成了
繼續往下,在修改完job狀態以後
因為流模式這裡是用的EAGER,flink批處理我不熟這裡就不展開了
在這個schduleEager方法中
然後
看到這裡它創建了一個TaskDeploymentDescriptor一個用於調度TaskManager端任務的tdd對象
看過前面幾篇博客的同學,就應該有印象了,在TaskManager啟動會啟動很多的RPC介面
其中有一個
一目瞭然了,這個東西是用來發送給TaskManager用於啟動TaskManager端任務的!!!!
到這裡jobManager端的job啟動任務就差不多完成了
接下來就是TaskManager端的任務了,隨緣更新的時候在說一下真正TaskManager節點是如何啟動我們job任務的