Flink 1.8 Basic API Concepts 基本API概念

来源:https://www.cnblogs.com/sxpujs/archive/2019/08/17/11370919.html
-Advertisement-
Play Games

參考:https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/api_concepts.html DataSet and DataStream Flink具有特殊類DataSet和DataStream來表示程式中的數據。 你可以 ...


參考:https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/api_concepts.html

DataSet and DataStream

Flink具有特殊類DataSet和DataStream來表示程式中的數據。 你可以將它們視為可以包含重覆項的不可變數據集合。

Anatomy of a Flink Program Flink程式的剖析

Flink程式看起來像是轉換數據集合的常規程式。 每個程式包含相同的基本部分:
  • 獲得執行環境, Obtain an execution environment,
  • 載入/創建初始數據, Load/create the initial data,
  • 指定此數據的轉換, Specify transformations on this data,
  • 指定放置計算結果的位置,Specify where to put the results of your computations,
  • 觸發程式執行 Trigger the program execution
StreamExecutionEnvironment是所有Flink程式的基礎。你可以在StreamExecutionEnvironment上使用這些靜態方法獲取一個:
getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment(String host,
int port, String... jarFiles)

通常,你只需要使用getExecutionEnvironment(),因為這將根據上下文執行正確的操作:如果你在IDE中執行程式或作為常規Java程式,它將創建一個本地環境,將執行你的程式 你的本地機器。 如果你從程式中創建了一個JAR文件,並通過命令行調用它,則Flink集群管理器將執行你的main方法,getExecutionEnvironment()將返回一個執行環境,用於在集群上執行你的程式。

  對於指定數據源,執行環境有幾種方法可以使用各種方法從文件中讀取:你可以逐行讀取它們,CSV文件或使用完全自定義數據輸入格式。 要將文本文件作為一系列行讀取,你可以使用:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = env.readTextFile("file:///path/to/file");
這將為你提供一個DataStream,然後你可以在其上應用轉換來創建新的派生DataStream。 你可以通過使用轉換函數調用DataStream上的方法來應用轉換。 例如,map轉換如下所示:
DataStream<String> input = ...;

DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {
    @Override
    public Integer map(String value) {
        return Integer.parseInt(value);
    }
});

這將通過將原始集合中的每個String轉換為Integer來創建新的DataStream。

一旦有了包含最終結果的DataStream,就可以通過創建接收器(sink)將其寫入外部系統。 這些只是創建接收器的一些示例方法:
writeAsText(String path)

print()

一旦指定了完整的程式,就需要通過調用StreamExecutionEnvironment上的execute()來觸發程式執行。 根據ExecutionEnvironment的類型,將在本地電腦上觸發執行或提交程式以在群集上執行。

execute()方法返回一個JobExecutionResult,它包含執行時間和累加器結果。

Lazy Evaluation 惰性求值

所有Flink程式都是惰性執行:當執行程式的main方法時,數據載入和轉換不會直接發生。 而是創建每個操作並將其添加到程式的計劃中。 當執行環境上的execute()調用顯式觸發執行時,實際執行操作。 程式是在本地執行還是在集群上執行取決於執行環境的類型。 惰性求值使你可以構建Flink作為一個整體計劃單元執行的複雜程式。

Specifying Keys 指定鍵

一些轉換(join,coGroup,keyBy,groupBy)要求在元素集合上定義鍵。 其他轉換(Reduce,GroupReduce,Aggregate,Windows)允許數據在應用之前在鍵上分組。 DataSet分組:
DataSet<...> input = // [...]
DataSet<...> reduced = input
  .groupBy(/*define key here*/)
  .reduceGroup(/*do something*/);

DataStream設置鍵:

DataStream<...> input = // [...]
DataStream<...> windowed = input
  .keyBy(/*define key here*/)
  .window(/*window specification*/);

Flink的數據模型不基於鍵值對。 因此,你無需將數據集類型物理打包到鍵和值中。 鍵是“虛擬的”:它們被定義為實際數據上的函數,以指導分組運算符。

Define keys for Tuples 定義元組的鍵

最簡單的情況是在元組的一個或多個欄位上對元組進行分組:
DataStream<Tuple3<Integer,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)

元組在第一個欄位(整數類型)上分組。 

DataStream<Tuple3<Integer,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0,1)

在這裡,我們將元組分組在由第一個和第二個欄位組成的複合鍵上。

  註意嵌套元組:如果你有一個帶有嵌套元組的DataStream,例如:
DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;
指定keyBy(0)將使系統使用完整的Tuple2作為鍵(以Integer和Float為鍵)。 如果要“導航”到嵌套的Tuple2中,則必須使用下麵解釋的欄位表達式鍵。

Define keys using Field Expressions 使用欄位表達式定義鍵

你可以使用基於字元串的欄位表達式來引用嵌套欄位,並定義用於grouping, sorting, joining或coGrouping的鍵。 欄位表達式可以非常輕鬆地選擇(嵌套)複合類型中的欄位,例如Tuple和POJO類型。   在下麵的示例中,我們有一個WC POJO,其中包含兩個欄位“word”和“count”。 要按欄位分組,我們只需將其名稱傳遞給keyBy()函數。
// some ordinary POJO (Plain old Java Object)
public class WC {
  public String word;
  public int count;
}
DataStream<WC> words = // [...]
DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);

欄位表達式語法:

  • 按欄位名稱選擇POJO欄位。 例如,“user”指的是POJO類型的“user”欄位。
  • 按欄位名稱或0偏移欄位索引選擇元組欄位。 例如,“f0”和“5”分別表示Java元組類型的第一和第六欄位。
  • 你可以在POJO和Tuples中選擇嵌套欄位。 例如,“user.zip”指的是POJO的“zip”欄位,其存儲在POJO類型的“user”欄位中。 支持任意嵌套和混合POJO和元組,例如“f1.user.zip”或“user.f3.1.zip”。
  • 你可以使用“*”通配符表達式選擇完整類型。 這也適用於非Tuple或POJO類型的類型。
  欄位表達式樣例:
public static class WC {
  public ComplexNestedClass complex; //nested POJO
  private int count;
  // getter / setter for private field (count)
  public int getCount() {
    return count;
  }
  public void setCount(int c) {
    this.count = c;
  }
}
public static class ComplexNestedClass {
  public Integer someNumber;
  public float someFloat;
  public Tuple3<Long, Long, String> word;
  public IntWritable hadoopCitizen;
}

這些是上面示例代碼的有效欄位表達式:

  • “count”:WC類中的count欄位。
  • “complex”:遞歸選擇POJO類型ComplexNestedClass的欄位複合體的所有欄位。
  • “complex.word.f2”:選擇嵌套Tuple3的最後一個欄位。
  • “complex.hadoopCitizen”:選擇Hadoop IntWritable類型。

Define keys using Key Selector Functions 使用鍵選擇器函數定義鍵

定義鍵的另一種方法是“鍵選擇器”函數。 鍵選擇器函數將單個元素作為輸入並返回元素的鍵。 鍵可以是任何類型,並且可以從確定性計算中輸出。 以下示例顯示了一個鍵選擇器函數,它只返回一個對象的欄位:
// some ordinary POJO
public class WC {public String word; public int count;}
DataStream<WC> words = // [...]
KeyedStream<WC> keyed = words
  .keyBy(new KeySelector<WC, String>() {
     public String getKey(WC wc) { return wc.word; }
   });

Specifying Transformation Functions 指定轉換函數

大多數轉換都需要用戶定義的函數。 本節列出瞭如何指定它們的不同方法。

Implementing an interface 實現介面

最基本的方法是實現一個提供的介面:
class MyMapFunction implements MapFunction<String, Integer> {
  public Integer map(String value) { return Integer.parseInt(value); }
};
data.map(new MyMapFunction());
Anonymous classes 匿名類
你可以將函數作為匿名類傳遞:
data.map(new MapFunction<String, Integer> () {
  public Integer map(String value) { return Integer.parseInt(value); }
}); 

Java 8 Lambdas 表達式

Flink還支持Java API中的Java 8 Lambdas。
data.filter(s -> s.startsWith("http://"));

data.reduce((i1,i2) -> i1 + i2);

Rich functions 富函數

所有需要用戶定義函數的轉換都可以將富函數作為參數。 例如,替換
class MyMapFunction implements MapFunction<String, Integer> {
  public Integer map(String value) { return Integer.parseInt(value); }
};
  你可以寫
class MyMapFunction extends RichMapFunction<String, Integer> {
  public Integer map(String value) { return Integer.parseInt(value); }
};
並像往常一樣將函數傳遞給map轉換:
data.map(new MyMapFunction());
富函數也可以定義為匿名類:
data.map (new RichMapFunction<String, Integer>() {
  public Integer map(String value) { return Integer.parseInt(value); }
});
除了用戶定義的函數(map,reduce等)之外,Rich函數還提供了四種方法:open,close,getRuntimeContext和setRuntimeContext。 這些用於參數化函數,創建和完成本地狀態,訪問廣播變數以及訪問運行時信息(如累加器和計數器)以及迭代信息。  

Supported Data Types 支持的數據類型

Flink對DataSet或DataStream中可以包含的元素類型設置了一些限制。原因是系統分析類型以確定有效的執行策略。 有六種不同類別的數據類型:
  1. 元組(Java Tuples and Scala Case Classes)
  2. Java普通對象(Java POJOs)
  3. 基本類型(Primitive Types)
  4. 常規類(Regular Classes)
  5. 值類型(Values)
  6. Hadoop可寫介面的實現(Hadoop Writables)
  7. 特殊類型(Special Types) 

Tuples and Case Classes 元組

元組是包含固定數量的具有各種類型的欄位的複合類型。 Java API提供從Tuple1到Tuple25的類。 元組的每個欄位都可以是包含更多元組的任意Flink類型,從而產生嵌套元組。 可以使用欄位名稱tuple.f4直接訪問元組的欄位,也可以使用通用getter方法tuple.getField(int position)。 欄位索引從0開始。請註意,這與Scala元組形成對比,但它與Java的一般索引更為一致。
DataStream<Tuple2<String, Integer>> wordCounts = env.fromElements(
    new Tuple2<String, Integer>("hello", 1),
    new Tuple2<String, Integer>("world", 2));

wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() {
    @Override
    public Integer map(Tuple2<String, Integer> value) throws Exception {
        return value.f1;
    }
});

wordCounts.keyBy(0); // also valid .keyBy("f0")

POJO Java普通對象

如果滿足以下要求,則Flink將Java和Scala類視為特殊的POJO數據類型:
  • 類必須是公共的。
  • 它必須有一個沒有參數的公共構造函數(預設構造函數)。
  • 所有欄位都是公共的,或者必須通過getter和setter函數訪問。 對於名為foo的欄位,getter和setter方法必須命名為getFoo()和setFoo()。
  • Flink必須支持欄位的類型。 目前,Flink使用Avro序列化任意對象(例如Date)。
Flink分析POJO類型的結構,即它瞭解POJO的欄位。 因此,POJO類型比一般類型更容易使用。 此外,Flink可以比一般類型更有效地處理POJO。 以下示例顯示了一個包含兩個公共欄位的簡單POJO。
public class WordWithCount {

    public String word;
    public int count;

    public WordWithCount() {}

    public WordWithCount(String word, int count) {
        this.word = word;
        this.count = count;
    }
}

DataStream<WordWithCount> wordCounts = env.fromElements(
    new WordWithCount("hello", 1),
    new WordWithCount("world", 2));

wordCounts.keyBy("word"); // key by field expression "word"

Primitive Types 基本類型

Flink支持所有Java和Scala基本類型,如Integer,String和Double。  

General Class Types 常規類類型

Flink支持大多數Java和Scala類(API和自定義)。 限制適用於包含無法序列化的欄位的類,如文件指針,I/O流或其他本機資源。 遵循Java Beans約定的類通常可以很好地工作。   所有未標識為POJO類型的類(請參閱上面的POJO要求)都由Flink作為常規類類型處理。 Flink將這些數據類型視為黑盒子,並且無法訪問其內容(即,用於高效排序)。 使用序列化框架Kryo對常規類型進行序列化和反序列化。  

Values 值類型

值類型需手動描述其序列化和反序列化。它們不是通過通用序列化框架,而是通過使用讀取和寫入方法實現org.apache.flinktypes.Value介面來為這些操作提供自定義代碼。當通用序列化效率非常低時,使用值類型是合理的。一個示例是將元素的稀疏向量實現為數組的數據類型。知道數組大部分為零,可以對非零元素使用特殊編碼,而通用序列化只需編寫所有數組元素。   org.apache.flinktypes.CopyableValue介面以類似的方式支持手動內部克隆邏輯。   Flink帶有與基本數據類型對應的預定義值類型。 (ByteValue,ShortValue,IntValue,LongValue,FloatValue,DoubleValue,StringValue,CharValue,BooleanValue)。這些值類型充當基本數據類型的可變變體:它們的值可以被更改,允許程式員重用對象並降低垃圾收集器壓力。  

Hadoop Writables Hadoop可寫介面的實現

你可以使用實現org.apache.hadoop.Writable介面的類型。 write()和readFields()方法中定義的序列化邏輯將用於序列化。  

Special Types 特殊類型

你可以使用特殊類型,包括Scala的Either,Option和Try。 Java API有自己的自定義Either實現。 與Scala的Either類似,它代表兩種可能類型的值,左或右。 兩者都可用於錯誤處理或需要輸出兩種不同類型記錄的運算符。  

Type Erasure & Type Inference 類型擦除和類型推斷

註意:本節僅適用於Java。   Java編譯器在編譯後拋棄了大部分泛型類型信息。這在Java中稱為類型擦除。這意味著在運行時,對象的實例不再知道其泛型類型。例如,DataStream <String>和DataStream <Long>的實例在JVM看來是一樣的。   Flink在準備執行程式時(當調用程式的main方法時)需要類型信息。 Flink Java API嘗試重建以各種方式丟棄的類型信息,並將其顯式存儲在數據集和運算符中。你可以通過DataStream.getType()檢索類型。該方法返回TypeInformation的一個實例,這是Flink表示類型的內部方式。   類型推斷有其局限性,在某些情況下需要程式員的“合作”。這方面的示例是從集合創建數據集的方法,例如ExecutionEnvironment.fromCollection(),你可以在其中傳遞描述類型的參數。但是像MapFunction<I,O>這樣的通用函數也可能需要額外的類型信息。   ResultTypeQueryable介面可以通過輸入格式和函數實現,以明確告知API其返回類型。調用函數的輸入類型通常可以通過先前操作的結果類型來推斷。  

Accumulators & Counters 累加器和計數器

累加器是具有增加操作(add operation)和最終累積結果(final accumulated result)的簡單構造,可在作業結束後使用。   最直接的累加器是一個計數器(counter):你可以使用Accumulator.add(V value)方法遞增它。 在工作結束時,Flink將彙總(合併)所有部分結果並將結果發送給客戶。 在調試過程中,或者如果你想快速瞭解有關數據的更多信息,累加器非常有用。   Flink目前有以下內置累加器。 它們中的每一個都實現了Accumulator介面。
  • IntCounter,LongCounter和DoubleCounter:請參閱下麵的使用計數器的示例。
  • 直方圖(Histogram):離散數量的區間的直方圖實現。 在內部,它只是一個從Integer到Integer的映射。 你可以使用它來計算值的分佈,例如 字數統計程式的每行字數分佈。

How to use accumulators: 如何使用累加器:

首先,你必須在要使用它的用戶定義轉換函數中創建累加器對象(此處為計數器)。
private IntCounter numLines = new IntCounter();
其次,你必須註冊累加器對象,通常在富函數的open()方法中。 在這裡你還可以定義名稱。
getRuntimeContext().addAccumulator("num-lines", this.numLines);
你現在可以在運算符函數中的任何位置使用累加器,包括open()和close()方法。
this.numLines.add(1);
整個結果將存儲在JobExecutionResult對象中,該對象是從執行環境的execute()方法返回的(當前這僅在執行等待作業完成時才有效)。
myJobExecutionResult.getAccumulatorResult("num-lines")
每個作業的所有累加器共用一個命名空間。 因此,你可以在作業的不同運算符函數中使用相同的累加器。 Flink將在內部合併所有具有相同名稱的累加器。   關於累加器和迭代的說明:目前累加器的結果僅在整個作業結束後才可用。 我們還計劃在下一次迭代中使前一次迭代的結果可用。 你可以使用聚合器來計算每次迭代統計信息,並根據此類統計信息確定迭代的終止。  

Custom accumulators: 定製累加器:

要實現自己的累加器,只需編寫Accumulator介面的實現即可。 如果你認為你的自定義累加器應該合入Flink主幹,請隨意創建拉取請求(pull request)。   你可以選擇實現Accumulator或SimpleAccumulator。   Accumulator<V, R>最靈活:它為要添加的值定義類型V,為最終結果定義結果類型R. 例如。 對於直方圖,V是數字,R是直方圖。 SimpleAccumulator適用於兩種類型相同的情況,例如: 對於計數器。
您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 一.存儲基礎知識 從工作原理區分: 機械 HDD 固態 SSD SSD的優勢: 從磁碟尺寸區分: 3.5 2.5 1.8 從插拔方式區分: 熱插拔 非熱插拔 從硬碟主要介面區分: IDE —— SATA I/II/II 個人電腦 SCSI —— SAS 伺服器 FC PCIE 從存儲連接方式區分: ...
  • 1. 查詢k8s集群部署pod的基本情況 如下圖,我們可知容器coredns和dnsutils都部署成功,但是由於功能變數名稱解析的問題,導致coredns和dnsutils的容器不斷重啟(原因heath檢查,無法請求成功,被kubelet重啟了pod) 命令如下: root >> kubectl get ...
  • vim多視窗操作 垂直方式打開多個視窗 vim filename1 filename2 -O :vsp filename 或者 :vs filename 視窗切換 ctrl + ww vim文件切換 在多個文件之間切換 ctrl + i #切換到下一個文件或同一個文件中的下一個索引 ctrl + o ...
  • 一、廢話兩句 在雲數據中心,一次幾十臺甚至幾百台伺服器上線,系統安裝將變得非常繁瑣,系統安裝好了後還會涉及很多配置,如果一臺台來安裝的話工作量非常大。(雖然有加班費,開個玩笑)為瞭解決這個問題,我們需要實現無人值守批量部署系統。 簡單看一下拓撲圖: 1. 什麼是PXE? 簡單來說:PXE主要是引導作 ...
  • 第一章Linux命令行簡介 1.1 Linux命令行概述 1.1.1 Linux 命令行的開啟和退出 開啟:登陸賬號密碼進入系統 退出:exit/logout 快捷鍵:Ctrl+d 1.1.2 Linux命令行提示符介紹 (1)提示符由PS1環境變數控制。實例代碼如下: [root@centos10 ...
  • 今天我們學習關於NTFS管理數據 以下是學習的內容NTFS分區和FAT32分區的區別,如何將FAT32分區轉化成NTFS分區,FAT 32 不支持大於4G ,NTFS許可權設置 ,EFS加密 ,文件夾的NTFS許可權 許可權累加, 查看對象的所有者,獲得對象的所有權,重置文件夾中所有對象的許可權,利用EFS ...
  • 把對應的不同文件內的代碼段,合併到一起,成為最後的可執行文件 鏈接的方式,讓我們在寫代碼的時候做到了“復用”。 同樣的功能代碼只要寫一次,然後提供給很多不同的程式進行鏈接就行了。 “鏈接”其實有點兒像我們日常生活中的 標準化、模塊化 生產。 有一個可以生產標準螺帽的生產線,就可生產很多不同的螺帽。 ...
  • 一、索引創建 1. 非結構化創建 2. 結構化創建 二、插入 1. 指定文檔ID插入 2. 自動產生文檔ID插入 三、修改 1. 直接修改文檔 2. 腳本修改文檔 四、刪除 1. 刪除文檔 2. 刪除索引 五、查詢 1. 簡單查詢 2. 條件查詢 3. 聚合查詢 ...
一周排行
    -Advertisement-
    Play Games
  • GoF之工廠模式 @目錄GoF之工廠模式每博一文案1. 簡單說明“23種設計模式”1.2 介紹工廠模式的三種形態1.3 簡單工廠模式(靜態工廠模式)1.3.1 簡單工廠模式的優缺點:1.4 工廠方法模式1.4.1 工廠方法模式的優缺點:1.5 抽象工廠模式1.6 抽象工廠模式的優缺點:2. 總結:3 ...
  • 新改進提供的Taurus Rpc 功能,可以簡化微服務間的調用,同時可以不用再手動輸出模塊名稱,或調用路徑,包括負載均衡,這一切,由框架實現並提供了。新的Taurus Rpc 功能,將使得服務間的調用,更加輕鬆、簡約、高效。 ...
  • 本章將和大家分享ES的數據同步方案和ES集群相關知識。廢話不多說,下麵我們直接進入主題。 一、ES數據同步 1、數據同步問題 Elasticsearch中的酒店數據來自於mysql資料庫,因此mysql數據發生改變時,Elasticsearch也必須跟著改變,這個就是Elasticsearch與my ...
  • 引言 在我們之前的文章中介紹過使用Bogus生成模擬測試數據,今天來講解一下功能更加強大自動生成測試數據的工具的庫"AutoFixture"。 什麼是AutoFixture? AutoFixture 是一個針對 .NET 的開源庫,旨在最大程度地減少單元測試中的“安排(Arrange)”階段,以提高 ...
  • 經過前面幾個部分學習,相信學過的同學已經能夠掌握 .NET Emit 這種中間語言,並能使得它來編寫一些應用,以提高程式的性能。隨著 IL 指令篇的結束,本系列也已經接近尾聲,在這接近結束的最後,會提供幾個可供直接使用的示例,以供大伙分析或使用在項目中。 ...
  • 當從不同來源導入Excel數據時,可能存在重覆的記錄。為了確保數據的準確性,通常需要刪除這些重覆的行。手動查找並刪除可能會非常耗費時間,而通過編程腳本則可以實現在短時間內處理大量數據。本文將提供一個使用C# 快速查找並刪除Excel重覆項的免費解決方案。 以下是實現步驟: 1. 首先安裝免費.NET ...
  • C++ 異常處理 C++ 異常處理機制允許程式在運行時處理錯誤或意外情況。它提供了捕獲和處理錯誤的一種結構化方式,使程式更加健壯和可靠。 異常處理的基本概念: 異常: 程式在運行時發生的錯誤或意外情況。 拋出異常: 使用 throw 關鍵字將異常傳遞給調用堆棧。 捕獲異常: 使用 try-catch ...
  • 優秀且經驗豐富的Java開發人員的特征之一是對API的廣泛瞭解,包括JDK和第三方庫。 我花了很多時間來學習API,尤其是在閱讀了Effective Java 3rd Edition之後 ,Joshua Bloch建議在Java 3rd Edition中使用現有的API進行開發,而不是為常見的東西編 ...
  • 框架 · 使用laravel框架,原因:tp的框架路由和orm沒有laravel好用 · 使用強制路由,方便介面多時,分多版本,分文件夾等操作 介面 · 介面開發註意欄位類型,欄位是int,查詢成功失敗都要返回int(對接java等強類型語言方便) · 查詢介面用GET、其他用POST 代碼 · 所 ...
  • 正文 下午找企業的人去鎮上做貸後。 車上聽同事跟那個司機對罵,火星子都快出來了。司機跟那同事更熟一些,連我在內一共就三個人,同事那一手指桑罵槐給我都聽愣了。司機也是老社會人了,馬上聽出來了,為那個無辜的企業經辦人辯護,實際上是為自己辯護。 “這個事情你不能怪企業。”“但他們總不能讓銀行的人全權負責, ...