參考:https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/api_concepts.html DataSet and DataStream Flink具有特殊類DataSet和DataStream來表示程式中的數據。 你可以 ...
參考:https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/api_concepts.html
DataSet and DataStream
Flink具有特殊類DataSet和DataStream來表示程式中的數據。 你可以將它們視為可以包含重覆項的不可變數據集合。Anatomy of a Flink Program Flink程式的剖析
Flink程式看起來像是轉換數據集合的常規程式。 每個程式包含相同的基本部分:- 獲得執行環境, Obtain an execution environment,
- 載入/創建初始數據, Load/create the initial data,
- 指定此數據的轉換, Specify transformations on this data,
- 指定放置計算結果的位置,Specify where to put the results of your computations,
- 觸發程式執行 Trigger the program execution
getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment(String host, int port, String... jarFiles)
通常,你只需要使用getExecutionEnvironment(),因為這將根據上下文執行正確的操作:如果你在IDE中執行程式或作為常規Java程式,它將創建一個本地環境,將執行你的程式 你的本地機器。 如果你從程式中創建了一個JAR文件,並通過命令行調用它,則Flink集群管理器將執行你的main方法,getExecutionEnvironment()將返回一個執行環境,用於在集群上執行你的程式。
對於指定數據源,執行環境有幾種方法可以使用各種方法從文件中讀取:你可以逐行讀取它們,CSV文件或使用完全自定義數據輸入格式。 要將文本文件作為一系列行讀取,你可以使用:final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> text = env.readTextFile("file:///path/to/file");這將為你提供一個DataStream,然後你可以在其上應用轉換來創建新的派生DataStream。 你可以通過使用轉換函數調用DataStream上的方法來應用轉換。 例如,map轉換如下所示:
DataStream<String> input = ...; DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() { @Override public Integer map(String value) { return Integer.parseInt(value); } });
這將通過將原始集合中的每個String轉換為Integer來創建新的DataStream。
一旦有了包含最終結果的DataStream,就可以通過創建接收器(sink)將其寫入外部系統。 這些只是創建接收器的一些示例方法:writeAsText(String path)
print()
一旦指定了完整的程式,就需要通過調用StreamExecutionEnvironment上的execute()來觸發程式執行。 根據ExecutionEnvironment的類型,將在本地電腦上觸發執行或提交程式以在群集上執行。
execute()方法返回一個JobExecutionResult,它包含執行時間和累加器結果。Lazy Evaluation 惰性求值
所有Flink程式都是惰性執行:當執行程式的main方法時,數據載入和轉換不會直接發生。 而是創建每個操作並將其添加到程式的計劃中。 當執行環境上的execute()調用顯式觸發執行時,實際執行操作。 程式是在本地執行還是在集群上執行取決於執行環境的類型。 惰性求值使你可以構建Flink作為一個整體計劃單元執行的複雜程式。Specifying Keys 指定鍵
一些轉換(join,coGroup,keyBy,groupBy)要求在元素集合上定義鍵。 其他轉換(Reduce,GroupReduce,Aggregate,Windows)允許數據在應用之前在鍵上分組。 DataSet分組:DataSet<...> input = // [...] DataSet<...> reduced = input .groupBy(/*define key here*/) .reduceGroup(/*do something*/);
DataStream設置鍵:
DataStream<...> input = // [...] DataStream<...> windowed = input .keyBy(/*define key here*/) .window(/*window specification*/);
Flink的數據模型不基於鍵值對。 因此,你無需將數據集類型物理打包到鍵和值中。 鍵是“虛擬的”:它們被定義為實際數據上的函數,以指導分組運算符。
Define keys for Tuples 定義元組的鍵
最簡單的情況是在元組的一個或多個欄位上對元組進行分組:DataStream<Tuple3<Integer,String,Long>> input = // [...] KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)
元組在第一個欄位(整數類型)上分組。
DataStream<Tuple3<Integer,String,Long>> input = // [...] KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0,1)
在這裡,我們將元組分組在由第一個和第二個欄位組成的複合鍵上。
註意嵌套元組:如果你有一個帶有嵌套元組的DataStream,例如:DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;
指定keyBy(0)將使系統使用完整的Tuple2作為鍵(以Integer和Float為鍵)。 如果要“導航”到嵌套的Tuple2中,則必須使用下麵解釋的欄位表達式鍵。
Define keys using Field Expressions 使用欄位表達式定義鍵
你可以使用基於字元串的欄位表達式來引用嵌套欄位,並定義用於grouping, sorting, joining或coGrouping的鍵。 欄位表達式可以非常輕鬆地選擇(嵌套)複合類型中的欄位,例如Tuple和POJO類型。 在下麵的示例中,我們有一個WC POJO,其中包含兩個欄位“word”和“count”。 要按欄位分組,我們只需將其名稱傳遞給keyBy()函數。// some ordinary POJO (Plain old Java Object) public class WC { public String word; public int count; } DataStream<WC> words = // [...] DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);
欄位表達式語法:
- 按欄位名稱選擇POJO欄位。 例如,“user”指的是POJO類型的“user”欄位。
- 按欄位名稱或0偏移欄位索引選擇元組欄位。 例如,“f0”和“5”分別表示Java元組類型的第一和第六欄位。
- 你可以在POJO和Tuples中選擇嵌套欄位。 例如,“user.zip”指的是POJO的“zip”欄位,其存儲在POJO類型的“user”欄位中。 支持任意嵌套和混合POJO和元組,例如“f1.user.zip”或“user.f3.1.zip”。
- 你可以使用“*”通配符表達式選擇完整類型。 這也適用於非Tuple或POJO類型的類型。
public static class WC { public ComplexNestedClass complex; //nested POJO private int count; // getter / setter for private field (count) public int getCount() { return count; } public void setCount(int c) { this.count = c; } } public static class ComplexNestedClass { public Integer someNumber; public float someFloat; public Tuple3<Long, Long, String> word; public IntWritable hadoopCitizen; }
這些是上面示例代碼的有效欄位表達式:
- “count”:WC類中的count欄位。
- “complex”:遞歸選擇POJO類型ComplexNestedClass的欄位複合體的所有欄位。
- “complex.word.f2”:選擇嵌套Tuple3的最後一個欄位。
- “complex.hadoopCitizen”:選擇Hadoop IntWritable類型。
Define keys using Key Selector Functions 使用鍵選擇器函數定義鍵
定義鍵的另一種方法是“鍵選擇器”函數。 鍵選擇器函數將單個元素作為輸入並返回元素的鍵。 鍵可以是任何類型,並且可以從確定性計算中輸出。 以下示例顯示了一個鍵選擇器函數,它只返回一個對象的欄位:// some ordinary POJO public class WC {public String word; public int count;} DataStream<WC> words = // [...] KeyedStream<WC> keyed = words .keyBy(new KeySelector<WC, String>() { public String getKey(WC wc) { return wc.word; } });
Specifying Transformation Functions 指定轉換函數
大多數轉換都需要用戶定義的函數。 本節列出瞭如何指定它們的不同方法。Implementing an interface 實現介面
最基本的方法是實現一個提供的介面:class MyMapFunction implements MapFunction<String, Integer> { public Integer map(String value) { return Integer.parseInt(value); } }; data.map(new MyMapFunction());
Anonymous classes 匿名類
你可以將函數作為匿名類傳遞:
data.map(new MapFunction<String, Integer> () { public Integer map(String value) { return Integer.parseInt(value); } });
Java 8 Lambdas 表達式
Flink還支持Java API中的Java 8 Lambdas。data.filter(s -> s.startsWith("http://"));
data.reduce((i1,i2) -> i1 + i2);
Rich functions 富函數
所有需要用戶定義函數的轉換都可以將富函數作為參數。 例如,替換class MyMapFunction implements MapFunction<String, Integer> { public Integer map(String value) { return Integer.parseInt(value); } };你可以寫
class MyMapFunction extends RichMapFunction<String, Integer> { public Integer map(String value) { return Integer.parseInt(value); } };並像往常一樣將函數傳遞給map轉換:
data.map(new MyMapFunction());
富函數也可以定義為匿名類:
data.map (new RichMapFunction<String, Integer>() { public Integer map(String value) { return Integer.parseInt(value); } });除了用戶定義的函數(map,reduce等)之外,Rich函數還提供了四種方法:open,close,getRuntimeContext和setRuntimeContext。 這些用於參數化函數,創建和完成本地狀態,訪問廣播變數以及訪問運行時信息(如累加器和計數器)以及迭代信息。
Supported Data Types 支持的數據類型
Flink對DataSet或DataStream中可以包含的元素類型設置了一些限制。原因是系統分析類型以確定有效的執行策略。 有六種不同類別的數據類型:- 元組(Java Tuples and Scala Case Classes)
- Java普通對象(Java POJOs)
- 基本類型(Primitive Types)
- 常規類(Regular Classes)
- 值類型(Values)
- Hadoop可寫介面的實現(Hadoop Writables)
- 特殊類型(Special Types)
Tuples and Case Classes 元組
元組是包含固定數量的具有各種類型的欄位的複合類型。 Java API提供從Tuple1到Tuple25的類。 元組的每個欄位都可以是包含更多元組的任意Flink類型,從而產生嵌套元組。 可以使用欄位名稱tuple.f4直接訪問元組的欄位,也可以使用通用getter方法tuple.getField(int position)。 欄位索引從0開始。請註意,這與Scala元組形成對比,但它與Java的一般索引更為一致。DataStream<Tuple2<String, Integer>> wordCounts = env.fromElements( new Tuple2<String, Integer>("hello", 1), new Tuple2<String, Integer>("world", 2)); wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() { @Override public Integer map(Tuple2<String, Integer> value) throws Exception { return value.f1; } }); wordCounts.keyBy(0); // also valid .keyBy("f0")
POJO Java普通對象
如果滿足以下要求,則Flink將Java和Scala類視為特殊的POJO數據類型:- 類必須是公共的。
- 它必須有一個沒有參數的公共構造函數(預設構造函數)。
- 所有欄位都是公共的,或者必須通過getter和setter函數訪問。 對於名為foo的欄位,getter和setter方法必須命名為getFoo()和setFoo()。
- Flink必須支持欄位的類型。 目前,Flink使用Avro序列化任意對象(例如Date)。
public class WordWithCount { public String word; public int count; public WordWithCount() {} public WordWithCount(String word, int count) { this.word = word; this.count = count; } } DataStream<WordWithCount> wordCounts = env.fromElements( new WordWithCount("hello", 1), new WordWithCount("world", 2)); wordCounts.keyBy("word"); // key by field expression "word"
Primitive Types 基本類型
Flink支持所有Java和Scala基本類型,如Integer,String和Double。General Class Types 常規類類型
Flink支持大多數Java和Scala類(API和自定義)。 限制適用於包含無法序列化的欄位的類,如文件指針,I/O流或其他本機資源。 遵循Java Beans約定的類通常可以很好地工作。 所有未標識為POJO類型的類(請參閱上面的POJO要求)都由Flink作為常規類類型處理。 Flink將這些數據類型視為黑盒子,並且無法訪問其內容(即,用於高效排序)。 使用序列化框架Kryo對常規類型進行序列化和反序列化。Values 值類型
值類型需手動描述其序列化和反序列化。它們不是通過通用序列化框架,而是通過使用讀取和寫入方法實現org.apache.flinktypes.Value介面來為這些操作提供自定義代碼。當通用序列化效率非常低時,使用值類型是合理的。一個示例是將元素的稀疏向量實現為數組的數據類型。知道數組大部分為零,可以對非零元素使用特殊編碼,而通用序列化只需編寫所有數組元素。 org.apache.flinktypes.CopyableValue介面以類似的方式支持手動內部克隆邏輯。 Flink帶有與基本數據類型對應的預定義值類型。 (ByteValue,ShortValue,IntValue,LongValue,FloatValue,DoubleValue,StringValue,CharValue,BooleanValue)。這些值類型充當基本數據類型的可變變體:它們的值可以被更改,允許程式員重用對象並降低垃圾收集器壓力。Hadoop Writables Hadoop可寫介面的實現
你可以使用實現org.apache.hadoop.Writable介面的類型。 write()和readFields()方法中定義的序列化邏輯將用於序列化。Special Types 特殊類型
你可以使用特殊類型,包括Scala的Either,Option和Try。 Java API有自己的自定義Either實現。 與Scala的Either類似,它代表兩種可能類型的值,左或右。 兩者都可用於錯誤處理或需要輸出兩種不同類型記錄的運算符。Type Erasure & Type Inference 類型擦除和類型推斷
註意:本節僅適用於Java。 Java編譯器在編譯後拋棄了大部分泛型類型信息。這在Java中稱為類型擦除。這意味著在運行時,對象的實例不再知道其泛型類型。例如,DataStream <String>和DataStream <Long>的實例在JVM看來是一樣的。 Flink在準備執行程式時(當調用程式的main方法時)需要類型信息。 Flink Java API嘗試重建以各種方式丟棄的類型信息,並將其顯式存儲在數據集和運算符中。你可以通過DataStream.getType()檢索類型。該方法返回TypeInformation的一個實例,這是Flink表示類型的內部方式。 類型推斷有其局限性,在某些情況下需要程式員的“合作”。這方面的示例是從集合創建數據集的方法,例如ExecutionEnvironment.fromCollection(),你可以在其中傳遞描述類型的參數。但是像MapFunction<I,O>這樣的通用函數也可能需要額外的類型信息。 ResultTypeQueryable介面可以通過輸入格式和函數實現,以明確告知API其返回類型。調用函數的輸入類型通常可以通過先前操作的結果類型來推斷。Accumulators & Counters 累加器和計數器
累加器是具有增加操作(add operation)和最終累積結果(final accumulated result)的簡單構造,可在作業結束後使用。 最直接的累加器是一個計數器(counter):你可以使用Accumulator.add(V value)方法遞增它。 在工作結束時,Flink將彙總(合併)所有部分結果並將結果發送給客戶。 在調試過程中,或者如果你想快速瞭解有關數據的更多信息,累加器非常有用。 Flink目前有以下內置累加器。 它們中的每一個都實現了Accumulator介面。- IntCounter,LongCounter和DoubleCounter:請參閱下麵的使用計數器的示例。
- 直方圖(Histogram):離散數量的區間的直方圖實現。 在內部,它只是一個從Integer到Integer的映射。 你可以使用它來計算值的分佈,例如 字數統計程式的每行字數分佈。
How to use accumulators: 如何使用累加器:
首先,你必須在要使用它的用戶定義轉換函數中創建累加器對象(此處為計數器)。private IntCounter numLines = new IntCounter();其次,你必須註冊累加器對象,通常在富函數的open()方法中。 在這裡你還可以定義名稱。
getRuntimeContext().addAccumulator("num-lines", this.numLines);
你現在可以在運算符函數中的任何位置使用累加器,包括open()和close()方法。
this.numLines.add(1);
整個結果將存儲在JobExecutionResult對象中,該對象是從執行環境的execute()方法返回的(當前這僅在執行等待作業完成時才有效)。
myJobExecutionResult.getAccumulatorResult("num-lines")每個作業的所有累加器共用一個命名空間。 因此,你可以在作業的不同運算符函數中使用相同的累加器。 Flink將在內部合併所有具有相同名稱的累加器。 關於累加器和迭代的說明:目前累加器的結果僅在整個作業結束後才可用。 我們還計劃在下一次迭代中使前一次迭代的結果可用。 你可以使用聚合器來計算每次迭代統計信息,並根據此類統計信息確定迭代的終止。