一、DataStream API概述 Flink 中的 DataStream 程式是對數據流(例如過濾、更新狀態、定義視窗、聚合)進行轉換的常規程式。數據流的起始是從各種源(例如消息隊列、套接字流、文件)創建的。結果通過 sink 返回,例如可以將數據寫入文件或標準輸出(例如命令行終端)。Flink ...
目錄
一、DataStream API概述
Flink 中的 DataStream 程式是對數據流(例如過濾、更新狀態、定義視窗、聚合)進行轉換的常規程式。數據流的起始是從各種源(例如消息隊列、套接字流、文件)創建的。結果通過 sink 返回,例如可以將數據寫入文件或標準輸出(例如命令行終端)。Flink 程式可以在各種上下文中運行,可以獨立運行,也可以嵌入到其它程式中。任務執行可以運行在本地 JVM 中,也可以運行在多台機器的集群上。
二、什麼是DataStream ?
- DataStream API 得名於特殊的 DataStream 類,該類用於表示 Flink 程式中的數據集合。你可以認為 它們是可以包含重覆項的不可變數據集合。這些數據可以是有界(有限)的,也可以是無界(無限)的,但用於處理它們的API是相同的。
- DataStream 在用法上類似於常規的 Java 集合,但在某些關鍵方面卻大不相同。它們是不可變的,這意味著一旦它們被創建,你就不能添加或刪除元素。你也不能簡單地察看內部元素,而只能使用 DataStream API 操作來處理它們,DataStream API 操作也叫作轉換(transformation)。
- 你可以通過在 Flink 程式中添加 source 創建一個初始的 DataStream。然後,你可以基於 DataStream 派生新的流,並使用 map、filter 等 API 方法把 DataStream 和派生的流連接在一起。
三、DataStream 數據處理過程
1)Data Sources(數據源)
1、Data Sources 原理
官方文檔
一個數據 source 包括三個核心組件:分片(Splits)、分片枚舉器(SplitEnumerator) 以及 源閱讀器(SourceReader)。
-
分片(Split) 是對一部分 source 數據的包裝,如一個文件或者日誌分區。分片是 source 進行任務分配和數據並行讀取的基本粒度。
-
源閱讀器(SourceReader) 會請求分片併進行處理,例如讀取分片所表示的文件或日誌分區。SourceReader 在
TaskManagers
上的 SourceOperators 並行運行,並產生並行的事件流/記錄流。 -
分片枚舉器(SplitEnumerator) 會生成分片並將它們分配給 SourceReader。該組件在
JobManager
上以單並行度運行,負責對未分配的分片進行維護,並以均衡的方式將其分配給 reader。SplitEnumerator 被認為是整個 Source 的“大腦”。
2、Data Sources 實現方式
1)基於文件
Source 是你的程式從中讀取其輸入的地方。你可以用
StreamExecutionEnvironment.addSource(sourceFunction)
將一個 source 關聯到你的程式。Flink 自帶了許多預先實現的 source functions,不過你仍然可以通過實現 SourceFunction 介面編寫自定義的非並行 source,也可以通過實現 ParallelSourceFunction 介面或者繼承 RichParallelSourceFunction 類編寫自定義的並行 sources。 通過 StreamExecutionEnvironment 可以訪問多種預定義的 stream source,source 連接器,請查看連接器文檔。
readTextFile(path)
:讀取文本文件。readFile(fileInputFormat, path)
- 按照指定的文件輸入格式讀取(一次)文件。readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)
:這是前兩個方法內部調用的方法。它基於給定的 fileInputFormat 讀取路徑 path 上的文件。根據提供的watchType
的不同,source 可能定期(每 interval 毫秒)監控路徑上的新數據(watchType 為 FileProcessingMode.PROCESS_CONTINUOUSLY),或者處理一次當前路徑中的數據然後退出(watchType 為 FileProcessingMode.PROCESS_ONCE)。使用 pathFilter,用戶可以進一步排除正在處理的文件。
2)基於套接字
socketTextStream
:套接字讀取。元素可以由分隔符分隔。
3)基於集合
-
fromCollection(Collection)
:從 Java Java.util.Collection 創建數據流。集合中的所有元素必須屬於同一類型。 -
fromCollection(Iterator, Class)
:從迭代器創建數據流。class 參數指定迭代器返回元素的數據類型。 -
fromElements(T ...)
:從給定的對象序列中創建數據流。所有的對象必須屬於同一類型。 -
fromParallelCollection(SplittableIterator, Class)
:從迭代器並行創建數據流。class 參數指定迭代器返回元素的數據類型。 -
generateSequence(from, to)
:基於給定間隔內的數字序列並行生成數據流。
4)自定義
addSource
:關聯一個新的 source function。例如,你可以使用 addSource(new FlinkKafkaConsumer<>(...)) 來從 Apache Kafka 獲取數據。更多詳細信息見連接器。
2)DataStream Transformations(數據流轉換//處理/運算元)
【溫馨提示】是用戶通過
運算元
能將一個或多個 DataStream 轉換成新的 DataStream,在應用程式中可以將多個數據轉換運算元合併成一個複雜的數據流拓撲。這部分內容將描述 Flink DataStream API 中基本的數據轉換API,數據轉換後各種數據分區方式,以及運算元的鏈接策略。
1、數據流轉換
運算元 | 數據轉換 | 解釋 | 示例 |
---|---|---|---|
Map | DataStream → DataStream | 獲取一個元素並生成一個元素。將輸入流的值加倍的映射函數 | dataStream.map { x => x * 2 } |
FlatMap | DataStream → DataStream | 獲取一個元素並生成零個、一個或多個元素。將句子拆分為單詞的flatmap函數 | dataStream.flatMap { str => str.split(" ") } |
Filter | DataStream → DataStream | 為每個元素計算布爾函數,並保留該函數返回true的元素。過濾掉零值的過濾器 | dataStream.filter { _ != 0 } |
KeyBy | DataStream → KeyedStream | 在邏輯上將流劃分為不相交的分區。具有相同密鑰的所有記錄都被分配到同一分區。在內部,keyBy()是通過哈希分區實現的,類似於mysql裡面的group by。有不同的方法來指定鍵 | dataStream.keyBy(.someKey) dataStream.keyBy(._1) |
Reduce | KeyedStream → DataStream | 鍵控數據流上的“滾動”減少。將當前元素與上次減少的值合併,併發出新值。創建部分和流的reduce函數 | keyedStream.reduce { _ + _ } |
Window | KeyedStream → WindowedStream | 可以在已分區的KeyedStreams上定義視窗。Windows根據某些特征(例如,在過去5秒內到達的數據)對每個鍵中的數據進行分組。有關windows的完整說明,請參見windows。 | dataStream .keyBy(_._1) .window(TumblingEventTimeWindows.of(Time.seconds(5))) |
WindowAll | DataStream → AllWindowedStream | 可以在常規數據流上定義視窗。Windows根據某些特征(例如,過去5秒內到達的數據)對所有流事件進行分組。有關windows的完整說明,請參見windows。 | dataStream .windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) |
Window Apply | WindowedStream → DataStream ;AllWindowedStream → DataStream | 將常規功能應用於整個視窗。下麵是一個手動求和視窗元素的函數。如果使用的是windowAll 轉換,則需要使用AllWindowFunction 。 |
windowedStream.apply { WindowFunction } // applying an AllWindowFunction on non-keyed window stream allWindowedStream.apply { AllWindowFunction } |
WindowReduce | WindowedStream → DataStream | 將reduce函數應用於視窗並返回減少的值。 | windowedStream.reduce { _ + _ } |
Union | DataStream* → DataStream | 兩個或多個數據流的合併,創建一個包含所有流中所有元素的新流。註意:如果將一個數據流與其自身合併,則在結果流中會得到兩次每個元素。 | dataStream.union(otherStream1, otherStream2, ...); |
Window Join | DataStream,DataStream → DataStream | 在給定的密鑰和公共視窗上連接兩個數據流。 | dataStream.join(otherStream) .where( .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply { ... } |
Interval Join | KeyedStream,KeyedStream → DataStream | 在給定的時間間隔內,將兩個密鑰流的兩個元素e1和e2與一個公共密鑰連接,因此 e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound | // this will join the two streams so that // key1 == key2 && leftTs - 2 < rightTs < leftTs + 2 keyedStream.intervalJoin(otherKeyedStream) .between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound .upperBoundExclusive(true) // optional .lowerBoundExclusive(true) // optional .process(new IntervalJoinFunction() {...}) |
Window CoGroup | DataStream,DataStream → DataStream | 在給定的鍵和公共視窗上對兩個數據流進行協組。 | dataStream.coGroup(otherStream) .where(0).equalTo(1) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply {} |
Connect | DataStream,DataStream → ConnectedStream | “連接”兩個保持其類型的數據流。連接允許兩個流之間的共用狀態。 | someStream : DataStream[Int] = ... otherStream : DataStream[String] = ... val connectedStreams = someStream.connect(otherStream) |
CoMap, CoFlatMap | ConnectedStream → DataStream | 類似於連接數據流上的map和flatMap | connectedStreams.map( (_ : Int) => true, (_ : String) => false) ) connectedStreams.flatMap( (_ : Int) => true, (_ : String) => false ) |
Iterate | DataStream → IterativeStream → ConnectedStream | 通過將一個操作符的輸出重定向到前一個操作符,在流中創建一個“反饋”迴圈。這對於定義不斷更新模型的演算法特別有用。下麵的代碼從一個流開始,並連續地應用迭代體。大於0的元素被髮送回反饋通道,其餘的元素被下游轉發。 | initialStream.iterate { iteration => { val iterationBody = iteration.map {/do something/} (iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0)) } } |
2、物理分區
Flink 也提供以下方法讓用戶根據需要在數據轉換完成後對數據分區進行更細粒度的配置。
分區 | 數據轉換 | 解釋 | 示例 |
---|---|---|---|
Custom Partitioning | DataStream → DataStream | 使用用戶定義的Partitioner為每個元素選擇目標任務。 | dataStream.partitionCustom(partitioner, "someKey") dataStream.partitionCustom(partitioner, 0) |
Random Partitioning | DataStream → DataStream | 根據均勻分佈隨機劃分元素。 | dataStream.shuffle() |
Rescaling | DataStream → DataStream | 迴圈地將元素分區到下游操作的一個子集。 | dataStream.rescale() |
Broadcasting | DataStream → DataStream | 將元素廣播到每個分區。 |
3、運算元鏈和資源組
將兩個運算元鏈接在一起能使得它們在同一個線程中執行,從而提升性能。Flink 預設會將能鏈接的運算元儘可能地進行鏈接(例如, 兩個 map 轉換操作)。此外, Flink 還提供了對鏈接更細粒度控制的 API 以滿足更多需求。
如果想對整個作業禁用運算元鏈,可以調用
StreamExecutionEnvironment.disableOperatorChaining()
。下列方法還提供了更細粒度的控制。需要註 意的是,這些方法只能在 DataStream 轉換操作後才能被調用
,因為它們只對前一次數據轉換生效。例如,可以 someStream.map(...).startNewChain() 這樣調用,而不能 someStream.startNewChain()這樣。
運算元鏈操作 | 解釋 | 示例 |
---|---|---|
Start New Chain | 開始一個新的鏈,從這個操作符開始。這兩個映射器將被鏈接,過濾器將不會鏈接到第一個映射器。 | someStream.filter(...).map(...).startNewChain().map(...) |
Disable Chaining | 不要鏈接map操作符。 | someStream.map(...).disableChaining() |
Set Slot Sharing Group | 設置操作的槽位共用組。Flink將把具有相同槽共用組的操作放在相同槽中,而將沒有槽共用組的操作放在其他槽中。這可以用來隔離槽。如果所有的輸入操作都在同一個槽位共用組中,則從輸入操作繼承槽位共用組。預設槽位共用組的名稱為“default”,可以通過調用slotSharingGroup(“default”)顯式地將操作放入該組。 |
someStream.filter(...).slotSharingGroup("name") |
3)Data Sinks(數據輸出)
sink 連接器,請查看連接器文檔。
Data sinks 使用 DataStream 並將它們轉發到文件、套接字、外部系統或列印它們。Flink 自帶了多種內置的輸出格式,這些格式相關的實現封裝在 DataStreams 的運算元里:
-
writeAsText() / TextOutputFormat
: 將元素按行寫成字元串。通過調用每個元素的 toString() 方法獲得字元串。 -
writeAsCsv(...) / CsvOutputFormat
:將元組寫成逗號分隔值文件。行和欄位的分隔符是可配置的。每個欄位的值來自對象的 toString() 方法。 -
print() / printToErr()
:在標準輸出/標準錯誤流上列印每個元素的 toString() 值。 可選地,可以提供一個首碼(msg)附加到輸出。這有助於區分不同的 print 調用。如果並行度大於1,輸出結果將附帶輸出任務標識符的首碼。 -
writeUsingOutputFormat() / FileOutputFormat
:自定義文件輸出的方法和基類。支持自定義 object 到 byte 的轉換。 -
writeToSocket
:根據 SerializationSchema 將元素寫入套接字。 -
addSink
: 調用自定義 sink function。Flink 捆綁了連接到其他系統(例如 Apache Kafka)的連接器,這些連接器被實現為 sink functions。
【溫馨提示】DataStream 的 write*() 方法主要用於調試目的。它們不參與 Flink 的 checkpointing,這意味著這些函數通常具有至少有一次語義。刷新到目標系統的數據取決於 OutputFormat 的實現。這意味著並非所有發送到 OutputFormat 的元素都會立即顯示在目標系統中。此外,在失敗的情況下,這些記錄可能會丟失。
為了將流可靠地、精準一次地傳輸到文件系統中,請使用 StreamingFileSink。此外,通過 .addSink(...) 方法調用的自定義實現也可以參與 Flink 的 checkpointing,以實現精準一次的語義。
旁路輸出(分流)
旁路輸出在Flink中叫作SideOutput,用途類似於DataStream#split,本質上是一個數據流的切分行為,按照條件將DataStream切分為多個子數據流,子數據流叫作旁路輸出數據流,每個旁路輸出數據流可以有自己的下游處理邏輯。
使用旁路輸出時,首先需要定義用於標識旁路輸出流的 OutputTag:
val outputTag = OutputTag[String]("side-output")
可以通過以下方法將數據發送到旁路輸出:
- ProcessFunction
- KeyedProcessFunction
- CoProcessFunction
- KeyedCoProcessFunction
- ProcessWindowFunction
- ProcessAllWindowFunction
【示例】
package com
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object myOutputTag {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val input: DataStream[String] = env.readTextFile("flink/data/hello.txt")
val outputTag = OutputTag[String]("side-output")
val mainDataStream = input
.process(new ProcessFunction[String, String] {
override def processElement(
value: String,
ctx: ProcessFunction[String, String]#Context,
out: Collector[String]): Unit = {
// 發送數據到主要的輸出
out.collect(value)
// 發送數據到旁路輸出
ctx.output(outputTag, "sideout-" + value)
}
})
// 獲取outputTag並輸出
mainDataStream.getSideOutput(outputTag).print()
// 必須調用execute或者executeAsync(),下麵會講
env.execute("test OutputTag")
}
}
【問題】
Caused by: java.lang.ClassNotFoundException: org.apache.commons.compress.compressors.zstandard.ZstdCompressorInputStream
【解決】在pom.xml添加下麵依賴
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.21</version>
</dependency>
2)Flink 程式剖析(scala)
Flink 程式看起來像一個轉換 DataStream 的常規程式。每個程式由相同的基本部分組成:
- 獲取一個執行環境(execution environment);
- 載入/創建初始數據;
- 指定數據相關的轉換;
- 指定計算結果的存儲位置;
- 觸發程式執行。
1、 獲取一個執行環境(execution environment)
val env = StreamExecutionEnvironment.getExecutionEnvironment
2、載入/創建初始數據
為了指定 data sources,執行環境提供了一些方法,支持使用各種方法從文件中讀取數據:你可以直接逐行讀取數據,像讀 CSV 文件一樣,或使用任何第三方提供的 source。下麵是將一個文本文件作為一個行的序列來讀。
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 載入數據源
val input: DataStream[String] = env.readTextFile("file:///path/to/file")
3、指定數據相關的轉換
val env = StreamExecutionEnvironment.getExecutionEnvironment
val input: DataStream[String] = env.readTextFile("file:///path/to/file")
// 例如一個 map 的轉換如下:
val mapped = input.map { x => x.toInt }
4、指定計算結果的存儲位置
一旦你有了包含最終結果的 DataStream,你就可以通過創建 sink 把它寫到外部系統。下麵是一些用於創建 sink 的示例方法:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val input: DataStream[String] = env.readTextFile("flink/data/source")
// 例如一個 map 的轉換如下:
val mapped = input.map { x => x.toInt }
// 存儲到文件,當然還可以執行更多的sink
// writeAsText第二個參數來定義輸出模式,它有以下兩個可選值:
// WriteMode.NO_OVERWRITE:當指定路徑上不存在任何文件時,才執行寫出操作;
// WriteMode.OVERWRITE:不論指定路徑上是否存在文件,都執行寫出操作;如果原來已有文件,則進行覆蓋。
mapped.writeAsText("flink/data/sink", FileSystem.WriteMode.OVERWRITE)
5、觸發程式執行
-
一旦指定了完整的程式,需要調用
StreamExecutionEnvironment
的execute()
方法來觸發程式執行。根據 ExecutionEnvironment 的類型,執行會在你的本地機器上觸發,或將你的程式提交到某個集群上執行。execute() 方法將等待作業完成,然後返回一個 JobExecutionResult,其中包含執行時間和累加器結果。 -
如果不想等待作業完成,可以通過調用 StreamExecutionEnvironment 的
executeAsync()
方法來觸發作業非同步執行。它會返回一個 JobClient,你可以通過它與剛剛提交的作業進行通信。如下是使用 executeAsync() 實現 execute() 語義的示例。
final JobClient jobClient = env.executeAsync();
final JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult().get();
完整示常式序(官網示例)
【問題一】
【溫馨提示】如果出現這種報錯,一般就是IDEA 對scope為provided,這是IDEA的bug:
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/api/scala/typeutils/CaseClassTypeInfo
【解決】
- 【第一種方式】把依賴範圍調大或者直接去掉都行,不清楚的可以看我之前的Java-Maven詳解,但是記住在打包的時候得加上。
- 【第二種方式】Run->Edit Configurations,設置如下:
【問題二】
【問題】
Caused by: java.util.concurrent.CompletionException: java.lang.NoSuchMethodError: org.apache.commons.math3.stat.descriptive.rank.Percentile.withNaNStrategy(Lorg/apache/commons/math3/stat/ranking/NaNStrategy;)Lorg/apache/commons/math3/stat/descriptive/rank/Percentile;
hadoop-common中的commons-math3衝突導致。
【解決】排除hadoop-common中的commons-math3,設置如此:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.1</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
</exclusion>
</exclusions>
</dependency>
先啟動服務
$ nc -lk 9999
WindowWordCount源碼如下:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object WindowWordCount {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("localhost", 9999)
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.keyBy(_._1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(1)
counts.print()
env.execute("Window Stream WordCount")
}
}
四、什麼是DataSet?
Flink用DataStream 表示無界數據集,用DataSet表示有界數據集,前者用於流處理應用程式,後者用於批處理應用程式。從操作形式上看,DataStream 和 DataSet 與集合 Collection 有些相似,但兩者有著本質的區別:
- DataStream 和 DataSet 是不可變的數據集合,因此不可以想操作集合那樣增加或者刪除 DataStream 和 DataSet 中的元素,也不可以通過諸如下標等方式訪問某個元素。
- Flink 應用程式通過 Source 創建 DataStream 對象和 DataSet 對象,通過轉換操作產生新的 DataStream 對象和 DataSet 對象。
- 運行時是應用程式被調度執行時的上下文環境,通過
StreamExecutionEnvironment
或ExecutionEnvironment
方法會根據當前環境自動選擇本地或者集群運行時環境。
五、DataSet 數據處理過程
1)Data Sources (數據源)
數據源創建初始數據集,比如從文件或Java集合創建數據集。創建數據集的一般機制抽象在InputFormat後面。Flink提供了幾種內置格式,可以從常見的文件格式創建數據集。它們中的許多在ExecutionEnvironment上都有快捷方法。
1、基於文件
readTextFile(path) / TextInputFormat
:讀取文本文件。readTextFileWithValue(path) / TextValueInputFormat
: 讀取文件,並將它們作為StringValues返回。StringValues是可變字元串。readCsvFile(path) / CsvInputFormat
:解析帶有逗號(或其他字元)分隔欄位的文件。返回由元組或pojo組成的數據集。支持基本java類型及其對應值作為欄位類型。readFileOfPrimitives(path, Class) / PrimitiveInputFormat
:解析以新行(或另一個字元序列)分隔的原始數據類型(如String或Integer)的文件。readFileOfPrimitives(path, delimiter, Class) / PrimitiveInputFormat
:使用給定的分隔符解析以新行(或另一個字元序列)分隔的原始數據類型(如String或Integer)的文件。
2、基於集合
fromCollection(Collection)
:從Java.util.Collection創建一個數據集。集合中的所有元素必須具有相同的類型。fromCollection(Iterator, Class)
:從迭代器創建數據集。該類指定迭代器返回的元素的數據類型。fromElements(T …)
:根據給定的對象序列創建一個數據集。所有對象必須是相同的類型。fromParallelCollection(SplittableIterator, Class)
:並行地從迭代器創建數據集。該類指定迭代器返回的元素的數據類型。generateSequence(from, to)
:並行生成給定區間內的數字序列。
3、通用型
readFile(inputFormat, path) / FileInputFormat
:接受文件輸入格式。createInput(inputFormat) / InputFormat
:接受通用輸入格式。
2)DataSet Transformations(數據集轉換//處理/運算元)
數據轉換將一個或多個數據集轉換為新的數據集。程式可以將多個轉換組合成複雜的程式集。
運算元 | 解釋 | 示例 |
---|---|---|
Map | 獲取一個元素並生成一個元素。將輸入流的值加倍的映射函數。 | data.map { x => x.toInt } |
FlatMap | 獲取一個元素並生成零個、一個或多個元素。將句子拆分為單詞的flatmap函數。 | data.flatMap { str => str.split(" ") } |
MapPartition | 在單個函數調用中轉換並行分區。該函數以Iterable流的形式獲取分區,並可以生成任意數量的結果值。每個分區中的元素數量取決於並行度和之前的操作。 | data.mapPartition { in => in map { (_, 1) } } |
Filter | 為每個元素計算布爾函數,並保留該函數返回true的元素。過濾掉零值的過濾器。 | data.filter { _ > 1000 } |
Reduce | 通過重覆地將兩個元素組合成一個元素,將一組元素組合成一個元素。Reduce可以應用於完整的數據集或分組的數據集。 | data.reduce { _ + _ } |
ReduceGroup | 將一組元素組合成一個或多個元素。ReduceGroup可以應用於完整的數據集,也可以應用於分組的數據集。 | data.reduceGroup { elements => elements.sum } |
Aggregate | 將一組值聚合為一個值。聚合函數可以看作是內置的reduce函數。聚合可以應用於完整的數據集,也可以應用於分組的數據集。 | val input: DataSet[(Int, String, Double)] = // [...] val output: DataSet[(Int, String, Double)] = input.aggregate(SUM, 0).aggregate(MIN, 2) |
Distinct | 返回數據集的不同元素。對於元素的所有欄位或欄位的子集,它將從輸入數據集中刪除重覆的條目。 | data.distinct() |
Join | 通過創建鍵值相等的所有元素對來連接兩個數據集。可選地使用JoinFunction將這對元素轉換為單個元素,或使用FlatJoinFunction將這對元素轉換為任意多個(包括沒有)元素。參見鍵部分瞭解如何定義連接鍵。 | val result = input1.join(input2).where(0).equalTo(1) |
OuterJoin | 對兩個數據集執行左、右或完全外部連接。外部連接類似於常規(內部)連接,它創建的所有元素對的鍵值相等。此外,如果在另一側沒有找到匹配的鍵,則保存外部的記錄(如果是完整的,則為左、右或兩者)。匹配的元素對(或一個元素和另一個輸入的空值)被賦給一個JoinFunction以將這對元素轉換為單個元素,或者賦給一個FlatJoinFunction以將這對元素轉換為任意多個(包括沒有)元素。參見鍵部分瞭解如何定義連接鍵。 | val joined = left.leftOuterJoin(right).where(0).equalTo(1) { (left, right) => val a = if (left == null) "none" else left._1 (a, right) } |
CoGroup | 簡化運算的二維變體。對一個或多個欄位上的每個輸入進行分組,然後合併組。每對組調用一個變換函數。請參閱鍵部分以瞭解如何定義coGroup鍵。 | data1.coGroup(data2).where(0).equalTo(1) |
Cross | 構建兩個輸入的笛卡爾積(叉積),創建所有的元素對。可選地使用CrossFunction將這對元素轉換為單個元素。 | val data1: DataSet[Int] = // [...] val data2: DataSet[String] = // [...] val result: DataSet[(Int, String)] = data1.cross(data2) |
Union | 生成兩個數據集的並集。 | data.union(data2) |
Rebalance | 均勻地重新平衡數據集的並行分區,以消除數據傾斜。只有類似map的轉換可以遵循rebalance轉換。 | val data1: DataSet[Int] = // [...] val result: DataSet[(Int, String)] = data1.rebalance().map(...) |
Hash-Partition | 哈希分區一個給定鍵的數據集。鍵可以指定為位置鍵、表達式鍵和鍵選擇器函數。 | val in: DataSet[(Int, String)] = // [...] val result = in.partitionByHash(0).mapPartition { ... } |
Range-Partition | 根據給定的鍵對數據集進行範圍分區。鍵可以指定為位置鍵、表達式鍵和鍵選擇器函數。 | val in: DataSet[(Int, String)] = // [...] val result = in.partitionByRange(0).mapPartition { ... } |
Custom Partitioning | 使用自定義Partitioner函數,根據鍵將記錄分配到特定的分區。該鍵可以指定為位置鍵、表達式鍵和選擇鍵函數。註意:此方法只適用於單個欄位鍵。 | val in: DataSet[(Int, String)] = // [...] val result = in .partitionCustom(partitioner, key).mapPartition { ... } |
Sort Partitioning | 按照指定的順序在本地對指定欄位上的數據集的所有分區進行排序。欄位可以指定為元組位置或欄位表達式。對多個欄位進行排序是通過鏈接sortPartition()調用來完成的。 | val in: DataSet[(Int, String)] = // [...] val result = in.sortPartition(1, Order.ASCENDING).mapPartition { ... } |
First-N | 返回數據集的前n個(任意的)元素。First-n可以應用於常規數據集、分組數據集或分組排序數據集。分組鍵可以指定為鍵選擇器函數或欄位位置鍵。 | val in: DataSet[(Int, String)] = // [...] // regular data set val result1 = in.first(3) // grouped data set val result2 = in.groupBy(0).first(3) // grouped-sorted data set val result3 = in.groupBy(0).sortGroup(1, Order.ASCENDING).first(3) |
MinBy / MaxBy | 從一個或多個欄位值為最小(最大值)的元組中選擇一個元組。用於比較的欄位必須是有效的關鍵欄位,即可比性。如果多個元組具有最小(最大)欄位值,則返回這些元組中的任意一個元組。MinBy (MaxBy)可以應用於完整的數據集或分組的數據集。 | val in: DataSet[(Int, Double, String)] = // [...] // a data set with a single tuple with minimum values for the Int and String fields. val out: DataSet[(Int, Double, String)] = in.minBy(0, 2) // a data set with one tuple for each group with the minimum value for the Double field. val out2: DataSet[(Int, Double, String)] = in.groupBy(2).minBy(1) |
Specifying Keys | 一些轉換(join、coGroup、groupBy)要求在元素集合上定義鍵。其他轉換(Reduce、groureduce、Aggregate)允許在應用數據之前對數據進行分組。 | DataSet<...> input = // [...] DataSet<...> reduced = input .groupBy(/define key here/) .reduceGroup(/do something/); |
Define keys for Tuples | 最簡單的情況是在元組的一個或多個欄位上分組元組。 | val input: DataSet[(Int, String, Long)] = // [...] val keyed = input.groupBy(0) //val input: DataSet[(Int, String, Long)] = // [...] val grouped = input.groupBy(0,1) |
3)Data Sinks(數據輸出)
數據接收器使用數據集,並用於存儲或返回它們。使用OutputFormat描述數據接收器操作。Flink提供了多種內置的輸出格式,這些格式封裝在DataSet上的操作後面:
writeAsText() / TextOutputFormat
:按行方式將元素寫入字元串。字元串是通過調用每個元素的toString()方法獲得的。writeAsFormattedText() / TextOutputFormat
:將元素按行編寫為字元串。字元串是通過為每個元素調用用戶定義的format()方法獲得的。writeAsCsv(…) / CsvOutputFormat
:將元組寫入逗號分隔的值文件。行和欄位分隔符是可配置的。每個欄位的值來自對象的toString()方法。print() / printToErr() / print(String msg) / printToErr(String msg) -列印出標準輸出/標準錯誤流中每個元素的toString()值。可選地,可以提供一個首碼(msg),作為輸出的首碼。這有助於區分不同的列印調用。如果並行度大於1,輸出也會被添加產生輸出的任務的標識符。write() / FileOutputFormat
:方法和基類用於自定義文件輸出。支持自定義對象到位元組的轉換。output()/ OutputFormat
:大多數通用輸出方法,用於非基於文件的數據接收器(例如將結果存儲在資料庫中)。
一個數據集可以被輸入到多個操作。程式可以寫或列印一個數據集,同時在它們上運行額外的轉換。
【示例】
package com
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.core.fs.FileSystem.WriteMode
object DataSetTest001 {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// text data
val textData: DataSet[String] = env.readTextFile("flink/data/s1")
// write DataSet to a file on the local file system
// textData.writeAsText("flink/data/sink01")
// write DataSet to a file on an HDFS with a namenode running at nnHost:nnPort
// 先創建目錄:hadoop fs -mkdir -p hdfs://hadoop-node1:8082/flink/DataSet/
// 操作添加依賴
/*<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.3.1</version>
<scope>provided</scope>
</dependency>*/
textData.writeAsText("hdfs://hadoop-node1:8082/flink/DataSet/sink02")
//
// // write DataSet to a file and overwrite the file if it exists
// textData.writeAsText("flink/data/sink03", WriteMode.OVERWRITE)
//
// // tuples as lines with pipe as the separator "a|b|c"
// val values: DataSet[(String, Int, Double)] = // [...]
// values.writeAsCsv("file:///path/to/the/result/file", "\n", "|")
//
// // this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
// values.writeAsText("file:///path/to/the/result/file")
// this writes values as strings using a user-defined formatting
// values map { tuple => tuple._1 + " - " + tuple._2 }
// .writeAsText("file:///path/to/the/result/file")
env.execute("dataset test")
}
}
【示例】WordCount
package com
import org.apache.flink.api.scala._
object WordCount {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val text = env.fromElements(
"Who's there?",
"I think I hear them. Stand, ho! Who's there?")
val counts = text
.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.groupBy(0)
.sum(1)
counts.print()
}
}
未完待續,更多大數據知識,請耐心等待~