Flink 從0到1學習—— Flink 不可以連續 Split(分流)?

来源:https://www.cnblogs.com/zhisheng/archive/2019/07/24/11241349.html
-Advertisement-
Play Games

前言 今天上午被 Flink 的一個運算元困惑了下,具體問題是什麼呢? 我有這麼個需求:有不同種類型的告警數據流(包含恢複數據),然後我要將這些數據流做一個拆分,拆分後的話,每種告警裡面的數據又想將告警數據和恢複數據拆分出來。 結果,這個需求用 Flink 的 Split 運算符出現了問題。 分析 需 ...


前言

今天上午被 Flink 的一個運算元困惑了下,具體問題是什麼呢?

我有這麼個需求:有不同種類型的告警數據流(包含恢複數據),然後我要將這些數據流做一個拆分,拆分後的話,每種告警裡面的數據又想將告警數據和恢複數據拆分出來。

結果,這個需求用 Flink 的 Split 運算符出現了問題。

分析

需求如下圖所示:

我是期望如上這樣將數據流進行拆分的,最後將每種告警和恢復用不同的消息模版做一個渲染,渲染後再通過各種其他的方式(釘釘群
郵件、簡訊)進行告警通知。

於是我的代碼大概的結構如下代碼所示:

//dataStream 是總的數據流

//split 是拆分後的數據流
SplitStream<AlertEvent> split = dataStream.split(new OutputSelector<AlertEvent>() {
    @Override
    public Iterable<String> select(AlertEvent value) {
        List<String> tags = new ArrayList<>();
        switch (value.getType()) {
            case MIDDLEWARE:
                tags.add(MIDDLEWARE);
                break;
            case HEALTH_CHECK:
                tags.add(HEALTH_CHECK);
                break;
            case DOCKER:
                tags.add(DOCKER);
                break;
            //...
            //當然這裡還可以很多種類型
        }
        return tags;
    }
});

//然後你想獲取每種不同的數據類型,你可以使用 select
DataStream<AlertEvent> middleware = split.select(MIDDLEWARE);   //選出中間件的數據流

//然後你又要將中間件的數據流分流成告警和恢復
SplitStream<AlertEvent> middlewareSplit = middleware.split(new OutputSelector<AlertEvent>() {
    @Override
    public Iterable<String> select(AlertEvent value) {
        List<String> tags = new ArrayList<>();
        if(value.isRecover()) {
            tags.add(RECOVER)
        } else {
            tags.add(ALERT)
        }
        return tags;
    }
});
middlewareSplit.select(ALERT).print();    
        


DataStream<AlertEvent> healthCheck = split.select(HEALTH_CHECK);   //選出健康檢查的數據流

//然後你又要將健康檢查的數據流分流成告警和恢復
SplitStream<AlertEvent> healthCheckSplit = healthCheck.split(new OutputSelector<AlertEvent>() {
    @Override
    public Iterable<String> select(AlertEvent value) {
        List<String> tags = new ArrayList<>();
        if(value.isRecover()) {
            tags.add(RECOVER)
        } else {
            tags.add(ALERT)
        }
        return tags;
    }
});
healthCheckSplit.select(ALERT).print();



DataStream<AlertEvent> docekr = split.select(DOCKER);   //選出容器的數據流

//然後你又要將容器的數據流分流成告警和恢復
SplitStream<AlertEvent> dockerSplit = docekr.split(new OutputSelector<AlertEvent>() {
    @Override
    public Iterable<String> select(AlertEvent value) {
        List<String> tags = new ArrayList<>();
        if(value.isRecover()) {
            tags.add(RECOVER)
        } else {
            tags.add(ALERT)
        }
        return tags;
    }
});
dockerSplit.select(ALERT).print();

結構我抽象後大概就長上面這樣,然後我先本地測試的時候只把容器的數據那塊代碼打開了,其他種告警的分流代碼註釋掉了,一運行,發現竟然容器告警的數據怎麼還摻雜著健康檢查的數據也一起列印出來了,一開始我以為自己出了啥問題,就再起碼運行了三遍 IDEA 才發現結果一直都是這樣的。

於是,我只好在第二步分流前將 docekr 數據流列印出來,發現是沒什麼問題,列印出來的數據都是容器相關的,沒有摻雜著其他種的數據啊。這會兒遍陷入了沉思,懵逼發呆了一會。

解決問題

於是還是開始面向 Google 編程:

發現第一條就找到答案了,簡直不要太快,點進去可以看到他也有這樣的需求:

然後這個小伙伴還掙扎了下用不同的方法(雖然結果更慘):

最後換了個姿勢就好了(果然小伙子會的姿勢挺多的):

但從這篇文章中,我找到了關聯到的兩個 Flink Issue,分別是:

1、https://issues.apache.org/jira/browse/FLINK-5031

2、https://issues.apache.org/jira/browse/FLINK-11084

然後呢,從第二個 Issue 的討論中我發現了一些很有趣的討論:

對話很有趣,但是我突然想到之前我的知識星球裡面一位很細心的小伙伴問的一個問題了:

可以發現代碼上確實是標明瞭過期了,但是註釋裡面沒寫清楚推薦用啥,幸好我看到了這個 Issue,不然腦子裡面估計這個問題一直會存著呢。

那麼這個問題解決方法是不是意味著就可以利用 Side Outputs 來解決呢?當然可以啦,官方都推薦了,還不能都話,那麼不是打臉啪啪啪的響嗎?不過這裡還是賣個關子將 Side Outputs 後面專門用一篇文章來講,感興趣的可以先看看官網介紹:https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html

另外其實也可以通過 split + filter 組合來解決這個問題,反正關鍵就是不要連續的用 split 來分流。

用 split + filter 的方案代碼大概如下:

DataStream<AlertEvent> docekr = split.select(DOCKER);   //選出容器的數據流

//容器告警的數據流
docekr.filter(new FilterFunction<AlertEvent>() {
    @Override
    public boolean filter(AlertEvent value) throws Exception {
        return !value.isRecover();
    }
})
.print();
        
//容器恢復的數據流        
docekr.filter(new FilterFunction<AlertEvent>() {
    @Override
    public boolean filter(AlertEvent value) throws Exception {
        return value.isRecover();
    }
})
.print();        

上面這種就是多次 filter 也可以滿足需求,但是就是代碼有點啰嗦。

總結

Flink 中不支持連續的 Split/Select 分流操作,要實現連續分流也可以通過其他的方式(split + filter 或者 side output)來實現

本篇文章連接是:http://www.54tianzhisheng.cn/2019/06/12/flink-split/

關註我

微信公眾號:zhisheng

另外我自己整理了些 Flink 的學習資料,目前已經全部放到微信公眾號了。你可以加我的微信:zhisheng_tian,然後回覆關鍵字:Flink 即可無條件獲取到。

更多私密資料請加入知識星球!

Github 代碼倉庫

https://github.com/zhisheng17/flink-learning/

以後這個項目的所有代碼都將放在這個倉庫里,包含了自己學習 flink 的一些 demo 和博客。

1、Flink 從0到1學習—— Apache Flink 介紹

2、Flink 從0到1學習—— Mac 上搭建 Flink 1.6.0 環境並構建運行簡單程式入門

3、Flink 從0到1學習—— Flink 配置文件詳解

4、Flink 從0到1學習—— Data Source 介紹

5、Flink 從0到1學習—— 如何自定義 Data Source ?

6、Flink 從0到1學習—— Data Sink 介紹

7、Flink 從0到1學習—— 如何自定義 Data Sink ?

8、Flink 從0到1學習—— Flink Data transformation(轉換)

9、Flink 從0到1學習—— 介紹Flink中的Stream Windows

10、Flink 從0到1學習—— Flink 中的幾種 Time 詳解

11、Flink 從0到1學習—— Flink 寫入數據到 ElasticSearch

12、Flink 從0到1學習—— Flink 項目如何運行?

13、Flink 從0到1學習—— Flink 寫入數據到 Kafka

14、Flink 從0到1學習—— Flink JobManager 高可用性配置

15、Flink 從0到1學習—— Flink parallelism 和 Slot 介紹

16、Flink 從0到1學習—— Flink 讀取 Kafka 數據批量寫入到 MySQL

17、Flink 從0到1學習—— Flink 讀取 Kafka 數據寫入到 RabbitMQ

18、Flink 從0到1學習》—— 你上傳的 jar 包藏到哪裡去了?

19、Flink 從0到1學習 —— Flink 中如何管理配置?

源碼解析

1、Flink 源碼解析 —— 源碼編譯運行

2、Flink 源碼解析 —— 項目結構一覽

3、Flink 源碼解析—— local 模式啟動流程

4、Flink 源碼解析 —— standalonesession 模式啟動流程

5、Flink 源碼解析 —— Standalone Session Cluster 啟動流程深度分析之 Job Manager 啟動

6、Flink 源碼解析 —— Standalone Session Cluster 啟動流程深度分析之 Task Manager 啟動

7、Flink 源碼解析 —— 分析 Batch WordCount 程式的執行過程

8、Flink 源碼解析 —— 分析 Streaming WordCount 程式的執行過程

9、Flink 源碼解析 —— 如何獲取 JobGraph?

10、Flink 源碼解析 —— 如何獲取 StreamGraph?

11、Flink 源碼解析 —— Flink JobManager 有什麼作用?

12、Flink 源碼解析 —— Flink TaskManager 有什麼作用?

13、Flink 源碼解析 —— JobManager 處理 SubmitJob 的過程

14、Flink 源碼解析 —— TaskManager 處理 SubmitJob 的過程

15、Flink 源碼解析 —— 深度解析 Flink Checkpoint 機制

16、Flink 源碼解析 —— 深度解析 Flink 序列化機制

17、Flink 源碼解析 —— 深度解析 Flink 是如何管理好記憶體的?原文出處:zhisheng的博客,歡迎關註我的公眾號:zhisheng


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • --基於譚浩強老師《C++程式設計(第三版)》做簡要Summary。(2019-07-24) 一、數組與指針 1. 指針數組(type_name * array_name[length]) 一個數組,其元素均為指針類型數據,該數組稱為指針數組。 2. 數組指針 二維數組的指針訪問: 二、 const ...
  • 其實在JAVA開發中servlet配置,映射註入配置等等都可以用xml來配置 在此處的department是實體類的名字,而不是對應的資料庫表的名字 資料庫表的欄位名=#{實體類屬性名} 逆向工程生成的XML文件有查找更新等功能,但是當我們查找的時候需要返回一個類, 我們應該在開頭寫返回結果 res ...
  • 在Java開發中,我們最常見到最頻繁使用的就是HashMap和HashTable,但是線上程競爭激烈的併發場景中使用都不夠合理。 1、HashMap 眾所周知 HashMap 底層是基於數組 + 鏈表組成的,不過在 jdk1.7 與1.8 中具體實現稍有不同。 HashMap是線程不安全的,在併發( ...
  • capture的作用是: 捕獲模板輸出的數據並將其存儲到一個變數,而不是把它們輸出到頁面,任何在 {capture name="foo"}和{/capture}之間的數據將被存儲到變數$foo中,該變數由name屬性指定,在模板中通過 $smarty.capture.foo 訪問該變數,{captu ...
  • 21.閉包 1. 閉包:在嵌套函數內,使用非全局變數(且不使用本層變數) 2. 閉包的作用:1.保證數據的安全性(純潔度)。2.裝飾器使用 3. ._\_closure\_\_判斷是否是閉包 22.裝飾器一(入門) 1.一個裝飾器裝飾多個函數 開放封閉原則:擴展是開放的(增加新功能),源碼是封閉的( ...
  • 在以往的對象模型編碼時,我們需要寫一大堆的get/set以及不同的構造函數等。Lombok為我們提供了一個非常好的插件形式。 在大多數的項目中,只需要使用到以下集中Annotation就足夠了,如果需要查看更多的選項,請參考: "傳送門" 1. 2. 3. 4. 生成final 欄位的構造函數 5. ...
  • Fermat's theorem states that for any prime number p and for any integer a > 1, ap = a (mod p). That is, if we raise a to the pth power and divide by p ...
  • shiro是一個強大而且易用的安全框架(主要包括認證和授權),它比spring security更加簡單,而且它不依賴於任何容器,可以和許多框架集成。 shiro的核心是安全管理器(SecurityManagement),它主要包括四個模塊: 1.Authentication:認證模塊,主要用於驗證 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...