「Flink」Flink中的時間類型

来源:https://www.cnblogs.com/ilovezihan/archive/2020/02/05/12262245.html
-Advertisement-
Play Games

Flink中的時間類型和視窗是非常重要概念,是學習Flink必須要掌握的兩個知識點。Flink中的時間類型時間類型介紹Flink流式處理中支持不同類型的時間。分為以下幾種:處理時間Flink程式執行對應操作的系統時間。所有基於時間的操作(例如:時間視窗)都將使用運行相應operator的系統時間。例... ...


Flink中的時間類型和視窗是非常重要概念,是學習Flink必須要掌握的兩個知識點。

Flink中的時間類型

時間類型介紹

Flink流式處理中支持不同類型的時間。分為以下幾種:

  1. 處理時間
    • Flink程式執行對應操作的系統時間。所有基於時間的操作(例如:時間視窗)都將使用運行相應operator的系統時間。例如:每個小時的處理時間視窗包括在系統時間範圍內所有operator接收到的記錄。例如:如果應用程式在09:15開始運行,則第一個滾動時間視窗將包括:09:15 – 10:00 之間的處理事件,下一個視窗包括上午10:00 – 11:00之間的處理事件
    • 這種處理時間方式實時性是最好的,但數據未必準確
  2. 事件時間
    • 每個事件發生的時間。這個時間一般是在進入到Flink之前就包含在事件中
    • 針對Eventtime,事件被處理的時間以來與事件本身
    • Eventtime必須要指定如何生成Eventtime Watermark(水印)
    • 理想情況,不管事件何時到達或者順序如何,事件時間處理能夠得到完整一致地結果。
    • 事件處理在等待亂序事件時,會產生一些延遲。這樣會對Eventtime的應用性能有一定的影響
  3. 攝入時間
    • 攝入時間是事件進入Flink的時間
    • 在source operator中,每個記錄以時間戳的形式獲取源的當前時間
    • 它在概念是處於事件時間和處理時間中間
    • 攝入時間不能處理亂序問題或者延遲數據,攝入時間可以由流式系統自動生成水印

Flink支持的這幾種時間剛好和我們上一篇播客中的內容相對應。

https://www.cnblogs.com/ilovezihan/p/12254479.html

應用一張Flink官網的圖。

image

Flink代碼中設置時間類型

通常,我們在Flink初始化流式運行環境時,就會設置流處理時間特性。這個設置很重要,它決定了數據流的行為方式。(例如:是否需要給事件分配時間戳),以及視窗操作應該使用什麼樣的時間類型。例如:KeyedStream.timeWindow(Time.seconds(30))。


我們接下來通過實現一個每5秒中進行一次單詞計數的案例,來說明Flink中如何指定時間類型。

public class WordCountWindow {
    public static void main(String[] args) throws Exception {
        // 1. 初始化流式運行環境
        Configuration conf = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

        // 2. 設置時間處理類型,這裡設置的方式處理時間
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

        // 3. 定義數據源,每秒發送一個hadoop單詞
        DataStreamSource<String> wordDS = env.addSource(new RichSourceFunction<String>() {

            private boolean isCanaled = false;

            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                while (!isCanaled) {
                    ctx.collect("hadooop");
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {
                isCanaled = true;
            }
        });

        // 4. 每5秒進行一次,分組統計
        // 4.1 轉換為元組
        wordDS.map(word -> Tuple2.of(word, 1))
                // 指定返回類型
                .returns(Types.TUPLE(Types.STRING, Types.INT))
                // 按照單詞進行分組
                .keyBy(t -> t.f0)
                // 滾動視窗,3秒計算一次
                .timeWindow(Time.seconds(3))
                .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
                        return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                    }
                }, new RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
                    @Override
                    public void apply(String word, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception {

                        // 列印視窗開始、結束時間
                        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                        System.out.println("視窗開始時間:" + sdf.format(window.getStart())
                                + " 視窗結束時間:" + sdf.format(window.getEnd())
                                + " 視窗計算時間:" + sdf.format(System.currentTimeMillis()));

                        int sum = 0;
                        Iterator<Tuple2<String, Integer>> iterator = input.iterator();
                        while(iterator.hasNext()) {
                            Integer count = iterator.next().f1;
                            sum += count;
                        }
                        out.collect(Tuple2.of(word, sum));
                    }
                }).print();

        env.execute("app");
    }
}

視窗開始時間:2020-02-05 00:22:21 視窗結束時間:2020-02-05 00:22:24 視窗計算時間:2020-02-05 00:22:24
4> (hadooop,2)
視窗開始時間:2020-02-05 00:22:24 視窗結束時間:2020-02-05 00:22:27 視窗計算時間:2020-02-05 00:22:27
4> (hadooop,3)
視窗開始時間:2020-02-05 00:22:27 視窗結束時間:2020-02-05 00:22:30 視窗計算時間:2020-02-05 00:22:30
4> (hadooop,3)
視窗開始時間:2020-02-05 00:22:30 視窗結束時間:2020-02-05 00:22:33 視窗計算時間:2020-02-05 00:22:33
4> (hadooop,3)
視窗開始時間:2020-02-05 00:22:33 視窗結束時間:2020-02-05 00:22:36 視窗計算時間:2020-02-05 00:22:36
4> (hadooop,3)
視窗開始時間:2020-02-05 00:22:36 視窗結束時間:2020-02-05 00:22:39 視窗計算時間:2020-02-05 00:22:39

我們可以看到,這個滾動視窗,每3秒計算一次,是按照系統時間來計算的。

我們再把時間視窗設置為1分鐘,再試試。

視窗開始時間:2020-02-05 00:27:00 視窗結束時間:2020-02-05 00:28:00 視窗計算時間:2020-02-05 00:28:00
4> (hadooop,32)

視窗開始時間:2020-02-05 00:28:00 視窗結束時間:2020-02-05 00:29:00 視窗計算時間:2020-02-05 00:29:00
4> (hadooop,60)

剛好在 00:27:00 – 00:28:00之間。


參考文件:

https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_time.html


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 內容控制項不僅包括基本控制項,如標簽、按鈕以及工具提示;它們還包含特殊容器,這些容器可用於構造用戶界面中比較大的部分區域。 首先介紹ScrollViewer控制項,該控制項直接繼承自ContentControl類,提供了虛擬界面,允許用戶圍繞更大的元素滾動。與所有內容控制項一樣,ScrollViewer只能包 ...
  • 在Sublime Text3安裝目錄下新建一個文件 sublime_addright.inf 文件內容: [Version] Signature="$Windows NT$" [DefaultInstall] AddReg=SublimeText3 [SublimeText3] hkcr,"*\\s ...
  • " 返回《C 併發編程》" "1. 初始化共用資源" "2. Rx延遲求值" "3. 非同步數據綁定" "4. 非同步構造" "5. 非同步屬性" 1. 初始化共用資源 不管同時有多少線程調用 ,這個工廠委托只會運行一次,並且所有線程都等待同一個實例。 + 實例在創建後會被緩存起來,以後所有對 Value ...
  • 根據磁碟IO告警,找到占用磁碟IO (util)讀寫很高的進程。 ...
  • 硬鏈接(Hard Link)和軟鏈接也稱為符號鏈接(Symbolic Link)的目的是為瞭解決文件的共用使用問題。要闡明其原理,必須先理解Linux的文件存儲方式。 索引結點 Linux是一個UNIX類操作系統,所有類型的UNIX文件都是由操作系統通過索引節點來管理的。 索引節點是一個控制結構,包 ...
  • 當前情況下,經常會有需要到公司電腦進行一些操作,比如連接內網OA,資料庫或者提交文檔。為了減少外出,將使用frp進行內網穿透的方法進行一個說明。 前提條件 1. 一臺擁有公網 IP 的設備(如果沒有,伺服器可以使用https://diannaobos.com/frp/ 提供的免費伺服器) 2、需要遠 ...
  • 今天要分享的這篇就是2013年痞子衡剛入職飛思卡爾半導體MCU軟體團隊時為了學習C編碼規範所翻譯的(外企嘛,各種資料都是洋文),當時飛思卡爾剛成立MCU軟體團隊不久,那時候Kinetis SDK也還沒有正式推出,整個團隊必須要有一個統一且良好的編碼風格,這樣寫出來的SDK才符合大廠身份。 ...
  • 業界一直在尋求取代SRAM。其中之一包括自旋轉移力矩MRAM(STT-MRAM)。新的存儲器帶來了一些大膽的主張。例如STT-MRAM具有SRAM的速度和快閃記憶體的無波動性,具有無限的耐用性。 圖1.STT-MRAM的MJT細胞 everspin已經為SSD提供SST-MRAM設備。此外一些晶元製造商正 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...