本篇我們將使用Java語言來實現Flink的單詞統計。代碼開發環境準備導入Flink 1.9 pom依賴<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId ...
本篇我們將使用Java語言來實現Flink的單詞統計。
代碼開發
環境準備
導入Flink 1.9 pom依賴
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.9.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.9.0</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.7</version> </dependency> </dependencies>
構建Flink流處理環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
自定義source
每秒生成一行文本
DataStreamSource<String> wordLineDS = env.addSource(new RichSourceFunction<String>() { private boolean isCanal = false; private String[] words = { "important oracle jdk license update", "the oracle jdk license has changed for releases starting april 16 2019", "the new oracle technology network license agreement for oracle java se is substantially different from prior oracle jdk licenses the new license permits certain uses such as ", "personal use and development use at no cost but other uses authorized under prior oracle jdk licenses may no longer be available please review the terms carefully before ", "downloading and using this product an faq is available here ", "commercial license and support is available with a low cost java se subscription", "oracle also provides the latest openjdk release under the open source gpl license at jdk java net" }; @Override public void run(SourceContext<String> ctx) throws Exception { // 每秒發送一行文本 while (!isCanal) { int randomIndex = RandomUtils.nextInt(0, words.length); ctx.collect(words[randomIndex]); Thread.sleep(1000); } } @Override public void cancel() { isCanal = true; } });
單詞計算
// 3. 單詞統計 // 3.1 將文本行切分成一個個的單詞 SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> { // 切分單詞 Arrays.stream(line.split(" ")).forEach(word -> { ctx.collect(word); }); }).returns(Types.STRING); //3.2 將單詞轉換為一個個的元組 SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS .map(word -> Tuple2.of(word, 1)) .returns(Types.TUPLE(Types.STRING, Types.INT)); // 3.3 按照單詞進行分組 KeyedStream<Tuple2<String, Integer>, String> keyedDS = tupleDS.keyBy(tuple -> tuple.f0); // 3.4 對每組單詞數量進行累加 SingleOutputStreamOperator<Tuple2<String, Integer>> resultDS = keyedDS .timeWindow(Time.seconds(3)) .reduce((t1, t2) -> Tuple2.of(t1.f0, t1.f1 + t2.f1)); resultDS.print();
參考代碼
public class WordCount { public static void main(String[] args) throws Exception { // 1. 構建Flink流式初始化環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 自定義source - 每秒發送一行文本 DataStreamSource<String> wordLineDS = env.addSource(new RichSourceFunction<String>() { private boolean isCanal = false; private String[] words = { "important oracle jdk license update", "the oracle jdk license has changed for releases starting april 16 2019", "the new oracle technology network license agreement for oracle java se is substantially different from prior oracle jdk licenses the new license permits certain uses such as ", "personal use and development use at no cost but other uses authorized under prior oracle jdk licenses may no longer be available please review the terms carefully before ", "downloading and using this product an faq is available here ", "commercial license and support is available with a low cost java se subscription", "oracle also provides the latest openjdk release under the open source gpl license at jdk java net" }; @Override public void run(SourceContext<String> ctx) throws Exception { // 每秒發送一行文本 while (!isCanal) { int randomIndex = RandomUtils.nextInt(0, words.length); ctx.collect(words[randomIndex]); Thread.sleep(1000); } } @Override public void cancel() { isCanal = true; } }); // 3. 單詞統計 // 3.1 將文本行切分成一個個的單詞 SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> { // 切分單詞 Arrays.stream(line.split(" ")).forEach(word -> { ctx.collect(word); }); }).returns(Types.STRING); //3.2 將單詞轉換為一個個的元組 SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS .map(word -> Tuple2.of(word, 1)) .returns(Types.TUPLE(Types.STRING, Types.INT)); // 3.3 按照單詞進行分組 KeyedStream<Tuple2<String, Integer>, String> keyedDS = tupleDS.keyBy(tuple -> tuple.f0); // 3.4 對每組單詞數量進行累加 SingleOutputStreamOperator<Tuple2<String, Integer>> resultDS = keyedDS .timeWindow(Time.seconds(3)) .reduce((t1, t2) -> Tuple2.of(t1.f0, t1.f1 + t2.f1)); resultDS.print(); env.execute("app"); } }
Flink對Java Lambda表達式支持情況
Flink支持Java API所有操作符使用Lambda表達式。但是,但Lambda表達式使用Java泛型時,就需要聲明類型信息。
我們來看下上述的這段代碼:
SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> { // 切分單詞 Arrays.stream(line.split(" ")).forEach(word -> { ctx.collect(word); }); }).returns(Types.STRING);
之所以這裡將所有的類型信息,因為Flink無法正確自動推斷出來Collector中帶的泛型。我們來看一下FlatMapFuntion的源代碼
@Public
@FunctionalInterface
public interface FlatMapFunction<T, O> extends Function, Serializable {
/**
* The core method of the FlatMapFunction. Takes an element from the input data set and transforms
* it into zero, one, or more elements.
*
* @param value The input value.
* @param out The collector for returning result values.
*
* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
* to fail and may trigger recovery.
*/
void flatMap(T value, Collector<O> out) throws Exception;
}
我們發現 flatMap的第二個參數是Collector<O>,是一個帶參數的泛型。Java編譯器編譯該代碼時會進行參數類型擦除,所以Java編譯器會變成成:
void flatMap(T value, Collector out)
這種情況,Flink將無法自動推斷類型信息。如果我們沒有顯示地提供類型信息,將會出現以下錯誤:
org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface. Otherwise the type has to be specified explicitly using type information.
這種情況下,必須要顯示指定類型信息,否則輸出將返回值視為Object類型,這將導致Flink無法正確序列化。
所以,我們需要顯示地指定Lambda表達式的參數類型信息,並通過returns方法顯示指定輸出的類型信息
我們再看一段代碼:
SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS .map(word -> Tuple2.of(word, 1)) .returns(Types.TUPLE(Types.STRING, Types.INT));
為什麼map後面也需要指定類型呢?
因為此處map返回的是Tuple2類型,Tuple2是帶有泛型參數,在編譯的時候同樣會被查出泛型參數信息,導致Flink無法正確推斷。
更多關於對Java Lambda表達式的支持請參考官網:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/java_lambdas.html