【Flink入門修煉】1-3 Flink WordCount 入門實現

来源:https://www.cnblogs.com/shuofxz/p/18011142
-Advertisement-
Play Games

本篇文章將帶大家運行 Flink 最簡單的程式 WordCount。先實踐後理論,對其基本輸入輸出、編程代碼有初步瞭解,後續篇章再對 Flink 的各種概念和架構進行介紹。 下麵將從創建項目開始,介紹如何創建出一個 Flink 項目;然後從 DataStream 流處理和 FlinkSQL 執行兩種... ...


本篇文章將帶大家運行 Flink 最簡單的程式 WordCount。先實踐後理論,對其基本輸入輸出、編程代碼有初步瞭解,後續篇章再對 Flink 的各種概念和架構進行介紹。
下麵將從創建項目開始,介紹如何創建出一個 Flink 項目;然後從 DataStream 流處理和 FlinkSQL 執行兩種方式來帶大家學習 WordCount 程式的開發。
Flink 各版本之間變化較多,之前版本的函數在後續版本可能不再支持。跟隨學習時,請儘量選擇和筆者同版本的 Flink。本文使用的 Flink 版本是 1.13.2。

一、創建項目

在很多其他教程中,會看到如下來創建 Flink 程式的方式。雖然簡單方便,但對初學者來說,不知道初始化項目的時候做了什麼,如果報錯了也不知道該如何排查。

mvn archetype:generate
-DarchetypeGroupId=org.apache.flink
-DarchetypeArtifactId=flink-quickstart-java
-DarchetypeVersion=1.13.2
通過指定 Maven 工程的三要素,即 GroupId、ArtifactId、Version 來創建一個新的工程。同時 Flink 給我提供了更為方便的創建 Flink 工程的方法:
curl https://flink.apache.org/q/quickstart.sh | bash -s 1.13.2

因此,我們手動來創建一個 Maven 項目,看看到底如何創建出一個 Flink 項目。
1、通過 IDEA 創建一個 Maven 項目
image.png

2、pom.xml 添加:
這裡我們選擇的是 Flink 1.13.2 版本(Flink 1.14 之後部分類和函數有變化,可自行探索)。

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.13.2</flink.version> <!-- 1.14 之後部分類和函數有變化,可自行探索 -->
        <target.java.version>1.8</target.java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <maven.compiler.source>${target.java.version}</maven.compiler.source>
        <maven.compiler.target>${target.java.version}</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

二、DataStream WordCount

一)編寫程式

基礎項目環境已經搞好了,接下來我們模仿一個流式環境,監聽本地的 Socket 埠,使用 Flink 統計流入的不同單詞個數。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


public class SocketTextStreamWordCount {
    public static void main(String[] args) throws Exception {
        //參數檢查
        if (args.length != 2) {
            // System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>");
            // return;
            args = new String[]{"127.0.0.1", "9000"};
        }

        String hostname = args[0];
        Integer port = Integer.parseInt(args[1]);


        // 創建 streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 獲取數據
        DataStreamSource<String> stream = env.socketTextStream(hostname, port);

        // 計數
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = stream.flatMap(new LineSplitter())
                .keyBy(0)
                .sum(1);

        sum.print();

        env.execute("Java WordCount from SocketTextStream Example");
    }

    public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) {
            String[] tokens = s.toLowerCase().split("\\W+");

            for (String token: tokens) {
                if (token.length() > 0) {
                    collector.collect(new Tuple2<String, Integer>(token, 1));
                }
            }
        }
    }
}

二)測試

接下來我們進行程式測試。
我們在本地使用 netcat 命令啟動一個埠:

nc -l 9000

然後啟動程式,能看到控制台一些輸出:
image.png

接下來,在 nc 中輸入:

$ nc -l 9000
hello world
flink flink flink

回到我們的程式,能看到統計的輸出:

3> (hello,1)
6> (world,1)
8> (flink,1)
8> (flink,2)
8> (flink,3)

image.png

三)如果有報錯

如果出現執行報錯:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/api/java/io/TextInputFormat
	at com.shuofxz.SocketTextStreamWordCount.main(SocketTextStreamWordCount.java:25)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.java.io.TextInputFormat
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
	... 1 more

在 IDE 中把 「Add dependencies with "Provided" scope to classpath」勾選上:
image.png

一)介紹 FlinkSQL

Flink SQL 是 Flink 實時計算為簡化計算模型,降低用戶使用實時計算門檻而設計的一套符合標準 SQL 語義的開發語言。
上面單詞統計的邏輯可以轉化為下麵的 SQL。
直接來看這個 SQL:

select word as word, sum(frequency) as frequency from WordCount group by word
  • WordCount 是要進行單詞統計的表,我們會先做一些處理,將輸入的單詞都存放到這個表中
  • 表我們定義為兩列(word, frequency),初始轉化輸入每個單詞占一行,frequency 都是 1
  • 然後,就可以按照 SQL 的邏輯來進行統計聚合了。

其中,WordCount 表數據如下:

word frequency
hello 1
world 1
flink 1
flink 1
flink 1

那麼接下來我們看,如何寫一個 FlinkSQL 的程式。

二)環境和程式

首先,添加 FlinkSQL 需要的依賴:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

程式如下:

public class SQLWordCount {
    public static void main(String[] args) throws Exception {
        // 創建上下文環境
        ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);

        // 讀取一行模擬數據作為輸入
        String words = "hello world flink flink flink";
        String[] split = words.split("\\W+");

        ArrayList<WC> list = new ArrayList<>();

        for (String word : split) {
            WC wc = new WC(word, 1);
            list.add(wc);
        }

        DataSource<WC> input = fbEnv.fromCollection(list);

        // DataSet 轉 SQL,指定欄位名
        Table table = fbTableEnv.fromDataSet(input, "word,frequency");
        table.printSchema();

        // 註冊為一個表
        fbTableEnv.createTemporaryView("WordCount", table);

        Table table1 = fbTableEnv.sqlQuery("select word as word, sum(frequency) as frequency from WordCount group by word");

        DataSet<WC> ds1 = fbTableEnv.toDataSet(table1, WC.class);
        ds1.printToErr();
    }

    public static class WC {
        public String word;
        public long frequency;

        public WC() {}

        public WC(String word, long frequency) {
            this.word = word;
            this.frequency = frequency;
        }

        @Override
        public String toString() {
            return  word + ", " + frequency;
        }
    }
}

執行,結果輸出:

(
  `word` STRING,
  `frequency` BIGINT
)
flink, 3
world, 1
hello, 1

image.png

四、小結

本篇手把手的帶大家搭建起 Flink Maven 項目,然後使用 DataStream 和 FlinkSQL 兩種方式來學習 WordCount 單詞計數這一最簡單最經典的 Flink 程式開發。跟著步驟一步步執行下來,大家應該對 Flink 程式基本執行流程有個初步的瞭解,為後續的學習打下了基礎。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 主機發現 arp-scan -l ┌──(root㉿kali)-[~] └─# arp-scan -l Interface: eth0, type: EN10MB, MAC: 00:0c:29:6b:ed:27, IPv4: 192.168.100.251 Starting arp-scan 1.1 ...
  • 大家好,我是痞子衡,是正經搞技術的痞子。今天痞子衡給大家介紹的是SRC_SBMRx寄存器對於定位i.MXRT1xxx離線無法啟動問題的意義。 最近有一位開源社區大佬在使能 RT1050 BEE 加密過程中遇到無法啟動問題,折騰到一度崩潰,甚至想要棄坑。痞子衡哪能讓這位“老鄉”跑掉,連忙給予緊急支持, ...
  • 一直知道 OpenWrt 經常拿來做軟路由軟體。最近買了個二手小主機升騰 C92 來做旁路由伺服器,其被歸為瘦客戶機一類,感覺和工控機差不多,現價百元以內。想就著這個機會,瞭解體驗一下 OpenWrt。 ...
  • 封裝WINDOWS10系統 一、準備工具: 1、安裝NTLite。下載鏈接:https://www.ntlite.com/download/ 2、準備一個ISO的PE例如:FirPE。下載鏈接https://firpe.cn/page-247 3、安裝VMware 4、下載封裝工具Easy Sysp ...
  • 瀏覽器每次打開都有個煩人的提示要獲取將來的 microsoft edge 更新,需要 macos 10.15 或更高版本,找了很久也沒有解決辦法,有 windows 端的解決方案,有禁止更新的解決方案,就是沒有 Mac 上如何避免這個告警的方案,於是走上 Edge 定製化之旅。 效果 使用前後對比 ...
  • 在 Linux 中,你可以使用 alias 命令來設置別名。別名允許你為常用的命令創建自定義的縮寫或快捷方式。這些別名會存在於當前會話中,如果你希望使別名永久生效,可以將其添加到你的 shell 配置文件中(例如 .bashrc、.bash_profile、.zshrc 等)。 1、查看已有別名 通 ...
  • 硬體開發板:STM32G0B1RET6 軟體平臺:cubemax+keil+VScode 內容原著聲明 代碼借鑒學習於以下文章: STM32 使用硬體IIC驅動0.96寸4針IOLED顯示器(HAL庫) 1 新建cubemax工程 1.1 配置系統時鐘RCC 1.2 配置引腳 1.3 導出工程 略. ...
  • Java 迴圈 迴圈可以執行一個代碼塊,只要達到指定的條件。迴圈很方便,因為它們節省時間,減少錯誤,並使代碼更易讀。 Java While 迴圈 while 迴圈會迴圈執行一個代碼塊,只要指定的條件為真: 語法 while (condition) { // 要執行的代碼塊 } 在下麵的示例中,只要變 ...
一周排行
    -Advertisement-
    Play Games
  • 前言 推薦一款基於.NET 8、WPF、Prism.DryIoc、MVVM設計模式、Blazor以及MySQL資料庫構建的企業級工作流系統的WPF客戶端框架-AIStudio.Wpf.AClient 6.0。 項目介紹 框架採用了 Prism 框架來實現 MVVM 模式,不僅簡化了 MVVM 的典型 ...
  • 先看一下效果吧: 我們直接通過改造一下原版的TreeView來實現上面這個效果 我們先創建一個普通的TreeView 代碼很簡單: <TreeView> <TreeViewItem Header="人事部"/> <TreeViewItem Header="技術部"> <TreeViewItem He ...
  • 1. 生成式 AI 簡介 https://imp.i384100.net/LXYmq3 2. Python 語言 https://imp.i384100.net/5gmXXo 3. 統計和 R https://youtu.be/ANMuuq502rE?si=hw9GT6JVzMhRvBbF 4. 數 ...
  • 本文為大家介紹下.NET解壓/壓縮zip文件。雖然解壓縮不是啥核心技術,但壓縮性能以及進度處理還是需要關註下,針對使用較多的zip開源組件驗證,給大家提供個技術選型參考 之前在《.NET WebSocket高併發通信阻塞問題 - 唐宋元明清2188 - 博客園 (cnblogs.com)》講過,團隊 ...
  • 之前寫過兩篇關於Roslyn源生成器生成源代碼的用例,今天使用Roslyn的代碼修複器CodeFixProvider實現一個cs文件頭部註釋的功能, 代碼修複器會同時涉及到CodeFixProvider和DiagnosticAnalyzer, 實現FileHeaderAnalyzer 首先我們知道修 ...
  • 在軟體行業,經常會聽到一句話“文不如表,表不如圖”說明瞭圖形在軟體應用中的重要性。同樣在WPF開發中,為了程式美觀或者業務需要,經常會用到各種個樣的圖形。今天以一些簡單的小例子,簡述WPF開發中幾何圖形(Geometry)相關內容,僅供學習分享使用,如有不足之處,還請指正。 ...
  • 在 C# 中使用 RabbitMQ 通過簡訊發送重置後的密碼到用戶的手機號上,你可以按照以下步驟進行 1.安裝 RabbitMQ 客戶端庫 首先,確保你已經安裝了 RabbitMQ 客戶端庫。你可以通過 NuGet 包管理器來安裝: dotnet add package RabbitMQ.Clien ...
  • 1.下載 Protocol Buffers 編譯器(protoc) 前往 Protocol Buffers GitHub Releases 頁面。在 "Assets" 下找到適合您系統的壓縮文件,通常為 protoc-{version}-win32.zip 或 protoc-{version}-wi ...
  • 簡介 在現代微服務架構中,服務發現(Service Discovery)是一項關鍵功能。它允許微服務動態地找到彼此,而無需依賴硬編碼的地址。以前如果你搜 .NET Service Discovery,大概率會搜到一大堆 Eureka,Consul 等的文章。現在微軟為我們帶來了一個官方的包:Micr ...
  • ZY樹洞 前言 ZY樹洞是一個基於.NET Core開發的簡單的評論系統,主要用於大家分享自己心中的感悟、經驗、心得、想法等。 好了,不賣關子了,這個項目其實是上班無聊的時候寫的,為什麼要寫這個項目呢?因為我單純的想吐槽一下工作中的不滿而已。 項目介紹 項目很簡單,主要功能就是提供一個簡單的評論系統 ...