本篇文章將帶大家運行 Flink 最簡單的程式 WordCount。先實踐後理論,對其基本輸入輸出、編程代碼有初步瞭解,後續篇章再對 Flink 的各種概念和架構進行介紹。 下麵將從創建項目開始,介紹如何創建出一個 Flink 項目;然後從 DataStream 流處理和 FlinkSQL 執行兩種... ...
本篇文章將帶大家運行 Flink 最簡單的程式 WordCount。先實踐後理論,對其基本輸入輸出、編程代碼有初步瞭解,後續篇章再對 Flink 的各種概念和架構進行介紹。
下麵將從創建項目開始,介紹如何創建出一個 Flink 項目;然後從 DataStream 流處理和 FlinkSQL 執行兩種方式來帶大家學習 WordCount 程式的開發。
Flink 各版本之間變化較多,之前版本的函數在後續版本可能不再支持。跟隨學習時,請儘量選擇和筆者同版本的 Flink。本文使用的 Flink 版本是 1.13.2。
一、創建項目
在很多其他教程中,會看到如下來創建 Flink 程式的方式。雖然簡單方便,但對初學者來說,不知道初始化項目的時候做了什麼,如果報錯了也不知道該如何排查。
mvn archetype:generate
-DarchetypeGroupId=org.apache.flink
-DarchetypeArtifactId=flink-quickstart-java
-DarchetypeVersion=1.13.2
通過指定 Maven 工程的三要素,即 GroupId、ArtifactId、Version 來創建一個新的工程。同時 Flink 給我提供了更為方便的創建 Flink 工程的方法:
curl https://flink.apache.org/q/quickstart.sh | bash -s 1.13.2
因此,我們手動來創建一個 Maven 項目,看看到底如何創建出一個 Flink 項目。
1、通過 IDEA 創建一個 Maven 項目
2、pom.xml
添加:
這裡我們選擇的是 Flink 1.13.2 版本(Flink 1.14 之後部分類和函數有變化,可自行探索)。
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.13.2</flink.version> <!-- 1.14 之後部分類和函數有變化,可自行探索 -->
<target.java.version>1.8</target.java.version>
<scala.binary.version>2.12</scala.binary.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
二、DataStream WordCount
一)編寫程式
基礎項目環境已經搞好了,接下來我們模仿一個流式環境,監聽本地的 Socket 埠,使用 Flink 統計流入的不同單詞個數。
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class SocketTextStreamWordCount {
public static void main(String[] args) throws Exception {
//參數檢查
if (args.length != 2) {
// System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>");
// return;
args = new String[]{"127.0.0.1", "9000"};
}
String hostname = args[0];
Integer port = Integer.parseInt(args[1]);
// 創建 streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 獲取數據
DataStreamSource<String> stream = env.socketTextStream(hostname, port);
// 計數
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = stream.flatMap(new LineSplitter())
.keyBy(0)
.sum(1);
sum.print();
env.execute("Java WordCount from SocketTextStream Example");
}
public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) {
String[] tokens = s.toLowerCase().split("\\W+");
for (String token: tokens) {
if (token.length() > 0) {
collector.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}
}
二)測試
接下來我們進行程式測試。
我們在本地使用 netcat 命令啟動一個埠:
nc -l 9000
然後啟動程式,能看到控制台一些輸出:
接下來,在 nc 中輸入:
$ nc -l 9000
hello world
flink flink flink
回到我們的程式,能看到統計的輸出:
3> (hello,1)
6> (world,1)
8> (flink,1)
8> (flink,2)
8> (flink,3)
三)如果有報錯
如果出現執行報錯:
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/api/java/io/TextInputFormat
at com.shuofxz.SocketTextStreamWordCount.main(SocketTextStreamWordCount.java:25)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.java.io.TextInputFormat
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
... 1 more
在 IDE 中把 「Add dependencies with "Provided" scope to classpath」勾選上:
三、Flink Table & SQL WordCount
一)介紹 FlinkSQL
Flink SQL 是 Flink 實時計算為簡化計算模型,降低用戶使用實時計算門檻而設計的一套符合標準 SQL 語義的開發語言。
上面單詞統計的邏輯可以轉化為下麵的 SQL。
直接來看這個 SQL:
select word as word, sum(frequency) as frequency from WordCount group by word
WordCount
是要進行單詞統計的表,我們會先做一些處理,將輸入的單詞都存放到這個表中- 表我們定義為兩列
(word, frequency)
,初始轉化輸入每個單詞占一行,frequency 都是 1 - 然後,就可以按照 SQL 的邏輯來進行統計聚合了。
其中,WordCount
表數據如下:
word | frequency |
---|---|
hello | 1 |
world | 1 |
flink | 1 |
flink | 1 |
flink | 1 |
那麼接下來我們看,如何寫一個 FlinkSQL 的程式。
二)環境和程式
首先,添加 FlinkSQL 需要的依賴:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
程式如下:
public class SQLWordCount {
public static void main(String[] args) throws Exception {
// 創建上下文環境
ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
// 讀取一行模擬數據作為輸入
String words = "hello world flink flink flink";
String[] split = words.split("\\W+");
ArrayList<WC> list = new ArrayList<>();
for (String word : split) {
WC wc = new WC(word, 1);
list.add(wc);
}
DataSource<WC> input = fbEnv.fromCollection(list);
// DataSet 轉 SQL,指定欄位名
Table table = fbTableEnv.fromDataSet(input, "word,frequency");
table.printSchema();
// 註冊為一個表
fbTableEnv.createTemporaryView("WordCount", table);
Table table1 = fbTableEnv.sqlQuery("select word as word, sum(frequency) as frequency from WordCount group by word");
DataSet<WC> ds1 = fbTableEnv.toDataSet(table1, WC.class);
ds1.printToErr();
}
public static class WC {
public String word;
public long frequency;
public WC() {}
public WC(String word, long frequency) {
this.word = word;
this.frequency = frequency;
}
@Override
public String toString() {
return word + ", " + frequency;
}
}
}
執行,結果輸出:
(
`word` STRING,
`frequency` BIGINT
)
flink, 3
world, 1
hello, 1
四、小結
本篇手把手的帶大家搭建起 Flink Maven 項目,然後使用 DataStream 和 FlinkSQL 兩種方式來學習 WordCount 單詞計數這一最簡單最經典的 Flink 程式開發。跟著步驟一步步執行下來,大家應該對 Flink 程式基本執行流程有個初步的瞭解,為後續的學習打下了基礎。