1. API基本概念 Flink程式可以對分散式集合進行轉換(例如: filtering, mapping, updating state, joining, grouping, defining windows, aggregating) 集合最初是從源創建的(例如,從文件、kafka主題或本地內 ...
1. API基本概念
Flink程式可以對分散式集合進行轉換(例如: filtering, mapping, updating state, joining, grouping, defining windows, aggregating)
集合最初是從源創建的(例如,從文件、kafka主題或本地記憶體集合中讀取)
結果通過sink返回,例如,可以將數據寫入(分散式)文件,或者寫入標準輸出(例如,命令行終端)
根據數據源的類型(有界或無界數據源),可以編寫批處理程式或流處理程式,其中使用DataSet API進行批處理,並使用DataStream API進行流處理。
Flink有特殊的類DataSet和DataStream來表示程式中的數據。在DataSet的情況下,數據是有限的,而對於DataStream,元素的數量可以是無限的。
Flink程式看起來像轉換數據集合的常規程式。每個程式都包含相同的基本部分:
- 獲取一個執行環境
- 載入/創建初始數據
- 指定數據上的轉換
- 指定計算結果放在哪裡
- 觸發程式執行
為了方便演示,先創建一個項目,可以從maven模板創建,例如:
mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-java \ -DarchetypeVersion=1.10.0 \ -DgroupId=com.cjs.example \ -DartifactId=flink-quickstart \ -Dversion=1.0.0-SNAPSHOT \ -Dpackage=com.cjs.example.flink \ -DinteractiveMode=false
也可以直接創建SpringBoot項目,自行引入依賴:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.10.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.10.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.10_2.11</artifactId> <version>1.10.0</version> </dependency>
StreamExecutionEnvironment是所有Flink程式的基礎。你可以在StreamExecutionEnvironment上使用以下靜態方法獲得一個:
getExecutionEnvironment() createLocalEnvironment() createRemoteEnvironment(String host, int port, String... jarFiles)
通常,只需要使用getExecutionEnvironment()即可,因為該方法會根據上下文自動推斷出當前的執行環境
從文件中讀取數據,例如:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> text = env.readTextFile("file:///path/to/file");
對DataStream應用轉換,例如:
DataStream<String> input = ...; DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() { @Override public Integer map(String value) { return Integer.parseInt(value); } });
通過創建一個sink將結果輸出,例如:
writeAsText(String path)
print()
最後,調用StreamExecutionEnvironment上的execute()執行:
// Triggers the program execution env.execute(); // Triggers the program execution asynchronously final JobClient jobClient = env.executeAsync(); final JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult(userClassloader).get();
下麵通過單詞統計的例子來加深對這一流程的理解,WordCount程式之於大數據就相當於是HelloWorld之於Java,哈哈哈
package com.cjs.example.flink; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; /** * Map-Reduce思想 * 先分組,再求和 * @author ChengJianSheng * @date 2020-05-26 */ public class WordCount { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<String> text = env.readTextFile("/Users/asdf/Desktop/input.txt"); DataSet<Tuple2<String, Integer>> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(new Tokenizer()) // group by the tuple field "0" and sum up tuple field "1" .groupBy(0) .sum(1); counts.writeAsCsv("/Users/asdf/Desktop/aaa", "\n", " "); env.execute(); } static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { // normalize and split the line String[] tokens = value.toLowerCase().split("\\W+"); // emit the pairs for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<>(token, 1)); } } } } }
為Tuple定義keys
Python中也有Tuple(元組)
DataStream<Tuple3<Integer,String,Long>> input = // [...] KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)
元組按第一個欄位(整數類型的欄位)分組
還可以使用POJO的屬性來定義keys,例如:
// some ordinary POJO (Plain old Java Object) public class WC { public String word; public int count; } DataStream<WC> words = // [...] DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);
先來瞭解一下KeyedStream
因此可以通過KeySelector方法來自定義
// some ordinary POJO public class WC {public String word; public int count;} DataStream<WC> words = // [...] KeyedStream<WC> keyed = words .keyBy(new KeySelector<WC, String>() { public String getKey(WC wc) { return wc.word; } });
如何指定轉換方法呢?
方式一:匿名內部類
data.map(new MapFunction<String, Integer> () { public Integer map(String value) { return Integer.parseInt(value); } });
方式二:Lamda
data.filter(s -> s.startsWith("http://"));
data.reduce((i1,i2) -> i1 + i2);
2. DataStream API
下麵這個例子,每10秒鐘統計一次來自Web Socket的單詞次數
package com.cjs.example.flink; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; public class WindowWordCount { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<String, Integer>> dataStream = env.socketTextStream("localhost", 9999) .flatMap(new Splitter()) .keyBy(0) .timeWindow(Time.seconds(10)) .sum(1); dataStream.print(); env.execute("Window WordCount"); } static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] words = value.split("\\W+"); for (String word : words) { out.collect(new Tuple2<String, Integer>(word, 1)); } } } }
為了運行此程式,首先要在終端啟動一個監聽
nc -lk 9999
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/datastream_api.html