【Flink入門修煉】1-3 Flink WordCount 入門實現

来源:https://www.cnblogs.com/shuofxz/p/18011142
-Advertisement-
Play Games

本篇文章將帶大家運行 Flink 最簡單的程式 WordCount。先實踐後理論,對其基本輸入輸出、編程代碼有初步瞭解,後續篇章再對 Flink 的各種概念和架構進行介紹。 下麵將從創建項目開始,介紹如何創建出一個 Flink 項目;然後從 DataStream 流處理和 FlinkSQL 執行兩種... ...


本篇文章將帶大家運行 Flink 最簡單的程式 WordCount。先實踐後理論,對其基本輸入輸出、編程代碼有初步瞭解,後續篇章再對 Flink 的各種概念和架構進行介紹。
下麵將從創建項目開始,介紹如何創建出一個 Flink 項目;然後從 DataStream 流處理和 FlinkSQL 執行兩種方式來帶大家學習 WordCount 程式的開發。
Flink 各版本之間變化較多,之前版本的函數在後續版本可能不再支持。跟隨學習時,請儘量選擇和筆者同版本的 Flink。本文使用的 Flink 版本是 1.13.2。

一、創建項目

在很多其他教程中,會看到如下來創建 Flink 程式的方式。雖然簡單方便,但對初學者來說,不知道初始化項目的時候做了什麼,如果報錯了也不知道該如何排查。

mvn archetype:generate
-DarchetypeGroupId=org.apache.flink
-DarchetypeArtifactId=flink-quickstart-java
-DarchetypeVersion=1.13.2
通過指定 Maven 工程的三要素,即 GroupId、ArtifactId、Version 來創建一個新的工程。同時 Flink 給我提供了更為方便的創建 Flink 工程的方法:
curl https://flink.apache.org/q/quickstart.sh | bash -s 1.13.2

因此,我們手動來創建一個 Maven 項目,看看到底如何創建出一個 Flink 項目。
1、通過 IDEA 創建一個 Maven 項目
image.png

2、pom.xml 添加:
這裡我們選擇的是 Flink 1.13.2 版本(Flink 1.14 之後部分類和函數有變化,可自行探索)。

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.13.2</flink.version> <!-- 1.14 之後部分類和函數有變化,可自行探索 -->
        <target.java.version>1.8</target.java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <maven.compiler.source>${target.java.version}</maven.compiler.source>
        <maven.compiler.target>${target.java.version}</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

二、DataStream WordCount

一)編寫程式

基礎項目環境已經搞好了,接下來我們模仿一個流式環境,監聽本地的 Socket 埠,使用 Flink 統計流入的不同單詞個數。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


public class SocketTextStreamWordCount {
    public static void main(String[] args) throws Exception {
        //參數檢查
        if (args.length != 2) {
            // System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>");
            // return;
            args = new String[]{"127.0.0.1", "9000"};
        }

        String hostname = args[0];
        Integer port = Integer.parseInt(args[1]);


        // 創建 streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 獲取數據
        DataStreamSource<String> stream = env.socketTextStream(hostname, port);

        // 計數
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = stream.flatMap(new LineSplitter())
                .keyBy(0)
                .sum(1);

        sum.print();

        env.execute("Java WordCount from SocketTextStream Example");
    }

    public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) {
            String[] tokens = s.toLowerCase().split("\\W+");

            for (String token: tokens) {
                if (token.length() > 0) {
                    collector.collect(new Tuple2<String, Integer>(token, 1));
                }
            }
        }
    }
}

二)測試

接下來我們進行程式測試。
我們在本地使用 netcat 命令啟動一個埠:

nc -l 9000

然後啟動程式,能看到控制台一些輸出:
image.png

接下來,在 nc 中輸入:

$ nc -l 9000
hello world
flink flink flink

回到我們的程式,能看到統計的輸出:

3> (hello,1)
6> (world,1)
8> (flink,1)
8> (flink,2)
8> (flink,3)

image.png

三)如果有報錯

如果出現執行報錯:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/api/java/io/TextInputFormat
	at com.shuofxz.SocketTextStreamWordCount.main(SocketTextStreamWordCount.java:25)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.java.io.TextInputFormat
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
	... 1 more

在 IDE 中把 「Add dependencies with "Provided" scope to classpath」勾選上:
image.png

一)介紹 FlinkSQL

Flink SQL 是 Flink 實時計算為簡化計算模型,降低用戶使用實時計算門檻而設計的一套符合標準 SQL 語義的開發語言。
上面單詞統計的邏輯可以轉化為下麵的 SQL。
直接來看這個 SQL:

select word as word, sum(frequency) as frequency from WordCount group by word
  • WordCount 是要進行單詞統計的表,我們會先做一些處理,將輸入的單詞都存放到這個表中
  • 表我們定義為兩列(word, frequency),初始轉化輸入每個單詞占一行,frequency 都是 1
  • 然後,就可以按照 SQL 的邏輯來進行統計聚合了。

其中,WordCount 表數據如下:

word frequency
hello 1
world 1
flink 1
flink 1
flink 1

那麼接下來我們看,如何寫一個 FlinkSQL 的程式。

二)環境和程式

首先,添加 FlinkSQL 需要的依賴:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

程式如下:

public class SQLWordCount {
    public static void main(String[] args) throws Exception {
        // 創建上下文環境
        ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);

        // 讀取一行模擬數據作為輸入
        String words = "hello world flink flink flink";
        String[] split = words.split("\\W+");

        ArrayList<WC> list = new ArrayList<>();

        for (String word : split) {
            WC wc = new WC(word, 1);
            list.add(wc);
        }

        DataSource<WC> input = fbEnv.fromCollection(list);

        // DataSet 轉 SQL,指定欄位名
        Table table = fbTableEnv.fromDataSet(input, "word,frequency");
        table.printSchema();

        // 註冊為一個表
        fbTableEnv.createTemporaryView("WordCount", table);

        Table table1 = fbTableEnv.sqlQuery("select word as word, sum(frequency) as frequency from WordCount group by word");

        DataSet<WC> ds1 = fbTableEnv.toDataSet(table1, WC.class);
        ds1.printToErr();
    }

    public static class WC {
        public String word;
        public long frequency;

        public WC() {}

        public WC(String word, long frequency) {
            this.word = word;
            this.frequency = frequency;
        }

        @Override
        public String toString() {
            return  word + ", " + frequency;
        }
    }
}

執行,結果輸出:

(
  `word` STRING,
  `frequency` BIGINT
)
flink, 3
world, 1
hello, 1

image.png

四、小結

本篇手把手的帶大家搭建起 Flink Maven 項目,然後使用 DataStream 和 FlinkSQL 兩種方式來學習 WordCount 單詞計數這一最簡單最經典的 Flink 程式開發。跟著步驟一步步執行下來,大家應該對 Flink 程式基本執行流程有個初步的瞭解,為後續的學習打下了基礎。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 主機發現 arp-scan -l ┌──(root㉿kali)-[~] └─# arp-scan -l Interface: eth0, type: EN10MB, MAC: 00:0c:29:6b:ed:27, IPv4: 192.168.100.251 Starting arp-scan 1.1 ...
  • 大家好,我是痞子衡,是正經搞技術的痞子。今天痞子衡給大家介紹的是SRC_SBMRx寄存器對於定位i.MXRT1xxx離線無法啟動問題的意義。 最近有一位開源社區大佬在使能 RT1050 BEE 加密過程中遇到無法啟動問題,折騰到一度崩潰,甚至想要棄坑。痞子衡哪能讓這位“老鄉”跑掉,連忙給予緊急支持, ...
  • 一直知道 OpenWrt 經常拿來做軟路由軟體。最近買了個二手小主機升騰 C92 來做旁路由伺服器,其被歸為瘦客戶機一類,感覺和工控機差不多,現價百元以內。想就著這個機會,瞭解體驗一下 OpenWrt。 ...
  • 封裝WINDOWS10系統 一、準備工具: 1、安裝NTLite。下載鏈接:https://www.ntlite.com/download/ 2、準備一個ISO的PE例如:FirPE。下載鏈接https://firpe.cn/page-247 3、安裝VMware 4、下載封裝工具Easy Sysp ...
  • 瀏覽器每次打開都有個煩人的提示要獲取將來的 microsoft edge 更新,需要 macos 10.15 或更高版本,找了很久也沒有解決辦法,有 windows 端的解決方案,有禁止更新的解決方案,就是沒有 Mac 上如何避免這個告警的方案,於是走上 Edge 定製化之旅。 效果 使用前後對比 ...
  • 在 Linux 中,你可以使用 alias 命令來設置別名。別名允許你為常用的命令創建自定義的縮寫或快捷方式。這些別名會存在於當前會話中,如果你希望使別名永久生效,可以將其添加到你的 shell 配置文件中(例如 .bashrc、.bash_profile、.zshrc 等)。 1、查看已有別名 通 ...
  • 硬體開發板:STM32G0B1RET6 軟體平臺:cubemax+keil+VScode 內容原著聲明 代碼借鑒學習於以下文章: STM32 使用硬體IIC驅動0.96寸4針IOLED顯示器(HAL庫) 1 新建cubemax工程 1.1 配置系統時鐘RCC 1.2 配置引腳 1.3 導出工程 略. ...
  • Java 迴圈 迴圈可以執行一個代碼塊,只要達到指定的條件。迴圈很方便,因為它們節省時間,減少錯誤,並使代碼更易讀。 Java While 迴圈 while 迴圈會迴圈執行一個代碼塊,只要指定的條件為真: 語法 while (condition) { // 要執行的代碼塊 } 在下麵的示例中,只要變 ...
一周排行
    -Advertisement-
    Play Games
  • 隨著Aspire發佈preview5的發佈,Microsoft.Extensions.ServiceDiscovery隨之更新, 服務註冊發現這個屬於老掉牙的話題解決什麼問題就不贅述了,這裡主要講講Microsoft.Extensions.ServiceDiscovery(preview5)以及如何 ...
  • 概述:通過使用`SemaphoreSlim`,可以簡單而有效地限制非同步HTTP請求的併發量,確保在任何給定時間內不超過20個網頁同時下載。`ParallelOptions`不適用於非同步操作,但可考慮使用`Parallel.ForEach`,儘管在非同步場景中謹慎使用。 對於併發非同步 I/O 操作的數量 ...
  • 1.Linux上安裝Docken 伺服器系統版本以及內核版本:cat /etc/redhat-release 查看伺服器內核版本:uname -r 安裝依賴包:yum install -y yum-utils device-mapper-persistent-data lvm2 設置阿裡雲鏡像源:y ...
  • 概述:WPF界面綁定和渲染大量數據可能導致性能問題。通過啟用UI虛擬化、非同步載入和數據分頁,可以有效提高界面響應性能。以下是簡單示例演示這些優化方法。 在WPF中,當你嘗試綁定和渲染大量的數據項時,性能問題可能出現。以下是一些可能導致性能慢的原因以及優化方法: UI 虛擬化: WPF提供了虛擬化技術 ...
  • 引言 上一章節介紹了 TDD 的三大法則,今天我們講一下在單元測試中模擬對象的使用。 Fake Fake - Fake 是一個通用術語,可用於描述 stub或 mock 對象。 它是 stub 還是 mock 取決於使用它的上下文。 也就是說,Fake 可以是 stub 或 mock Mock - ...
  • 為.net6在CentOS7上面做準備,先在vmware虛擬機安裝CentOS 7.9 新建CentOS764位的系統 因為CentOS8不更新了,所以安裝7;簡單就一筆帶過了 選擇下載好的操作系統的iso文件,下載地址https://mirrors.aliyun.com/centos/7.9.20 ...
  • 經過前面幾篇的學習,我們瞭解到指令的大概分類,如:參數載入指令,該載入指令以 Ld 開頭,將參數載入到棧中,以便於後續執行操作命令。參數存儲指令,其指令以 St 開頭,將棧中的數據,存儲到指定的變數中,以方便後續使用。創建實例指令,其指令以 New 開頭,用於在運行時動態生成並初始化對象。方法調用指... ...
  • LiteDB 是一個輕量級的嵌入式 NoSQL 資料庫,其設計理念與 MongoDB 類似,但它是完全使用 C# 開發的,因此與 C# 應用程式的集成非常順暢。與 SQLite 相比,LiteDB 提供了 NoSQL(即鍵值對)的數據存儲方式,並且是一個開源且免費的項目。它適用於桌面、移動以及 We ...
  • 1 開源解析和拆分文檔 第三方的工具去對文件解析拆分,去將我們的文件內容給提取出來,並將我們的文檔內容去拆分成一個小的chunk。常見的PDF word mark down, JSON、HTML。都可以有很好的一些模塊去把這些文件去進行一個東西去提取。 優勢 支持豐富的文檔類型 每種文檔多樣化選擇 ...
  • OOM是什麼?英文全稱為 OutOfMemoryError(記憶體溢出錯誤)。當程式發生OOM時,如何去定位導致異常的代碼還是挺麻煩的。 要檢查OOM發生的原因,首先需要瞭解各種OOM情況下會報的異常信息。這樣能縮小排查範圍,再結合異常堆棧、heapDump文件、JVM分析工具和業務代碼來判斷具體是哪 ...