Flink中Periodic水印和Punctuated水印實現原理(源碼分析)

来源:https://www.cnblogs.com/ljygz/archive/2019/08/30/11435243.html
-Advertisement-
Play Games

在用戶代碼中,我們設置生成水印和事件時間的方法assignTimestampsAndWatermarks()中這裡有個方法的重載 我們傳入的對象分為兩種 AssignerWithPunctuatedWatermarks(可以理解為每條數據都會產生水印,如果不想產生水印,返回一個null的水印) As ...


在用戶代碼中,我們設置生成水印和事件時間的方法assignTimestampsAndWatermarks()中這裡有個方法的重載

我們傳入的對象分為兩種

AssignerWithPunctuatedWatermarks(可以理解為每條數據都會產生水印,如果不想產生水印,返回一個null的水印)

AssignerWithPeriodicWatermarks(周期性的生成水印)

來看一下源碼中是如何實現這兩種水印的

二話不說打開org.apache.flink.streaming.runtime.operators.TimestampsAndPunctuatedWatermarksOperator.java

這個類的processElement方法

 

看到源碼這裡這段邏輯就 非常的清晰了

先通過用戶的代碼獲取到事件時間,註入到element裡面就直接往下個opeartor發送了

然後通過用戶代碼獲取水印,這裡會判斷水印是否為null

不為null的就直接往下游emit 了

現在看一下AssignerWithPeriodicWatermarks如何周期的發送生成的水印

直接打開TimestampsAndPeriodicWatermarksOperator.java這個類

這裡先不看processElement()方法,先看open方法

 

可以看到它將  當前時間其實就是System.currentTimeMillis()+ watermarkInterval水印間隔 註冊作為了一個timer定時器

這樣就知道了,當他過了這個水印間隔時間以後肯定會觸發操作

來看一下這個間隔時間以後觸發了什麼操作

 

可以看到,他先是獲取了當前的水印時間,然後直接emit出去了????

Periodic模式明明是在接收數據的processElement()發送水印的

然後又再次註冊了一個 當前時間+間隔的 timer,這樣就無限的觸發下去了

既然他在這裡發送了水印,來看下他的processElement方法

 

果然他周期性的發送水印以後,接收數據的processElement()方法裡面就沒有發送水印了

只有獲取事件時間的邏輯了


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 由於沒有安卓機,想要測試一些東西,所以選擇了安卓模擬器,可是一運行模擬器就導致電腦藍屏,試了 N 次都不行。 於是在網上尋找解決方案,瞭解到導致藍屏的原因都是因為虛擬化技術,我的系統是 Windows10 1903,加上之前開啟了 Hyper V 虛擬機,和 Windows 沙盒,再加上 Win10 ...
  • RPM包管理: RPM(RedHat Package Manager),早期是在RedHat發行版下,由於比較火,所以慢慢運行於各個發行版(如suse,centos等)。 它生成具有.RPM擴展名的文件,類似windows的setup.exe。 【查詢】 =》查詢已安裝的rpm列表 rpm -qa| ...
  • 要求:關閉VMware虛擬網路編輯器中自身的DHCP服務 1、掛在本地鏡像源本配置Yum倉庫,安裝DHCP服務 2、配置DHCP服務 [root@NoneOs ~]# systemctl restart dhcpd[root@NoneOs ~]# systemctl enable dhcpdCrea ...
  • Summary: in this tutorial, we will show you how to install PostgreSQL on your local system for learning and practicing PostgreSQL. PostgreSQL was deve ...
  • 1. 我的版本是 mysql-5.7.26.0 ,因為據說 mysql-8 的性能雖然強悍,但是相容性還是有問題,而且發佈時間不長,沒有多少人用,就暫時用著5.7版本。 2. 接受許可協議。 3. 選擇安裝類型,選擇自定義。 4. 選擇安裝的位數(和系統匹配),然後設置安裝路徑。 選擇安裝位置 5. ...
  • 1.測試驗證環境 伺服器角色 機器名 IP SQL Server Ver 主體伺服器 WIN-TestDB4O 172.83.XXX.XXX SQL Server 2012 - 11.0.5058.0 (X64) 鏡像伺服器 WIN-TestDB5O 172.73.XXX.XXX SQL Serve ...
  • 前幾天在社區群上,有人問了一個問題 既然上游最小水印會決定視窗觸發,那如果我上游其中一條流突然沒有了數據,我的視窗還會繼續觸發嗎? 看到這個問題,我蒙了???? 對哈,因為我是選擇上游所有流中水印最小的一條作為當前水印時間,那萬一最小水印的那條流突然裡面沒有數據了 那我的最小水印不就一直不往前走了, ...
  • YARN基礎庫是其他一切模塊的基礎,它的設計直接決定了YARN的穩定性和擴展性,YARN借用了MRV1的一些底層基礎庫,比如RPC庫等,但因為引入了很多新的軟體設計方式,所以它的基礎庫更多,包括直接使用了開源序列化框架Protocol Buffers和Apache Avro,自定義的服務庫、事件庫和 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...