Flink中的時間類型和視窗是非常重要概念,是學習Flink必須要掌握的兩個知識點。Flink中的時間類型時間類型介紹Flink流式處理中支持不同類型的時間。分為以下幾種:處理時間Flink程式執行對應操作的系統時間。所有基於時間的操作(例如:時間視窗)都將使用運行相應operator的系統時間。例... ...
Flink中的時間類型和視窗是非常重要概念,是學習Flink必須要掌握的兩個知識點。
Flink中的時間類型
時間類型介紹
Flink流式處理中支持不同類型的時間。分為以下幾種:
- 處理時間
- Flink程式執行對應操作的系統時間。所有基於時間的操作(例如:時間視窗)都將使用運行相應operator的系統時間。例如:每個小時的處理時間視窗包括在系統時間範圍內所有operator接收到的記錄。例如:如果應用程式在09:15開始運行,則第一個滾動時間視窗將包括:09:15 – 10:00 之間的處理事件,下一個視窗包括上午10:00 – 11:00之間的處理事件
- 這種處理時間方式實時性是最好的,但數據未必準確
- 事件時間
- 每個事件發生的時間。這個時間一般是在進入到Flink之前就包含在事件中
- 針對Eventtime,事件被處理的時間以來與事件本身
- Eventtime必須要指定如何生成Eventtime Watermark(水印)
- 理想情況,不管事件何時到達或者順序如何,事件時間處理能夠得到完整一致地結果。
- 事件處理在等待亂序事件時,會產生一些延遲。這樣會對Eventtime的應用性能有一定的影響
- 攝入時間
- 攝入時間是事件進入Flink的時間
- 在source operator中,每個記錄以時間戳的形式獲取源的當前時間
- 它在概念是處於事件時間和處理時間中間
- 攝入時間不能處理亂序問題或者延遲數據,攝入時間可以由流式系統自動生成水印
Flink支持的這幾種時間剛好和我們上一篇播客中的內容相對應。
https://www.cnblogs.com/ilovezihan/p/12254479.html
應用一張Flink官網的圖。
Flink代碼中設置時間類型
通常,我們在Flink初始化流式運行環境時,就會設置流處理時間特性。這個設置很重要,它決定了數據流的行為方式。(例如:是否需要給事件分配時間戳),以及視窗操作應該使用什麼樣的時間類型。例如:KeyedStream.timeWindow(Time.seconds(30))。
我們接下來通過實現一個每5秒中進行一次單詞計數的案例,來說明Flink中如何指定時間類型。
public class WordCountWindow { public static void main(String[] args) throws Exception { // 1. 初始化流式運行環境 Configuration conf = new Configuration(); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); // 2. 設置時間處理類型,這裡設置的方式處理時間 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // 3. 定義數據源,每秒發送一個hadoop單詞 DataStreamSource<String> wordDS = env.addSource(new RichSourceFunction<String>() { private boolean isCanaled = false; @Override public void run(SourceContext<String> ctx) throws Exception { while (!isCanaled) { ctx.collect("hadooop"); Thread.sleep(1000); } } @Override public void cancel() { isCanaled = true; } }); // 4. 每5秒進行一次,分組統計 // 4.1 轉換為元組 wordDS.map(word -> Tuple2.of(word, 1)) // 指定返回類型 .returns(Types.TUPLE(Types.STRING, Types.INT)) // 按照單詞進行分組 .keyBy(t -> t.f0) // 滾動視窗,3秒計算一次 .timeWindow(Time.seconds(3)) .reduce(new ReduceFunction<Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { return Tuple2.of(value1.f0, value1.f1 + value2.f1); } }, new RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() { @Override public void apply(String word, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception { // 列印視窗開始、結束時間 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println("視窗開始時間:" + sdf.format(window.getStart()) + " 視窗結束時間:" + sdf.format(window.getEnd()) + " 視窗計算時間:" + sdf.format(System.currentTimeMillis())); int sum = 0; Iterator<Tuple2<String, Integer>> iterator = input.iterator(); while(iterator.hasNext()) { Integer count = iterator.next().f1; sum += count; } out.collect(Tuple2.of(word, sum)); } }).print(); env.execute("app"); } }
視窗開始時間:2020-02-05 00:22:21 視窗結束時間:2020-02-05 00:22:24 視窗計算時間:2020-02-05 00:22:24
4> (hadooop,2)
視窗開始時間:2020-02-05 00:22:24 視窗結束時間:2020-02-05 00:22:27 視窗計算時間:2020-02-05 00:22:27
4> (hadooop,3)
視窗開始時間:2020-02-05 00:22:27 視窗結束時間:2020-02-05 00:22:30 視窗計算時間:2020-02-05 00:22:30
4> (hadooop,3)
視窗開始時間:2020-02-05 00:22:30 視窗結束時間:2020-02-05 00:22:33 視窗計算時間:2020-02-05 00:22:33
4> (hadooop,3)
視窗開始時間:2020-02-05 00:22:33 視窗結束時間:2020-02-05 00:22:36 視窗計算時間:2020-02-05 00:22:36
4> (hadooop,3)
視窗開始時間:2020-02-05 00:22:36 視窗結束時間:2020-02-05 00:22:39 視窗計算時間:2020-02-05 00:22:39
我們可以看到,這個滾動視窗,每3秒計算一次,是按照系統時間來計算的。
我們再把時間視窗設置為1分鐘,再試試。
視窗開始時間:2020-02-05 00:27:00 視窗結束時間:2020-02-05 00:28:00 視窗計算時間:2020-02-05 00:28:00
4> (hadooop,32)視窗開始時間:2020-02-05 00:28:00 視窗結束時間:2020-02-05 00:29:00 視窗計算時間:2020-02-05 00:29:00
4> (hadooop,60)
剛好在 00:27:00 – 00:28:00之間。
參考文件:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_time.html