大數據Hadoop之——Flink中的Window API+時間語義+Watermark

来源:https://www.cnblogs.com/liugp/archive/2022/05/10/16255682.html
-Advertisement-
Play Games

一、window 概念 視窗(window)是處理無限流的核心。視窗將流分割成有限大小的“桶”,我們可以在桶上應用計算。本文檔重點介紹如何在Flink中執行視窗操作,以及程式員如何從其提供的功能中獲得最大的好處。 一個有視窗的Flink程式的一般結構如下所示。第一個片段指的是鍵控流,而第二個片段指的 ...


目錄

一、window 概念

視窗(window)是處理無限流的核心。視窗將流分割成有限大小的“桶”,我們可以在桶上應用計算。本文檔重點介紹如何在Flink中執行視窗操作,以及程式員如何從其提供的功能中獲得最大的好處。

一個有視窗的Flink程式的一般結構如下所示。第一個片段指的是鍵控流,而第二個片段指的是非鍵控流。可以看到,唯一的區別是keyBy(…)調用鍵流而window(…)調用非鍵流的windowwall(…)。這也將作為頁面其餘部分的路標。

Keyed Windows

stream
       .keyBy(...)               <-  keyed versus non-keyed windows
       .window(...)              <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"

Non-Keyed Windows

stream
       .windowAll(...)           <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"

一般真實的流都是無界的,怎樣處理無界的數據?

在自然環境中,數據的產生原本就是流式的。無論是來自 Web 伺服器的事件數據,證券交易所的交易數據,還是來自工廠車間機器上的感測器數據,其數據都是流式的。但是當你 分析數據時,可以圍繞 有界流(bounded)或 無界流(unbounded)兩種模型來組織處理數據,當然,選擇不同的模型,程式的執行和處理方式也都會不同。

上面圖片來源:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/learn-flink/overview/

  • 可以把無限的數據流進行切分,得到有限的數據集進行處理 —— 也
    就是得到有界流
  • 視窗(window)就是將無限流切割為有限流的一種方式,它會將流
    數據分發到有限大小的桶(bucket)中進行分析

二、 時間視窗(Time Window)

官方文檔

1)滾動視窗(Tumbling Windows)

翻轉視窗賦值器將每個元素賦值給一個指定視窗大小的視窗。滾動的視窗有固定的尺寸,而且不重疊。例如,如果您指定一個大小為5分鐘的滾動視窗,則當前視窗將被評估,並每5分鐘啟動一個新視窗,如下圖所示:

【特點】

  • 將數據依據固定的視窗長度對數據進行切分
  • 時間對齊,視窗長度固定,沒有重疊

【示例代碼】

TumblingEventTimeWindows:滾動事件時間視窗
TumblingProcessingTimeWindows:滾動處理時間視窗

val input: DataStream[T] = ...

// tumbling event-time windows
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>)

// tumbling processing-time windows
input
    .keyBy(<key selector>)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>)

// daily tumbling event-time windows offset by -8 hours.
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    .<windowed transformation>(<window function>)

2)滑動視窗(Sliding Windows)

滑動視窗賦值器將元素賦值給固定長度的視窗。類似於滾動視窗賦值器,視窗的大小由視窗大小參數配置。另外一個視窗滑動參數控制滑動視窗啟動的頻率。因此,如果滑動視窗小於視窗大小,則滑動視窗可以重疊。在這種情況下,元素被分配給多個視窗。

例如,您可以將大小為10分鐘的視窗滑動5分鐘。這樣,每隔5分鐘就會出現一個視窗,其中包含在最後10分鐘內到達的事件,如下圖所示:

【特點】

  • 滑動視窗是固定視窗的更廣義的一種形式,滑動視窗由固定的視窗
    長度和滑動間隔組成
  • 視窗長度固定,可以有重疊

【示例代碼】

SlidingEventTimeWindows:滑動事件時間視窗
SlidingProcessingTimeWindows:滑動處理時間視窗

val input: DataStream[T] = ...

// sliding event-time windows
input
    .keyBy(<key selector>)
    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>)

// sliding processing-time windows
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>)

// sliding processing-time windows offset by -8 hours
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
    .<windowed transformation>(<window function>)

3)會話視窗(Session Windows)

會話視窗分配器根據活動的會話對元素進行分組。與滑動視窗不同,會話視窗沒有重疊,也沒有固定的開始和結束時間。相反,當會話視窗在一段時間內沒有接收到元素時,即當一個不活動間隙發生時,會話視窗將關閉。會話視窗分配器可以配置一個靜態會話間隙,也可以配置一個會話間隙提取器函數,該函數定義了不活動的時間長度。當這段時間到期時,當前會話關閉,隨後的元素被分配到一個新的會話視窗。

【特點】

  • 由一系列事件組合一個指定時間長度的 timeout 間隙組成,也就是
    一段時間沒有接收到新數據就會生成新的視窗
  • 時間無對齊
  • 視窗長度不固定,也不會重疊

【示例代碼】

EventTimeSessionWindows:會話事件時間視窗
SlidingProcessingTimeWindows:會話處理時間視窗

val input: DataStream[T] = ...

// event-time session windows with static gap
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>)

// event-time session windows with dynamic gap
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
      override def extract(element: String): Long = {
        // determine and return session gap
      }
    }))
    .<windowed transformation>(<window function>)

// processing-time session windows with static gap
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>)


// processing-time session windows with dynamic gap
input
    .keyBy(<key selector>)
    .window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
      override def extract(element: String): Long = {
        // determine and return session gap
      }
    }))
    .<windowed transformation>(<window function>)

三、window API

視窗分配器 —— window() 方法

  • 我們可以用 .window() 來定義一個視窗,然後基於這個 window 去做一些聚
    合或者其它處理操作。註意 window () 方法必須在 keyBy 之後才能用
  • Flink 提供了更加簡單的三種類型時間視窗用於定義時
    間視窗,也提供了countWindowAll來定義計數視窗

TumblingEventTimeWindows:滾動事件時間視窗
TumblingProcessingTimeWindows:滾動處理時間視窗
SlidingEventTimeWindows:滑動事件時間視窗
SlidingProcessingTimeWindows:滑動處理時間視窗
EventTimeSessionWindows:會話事件時間視窗
SlidingProcessingTimeWindows:會話處理時間視窗

四、視窗分配器(window assigner)

window function 定義了要對視窗中收集的數據做的計算操作。可以分為兩類。

1)增量聚合函數(incremental aggregation functions)

  • 每條數據到來就進行計算,保持一個簡單的狀態
  • ReduceFunction
val input: DataStream[(String, Long)] = ...

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }
  • AggregateFunction
val input: DataStream[(String, Long)] = ...

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .aggregate(new AverageAggregate)

2)全視窗函數(full window functions)

  • 先把視窗所有數據收集起來,等到計算的時候會遍歷所有數據
  • ProcessWindowFunction

一個ProcessWindowFunction可以這樣定義和使用:

val input: DataStream[(String, Long)] = ...

input
  .keyBy(_._1)
  .window(TumblingEventTimeWindows.of(Time.minutes(5)))
  .process(new MyProcessWindowFunction())

/* ... */

class MyProcessWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {

  def process(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]) = {
    var count = 0L
    for (in <- input) {
      count = count + 1
    }
    out.collect(s"Window ${context.window} count: $count")
  }
}

3)其它可選window API

  • .trigger() —— 觸發器,定義 window 什麼時候關閉,觸發計算並輸出結果
  • .evictor() —— 移除器,定義移除某些數據的邏輯
  • .allowedLateness() —— 允許處理遲到的數據
  • .sideOutputLateData() —— 將遲到的數據放入側輸出流
  • .getSideOutput() —— 獲取側輸出流

官方文檔
Flink 明確支持以下三種時間語義:

  • 事件時間(event time): 事件產生的時間,記錄的是設備生產(或者存儲)事件的時間

  • 攝取時間(ingestion time): 數據進入Flink的時間,Flink 讀取事件時記錄的時間

  • 處理時間(processing time):執行操作運算元的本地系統時間,與機器相關

上面圖片來源:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/concepts/time/

六、設置 Event Time

我們可以直接在代碼中,對執行環境調用 setStreamTimeCharacteristic
方法,設置流的時間特性,具體的時間,還需要從數據中提取時間戳(timestamp)

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
var env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

七、水位線(Watermark)

官方文檔

1)為什麼需要水位線(Watermark)

當 Flink 以 Event Time 模式處理數據流時,它會根據數據里的時間戳來
處理基於時間的運算元,由於網路、分散式等原因,會導致亂序數據的產生,亂序數據會讓視窗計算不准確。Watermark正是處理亂序數據而來的。

2)如何利用Watermark處理亂序數據問題?

遇到一個時間戳達到了視窗關閉時間,不應該立刻觸發視窗計算,而是等
待一段時間,等遲到的數據來了再關閉視窗。

  • Watermark 是一種衡量 Event Time 進展的機制,可以設定延遲觸發
  • Watermark 是用於處理亂序事件的,而正確的處理亂序事件,通常
    Watermark 機制結合 window 來實現
  • 數據流中的 Watermark 用於表示 timestamp 小於 Watermark 的數據,
    都已經到達了,因此,window 的執行也是由 Watermark 觸發的;
  • watermark 用來讓程式自己平衡延遲和結果正確性。

3)watermark 的特點

  • watermark 是一條特殊的數據記錄
  • watermark 必須單調遞增,以確保任務的事件時間時鐘在向前推進,而不
    是在後退
  • watermark 與數據的時間戳相關

4)watermark 的傳遞

5)watermark 策略與應用

1)Watermark 策略簡介

時間戳的分配與 watermark 的生成是齊頭併進的,其可以告訴 Flink 應用程式事件時間的進度。其可以通過指定 WatermarkGenerator 來配置 watermark 的生成方式。

使用 Flink API 時需要設置一個同時包含 TimestampAssigner 和 WatermarkGenerator 的 WatermarkStrategy。WatermarkStrategy 工具類中也提供了許多常用的 watermark 策略,並且用戶也可以在某些必要場景下構建自己的 watermark 策略。WatermarkStrategy 介面如下:

public interface WatermarkStrategy<T> 
    extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T>{

    /**
     * 根據策略實例化一個可分配時間戳的 {@link TimestampAssigner}。
     */
    @Override
    TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);

    /**
     * 根據策略實例化一個 watermark 生成器。
     */
    @Override
    WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}

通常情況下,你不用實現此介面,而是可以使用 WatermarkStrategy 工具類中通用的 watermark 策略,或者可以使用這個工具類將自定義的 TimestampAssignerWatermarkGenerator 進行綁定。

【例如】你想要要使用有界無序(bounded-out-of-orderness)watermark 生成器和一個 lambda 表達式作為時間戳分配器,那麼可以按照如下方式實現:

WatermarkStrategy
  .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20))
  .withTimestampAssigner(new SerializableTimestampAssigner[(Long, String)] {
    override def extractTimestamp(element: (Long, String), recordTimestamp: Long): Long = element._1
  })

【溫馨提示】其中 TimestampAssigner 的設置與否是可選的,大多數情況下,可以不用去特別指定。

2)使用 Watermark 策略應用

WatermarkStrategy 可以在 Flink 應用程式中的兩處使用:

  • 第一種是直接在數據源上使用
  • 第二種是直接在非數據源的操作之後使用。

【溫馨提示】第一種方式相比會更好,因為數據源可以利用 watermark 生成邏輯中有關分片/分區(shards/partitions/splits)的信息。使用這種方式,數據源通常可以更精準地跟蹤 watermark,整體 watermark 生成將更精確。

【示例】僅當無法直接在數據源上設置策略時,才應該使用第二種方式(在任意轉換操作之後設置 WatermarkStrategy):

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream: DataStream[MyEvent] = env.readFile(
         myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
         FilePathFilter.createDefaultFilter())

val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
        .filter( _.severity == WARNING )
        .assignTimestampsAndWatermarks(<watermark strategy>)

withTimestampsAndWatermarks
        .keyBy( _.getGroup )
        .window(TumblingEventTimeWindows.of(Time.seconds(10)))
        .reduce( (a, b) => a.add(b) )
        .addSink(...)

【示例】處理空閑數據源

如果數據源中的某一個分區/分片在一段時間內未發送事件數據,則意味著 WatermarkGenerator 也不會獲得任何新數據去生成 watermark。我們稱這類數據源為空閑輸入或空閑源。在這種情況下,當某些其他分區仍然發送事件數據的時候就會出現問題。由於下游運算元 watermark 的計算方式是取所有不同的上游並行數據源 watermark 的最小值,則其 watermark 將不會發生變化。

WatermarkStrategy
  .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20))
  .withIdleness(Duration.ofMinutes(1))

3)使用場景

  • 對於排好序的數據,不需要延遲觸發,可以只指定時間戳就行了。
// 註意時間是毫秒,所以根據時間戳不同,可能需要乘以1000
dataStream.assignAscendingTimestamps(_.timestamp * 1000)
  • Flink 暴露了 TimestampAssigner 介面供我們實現,使我們可以自定義如
    何從事件數據中抽取時間戳和生成watermark。
// MyAssigner 可以有兩種類型,都繼承自 TimestampAssigner
dataStream.assignAscendingTimestamps(new MyAssigner())

4)TimestampAssigner

定義了抽取時間戳,以及生成 watermark 的方法,有兩種類型

1、AssignerWithPeriodicWatermarks

  • 周期性的生成 watermark:系統會周期性的將 watermark 插入到流中
  • 預設周期是200毫秒,可以使用 ExecutionConfig.setAutoWatermarkInterval()
    方法進行設置
  • 升序和前面亂序的處理 BoundedOutOfOrderness ,都是基於周期性
    watermark 的。

2、AssignerWithPunctuatedWatermarks

  • 沒有時間周期規律,可打斷的生成 watermark

可以棄用 AssignerWithPeriodicWatermarks 和 AssignerWithPunctuatedWatermarks 了

在 Flink 新的 WatermarkStrategyTimestampAssignerWatermarkGenerator 的抽象介面之前,Flink 使用的是 AssignerWithPeriodicWatermarks 和 AssignerWithPunctuatedWatermarks。你仍可以在 API 中看到它們,但建議使用新介面,因為其對時間戳和 watermark 等重點的抽象和分離很清晰,並且還統一了周期性和標記形式的 watermark 生成方式。

5)WatermarkStrategy(重點)

flink1.11版本後 建議用WatermarkStrategy(Watermark生成策略)生成Watermark,當創建DataStream對象後,使用如下方法指定策略:assignTimestampsAndWatermarks(WatermarkStrategy<T>)

通常情況下,你不用實現此介面,而是可以使用 WatermarkStrategy 工具類中通用的 watermark 策略,或者可以使用這個工具類將自定義的 TimestampAssigner 與 WatermarkGenerator 進行綁定。

1、固定亂序長度策略(forBoundedOutOfOrderness)

通過調用WatermarkStrategy對象上的forBoundedOutOfOrderness方法來實現,接收一個Duration類型的參數作為最大亂序(out of order)長度。WatermarkStrategy對象上的withTimestampAssigner方法為從事件數據中提取時間戳提供了介面。

【示例】

  • ForBoundedOutOfOrderness.java
package com.com.streaming.watermarkstrategy;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.Duration;
import java.time.LocalDateTime;

//在assignTimestampsAndWatermarks中用WatermarkStrategy.forBoundedOutOfOrderness方法抽取Timestamp和生成周期性水位線示例
public class ForBoundedOutOfOrderness {

    public static void main(String[] args) throws  Exception{
        //創建流處理環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //設置EventTime語義
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //設置周期生成Watermark間隔(10毫秒)
        env.getConfig().setAutoWatermarkInterval(10L);
        //並行度1
        env.setParallelism(1);
        //演示數據
        DataStreamSource<ClickEvent> mySource = env.fromElements(
                new ClickEvent(LocalDateTime.now(), "user1", 1L, 1),
                new ClickEvent(LocalDateTime.now(), "user1", 2L, 2),
                new ClickEvent(LocalDateTime.now(), "user1", 3L, 3),
                new ClickEvent(LocalDateTime.now(), "user1", 4L, 4),
                new ClickEvent(LocalDateTime.now(), "user1", 5L, 5),
                new ClickEvent(LocalDateTime.now(), "user1", 6L, 6),
                new ClickEvent(LocalDateTime.now(), "user1", 7L, 7),
                new ClickEvent(LocalDateTime.now(), "user1", 8L, 8)
        );
        //WatermarkStrategy.forBoundedOutOfOrderness周期性生成水位線
        //可更好處理延遲數據
        //BoundedOutOfOrdernessWatermarks<T>實現WatermarkGenerator<T>
        SingleOutputStreamOperator<ClickEvent> streamTS = mySource.assignTimestampsAndWatermarks(
                //指定Watermark生成策略,最大延遲長度5毫秒
                WatermarkStrategy.<ClickEvent>forBoundedOutOfOrderness(Duration.ofMillis(5))
                        .withTimestampAssigner(
                                //SerializableTimestampAssigner介面中實現了extractTimestamp方法來指定如何從事件數據中抽取時間戳
                                new SerializableTimestampAssigner<ClickEvent>() {
                                    @Override
                                    public long extractTimestamp(ClickEvent event, long recordTimestamp) {
                                        return event.getDateTime(event.getEventTime());
                                    }
                                })
        );
        //結果列印
        streamTS.print();
        env.execute();
    }
}

  • ClickEvent.java
package com.com.streaming.watermarkstrategy;

import java.time.LocalDateTime;
import java.time.ZoneOffset;

public class ClickEvent {
    private String user;
    private long l;
    private int i;
    private LocalDateTime eventTime;

    public ClickEvent(LocalDateTime eventTime, String user, long l, int i) {
        this.eventTime = eventTime;
        this.user = user;
        this.l = l;
        this.i = i;
    }

    public LocalDateTime getEventTime() {
        return eventTime;
    }

    public void setEventTime(LocalDateTime eventTime) {
        this.eventTime = eventTime;
    }

    public String getUser() {
        return user;
    }

    public void setUser(String user) {
        this.user = user;
    }

    public long getL() {
        return l;
    }

    public void setL(long l) {
        this.l = l;
    }

    public int getI() {
        return i;
    }

    public void setI(int i) {
        this.i = i;
    }

    public long getDateTime(LocalDateTime dt) {
        ZoneOffset zoneOffset8 = ZoneOffset.of("+8");
        return dt.toInstant(zoneOffset8).toEpochMilli();
    }
}

2、單調遞增策略(forMonotonousTimestamps)

通過調用WatermarkStrategy對象上的forMonotonousTimestamps方法來實現,無需任何參數,相當於將forBoundedOutOfOrderness策略的最大亂序長度outOfOrdernessMillis設置為0

  • ForMonotonousTimestamps.java
package com.com.streaming.watermarkstrategy;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.Duration;
import java.time.LocalDateTime;

public class ForMonotonousTimestamps {
    public static void main(String[] args) throws  Exception{
        //創建流處理環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //設置EventTime語義
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //設置周期生成Watermark間隔(10毫秒)
        env.getConfig().setAutoWatermarkInterval(10L);
        //並行度1
        env.setParallelism(1);
        //演示數據
        DataStreamSource<ClickEvent> mySource = env.fromElements(
                new ClickEvent(LocalDateTime.now(), "user1", 1L, 1),
                new ClickEvent(LocalDateTime.now(), "user1", 2L, 2),
                new ClickEvent(LocalDateTime.now(), "user1", 3L, 3),
                new ClickEvent(LocalDateTime.now(), "user1", 4L, 4),
                new ClickEvent(LocalDateTime.now(), "user1", 5L, 5),
                new ClickEvent(LocalDateTime.now(), "user1", 6L, 6),
                new ClickEvent(LocalDateTime.now(), "user1", 7L, 7),
                new ClickEvent(LocalDateTime.now(), "user1", 8L, 8)
        );
        //WatermarkStrategy.forMonotonousTimestamps周期性生成水位線
        //相當於延遲outOfOrdernessMillis=0
        //繼承自BoundedOutOfOrdernessWatermarks<T>

        SingleOutputStreamOperator<ClickEvent> streamTS = mySource.assignTimestampsAndWatermarks(
                WatermarkStrategy.<ClickEvent>forMonotonousTimestamps()
                        .withTimestampAssigner((event, recordTimestamp) -> event.getDateTime(event.getEventTime()))
        );
        //結果列印
        streamTS.print();
        env.execute();
    }
}

3、不生成策略(noWatermarks)

WatermarkStrategy.noWatermarks()

  • 當一個運算元從多個上游運算元中獲取數據時,會取上游最小的Watermark作為自身的Watermark,並檢測是否滿足視窗觸發條件。當達不到觸發條件,視窗會在記憶體中緩存大量視窗數據,導致記憶體不足等問題
  • flink提供了設置流狀態為空閑的withIdleness方法。在設置的超時時間內,當某個數據流一直沒有事件數據到達,就標記這個流為空閑。下游運算元不需要等待這條數據流產生的Watermark,而取其他上游激活狀態的Watermark,來決定是否需要觸發視窗計算。

上面代碼設置超時時間5毫秒,超過這個時間,沒有生成Watermark,將流狀態設置空閑,當下次有新的Watermark生成併發送到下游時,重新設置為活躍。
WatermarkStrategy.withIdleness(Duration.ofMillis(5))

未完待續~


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 背景 昨天,咱們的《知識星球:Java技術棧》裡面有粉絲向我提問: 問題大概就是: Spring Boot 定時任務開啟後,怎麼符合條件自動停止? 當時我有空,雖然已經給出了參考答案,但可能還有一些細節地方要註意的,另外,我也覺得這個問題特別有意思,現在特別拿出來整理下,分享下給大家。 1、自定義任 ...
  • 一個工作了 5年的程式員,在私信裡面不斷向我訴苦。 他說,他用了Mybatis這麼久,怎麼滴也算是精通Mybatis了吧。 結果竟然在Mybatis這個面試題上翻車了! 真的好煩! 好吧,我們今天來看看“Mybatis裡面的緩存機制”,普通人和高手的回答。 普通人: 嗯。。。。。。。。。 高手: 這 ...
  • 前言 用過VueRouter路由組件的應該都知道,VueRouter有hash和history兩種模式。hash模式會在url中插入#,history模式下url則看上去更加簡潔美觀。如果想要支持history模式則必須要後端服務進行配合。 常用後端伺服器配置方式請參考 後端配置例子 後端配置例子 ...
  • 1、WebFirst框架描述 WebFirst 是果糖大數據團隊開發的新一代 高性能 代碼生成器&資料庫設計工具,由.net core + sqlsugar 開發 導入1000個表只要1-2秒,用法簡單,功能強大,支持多種資料庫 ,具體功能如下: 一、 建庫、CodeFirst方式線上建表,沒用到C ...
  • 一、docker安裝 centos安裝docker sudo yum update sudo yum install -y yum-utils device-mapper-persistent-data 1vm2 sudo yum-config-manager --add-repo http://m ...
  • 鏡像下載、功能變數名稱解析、時間同步請點擊 阿裡雲開源鏡像站 畢設要開始做Apollo相關的課題,自己在這方面完全就是一個小白,光是安裝apollo就已經花了不少功夫,也走了一些彎路,所以在這裡記錄一下,一方面做個總結,另一方面也希望可以幫到和我一樣的朋友,最近在做Apollo的朋友也可以點個關註,大家一起 ...
  • 一、單個文件上傳 前端代碼可參考elementUI,後端代碼主要上傳一個文件MultipartFile multipartFile @PostMapping("/upload") public ObjectRestResponse uploadKnowledge(@RequestParam(valu ...
  • 本文例子參考《STM32單片機開發實例——基於Proteus虛擬模擬與HAL/LL庫》 源代碼:https://github.com/LanLinnet/STM33F103R6 項目要求 通過定時器中斷的方式,實現流水燈的效果。 硬體設計 在第一節的基礎上,在Proteus中添加電路如下圖所示。 在 ...
一周排行
    -Advertisement-
    Play Games
  • 1. 說明 /* Performs operations on System.String instances that contain file or directory path information. These operations are performed in a cross-pla ...
  • 視頻地址:【WebApi+Vue3從0到1搭建《許可權管理系統》系列視頻:搭建JWT系統鑒權-嗶哩嗶哩】 https://b23.tv/R6cOcDO qq群:801913255 一、在appsettings.json中設置鑒權屬性 /*jwt鑒權*/ "JwtSetting": { "Issuer" ...
  • 引言 集成測試可在包含應用支持基礎結構(如資料庫、文件系統和網路)的級別上確保應用組件功能正常。 ASP.NET Core 通過將單元測試框架與測試 Web 主機和記憶體中測試伺服器結合使用來支持集成測試。 簡介 集成測試與單元測試相比,能夠在更廣泛的級別上評估應用的組件,確認多個組件一起工作以生成預 ...
  • 在.NET Emit編程中,我們探討了運算操作指令的重要性和應用。這些指令包括各種數學運算、位操作和比較操作,能夠在動態生成的代碼中實現對數據的處理和操作。通過這些指令,開發人員可以靈活地進行算術運算、邏輯運算和比較操作,從而實現各種複雜的演算法和邏輯......本篇之後,將進入第七部分:實戰項目 ...
  • 前言 多表頭表格是一個常見的業務需求,然而WPF中卻沒有預設實現這個功能,得益於WPF強大的控制項模板設計,我們可以通過修改控制項模板的方式自己實現它。 一、需求分析 下圖為一個典型的統計表格,統計1-12月的數據。 此時我們有一個需求,需要將月份按季度劃分,以便能夠直觀地看到季度統計數據,以下為該需求 ...
  • 如何將 ASP.NET Core MVC 項目的視圖分離到另一個項目 在當下這個年代 SPA 已是主流,人們早已忘記了 MVC 以及 Razor 的故事。但是在某些場景下 SSR 還是有意想不到效果。比如某些靜態頁面,比如追求首屏載入速度的時候。最近在項目中回歸傳統效果還是不錯。 有的時候我們希望將 ...
  • System.AggregateException: 發生一個或多個錯誤。 > Microsoft.WebTools.Shared.Exceptions.WebToolsException: 生成失敗。檢查輸出視窗瞭解更多詳細信息。 內部異常堆棧跟蹤的結尾 > (內部異常 #0) Microsoft ...
  • 引言 在上一章節我們實戰了在Asp.Net Core中的項目實戰,這一章節講解一下如何測試Asp.Net Core的中間件。 TestServer 還記得我們在集成測試中提供的TestServer嗎? TestServer 是由 Microsoft.AspNetCore.TestHost 包提供的。 ...
  • 在發現結果為真的WHEN子句時,CASE表達式的真假值判斷會終止,剩餘的WHEN子句會被忽略: CASE WHEN col_1 IN ('a', 'b') THEN '第一' WHEN col_1 IN ('a') THEN '第二' ELSE '其他' END 註意: 統一各分支返回的數據類型. ...
  • 在C#編程世界中,語法的精妙之處往往體現在那些看似微小卻極具影響力的符號與結構之中。其中,“_ =” 這一組合突然出現還真不知道什麼意思。本文將深入剖析“_ =” 的含義、工作原理及其在實際編程中的廣泛應用,揭示其作為C#語法奇兵的重要角色。 一、下劃線 _:神秘的棄元符號 下劃線 _ 在C#中並非 ...