Flink中Idle停滯流機制(源碼分析)

来源:https://www.cnblogs.com/ljygz/archive/2019/08/30/11436014.html
-Advertisement-
Play Games

前幾天在社區群上,有人問了一個問題 既然上游最小水印會決定視窗觸發,那如果我上游其中一條流突然沒有了數據,我的視窗還會繼續觸發嗎? 看到這個問題,我蒙了???? 對哈,因為我是選擇上游所有流中水印最小的一條作為當前水印時間,那萬一最小水印的那條流突然裡面沒有數據了 那我的最小水印不就一直不往前走了, ...


前幾天在社區群上,有人問了一個問題

  既然上游最小水印會決定視窗觸發,那如果我上游其中一條流突然沒有了數據,我的視窗還會繼續觸發嗎?

看到這個問題,我蒙了????

對哈,因為我是選擇上游所有流中水印最小的一條作為當前水印時間,那萬一最小水印的那條流突然裡面沒有數據了

那我的最小水印不就一直不往前走了,一直是那個沒有數據流的水印了嗎,因為它的水印最小,而且一直不會更新了

????然後視窗再也不觸發????

思考了一下,發現好像也對,當我有一個上游的水印沒來的時候,我就等著唄,誰知道他是不是延遲了

但是!!!

  萬一他真的就是正常的,出現這種hash極端數據傾斜的情況怎麼辦呢,MQ的一個partation就是沒有數據

那難不成我還真不計算了,一直等著?

懷著這個疑問

首先我想到的是,難道是在生成水印的時候,這條流沒有數據了,我為了不讓流停下來,就算沒數據也周期性的發送水印?

於是有了這篇文章    Flink中Periodic水印和Punctuated水印實現原理(源碼分析)

但是,無果!!! 

那想要流不停下計算只能在source端實現了,於是看了下源碼

 

看到sourceFunction.java介面的這個方法時,便解開了我的疑惑

上面就是說事件時間處理時,可以把流標記為 idle停滯的,就是說這個流不會再發送數據和水印了

且允許下游任務推進

ok 找到了那現在來看一下它是如何實現的,看下具體實現類

 

 

 

 這裡看到這個streamStatus 的停滯idle狀態會被emit廣播往下游發送

 既然往下發了,看下下游接收到這個status是做了什麼

 打開StreamInputProcessor.java的processInput()方法  (這裡是task端運行job的邏輯以後隨緣更新到會細講)

 

 這裡接收到了某上游流的狀態改變了,這裡毫無疑問就是更新stream的狀態

 

修改了stream和channel的狀態為idle 停滯 以後呢

來到水印更新的邏輯 (這裡不瞭解的可以看看這裡  Flink中watermark為什麼選擇最小一條(源碼分析)

 

前面就是說如果是來自已經是idle停滯的流的水印,那我就忽略這條水印

然後來看看,來自沒有停滯idle的流的水印,是如何更新當前水印的 findAndOutputNewMinWatermarkAcrossAlignedChannels方法

 

註意到這裡

會先判斷這個channel是否是idel的!!!!

也就是說當某一個上游的流沒有數據停滯了,他是不會參與水印更新邏輯的

真相大白,水印還是會繼續往前推進不會停下,計算不會停下

 

這裡就引出了一個思考也是自己在思考的

  這裡暴露的介面其實是留給我們source源自己實現的,什麼時候我們認為流變成了停滯的,我們想他繼續強

制推進,繼續計算,應該都是要我們自己去決定的,就是說,我是等著數據來才計算呢,還是我繼續強制流繼續

執行呢,其實是根據自己對source的設計來的,這也是自己的一個思考,自己也沒有細研究以後會研究一下主流

source的設計,看能不能解開自己的疑惑

 

五分鐘以後    這!!!FlinkKafkaConsumerBase.java

 難道沒有offset就停滯了,這麼簡單嗎


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • linux系統所有的文件都是存放在根分區中的,如果根分區容量即將耗盡,我們就需要給根分區擴容,我們可以使用lsblk命令來查看,系統的根分區實際是邏輯捲,所以想要擴展根分區只要將邏輯捲擴容就可以了。此時我的根分區容量為17G,已用12G,我想要給它擴展容量應該怎麼做呢 1.新添加一塊硬碟 2.將新添 ...
  • 如何使用 Skywalking Agent ? 如果你還不知道 Skywalking agent 是什麼, "請點擊這裡查看 Probe" 或者 "這裡查看快速瞭解agent" ,由於我這邊大部分都是 JAVA 服務,所以下文以 Java 中使用 agent 為例,提供了以下三種方式供你選擇 三種方 ...
  • 由於沒有安卓機,想要測試一些東西,所以選擇了安卓模擬器,可是一運行模擬器就導致電腦藍屏,試了 N 次都不行。 於是在網上尋找解決方案,瞭解到導致藍屏的原因都是因為虛擬化技術,我的系統是 Windows10 1903,加上之前開啟了 Hyper V 虛擬機,和 Windows 沙盒,再加上 Win10 ...
  • RPM包管理: RPM(RedHat Package Manager),早期是在RedHat發行版下,由於比較火,所以慢慢運行於各個發行版(如suse,centos等)。 它生成具有.RPM擴展名的文件,類似windows的setup.exe。 【查詢】 =》查詢已安裝的rpm列表 rpm -qa| ...
  • 要求:關閉VMware虛擬網路編輯器中自身的DHCP服務 1、掛在本地鏡像源本配置Yum倉庫,安裝DHCP服務 2、配置DHCP服務 [root@NoneOs ~]# systemctl restart dhcpd[root@NoneOs ~]# systemctl enable dhcpdCrea ...
  • Summary: in this tutorial, we will show you how to install PostgreSQL on your local system for learning and practicing PostgreSQL. PostgreSQL was deve ...
  • 1. 我的版本是 mysql-5.7.26.0 ,因為據說 mysql-8 的性能雖然強悍,但是相容性還是有問題,而且發佈時間不長,沒有多少人用,就暫時用著5.7版本。 2. 接受許可協議。 3. 選擇安裝類型,選擇自定義。 4. 選擇安裝的位數(和系統匹配),然後設置安裝路徑。 選擇安裝位置 5. ...
  • 1.測試驗證環境 伺服器角色 機器名 IP SQL Server Ver 主體伺服器 WIN-TestDB4O 172.83.XXX.XXX SQL Server 2012 - 11.0.5058.0 (X64) 鏡像伺服器 WIN-TestDB5O 172.73.XXX.XXX SQL Serve ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...