Flink 1.8 Basic API Concepts 基本API概念

来源:https://www.cnblogs.com/sxpujs/archive/2019/08/17/11370919.html
-Advertisement-
Play Games

參考:https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/api_concepts.html DataSet and DataStream Flink具有特殊類DataSet和DataStream來表示程式中的數據。 你可以 ...


參考:https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/api_concepts.html

DataSet and DataStream

Flink具有特殊類DataSet和DataStream來表示程式中的數據。 你可以將它們視為可以包含重覆項的不可變數據集合。

Anatomy of a Flink Program Flink程式的剖析

Flink程式看起來像是轉換數據集合的常規程式。 每個程式包含相同的基本部分:
  • 獲得執行環境, Obtain an execution environment,
  • 載入/創建初始數據, Load/create the initial data,
  • 指定此數據的轉換, Specify transformations on this data,
  • 指定放置計算結果的位置,Specify where to put the results of your computations,
  • 觸發程式執行 Trigger the program execution
StreamExecutionEnvironment是所有Flink程式的基礎。你可以在StreamExecutionEnvironment上使用這些靜態方法獲取一個:
getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment(String host,
int port, String... jarFiles)

通常,你只需要使用getExecutionEnvironment(),因為這將根據上下文執行正確的操作:如果你在IDE中執行程式或作為常規Java程式,它將創建一個本地環境,將執行你的程式 你的本地機器。 如果你從程式中創建了一個JAR文件,並通過命令行調用它,則Flink集群管理器將執行你的main方法,getExecutionEnvironment()將返回一個執行環境,用於在集群上執行你的程式。

  對於指定數據源,執行環境有幾種方法可以使用各種方法從文件中讀取:你可以逐行讀取它們,CSV文件或使用完全自定義數據輸入格式。 要將文本文件作為一系列行讀取,你可以使用:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = env.readTextFile("file:///path/to/file");
這將為你提供一個DataStream,然後你可以在其上應用轉換來創建新的派生DataStream。 你可以通過使用轉換函數調用DataStream上的方法來應用轉換。 例如,map轉換如下所示:
DataStream<String> input = ...;

DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {
    @Override
    public Integer map(String value) {
        return Integer.parseInt(value);
    }
});

這將通過將原始集合中的每個String轉換為Integer來創建新的DataStream。

一旦有了包含最終結果的DataStream,就可以通過創建接收器(sink)將其寫入外部系統。 這些只是創建接收器的一些示例方法:
writeAsText(String path)

print()

一旦指定了完整的程式,就需要通過調用StreamExecutionEnvironment上的execute()來觸發程式執行。 根據ExecutionEnvironment的類型,將在本地電腦上觸發執行或提交程式以在群集上執行。

execute()方法返回一個JobExecutionResult,它包含執行時間和累加器結果。

Lazy Evaluation 惰性求值

所有Flink程式都是惰性執行:當執行程式的main方法時,數據載入和轉換不會直接發生。 而是創建每個操作並將其添加到程式的計劃中。 當執行環境上的execute()調用顯式觸發執行時,實際執行操作。 程式是在本地執行還是在集群上執行取決於執行環境的類型。 惰性求值使你可以構建Flink作為一個整體計劃單元執行的複雜程式。

Specifying Keys 指定鍵

一些轉換(join,coGroup,keyBy,groupBy)要求在元素集合上定義鍵。 其他轉換(Reduce,GroupReduce,Aggregate,Windows)允許數據在應用之前在鍵上分組。 DataSet分組:
DataSet<...> input = // [...]
DataSet<...> reduced = input
  .groupBy(/*define key here*/)
  .reduceGroup(/*do something*/);

DataStream設置鍵:

DataStream<...> input = // [...]
DataStream<...> windowed = input
  .keyBy(/*define key here*/)
  .window(/*window specification*/);

Flink的數據模型不基於鍵值對。 因此,你無需將數據集類型物理打包到鍵和值中。 鍵是“虛擬的”:它們被定義為實際數據上的函數,以指導分組運算符。

Define keys for Tuples 定義元組的鍵

最簡單的情況是在元組的一個或多個欄位上對元組進行分組:
DataStream<Tuple3<Integer,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)

元組在第一個欄位(整數類型)上分組。 

DataStream<Tuple3<Integer,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0,1)

在這裡,我們將元組分組在由第一個和第二個欄位組成的複合鍵上。

  註意嵌套元組:如果你有一個帶有嵌套元組的DataStream,例如:
DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;
指定keyBy(0)將使系統使用完整的Tuple2作為鍵(以Integer和Float為鍵)。 如果要“導航”到嵌套的Tuple2中,則必須使用下麵解釋的欄位表達式鍵。

Define keys using Field Expressions 使用欄位表達式定義鍵

你可以使用基於字元串的欄位表達式來引用嵌套欄位,並定義用於grouping, sorting, joining或coGrouping的鍵。 欄位表達式可以非常輕鬆地選擇(嵌套)複合類型中的欄位,例如Tuple和POJO類型。   在下麵的示例中,我們有一個WC POJO,其中包含兩個欄位“word”和“count”。 要按欄位分組,我們只需將其名稱傳遞給keyBy()函數。
// some ordinary POJO (Plain old Java Object)
public class WC {
  public String word;
  public int count;
}
DataStream<WC> words = // [...]
DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);

欄位表達式語法:

  • 按欄位名稱選擇POJO欄位。 例如,“user”指的是POJO類型的“user”欄位。
  • 按欄位名稱或0偏移欄位索引選擇元組欄位。 例如,“f0”和“5”分別表示Java元組類型的第一和第六欄位。
  • 你可以在POJO和Tuples中選擇嵌套欄位。 例如,“user.zip”指的是POJO的“zip”欄位,其存儲在POJO類型的“user”欄位中。 支持任意嵌套和混合POJO和元組,例如“f1.user.zip”或“user.f3.1.zip”。
  • 你可以使用“*”通配符表達式選擇完整類型。 這也適用於非Tuple或POJO類型的類型。
  欄位表達式樣例:
public static class WC {
  public ComplexNestedClass complex; //nested POJO
  private int count;
  // getter / setter for private field (count)
  public int getCount() {
    return count;
  }
  public void setCount(int c) {
    this.count = c;
  }
}
public static class ComplexNestedClass {
  public Integer someNumber;
  public float someFloat;
  public Tuple3<Long, Long, String> word;
  public IntWritable hadoopCitizen;
}

這些是上面示例代碼的有效欄位表達式:

  • “count”:WC類中的count欄位。
  • “complex”:遞歸選擇POJO類型ComplexNestedClass的欄位複合體的所有欄位。
  • “complex.word.f2”:選擇嵌套Tuple3的最後一個欄位。
  • “complex.hadoopCitizen”:選擇Hadoop IntWritable類型。

Define keys using Key Selector Functions 使用鍵選擇器函數定義鍵

定義鍵的另一種方法是“鍵選擇器”函數。 鍵選擇器函數將單個元素作為輸入並返回元素的鍵。 鍵可以是任何類型,並且可以從確定性計算中輸出。 以下示例顯示了一個鍵選擇器函數,它只返回一個對象的欄位:
// some ordinary POJO
public class WC {public String word; public int count;}
DataStream<WC> words = // [...]
KeyedStream<WC> keyed = words
  .keyBy(new KeySelector<WC, String>() {
     public String getKey(WC wc) { return wc.word; }
   });

Specifying Transformation Functions 指定轉換函數

大多數轉換都需要用戶定義的函數。 本節列出瞭如何指定它們的不同方法。

Implementing an interface 實現介面

最基本的方法是實現一個提供的介面:
class MyMapFunction implements MapFunction<String, Integer> {
  public Integer map(String value) { return Integer.parseInt(value); }
};
data.map(new MyMapFunction());
Anonymous classes 匿名類
你可以將函數作為匿名類傳遞:
data.map(new MapFunction<String, Integer> () {
  public Integer map(String value) { return Integer.parseInt(value); }
}); 

Java 8 Lambdas 表達式

Flink還支持Java API中的Java 8 Lambdas。
data.filter(s -> s.startsWith("http://"));

data.reduce((i1,i2) -> i1 + i2);

Rich functions 富函數

所有需要用戶定義函數的轉換都可以將富函數作為參數。 例如,替換
class MyMapFunction implements MapFunction<String, Integer> {
  public Integer map(String value) { return Integer.parseInt(value); }
};
  你可以寫
class MyMapFunction extends RichMapFunction<String, Integer> {
  public Integer map(String value) { return Integer.parseInt(value); }
};
並像往常一樣將函數傳遞給map轉換:
data.map(new MyMapFunction());
富函數也可以定義為匿名類:
data.map (new RichMapFunction<String, Integer>() {
  public Integer map(String value) { return Integer.parseInt(value); }
});
除了用戶定義的函數(map,reduce等)之外,Rich函數還提供了四種方法:open,close,getRuntimeContext和setRuntimeContext。 這些用於參數化函數,創建和完成本地狀態,訪問廣播變數以及訪問運行時信息(如累加器和計數器)以及迭代信息。  

Supported Data Types 支持的數據類型

Flink對DataSet或DataStream中可以包含的元素類型設置了一些限制。原因是系統分析類型以確定有效的執行策略。 有六種不同類別的數據類型:
  1. 元組(Java Tuples and Scala Case Classes)
  2. Java普通對象(Java POJOs)
  3. 基本類型(Primitive Types)
  4. 常規類(Regular Classes)
  5. 值類型(Values)
  6. Hadoop可寫介面的實現(Hadoop Writables)
  7. 特殊類型(Special Types) 

Tuples and Case Classes 元組

元組是包含固定數量的具有各種類型的欄位的複合類型。 Java API提供從Tuple1到Tuple25的類。 元組的每個欄位都可以是包含更多元組的任意Flink類型,從而產生嵌套元組。 可以使用欄位名稱tuple.f4直接訪問元組的欄位,也可以使用通用getter方法tuple.getField(int position)。 欄位索引從0開始。請註意,這與Scala元組形成對比,但它與Java的一般索引更為一致。
DataStream<Tuple2<String, Integer>> wordCounts = env.fromElements(
    new Tuple2<String, Integer>("hello", 1),
    new Tuple2<String, Integer>("world", 2));

wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() {
    @Override
    public Integer map(Tuple2<String, Integer> value) throws Exception {
        return value.f1;
    }
});

wordCounts.keyBy(0); // also valid .keyBy("f0")

POJO Java普通對象

如果滿足以下要求,則Flink將Java和Scala類視為特殊的POJO數據類型:
  • 類必須是公共的。
  • 它必須有一個沒有參數的公共構造函數(預設構造函數)。
  • 所有欄位都是公共的,或者必須通過getter和setter函數訪問。 對於名為foo的欄位,getter和setter方法必須命名為getFoo()和setFoo()。
  • Flink必須支持欄位的類型。 目前,Flink使用Avro序列化任意對象(例如Date)。
Flink分析POJO類型的結構,即它瞭解POJO的欄位。 因此,POJO類型比一般類型更容易使用。 此外,Flink可以比一般類型更有效地處理POJO。 以下示例顯示了一個包含兩個公共欄位的簡單POJO。
public class WordWithCount {

    public String word;
    public int count;

    public WordWithCount() {}

    public WordWithCount(String word, int count) {
        this.word = word;
        this.count = count;
    }
}

DataStream<WordWithCount> wordCounts = env.fromElements(
    new WordWithCount("hello", 1),
    new WordWithCount("world", 2));

wordCounts.keyBy("word"); // key by field expression "word"

Primitive Types 基本類型

Flink支持所有Java和Scala基本類型,如Integer,String和Double。  

General Class Types 常規類類型

Flink支持大多數Java和Scala類(API和自定義)。 限制適用於包含無法序列化的欄位的類,如文件指針,I/O流或其他本機資源。 遵循Java Beans約定的類通常可以很好地工作。   所有未標識為POJO類型的類(請參閱上面的POJO要求)都由Flink作為常規類類型處理。 Flink將這些數據類型視為黑盒子,並且無法訪問其內容(即,用於高效排序)。 使用序列化框架Kryo對常規類型進行序列化和反序列化。  

Values 值類型

值類型需手動描述其序列化和反序列化。它們不是通過通用序列化框架,而是通過使用讀取和寫入方法實現org.apache.flinktypes.Value介面來為這些操作提供自定義代碼。當通用序列化效率非常低時,使用值類型是合理的。一個示例是將元素的稀疏向量實現為數組的數據類型。知道數組大部分為零,可以對非零元素使用特殊編碼,而通用序列化只需編寫所有數組元素。   org.apache.flinktypes.CopyableValue介面以類似的方式支持手動內部克隆邏輯。   Flink帶有與基本數據類型對應的預定義值類型。 (ByteValue,ShortValue,IntValue,LongValue,FloatValue,DoubleValue,StringValue,CharValue,BooleanValue)。這些值類型充當基本數據類型的可變變體:它們的值可以被更改,允許程式員重用對象並降低垃圾收集器壓力。  

Hadoop Writables Hadoop可寫介面的實現

你可以使用實現org.apache.hadoop.Writable介面的類型。 write()和readFields()方法中定義的序列化邏輯將用於序列化。  

Special Types 特殊類型

你可以使用特殊類型,包括Scala的Either,Option和Try。 Java API有自己的自定義Either實現。 與Scala的Either類似,它代表兩種可能類型的值,左或右。 兩者都可用於錯誤處理或需要輸出兩種不同類型記錄的運算符。  

Type Erasure & Type Inference 類型擦除和類型推斷

註意:本節僅適用於Java。   Java編譯器在編譯後拋棄了大部分泛型類型信息。這在Java中稱為類型擦除。這意味著在運行時,對象的實例不再知道其泛型類型。例如,DataStream <String>和DataStream <Long>的實例在JVM看來是一樣的。   Flink在準備執行程式時(當調用程式的main方法時)需要類型信息。 Flink Java API嘗試重建以各種方式丟棄的類型信息,並將其顯式存儲在數據集和運算符中。你可以通過DataStream.getType()檢索類型。該方法返回TypeInformation的一個實例,這是Flink表示類型的內部方式。   類型推斷有其局限性,在某些情況下需要程式員的“合作”。這方面的示例是從集合創建數據集的方法,例如ExecutionEnvironment.fromCollection(),你可以在其中傳遞描述類型的參數。但是像MapFunction<I,O>這樣的通用函數也可能需要額外的類型信息。   ResultTypeQueryable介面可以通過輸入格式和函數實現,以明確告知API其返回類型。調用函數的輸入類型通常可以通過先前操作的結果類型來推斷。  

Accumulators & Counters 累加器和計數器

累加器是具有增加操作(add operation)和最終累積結果(final accumulated result)的簡單構造,可在作業結束後使用。   最直接的累加器是一個計數器(counter):你可以使用Accumulator.add(V value)方法遞增它。 在工作結束時,Flink將彙總(合併)所有部分結果並將結果發送給客戶。 在調試過程中,或者如果你想快速瞭解有關數據的更多信息,累加器非常有用。   Flink目前有以下內置累加器。 它們中的每一個都實現了Accumulator介面。
  • IntCounter,LongCounter和DoubleCounter:請參閱下麵的使用計數器的示例。
  • 直方圖(Histogram):離散數量的區間的直方圖實現。 在內部,它只是一個從Integer到Integer的映射。 你可以使用它來計算值的分佈,例如 字數統計程式的每行字數分佈。

How to use accumulators: 如何使用累加器:

首先,你必須在要使用它的用戶定義轉換函數中創建累加器對象(此處為計數器)。
private IntCounter numLines = new IntCounter();
其次,你必須註冊累加器對象,通常在富函數的open()方法中。 在這裡你還可以定義名稱。
getRuntimeContext().addAccumulator("num-lines", this.numLines);
你現在可以在運算符函數中的任何位置使用累加器,包括open()和close()方法。
this.numLines.add(1);
整個結果將存儲在JobExecutionResult對象中,該對象是從執行環境的execute()方法返回的(當前這僅在執行等待作業完成時才有效)。
myJobExecutionResult.getAccumulatorResult("num-lines")
每個作業的所有累加器共用一個命名空間。 因此,你可以在作業的不同運算符函數中使用相同的累加器。 Flink將在內部合併所有具有相同名稱的累加器。   關於累加器和迭代的說明:目前累加器的結果僅在整個作業結束後才可用。 我們還計劃在下一次迭代中使前一次迭代的結果可用。 你可以使用聚合器來計算每次迭代統計信息,並根據此類統計信息確定迭代的終止。  

Custom accumulators: 定製累加器:

要實現自己的累加器,只需編寫Accumulator介面的實現即可。 如果你認為你的自定義累加器應該合入Flink主幹,請隨意創建拉取請求(pull request)。   你可以選擇實現Accumulator或SimpleAccumulator。   Accumulator<V, R>最靈活:它為要添加的值定義類型V,為最終結果定義結果類型R. 例如。 對於直方圖,V是數字,R是直方圖。 SimpleAccumulator適用於兩種類型相同的情況,例如: 對於計數器。
您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 一.存儲基礎知識 從工作原理區分: 機械 HDD 固態 SSD SSD的優勢: 從磁碟尺寸區分: 3.5 2.5 1.8 從插拔方式區分: 熱插拔 非熱插拔 從硬碟主要介面區分: IDE —— SATA I/II/II 個人電腦 SCSI —— SAS 伺服器 FC PCIE 從存儲連接方式區分: ...
  • 1. 查詢k8s集群部署pod的基本情況 如下圖,我們可知容器coredns和dnsutils都部署成功,但是由於功能變數名稱解析的問題,導致coredns和dnsutils的容器不斷重啟(原因heath檢查,無法請求成功,被kubelet重啟了pod) 命令如下: root >> kubectl get ...
  • vim多視窗操作 垂直方式打開多個視窗 vim filename1 filename2 -O :vsp filename 或者 :vs filename 視窗切換 ctrl + ww vim文件切換 在多個文件之間切換 ctrl + i #切換到下一個文件或同一個文件中的下一個索引 ctrl + o ...
  • 一、廢話兩句 在雲數據中心,一次幾十臺甚至幾百台伺服器上線,系統安裝將變得非常繁瑣,系統安裝好了後還會涉及很多配置,如果一臺台來安裝的話工作量非常大。(雖然有加班費,開個玩笑)為瞭解決這個問題,我們需要實現無人值守批量部署系統。 簡單看一下拓撲圖: 1. 什麼是PXE? 簡單來說:PXE主要是引導作 ...
  • 第一章Linux命令行簡介 1.1 Linux命令行概述 1.1.1 Linux 命令行的開啟和退出 開啟:登陸賬號密碼進入系統 退出:exit/logout 快捷鍵:Ctrl+d 1.1.2 Linux命令行提示符介紹 (1)提示符由PS1環境變數控制。實例代碼如下: [root@centos10 ...
  • 今天我們學習關於NTFS管理數據 以下是學習的內容NTFS分區和FAT32分區的區別,如何將FAT32分區轉化成NTFS分區,FAT 32 不支持大於4G ,NTFS許可權設置 ,EFS加密 ,文件夾的NTFS許可權 許可權累加, 查看對象的所有者,獲得對象的所有權,重置文件夾中所有對象的許可權,利用EFS ...
  • 把對應的不同文件內的代碼段,合併到一起,成為最後的可執行文件 鏈接的方式,讓我們在寫代碼的時候做到了“復用”。 同樣的功能代碼只要寫一次,然後提供給很多不同的程式進行鏈接就行了。 “鏈接”其實有點兒像我們日常生活中的 標準化、模塊化 生產。 有一個可以生產標準螺帽的生產線,就可生產很多不同的螺帽。 ...
  • 一、索引創建 1. 非結構化創建 2. 結構化創建 二、插入 1. 指定文檔ID插入 2. 自動產生文檔ID插入 三、修改 1. 直接修改文檔 2. 腳本修改文檔 四、刪除 1. 刪除文檔 2. 刪除索引 五、查詢 1. 簡單查詢 2. 條件查詢 3. 聚合查詢 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...