【Flink入門修煉】1-3 Flink WordCount 入門實現

来源:https://www.cnblogs.com/shuofxz/p/18011142
-Advertisement-
Play Games

本篇文章將帶大家運行 Flink 最簡單的程式 WordCount。先實踐後理論,對其基本輸入輸出、編程代碼有初步瞭解,後續篇章再對 Flink 的各種概念和架構進行介紹。 下麵將從創建項目開始,介紹如何創建出一個 Flink 項目;然後從 DataStream 流處理和 FlinkSQL 執行兩種... ...


本篇文章將帶大家運行 Flink 最簡單的程式 WordCount。先實踐後理論,對其基本輸入輸出、編程代碼有初步瞭解,後續篇章再對 Flink 的各種概念和架構進行介紹。
下麵將從創建項目開始,介紹如何創建出一個 Flink 項目;然後從 DataStream 流處理和 FlinkSQL 執行兩種方式來帶大家學習 WordCount 程式的開發。
Flink 各版本之間變化較多,之前版本的函數在後續版本可能不再支持。跟隨學習時,請儘量選擇和筆者同版本的 Flink。本文使用的 Flink 版本是 1.13.2。

一、創建項目

在很多其他教程中,會看到如下來創建 Flink 程式的方式。雖然簡單方便,但對初學者來說,不知道初始化項目的時候做了什麼,如果報錯了也不知道該如何排查。

mvn archetype:generate
-DarchetypeGroupId=org.apache.flink
-DarchetypeArtifactId=flink-quickstart-java
-DarchetypeVersion=1.13.2
通過指定 Maven 工程的三要素,即 GroupId、ArtifactId、Version 來創建一個新的工程。同時 Flink 給我提供了更為方便的創建 Flink 工程的方法:
curl https://flink.apache.org/q/quickstart.sh | bash -s 1.13.2

因此,我們手動來創建一個 Maven 項目,看看到底如何創建出一個 Flink 項目。
1、通過 IDEA 創建一個 Maven 項目
image.png

2、pom.xml 添加:
這裡我們選擇的是 Flink 1.13.2 版本(Flink 1.14 之後部分類和函數有變化,可自行探索)。

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.13.2</flink.version> <!-- 1.14 之後部分類和函數有變化,可自行探索 -->
        <target.java.version>1.8</target.java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <maven.compiler.source>${target.java.version}</maven.compiler.source>
        <maven.compiler.target>${target.java.version}</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

二、DataStream WordCount

一)編寫程式

基礎項目環境已經搞好了,接下來我們模仿一個流式環境,監聽本地的 Socket 埠,使用 Flink 統計流入的不同單詞個數。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


public class SocketTextStreamWordCount {
    public static void main(String[] args) throws Exception {
        //參數檢查
        if (args.length != 2) {
            // System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>");
            // return;
            args = new String[]{"127.0.0.1", "9000"};
        }

        String hostname = args[0];
        Integer port = Integer.parseInt(args[1]);


        // 創建 streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 獲取數據
        DataStreamSource<String> stream = env.socketTextStream(hostname, port);

        // 計數
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = stream.flatMap(new LineSplitter())
                .keyBy(0)
                .sum(1);

        sum.print();

        env.execute("Java WordCount from SocketTextStream Example");
    }

    public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) {
            String[] tokens = s.toLowerCase().split("\\W+");

            for (String token: tokens) {
                if (token.length() > 0) {
                    collector.collect(new Tuple2<String, Integer>(token, 1));
                }
            }
        }
    }
}

二)測試

接下來我們進行程式測試。
我們在本地使用 netcat 命令啟動一個埠:

nc -l 9000

然後啟動程式,能看到控制台一些輸出:
image.png

接下來,在 nc 中輸入:

$ nc -l 9000
hello world
flink flink flink

回到我們的程式,能看到統計的輸出:

3> (hello,1)
6> (world,1)
8> (flink,1)
8> (flink,2)
8> (flink,3)

image.png

三)如果有報錯

如果出現執行報錯:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/api/java/io/TextInputFormat
	at com.shuofxz.SocketTextStreamWordCount.main(SocketTextStreamWordCount.java:25)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.java.io.TextInputFormat
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
	... 1 more

在 IDE 中把 「Add dependencies with "Provided" scope to classpath」勾選上:
image.png

一)介紹 FlinkSQL

Flink SQL 是 Flink 實時計算為簡化計算模型,降低用戶使用實時計算門檻而設計的一套符合標準 SQL 語義的開發語言。
上面單詞統計的邏輯可以轉化為下麵的 SQL。
直接來看這個 SQL:

select word as word, sum(frequency) as frequency from WordCount group by word
  • WordCount 是要進行單詞統計的表,我們會先做一些處理,將輸入的單詞都存放到這個表中
  • 表我們定義為兩列(word, frequency),初始轉化輸入每個單詞占一行,frequency 都是 1
  • 然後,就可以按照 SQL 的邏輯來進行統計聚合了。

其中,WordCount 表數據如下:

word frequency
hello 1
world 1
flink 1
flink 1
flink 1

那麼接下來我們看,如何寫一個 FlinkSQL 的程式。

二)環境和程式

首先,添加 FlinkSQL 需要的依賴:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

程式如下:

public class SQLWordCount {
    public static void main(String[] args) throws Exception {
        // 創建上下文環境
        ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);

        // 讀取一行模擬數據作為輸入
        String words = "hello world flink flink flink";
        String[] split = words.split("\\W+");

        ArrayList<WC> list = new ArrayList<>();

        for (String word : split) {
            WC wc = new WC(word, 1);
            list.add(wc);
        }

        DataSource<WC> input = fbEnv.fromCollection(list);

        // DataSet 轉 SQL,指定欄位名
        Table table = fbTableEnv.fromDataSet(input, "word,frequency");
        table.printSchema();

        // 註冊為一個表
        fbTableEnv.createTemporaryView("WordCount", table);

        Table table1 = fbTableEnv.sqlQuery("select word as word, sum(frequency) as frequency from WordCount group by word");

        DataSet<WC> ds1 = fbTableEnv.toDataSet(table1, WC.class);
        ds1.printToErr();
    }

    public static class WC {
        public String word;
        public long frequency;

        public WC() {}

        public WC(String word, long frequency) {
            this.word = word;
            this.frequency = frequency;
        }

        @Override
        public String toString() {
            return  word + ", " + frequency;
        }
    }
}

執行,結果輸出:

(
  `word` STRING,
  `frequency` BIGINT
)
flink, 3
world, 1
hello, 1

image.png

四、小結

本篇手把手的帶大家搭建起 Flink Maven 項目,然後使用 DataStream 和 FlinkSQL 兩種方式來學習 WordCount 單詞計數這一最簡單最經典的 Flink 程式開發。跟著步驟一步步執行下來,大家應該對 Flink 程式基本執行流程有個初步的瞭解,為後續的學習打下了基礎。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 主機發現 arp-scan -l ┌──(root㉿kali)-[~] └─# arp-scan -l Interface: eth0, type: EN10MB, MAC: 00:0c:29:6b:ed:27, IPv4: 192.168.100.251 Starting arp-scan 1.1 ...
  • 大家好,我是痞子衡,是正經搞技術的痞子。今天痞子衡給大家介紹的是SRC_SBMRx寄存器對於定位i.MXRT1xxx離線無法啟動問題的意義。 最近有一位開源社區大佬在使能 RT1050 BEE 加密過程中遇到無法啟動問題,折騰到一度崩潰,甚至想要棄坑。痞子衡哪能讓這位“老鄉”跑掉,連忙給予緊急支持, ...
  • 一直知道 OpenWrt 經常拿來做軟路由軟體。最近買了個二手小主機升騰 C92 來做旁路由伺服器,其被歸為瘦客戶機一類,感覺和工控機差不多,現價百元以內。想就著這個機會,瞭解體驗一下 OpenWrt。 ...
  • 封裝WINDOWS10系統 一、準備工具: 1、安裝NTLite。下載鏈接:https://www.ntlite.com/download/ 2、準備一個ISO的PE例如:FirPE。下載鏈接https://firpe.cn/page-247 3、安裝VMware 4、下載封裝工具Easy Sysp ...
  • 瀏覽器每次打開都有個煩人的提示要獲取將來的 microsoft edge 更新,需要 macos 10.15 或更高版本,找了很久也沒有解決辦法,有 windows 端的解決方案,有禁止更新的解決方案,就是沒有 Mac 上如何避免這個告警的方案,於是走上 Edge 定製化之旅。 效果 使用前後對比 ...
  • 在 Linux 中,你可以使用 alias 命令來設置別名。別名允許你為常用的命令創建自定義的縮寫或快捷方式。這些別名會存在於當前會話中,如果你希望使別名永久生效,可以將其添加到你的 shell 配置文件中(例如 .bashrc、.bash_profile、.zshrc 等)。 1、查看已有別名 通 ...
  • 硬體開發板:STM32G0B1RET6 軟體平臺:cubemax+keil+VScode 內容原著聲明 代碼借鑒學習於以下文章: STM32 使用硬體IIC驅動0.96寸4針IOLED顯示器(HAL庫) 1 新建cubemax工程 1.1 配置系統時鐘RCC 1.2 配置引腳 1.3 導出工程 略. ...
  • Java 迴圈 迴圈可以執行一個代碼塊,只要達到指定的條件。迴圈很方便,因為它們節省時間,減少錯誤,並使代碼更易讀。 Java While 迴圈 while 迴圈會迴圈執行一個代碼塊,只要指定的條件為真: 語法 while (condition) { // 要執行的代碼塊 } 在下麵的示例中,只要變 ...
一周排行
    -Advertisement-
    Play Games
  • 前言 微服務架構已經成為搭建高效、可擴展系統的關鍵技術之一,然而,現有許多微服務框架往往過於複雜,使得我們普通開發者難以快速上手並體驗到微服務帶了的便利。為瞭解決這一問題,於是作者精心打造了一款最接地氣的 .NET 微服務框架,幫助我們輕鬆構建和管理微服務應用。 本框架不僅支持 Consul 服務註 ...
  • 先看一下效果吧: 如果不會寫動畫或者懶得寫動畫,就直接交給Blend來做吧; 其實Blend操作起來很簡單,有點類似於在操作PS,我們只需要設置關鍵幀,滑鼠點來點去就可以了,Blend會自動幫我們生成我們想要的動畫效果. 第一步:要創建一個空的WPF項目 第二步:右鍵我們的項目,在最下方有一個,在B ...
  • Prism:框架介紹與安裝 什麼是Prism? Prism是一個用於在 WPF、Xamarin Form、Uno 平臺和 WinUI 中構建鬆散耦合、可維護和可測試的 XAML 應用程式框架 Github https://github.com/PrismLibrary/Prism NuGet htt ...
  • 在WPF中,屏幕上的所有內容,都是通過畫筆(Brush)畫上去的。如按鈕的背景色,邊框,文本框的前景和形狀填充。藉助畫筆,可以繪製頁面上的所有UI對象。不同畫筆具有不同類型的輸出( 如:某些畫筆使用純色繪製區域,其他畫筆使用漸變、圖案、圖像或繪圖)。 ...
  • 前言 嗨,大家好!推薦一個基於 .NET 8 的高併發微服務電商系統,涵蓋了商品、訂單、會員、服務、財務等50多種實用功能。 項目不僅使用了 .NET 8 的最新特性,還集成了AutoFac、DotLiquid、HangFire、Nlog、Jwt、LayUIAdmin、SqlSugar、MySQL、 ...
  • 本文主要介紹攝像頭(相機)如何採集數據,用於類似攝像頭本地顯示軟體,以及流媒體數據傳輸場景如傳屏、視訊會議等。 攝像頭採集有多種方案,如AForge.NET、WPFMediaKit、OpenCvSharp、EmguCv、DirectShow.NET、MediaCaptre(UWP),網上一些文章以及 ...
  • 前言 Seal-Report 是一款.NET 開源報表工具,擁有 1.4K Star。它提供了一個完整的框架,使用 C# 編寫,最新的版本採用的是 .NET 8.0 。 它能夠高效地從各種資料庫或 NoSQL 數據源生成日常報表,並支持執行複雜的報表任務。 其簡單易用的安裝過程和直觀的設計界面,我們 ...
  • 背景需求: 系統需要對接到XXX官方的API,但因此官方對接以及管理都十分嚴格。而本人部門的系統中包含諸多子系統,系統間為了穩定,程式間多數固定Token+特殊驗證進行調用,且後期還要提供給其他兄弟部門系統共同調用。 原則上:每套系統都必須單獨接入到官方,但官方的接入複雜,還要官方指定機構認證的證書 ...
  • 本文介紹下電腦設備關機的情況下如何通過網路喚醒設備,之前電源S狀態 電腦Power電源狀態- 唐宋元明清2188 - 博客園 (cnblogs.com) 有介紹過遠程喚醒設備,後面這倆天瞭解多了點所以單獨加個隨筆 設備關機的情況下,使用網路喚醒的前提條件: 1. 被喚醒設備需要支持這WakeOnL ...
  • 前言 大家好,推薦一個.NET 8.0 為核心,結合前端 Vue 框架,實現了前後端完全分離的設計理念。它不僅提供了強大的基礎功能支持,如許可權管理、代碼生成器等,還通過採用主流技術和最佳實踐,顯著降低了開發難度,加快了項目交付速度。 如果你需要一個高效的開發解決方案,本框架能幫助大家輕鬆應對挑戰,實 ...