【Flink入門修煉】1-3 Flink WordCount 入門實現

来源:https://www.cnblogs.com/shuofxz/p/18011142
-Advertisement-
Play Games

本篇文章將帶大家運行 Flink 最簡單的程式 WordCount。先實踐後理論,對其基本輸入輸出、編程代碼有初步瞭解,後續篇章再對 Flink 的各種概念和架構進行介紹。 下麵將從創建項目開始,介紹如何創建出一個 Flink 項目;然後從 DataStream 流處理和 FlinkSQL 執行兩種... ...


本篇文章將帶大家運行 Flink 最簡單的程式 WordCount。先實踐後理論,對其基本輸入輸出、編程代碼有初步瞭解,後續篇章再對 Flink 的各種概念和架構進行介紹。
下麵將從創建項目開始,介紹如何創建出一個 Flink 項目;然後從 DataStream 流處理和 FlinkSQL 執行兩種方式來帶大家學習 WordCount 程式的開發。
Flink 各版本之間變化較多,之前版本的函數在後續版本可能不再支持。跟隨學習時,請儘量選擇和筆者同版本的 Flink。本文使用的 Flink 版本是 1.13.2。

一、創建項目

在很多其他教程中,會看到如下來創建 Flink 程式的方式。雖然簡單方便,但對初學者來說,不知道初始化項目的時候做了什麼,如果報錯了也不知道該如何排查。

mvn archetype:generate
-DarchetypeGroupId=org.apache.flink
-DarchetypeArtifactId=flink-quickstart-java
-DarchetypeVersion=1.13.2
通過指定 Maven 工程的三要素,即 GroupId、ArtifactId、Version 來創建一個新的工程。同時 Flink 給我提供了更為方便的創建 Flink 工程的方法:
curl https://flink.apache.org/q/quickstart.sh | bash -s 1.13.2

因此,我們手動來創建一個 Maven 項目,看看到底如何創建出一個 Flink 項目。
1、通過 IDEA 創建一個 Maven 項目
image.png

2、pom.xml 添加:
這裡我們選擇的是 Flink 1.13.2 版本(Flink 1.14 之後部分類和函數有變化,可自行探索)。

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.13.2</flink.version> <!-- 1.14 之後部分類和函數有變化,可自行探索 -->
        <target.java.version>1.8</target.java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <maven.compiler.source>${target.java.version}</maven.compiler.source>
        <maven.compiler.target>${target.java.version}</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

二、DataStream WordCount

一)編寫程式

基礎項目環境已經搞好了,接下來我們模仿一個流式環境,監聽本地的 Socket 埠,使用 Flink 統計流入的不同單詞個數。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


public class SocketTextStreamWordCount {
    public static void main(String[] args) throws Exception {
        //參數檢查
        if (args.length != 2) {
            // System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>");
            // return;
            args = new String[]{"127.0.0.1", "9000"};
        }

        String hostname = args[0];
        Integer port = Integer.parseInt(args[1]);


        // 創建 streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 獲取數據
        DataStreamSource<String> stream = env.socketTextStream(hostname, port);

        // 計數
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = stream.flatMap(new LineSplitter())
                .keyBy(0)
                .sum(1);

        sum.print();

        env.execute("Java WordCount from SocketTextStream Example");
    }

    public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) {
            String[] tokens = s.toLowerCase().split("\\W+");

            for (String token: tokens) {
                if (token.length() > 0) {
                    collector.collect(new Tuple2<String, Integer>(token, 1));
                }
            }
        }
    }
}

二)測試

接下來我們進行程式測試。
我們在本地使用 netcat 命令啟動一個埠:

nc -l 9000

然後啟動程式,能看到控制台一些輸出:
image.png

接下來,在 nc 中輸入:

$ nc -l 9000
hello world
flink flink flink

回到我們的程式,能看到統計的輸出:

3> (hello,1)
6> (world,1)
8> (flink,1)
8> (flink,2)
8> (flink,3)

image.png

三)如果有報錯

如果出現執行報錯:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/api/java/io/TextInputFormat
	at com.shuofxz.SocketTextStreamWordCount.main(SocketTextStreamWordCount.java:25)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.java.io.TextInputFormat
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
	... 1 more

在 IDE 中把 「Add dependencies with "Provided" scope to classpath」勾選上:
image.png

一)介紹 FlinkSQL

Flink SQL 是 Flink 實時計算為簡化計算模型,降低用戶使用實時計算門檻而設計的一套符合標準 SQL 語義的開發語言。
上面單詞統計的邏輯可以轉化為下麵的 SQL。
直接來看這個 SQL:

select word as word, sum(frequency) as frequency from WordCount group by word
  • WordCount 是要進行單詞統計的表,我們會先做一些處理,將輸入的單詞都存放到這個表中
  • 表我們定義為兩列(word, frequency),初始轉化輸入每個單詞占一行,frequency 都是 1
  • 然後,就可以按照 SQL 的邏輯來進行統計聚合了。

其中,WordCount 表數據如下:

word frequency
hello 1
world 1
flink 1
flink 1
flink 1

那麼接下來我們看,如何寫一個 FlinkSQL 的程式。

二)環境和程式

首先,添加 FlinkSQL 需要的依賴:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

程式如下:

public class SQLWordCount {
    public static void main(String[] args) throws Exception {
        // 創建上下文環境
        ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);

        // 讀取一行模擬數據作為輸入
        String words = "hello world flink flink flink";
        String[] split = words.split("\\W+");

        ArrayList<WC> list = new ArrayList<>();

        for (String word : split) {
            WC wc = new WC(word, 1);
            list.add(wc);
        }

        DataSource<WC> input = fbEnv.fromCollection(list);

        // DataSet 轉 SQL,指定欄位名
        Table table = fbTableEnv.fromDataSet(input, "word,frequency");
        table.printSchema();

        // 註冊為一個表
        fbTableEnv.createTemporaryView("WordCount", table);

        Table table1 = fbTableEnv.sqlQuery("select word as word, sum(frequency) as frequency from WordCount group by word");

        DataSet<WC> ds1 = fbTableEnv.toDataSet(table1, WC.class);
        ds1.printToErr();
    }

    public static class WC {
        public String word;
        public long frequency;

        public WC() {}

        public WC(String word, long frequency) {
            this.word = word;
            this.frequency = frequency;
        }

        @Override
        public String toString() {
            return  word + ", " + frequency;
        }
    }
}

執行,結果輸出:

(
  `word` STRING,
  `frequency` BIGINT
)
flink, 3
world, 1
hello, 1

image.png

四、小結

本篇手把手的帶大家搭建起 Flink Maven 項目,然後使用 DataStream 和 FlinkSQL 兩種方式來學習 WordCount 單詞計數這一最簡單最經典的 Flink 程式開發。跟著步驟一步步執行下來,大家應該對 Flink 程式基本執行流程有個初步的瞭解,為後續的學習打下了基礎。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 主機發現 arp-scan -l ┌──(root㉿kali)-[~] └─# arp-scan -l Interface: eth0, type: EN10MB, MAC: 00:0c:29:6b:ed:27, IPv4: 192.168.100.251 Starting arp-scan 1.1 ...
  • 大家好,我是痞子衡,是正經搞技術的痞子。今天痞子衡給大家介紹的是SRC_SBMRx寄存器對於定位i.MXRT1xxx離線無法啟動問題的意義。 最近有一位開源社區大佬在使能 RT1050 BEE 加密過程中遇到無法啟動問題,折騰到一度崩潰,甚至想要棄坑。痞子衡哪能讓這位“老鄉”跑掉,連忙給予緊急支持, ...
  • 一直知道 OpenWrt 經常拿來做軟路由軟體。最近買了個二手小主機升騰 C92 來做旁路由伺服器,其被歸為瘦客戶機一類,感覺和工控機差不多,現價百元以內。想就著這個機會,瞭解體驗一下 OpenWrt。 ...
  • 封裝WINDOWS10系統 一、準備工具: 1、安裝NTLite。下載鏈接:https://www.ntlite.com/download/ 2、準備一個ISO的PE例如:FirPE。下載鏈接https://firpe.cn/page-247 3、安裝VMware 4、下載封裝工具Easy Sysp ...
  • 瀏覽器每次打開都有個煩人的提示要獲取將來的 microsoft edge 更新,需要 macos 10.15 或更高版本,找了很久也沒有解決辦法,有 windows 端的解決方案,有禁止更新的解決方案,就是沒有 Mac 上如何避免這個告警的方案,於是走上 Edge 定製化之旅。 效果 使用前後對比 ...
  • 在 Linux 中,你可以使用 alias 命令來設置別名。別名允許你為常用的命令創建自定義的縮寫或快捷方式。這些別名會存在於當前會話中,如果你希望使別名永久生效,可以將其添加到你的 shell 配置文件中(例如 .bashrc、.bash_profile、.zshrc 等)。 1、查看已有別名 通 ...
  • 硬體開發板:STM32G0B1RET6 軟體平臺:cubemax+keil+VScode 內容原著聲明 代碼借鑒學習於以下文章: STM32 使用硬體IIC驅動0.96寸4針IOLED顯示器(HAL庫) 1 新建cubemax工程 1.1 配置系統時鐘RCC 1.2 配置引腳 1.3 導出工程 略. ...
  • Java 迴圈 迴圈可以執行一個代碼塊,只要達到指定的條件。迴圈很方便,因為它們節省時間,減少錯誤,並使代碼更易讀。 Java While 迴圈 while 迴圈會迴圈執行一個代碼塊,只要指定的條件為真: 語法 while (condition) { // 要執行的代碼塊 } 在下麵的示例中,只要變 ...
一周排行
    -Advertisement-
    Play Games
  • 1、預覽地址:http://139.155.137.144:9012 2、qq群:801913255 一、前言 隨著網路的發展,企業對於信息系統數據的保密工作愈發重視,不同身份、角色對於數據的訪問許可權都應該大相徑庭。 列如 1、不同登錄人員對一個數據列表的可見度是不一樣的,如數據列、數據行、數據按鈕 ...
  • 前言 上一篇文章寫瞭如何使用RabbitMQ做個簡單的發送郵件項目,然後評論也是比較多,也是準備去學習一下如何確保RabbitMQ的消息可靠性,但是由於時間原因,先來說說設計模式中的簡單工廠模式吧! 在瞭解簡單工廠模式之前,我們要知道C#是一款面向對象的高級程式語言。它有3大特性,封裝、繼承、多態。 ...
  • Nodify學習 一:介紹與使用 - 可樂_加冰 - 博客園 (cnblogs.com) Nodify學習 二:添加節點 - 可樂_加冰 - 博客園 (cnblogs.com) 介紹 Nodify是一個WPF基於節點的編輯器控制項,其中包含一系列節點、連接和連接器組件,旨在簡化構建基於節點的工具的過程 ...
  • 創建一個webapi項目做測試使用。 創建新控制器,搭建一個基礎框架,包括獲取當天日期、wiki的請求地址等 創建一個Http請求幫助類以及方法,用於獲取指定URL的信息 使用http請求訪問指定url,先運行一下,看看返回的內容。內容如圖右邊所示,實際上是一個Json數據。我們主要解析 大事記 部 ...
  • 最近在不少自媒體上看到有關.NET與C#的資訊與評價,感覺大家對.NET與C#還是不太瞭解,尤其是對2016年6月發佈的跨平臺.NET Core 1.0,更是知之甚少。在考慮一番之後,還是決定寫點東西總結一下,也回顧一下.NET的發展歷史。 首先,你沒看錯,.NET是跨平臺的,可以在Windows、 ...
  • Nodify學習 一:介紹與使用 - 可樂_加冰 - 博客園 (cnblogs.com) Nodify學習 二:添加節點 - 可樂_加冰 - 博客園 (cnblogs.com) 添加節點(nodes) 通過上一篇我們已經創建好了編輯器實例現在我們為編輯器添加一個節點 添加model和viewmode ...
  • 前言 資料庫併發,數據審計和軟刪除一直是數據持久化方面的經典問題。早些時候,這些工作需要手寫複雜的SQL或者通過存儲過程和觸發器實現。手寫複雜SQL對軟體可維護性構成了相當大的挑戰,隨著SQL字數的變多,用到的嵌套和複雜語法增加,可讀性和可維護性的難度是幾何級暴漲。因此如何在實現功能的同時控制這些S ...
  • 類型檢查和轉換:當你需要檢查對象是否為特定類型,並且希望在同一時間內將其轉換為那個類型時,模式匹配提供了一種更簡潔的方式來完成這一任務,避免了使用傳統的as和is操作符後還需要進行額外的null檢查。 複雜條件邏輯:在處理複雜的條件邏輯時,特別是涉及到多個條件和類型的情況下,使用模式匹配可以使代碼更 ...
  • 在日常開發中,我們經常需要和文件打交道,特別是桌面開發,有時候就會需要載入大批量的文件,而且可能還會存在部分文件缺失的情況,那麼如何才能快速的判斷文件是否存在呢?如果處理不當的,且文件數量比較多的時候,可能會造成卡頓等情況,進而影響程式的使用體驗。今天就以一個簡單的小例子,簡述兩種不同的判斷文件是否... ...
  • 前言 資料庫併發,數據審計和軟刪除一直是數據持久化方面的經典問題。早些時候,這些工作需要手寫複雜的SQL或者通過存儲過程和觸發器實現。手寫複雜SQL對軟體可維護性構成了相當大的挑戰,隨著SQL字數的變多,用到的嵌套和複雜語法增加,可讀性和可維護性的難度是幾何級暴漲。因此如何在實現功能的同時控制這些S ...