Flink中TaskManager端執行用戶邏輯過程(源碼分析)

来源:https://www.cnblogs.com/ljygz/archive/2019/09/11/11504220.html
-Advertisement-
Play Games

TaskManager接收到來自JobManager的jobGraph轉換得到的TDD對象,啟動了任務,在StreamInputProcessor類的processInput()方法中 通過一個while(true)中不停的拉取上游的數據,然後調用streamOperator.processElem ...


TaskManager接收到來自JobManager的jobGraph轉換得到的TDD對象,啟動了任務,在StreamInputProcessor類的processInput()方法中

通過一個while(true)中不停的拉取上游的數據,然後調用streamOperator.processElement(record)調用用戶實現的方法去處理數據拉取的數據

首先先來看下這個operator對象

然後看看OneInputStreamOperator類的UML

這裡所有的實現類沒有全部列出,只列了一些代表

看到這裡,寫過Flink的streamAPI的同學,肯定感覺到很熟悉!!!!!!

這裡!不就是我們常寫flink代碼的那些運算元嘛

對沒有錯,我們程式中實現的那些運算元邏輯,最後都會被封裝成一個OneInputStreamOperator,這裡具體看一個最熟悉的Fliter

來看一下StreamFilter的processElement方法

!!!這裡傳入一個數據後,這個userFunction調用了filter方法並且把數據放進去了

當返回true通過這個output.collect發送出去了

這不就對應了我們用戶自己實現的filter運算元嘛,沒錯這個方法其實就是客戶端的filter方法,這個userFunction包含了用戶實現filter運算元的邏輯

(!!!!!就是說這個processElement方法會調用用戶的邏輯)

(所以這個userFunction可以帶上client的方法實現,這對我們很重要,特別是對flink源碼修改,為clientApi添加新功能方法,運行時可以通過這裡拿到)

繼續

來看看這個output.collect()方法

然後

 

看到這個,等等等等

我不是從這個processElement()方法進來的嗎,怎麼又開始調processElement()方法了

難道遞歸了? 不對不對

這裡operator不是上一個operator了,而是這個output對象的(這裡是chainOutPut)

看下這個output對象

看下UML類圖,也是只列舉了重要的

先看chainingOutPut的屬性

 

發現了又出現了OneInputStreamOperator對象

看到這個實現類的名字!chain聯想起了什麼

Flink會將可以chain在一起的運算元在streamGraph轉換成jobGraph的時候根據條件chain在一起

一驚!

來分別看一下ChainingOutPut和RecordWriterOutput的collect()方法有什麼區別

在chain中

 在RecordWriter中

這裡chain的ouput,又繼續調用了下一個operator的processElement方法,然後又在processElement方法中又調用output.collect( ),collect中又調用了下一個operator的processElement方法

整個過程就是個無限的迴圈,直到,某一個operator的ouput不為ChainingOutPut,當變為RecordWriterOutput時

上面看到RecordWriterOutput的processElement直接emit發送出去了這個數據,再也沒有繼續調用processElement方法了

這裡也就對應了,flink中的責任鏈,chain在一起的運算元會一個接著一個執行,直到無法chain,就會往下游發送emit了

來看一下UML類圖幫助理解

 

 里中有我,我中有你,一直相互調用直到無法chain,然後emit往下游發送(這裡肯定就有發送端的反壓邏輯,以後隨緣更新)

那這裡的迴圈調用理解了就會想,那如何確定第一個operator調用,然後進入整個調用鏈呢

回到TaskManager接收到JobManager的TDD以後初始化整個任務的時候

StreamTask.java中invoke方法中

 先是初始化了一個OperatorChain,裡面其實就是一個數組StreamOperator

在他初始化的時候,其實就是為我們所有的streamOutputs設置了他的output以及會根據jobManager發送過來的TDD(包含信息)

設置成對應的ChainingOutPut還是RecordWriterOutput,chainOutput會設置他的的operator

然後獲取了getHeadOperator()其實就是獲取了他調用連中的第一個

然後在

 

將這個第一個operator關聯到了inputProcessor對象裡面

後面就簡單了在inputProcessor.processInput中就進入了while(true)迴圈拉取上游數據的邏輯

然後

在這裡調用的第一個processElement方法就是我們的那個headOperator

這樣整個調用責任鏈就開始從第一個Operator運行起來了

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 場景 在進行資料庫還原時提示: System.Data.SqlError:在對”“嘗試”“時,操作系統返回了錯誤5(拒絕訪問) 實現 第一種方案是修改要還原的資料庫備份文件的許可權。 找到備份文件右擊屬性--安全--組或用戶名--編輯--添加--輸入對象名稱來選擇 輸入Everyone,然後編輯Eve ...
  • 場景 在SqlServer2014企業版上怎樣進行資料庫的還原,首先你得有一個其他數據 的備份文件。 實現 打開cmd,輸入sql,打開SqlServer 2014 Management Studio 然後連接資料庫 連接成功後,右擊資料庫,點擊還原資料庫 然後選擇常規,再選擇目標資料庫以及源中點擊 ...
  • 場景 在SqlServer Management中進行資料庫還原時提示: 資料庫不能在此版本的SQL Server中啟動,因為它包含分區函數。 點擊左下角的查看詳細信息 實現 電腦上安裝的是SQL Server 2014 的Express的版本,即標準版本,而分區函數是不支持在此版本上的, 所以將當 ...
  • https://github.com/Lotharing/SDIMS 項目地址 ...
  • Linux系統中:mysql進入的命令為mysql -u root -p +你的mysql密碼。 Mysql是如何添加用戶呢? 在mysql命令行下,使用use mysql;進入mysql的資料庫中。然後插入信息到user表,就可以添加上用戶了。 例子如下: 本人是根據無情站長的博客進行學習的,原創 ...
  • 轉:https://www.cnblogs.com/wxjnew/p/9160855.html 一、下載: 下載地址: https://github.com/MicrosoftArchive/redis/releases 根據系統下載的版本:以(64位為例) 下載後一般解壓到根目錄下:如(E:\Re ...
  • 一、磁碟的選擇也是影響MySQL的性能的重大因素之一 1.使用傳統的機器硬碟讀取數據的過程 2.如何選擇傳統機器硬碟的因素 二、使用RAID增加傳統機器硬碟的性能 1.什麼是RAID技術 2.常見的RAID技術有以下四種: RAID0、RAID1、RAID5、RAID10 3.RAID0技術的磁碟利 ...
  • 雙活數據中心是為資料庫提供高可用的同城雙活比兩地三中心少了一個異地災備數據中心,oracle中用rac實現要滿足1.網路雙活 2.存儲雙活 3.應用雙活距離上有限制一般100km以上不支持 實際建議50km以內無論兩地三中心和同城雙活最終還是要結合自己業務實際需要,有的客戶業務連備份都沒做過,做什麼... ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...