「Flink」使用Java lambda表達式實現Flink WordCount

来源:https://www.cnblogs.com/ilovezihan/archive/2020/01/31/12245067.html
-Advertisement-
Play Games

本篇我們將使用Java語言來實現Flink的單詞統計。代碼開發環境準備導入Flink 1.9 pom依賴<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId ...


本篇我們將使用Java語言來實現Flink的單詞統計。

代碼開發

環境準備

導入Flink 1.9 pom依賴

<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.7</version>
        </dependency>
    </dependencies>

構建Flink流處理環境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

自定義source

每秒生成一行文本

DataStreamSource<String> wordLineDS = env.addSource(new RichSourceFunction<String>() {
            private boolean isCanal = false;
            private String[] words = {
                    "important oracle jdk license update",
                    "the oracle jdk license has changed for releases starting april 16 2019",
                    "the new oracle technology network license agreement for oracle java se is substantially different from prior oracle jdk licenses the new license permits certain uses such as ",
                    "personal use and development use at no cost but other uses authorized under prior oracle jdk licenses may no longer be available please review the terms carefully before ",
                    "downloading and using this product an faq is available here ",
                    "commercial license and support is available with a low cost java se subscription",
                    "oracle also provides the latest openjdk release under the open source gpl license at jdk java net"
            };

            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                // 每秒發送一行文本
                while (!isCanal) {
                    int randomIndex = RandomUtils.nextInt(0, words.length);
                    ctx.collect(words[randomIndex]);
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {
                isCanal = true;
            }
        });

單詞計算

// 3. 單詞統計
        // 3.1 將文本行切分成一個個的單詞
        SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> {
            // 切分單詞
            Arrays.stream(line.split(" ")).forEach(word -> {
                ctx.collect(word);
            });
        }).returns(Types.STRING);

        //3.2 將單詞轉換為一個個的元組
        SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS
                .map(word -> Tuple2.of(word, 1))
                .returns(Types.TUPLE(Types.STRING, Types.INT));

        // 3.3 按照單詞進行分組
        KeyedStream<Tuple2<String, Integer>, String> keyedDS = tupleDS.keyBy(tuple -> tuple.f0);

        // 3.4 對每組單詞數量進行累加
        SingleOutputStreamOperator<Tuple2<String, Integer>> resultDS = keyedDS
                .timeWindow(Time.seconds(3))
                .reduce((t1, t2) -> Tuple2.of(t1.f0, t1.f1 + t2.f1));

        resultDS.print();

參考代碼

public class WordCount {
    public static void main(String[] args) throws Exception {
        // 1. 構建Flink流式初始化環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. 自定義source - 每秒發送一行文本
        DataStreamSource<String> wordLineDS = env.addSource(new RichSourceFunction<String>() {
            private boolean isCanal = false;
            private String[] words = {
                    "important oracle jdk license update",
                    "the oracle jdk license has changed for releases starting april 16 2019",
                    "the new oracle technology network license agreement for oracle java se is substantially different from prior oracle jdk licenses the new license permits certain uses such as ",
                    "personal use and development use at no cost but other uses authorized under prior oracle jdk licenses may no longer be available please review the terms carefully before ",
                    "downloading and using this product an faq is available here ",
                    "commercial license and support is available with a low cost java se subscription",
                    "oracle also provides the latest openjdk release under the open source gpl license at jdk java net"
            };

            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                // 每秒發送一行文本
                while (!isCanal) {
                    int randomIndex = RandomUtils.nextInt(0, words.length);
                    ctx.collect(words[randomIndex]);
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {
                isCanal = true;
            }
        });

        // 3. 單詞統計
        // 3.1 將文本行切分成一個個的單詞
        SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> {
            // 切分單詞
            Arrays.stream(line.split(" ")).forEach(word -> {
                ctx.collect(word);
            });
        }).returns(Types.STRING);

        //3.2 將單詞轉換為一個個的元組
        SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS
                .map(word -> Tuple2.of(word, 1))
                .returns(Types.TUPLE(Types.STRING, Types.INT));

        // 3.3 按照單詞進行分組
        KeyedStream<Tuple2<String, Integer>, String> keyedDS = tupleDS.keyBy(tuple -> tuple.f0);

        // 3.4 對每組單詞數量進行累加
        SingleOutputStreamOperator<Tuple2<String, Integer>> resultDS = keyedDS
                .timeWindow(Time.seconds(3))
                .reduce((t1, t2) -> Tuple2.of(t1.f0, t1.f1 + t2.f1));

        resultDS.print();

        env.execute("app");
    }
}

Flink對Java Lambda表達式支持情況

Flink支持Java API所有操作符使用Lambda表達式。但是,但Lambda表達式使用Java泛型時,就需要聲明類型信息。

我們來看下上述的這段代碼:

SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> {
            // 切分單詞
            Arrays.stream(line.split(" ")).forEach(word -> {
                ctx.collect(word);
            });
        }).returns(Types.STRING);

之所以這裡將所有的類型信息,因為Flink無法正確自動推斷出來Collector中帶的泛型。我們來看一下FlatMapFuntion的源代碼

@Public
@FunctionalInterface
public interface FlatMapFunction<T, O> extends Function, Serializable {

/**
* The core method of the FlatMapFunction. Takes an element from the input data set and transforms
* it into zero, one, or more elements.
*
* @param value The input value.
* @param out The collector for returning result values.
*
* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
* to fail and may trigger recovery.
*/
void flatMap(T value, Collector<O> out) throws Exception;
}

我們發現 flatMap的第二個參數是Collector<O>,是一個帶參數的泛型。Java編譯器編譯該代碼時會進行參數類型擦除,所以Java編譯器會變成成:

void flatMap(T value, Collector out)

這種情況,Flink將無法自動推斷類型信息。如果我們沒有顯示地提供類型信息,將會出現以下錯誤:

org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing.
    In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved.
    An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface.
    Otherwise the type has to be specified explicitly using type information.

這種情況下,必須要顯示指定類型信息,否則輸出將返回值視為Object類型,這將導致Flink無法正確序列化。

所以,我們需要顯示地指定Lambda表達式的參數類型信息,並通過returns方法顯示指定輸出的類型信息

我們再看一段代碼:

SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS
                .map(word -> Tuple2.of(word, 1))
                .returns(Types.TUPLE(Types.STRING, Types.INT));

為什麼map後面也需要指定類型呢?

因為此處map返回的是Tuple2類型,Tuple2是帶有泛型參數,在編譯的時候同樣會被查出泛型參數信息,導致Flink無法正確推斷。

更多關於對Java Lambda表達式的支持請參考官網:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/java_lambdas.html


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 這是Serilog系列的第三篇文章。 1. "第1部分 使用Serilog RequestLogging減少日誌詳細程度" 2. "第2部分 使用Serilog記錄所選的終結點屬性" 3. 第3部分 使用Serilog.AspNetCore記錄MVC屬性(本文) 4. 第4部分 從Serilog請 ...
  • WPF視窗充滿了各種元素,但這些元素中只有一部分是控制項。在WPF領域,控制項通常被描述為與用戶交互的元素——能接收焦點並接受鍵盤或滑鼠輸入的元素。明顯的例子包括文本框和按鈕。然而,這個區別有時有些模糊。將工具提示視為控制項,因為它根據用戶滑鼠的移動顯示或消失。將標簽視為控制項,因為它支持記憶碼(mnemo ...
  • 多點觸控(multi-touch)是通過觸摸屏幕與應用程式進行交互的一種方式。多點觸控輸入和更傳統的基於筆(pen-based)的輸入的區別是多點觸控識別手勢(gesture)——用戶可移動多根手指以執行常見操作的特殊方式。例如,在觸摸屏上放置兩根手指並同時移動他們,這通常意味著“放大",而以一根手 ...
  • " 返回《C 併發編程》" "1. 線程池的由來" "1.1. 線程池出現前" "1.2. 線程池的誕生" "1.3. CLR線程池工作過程" "2. 線程池解決的問題" "2.1. 非同步調用方法" "2.2. 按時間間隔調用方法" "3. 當單個內核對象接收到信號通知時調用方法" "3.1. 註冊 ...
  • EF對數據做什麼樣的操作,是根據EF的上下文實體狀態決定,實體狀態有以下5種狀態,下麵我們就分別看下這5種狀態 數據準備,我們看到學生表裡有20000名學生記錄,最後1位學生的學生編號為0000020000 1、Detached--實體跟上下文壓根沒關係 我們看到我新創建了名學生,學號為000002 ...
  • 很多時候,我們在IDE中編寫Flink代碼,我們希望能夠查看到Web UI,從而來瞭解Flink程式的運行情況。按照以下步驟操作即可,親測有效。1、添加Maven依賴<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink- ...
  • 先上代碼:public class WordCountKeyedState { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvi ...
  • 在Flink中的每個函數和運算符都是有狀態的。在處理過程中可以用狀態來存儲數據,這樣可以利用狀態來構建複雜操作。為了讓狀態容錯,Flink需要設置checkpoint狀態。Flink程式是通過checkpoint來保證容錯,通過checkpoint機制,Flink可恢復作業的狀態和計算位置。chec ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...