Flink-使用流批一體API統計單詞數量

来源:https://www.cnblogs.com/luxh/archive/2022/06/29/16424238.html
-Advertisement-
Play Games

The DataStream API gets its name from the special DataStream class that is used to represent a collection of data in a Flink program. You can think of ...


The DataStream API gets its name from the special DataStream class that is used to represent a collection of data in a Flink program. You can think of them as immutable collections of data that can contain duplicates. This data can either be finite or unbounded, the API that you use to work on them is the same.

執行模式(流/批)

DataStream API 支持不同的運行時執行模式,你可以根據你的用例需要和作業特點進行選擇。

DataStream API 有一種”經典“的執行行為,我們稱之為流(STREAMING)執行模式。這種模式適用於需要連續增量處理,而且預計無限期保持線上的無邊界作業。

此外,還有一種批式執行模式,我們稱之為批(BATCH)執行模式。這種執行作業的方式更容易讓人聯想到批處理框架,比如 MapReduce。這種執行模式適用於有一個已知的固定輸入,而且不會連續運行的有邊界作業。

Apache Flink 對流處理和批處理統一方法,意味著無論配置何種執行模式,在有界輸入上執行的 DataStream 應用都會產生相同的最終 結果。重要的是要註意最終 在這裡是什麼意思:一個在流模式執行的作業可能會產生增量更新(想想資料庫中的插入(upsert)操作),而批作業只在最後產生一個最終結果。儘管計算方法不同,只要呈現方式得當,最終結果會是相同的。

通過啟用批執行,我們允許 Flink 應用只有在我們知道輸入是有邊界的時侯才會使用到的額外的優化。例如,可以使用不同的關聯(join)/ 聚合(aggregation)策略,允許實現更高效的任務調度和故障恢復行為的不同 shuffle。下麵我們將介紹一些執行行為的細節。

什麼時候可以/應該使用批處理模式?

批執行模式只能用於 有邊界 的作業/Flink 程式。邊界是數據源的一個屬性,告訴我們在執行前,來自該數據源的所有輸入是否都是已知的,或者是否會有新的數據出現,可能是無限的。而對一個作業來說,如果它的所有源都是有邊界的,則它就是有邊界的,否則就是無邊界的。

而流執行模式,既可用於有邊界任務,也可用於無邊界任務。

一般來說,在你的程式是有邊界的時候,你應該使用批執行模式,因為這樣做會更高效。當你的程式是無邊界的時候,你必須使用流執行模式,因為只有這種模式足夠通用,能夠處理連續的數據流。

一個明顯的例外是當你想使用一個有邊界作業去自展一些作業狀態,並將狀態使用在之後的無邊界作業的時候。例如,通過流模式運行一個有邊界作業,取一個 savepoint,然後在一個無邊界作業上恢復這個 savepoint。這是一個非常特殊的用例,當我們允許將 savepoint 作為批執行作業的附加輸出時,這個用例可能很快就會過時。

另一個你可能會使用流模式運行有邊界作業的情況是當你為最終會在無邊界數據源寫測試代碼的時候。對於測試來說,在這些情況下使用有邊界數據源可能更自然。

配置批執行模式

執行模式可以通過 execute.runtime-mode 設置來配置。有三種可選的值:

  • STREAMING: 經典 DataStream 執行模式(預設)
  • BATCH: 在 DataStream API 上進行批量式執行
  • AUTOMATIC: 讓系統根據數據源的邊界性來決定

這可以通過 bin/flink run ... 的命令行參數進行配置,或者在創建/配置 StreamExecutionEnvironment 時寫進程式。

下麵是如何通過命令行配置執行模式:

$ bin/flink run -Dexecution.runtime-mode=BATCH examples/streaming/WordCount.jar 

這個例子展示瞭如何在代碼中配置執行模式:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

我們不建議用戶在程式中設置運行模式,而是在提交應用程式時使用命令行進行設置。保持應用程式代碼的免配置可以讓程式更加靈活,因為同一個應用程式可能在任何執行模式下執行。

統計單詞案例

以批處理方式進行統計
  1. 流程

  2. 核心代碼

        ParameterTool parameterFromArgs = ParameterTool.fromArgs(args);
        String input = parameterFromArgs.getRequired("input");

        // 初始化環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);

        // 載入數據源
        DataStreamSource<String> wordSource = env.readTextFile(input, "UTF-8");

        // 數據轉換
        SingleOutputStreamOperator<Word> wordStreamOperator = wordSource.flatMap(new TokenizerFunction());

        // 按單詞分組
        KeyedStream<Word, String> wordKeyedStream = wordStreamOperator.keyBy(new KeySelector<Word, String>() {
            @Override
            public String getKey(Word word) throws Exception {
                return word.getWord();
            }
        });

        // 求和
        SingleOutputStreamOperator<Word> sumStream = wordKeyedStream.sum("frequency");

        sumStream.print();

        env.execute("WordCountBatch");
  1. 在IDE中運行時,需指定-input參數,輸入文件地址
以流處理方式進行統計
  1. 流程

  2. 核心代碼

 // 初始化環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 定義kafka數據源
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("192.168.0.192:9092")
                .setTopics("TOPIC_WORD")
                .setGroupId("TEST_GROUP")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        // 載入數據源
        DataStreamSource<String> kafkaWordSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Word Source");

        // 數據轉換
        SingleOutputStreamOperator<Word> wordStreamOperator = kafkaWordSource.flatMap(new TokenizerFunction());

        // 按單詞分組
        KeyedStream<Word, String> wordKeyedStream = wordStreamOperator.keyBy(new KeySelector<Word, String>() {
            @Override
            public String getKey(Word word) throws Exception {
                return word.getWord();
            }
        });

        // 求和
        SingleOutputStreamOperator<Word> sumStream = wordKeyedStream.sum("frequency");
        sumStream.print();


        env.execute("WordCountStream");

完整代碼地址

https://github.com/Mr-LuXiaoHua/study-flink

com.example.datastream.wordcount.DataStreamApiWordCountBatch  --從文件讀取數據進行單詞統計
com.example.datastream.wordcount.DataStreamApiWordCountStream --從Kafka消費數據進行單詞統計

提交到flink集群執行:
bin/flink run -m 127.0.0.1:8081 -c com.example.datastream.wordcount.DataStreamApiWordCountBatch -input /mnt/data/words.txt /opt/apps/study-flink-1.0.jar
-input 指定輸入文件路徑


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 前言 在瞭解原型和原型鏈之前,我們先瞭解一部分概念,constructor,prototype,proto。 constructor 在之前判斷數據類型的文章: javaScript常見數據類型檢查校驗 有提到過關於構造函數的屬性constructor constructor 的是返回創建實例對象的 ...
  • 前言 我們可以使用SpringCloud框架中Feign組完成微服務之間的遠程調用; 但是Feign組件底層基於HTTP協議,HTTP協議的特點是請求同步,而且既需要請求也需要響應,屬於同步遠程調用; 微服務架構在同步遠程調用的場景下,如果服務提供者一直沒有響應服務消費者,很容易造成服務雪崩; 如果 ...
  • BUAA_OO_U4_Summary 一 / 架構設計 1.0> 題目解析 實現UML類圖的分析。 1.1> HW13 1.1.1> 關於UML 從JML到UML,對於從模型到實現的能力訓練,此單元從上一單元的Java Modeling Language升級到了Unified Modeling La ...
  • 目錄 一.簡介 二.效果演示 三.源碼下載 四.猜你喜歡 零基礎 OpenGL (ES) 學習路線推薦 : OpenGL (ES) 學習目錄 >> OpenGL ES 基礎 零基礎 OpenGL (ES) 學習路線推薦 : OpenGL (ES) 學習目錄 >> OpenGL ES 轉場 零基礎 O ...
  • 眾所周知,所有權是Rust區別於其他語言的一大特色,只要代碼滿足了所有權規則,我們就不用擔心記憶體的泄露的問題。 讓代碼在編譯階段就解決記憶體的問題,而不是在運行崩潰後再調試。 Rust中,所有權和借用的規則其實並不複雜,所有權有3條規則,借用只有2條規則。 所有權的規則 1. Rust中的每一個值都有 ...
  • 昨晚我正在床上睡得著著的,突然來了一條簡訊。 啥,線上MySQL死鎖了,我趕緊登錄線上系統,查看業務日誌。 ...
  • 打開源代碼發現了個./time.php?source 於是打開點進去 <?php #error_reporting(0); class HelloPhp { public $a; public $b; public function __construct(){ $this->a = "Y-m-d ...
  • 來源:blog.csdn.net/qq_29879799/article/details/105146415 java的stream編程給調試帶來了極大的不便,idea 推出了streamtrace功能,可以詳細看到每一步操作的關係、結果,非常方便進行調試。 初遇StreamTrace 這裡簡單將字 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...