Flink 從 0 到 1 學習 —— Flink Data transformation(轉換)

来源:https://www.cnblogs.com/zhisheng/archive/2019/11/04/11795072.html
-Advertisement-
Play Games

toc: true title: Flink 從 0 到 1 學習 —— Flink Data transformation(轉換) date: 2018 11 04 tags: Flink 大數據 流式計算 前言 在第一篇介紹 Flink 的文章 "《《從0到1學習Flink》—— Apache ...



toc: true
title: Flink 從 0 到 1 學習 —— Flink Data transformation(轉換)
date: 2018-11-04
tags:

  • Flink
  • 大數據
  • 流式計算

前言

在第一篇介紹 Flink 的文章 《《從0到1學習Flink》—— Apache Flink 介紹》 中就說過 Flink 程式的結構

Flink 應用程式結構就是如上圖所示:

1、Source: 數據源,Flink 在流處理和批處理上的 source 大概有 4 類:基於本地集合的 source、基於文件的 source、基於網路套接字的 source、自定義的 source。自定義的 source 常見的有 Apache kafka、Amazon Kinesis Streams、RabbitMQ、Twitter Streaming API、Apache NiFi 等,當然你也可以定義自己的 source。

2、Transformation:數據轉換的各種操作,有 Map / FlatMap / Filter / KeyBy / Reduce / Fold / Aggregations / Window / WindowAll / Union / Window join / Split / Select / Project 等,操作很多,可以將數據轉換計算成你想要的數據。

3、Sink:接收器,Flink 將轉換計算後的數據發送的地點 ,你可能需要存儲下來,Flink 常見的 Sink 大概有如下幾類:寫入文件、列印出來、寫入 socket 、自定義的 sink 。自定義的 sink 常見的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等,同理你也可以定義自己的 Sink。

在上四篇文章介紹了 Source 和 Sink:

1、《從0到1學習Flink》—— Data Source 介紹

2、《從0到1學習Flink》—— 如何自定義 Data Source ?

3、《從0到1學習Flink》—— Data Sink 介紹

4、《從0到1學習Flink》—— 如何自定義 Data Sink ?

那麼這篇文章我們就來看下 Flink Data Transformation 吧,數據轉換操作還是蠻多的,需要好好講講!

Transformation

Map

這是最簡單的轉換之一,其中輸入是一個數據流,輸出的也是一個數據流:

還是拿上一篇文章的案例來將數據進行 map 轉換操作:

SingleOutputStreamOperator<Student> map = student.map(new MapFunction<Student, Student>() {
    @Override
    public Student map(Student value) throws Exception {
        Student s1 = new Student();
        s1.id = value.id;
        s1.name = value.name;
        s1.password = value.password;
        s1.age = value.age + 5;
        return s1;
    }
});
map.print();

將每個人的年齡都增加 5 歲,其他不變。

FlatMap

FlatMap 採用一條記錄並輸出零個,一個或多個記錄。

SingleOutputStreamOperator<Student> flatMap = student.flatMap(new FlatMapFunction<Student, Student>() {
    @Override
    public void flatMap(Student value, Collector<Student> out) throws Exception {
        if (value.id % 2 == 0) {
            out.collect(value);
        }
    }
});
flatMap.print();

這裡將 id 為偶數的聚集出來。

Filter

Filter 函數根據條件判斷出結果。

SingleOutputStreamOperator<Student> filter = student.filter(new FilterFunction<Student>() {
    @Override
    public boolean filter(Student value) throws Exception {
        if (value.id > 95) {
            return true;
        }
        return false;
    }
});
filter.print();

這裡將 id 大於 95 的過濾出來,然後列印出來。

KeyBy

KeyBy 在邏輯上是基於 key 對流進行分區。在內部,它使用 hash 函數對流進行分區。它返回 KeyedDataStream 數據流。

KeyedStream<Student, Integer> keyBy = student.keyBy(new KeySelector<Student, Integer>() {
    @Override
    public Integer getKey(Student value) throws Exception {
        return value.age;
    }
});
keyBy.print();

上面對 student 的 age 做 KeyBy 操作分區

Reduce

Reduce 返回單個的結果值,並且 reduce 操作每處理一個元素總是創建一個新值。常用的方法有 average, sum, min, max, count,使用 reduce 方法都可實現。

SingleOutputStreamOperator<Student> reduce = student.keyBy(new KeySelector<Student, Integer>() {
    @Override
    public Integer getKey(Student value) throws Exception {
        return value.age;
    }
}).reduce(new ReduceFunction<Student>() {
    @Override
    public Student reduce(Student value1, Student value2) throws Exception {
        Student student1 = new Student();
        student1.name = value1.name + value2.name;
        student1.id = (value1.id + value2.id) / 2;
        student1.password = value1.password + value2.password;
        student1.age = (value1.age + value2.age) / 2;
        return student1;
    }
});
reduce.print();

上面先將數據流進行 keyby 操作,因為執行 reduce 操作只能是 KeyedStream,然後將 student 對象的 age 做了一個求平均值的操作。

Fold

Fold 通過將最後一個文件夾流與當前記錄組合來推出 KeyedStream。 它會發回數據流。

KeyedStream.fold("1", new FoldFunction<Integer, String>() {
    @Override
    public String fold(String accumulator, Integer value) throws Exception {
        return accumulator + "=" + value;
    }
})

Aggregations

DataStream API 支持各種聚合,例如 min,max,sum 等。 這些函數可以應用於 KeyedStream 以獲得 Aggregations 聚合。

KeyedStream.sum(0) 
KeyedStream.sum("key") 
KeyedStream.min(0) 
KeyedStream.min("key") 
KeyedStream.max(0) 
KeyedStream.max("key") 
KeyedStream.minBy(0) 
KeyedStream.minBy("key") 
KeyedStream.maxBy(0) 
KeyedStream.maxBy("key")

max 和 maxBy 之間的區別在於 max 返迴流中的最大值,但 maxBy 返回具有最大值的鍵, min 和 minBy 同理。

Window

Window 函數允許按時間或其他條件對現有 KeyedStream 進行分組。 以下是以 10 秒的時間視窗聚合:

inputStream.keyBy(0).window(Time.seconds(10));

Flink 定義數據片段以便(可能)處理無限數據流。 這些切片稱為視窗。 此切片有助於通過應用轉換處理數據塊。 要對流進行視窗化,我們需要分配一個可以進行分發的鍵和一個描述要對視窗化流執行哪些轉換的函數

要將流切片到視窗,我們可以使用 Flink 自帶的視窗分配器。 我們有選項,如 tumbling windows, sliding windows, global 和 session windows。 Flink 還允許您通過擴展 WindowAssginer 類來編寫自定義視窗分配器。 這裡先預留下篇文章來講解這些不同的 windows 是如何工作的。

WindowAll

windowAll 函數允許對常規數據流進行分組。 通常,這是非並行數據轉換,因為它在非分區數據流上運行。

與常規數據流功能類似,我們也有視窗數據流功能。 唯一的區別是它們處理視窗數據流。 所以視窗縮小就像 Reduce 函數一樣,Window fold 就像 Fold 函數一樣,並且還有聚合。

inputStream.keyBy(0).windowAll(Time.seconds(10));

Union

Union 函數將兩個或多個數據流結合在一起。 這樣就可以並行地組合數據流。 如果我們將一個流與自身組合,那麼它會輸出每個記錄兩次。

inputStream.union(inputStream1, inputStream2, ...);

Window join

我們可以通過一些 key 將同一個 window 的兩個數據流 join 起來。

inputStream.join(inputStream1)
           .where(0).equalTo(1)
           .window(Time.seconds(5))     
           .apply (new JoinFunction () {...});

以上示例是在 5 秒的視窗中連接兩個流,其中第一個流的第一個屬性的連接條件等於另一個流的第二個屬性。

Split

此功能根據條件將流拆分為兩個或多個流。 當您獲得混合流並且您可能希望單獨處理每個數據流時,可以使用此方法。

SplitStream<Integer> split = inputStream.split(new OutputSelector<Integer>() {
    @Override
    public Iterable<String> select(Integer value) {
        List<String> output = new ArrayList<String>(); 
        if (value % 2 == 0) {
            output.add("even");
        }
        else {
            output.add("odd");
        }
        return output;
    }
});

Select

此功能允許您從拆分流中選擇特定流。

SplitStream<Integer> split;
DataStream<Integer> even = split.select("even"); 
DataStream<Integer> odd = split.select("odd"); 
DataStream<Integer> all = split.select("even","odd");

Project

Project 函數允許您從事件流中選擇屬性子集,並僅將所選元素髮送到下一個處理流。

DataStream<Tuple4<Integer, Double, String, String>> in = // [...] 
DataStream<Tuple2<String, String>> out = in.project(3,2);

上述函數從給定記錄中選擇屬性號 2 和 3。 以下是示例輸入和輸出記錄:

(1,10.0,A,B)=> (B,A)
(2,20.0,C,D)=> (D,C)

最後

本文主要介紹了 Flink Data 的常用轉換方式:Map、FlatMap、Filter、KeyBy、Reduce、Fold、Aggregations、Window、WindowAll、Union、Window Join、Split、Select、Project 等。並用了點簡單的 demo 介紹瞭如何使用,具體在項目中該如何將數據流轉換成我們想要的格式,還需要根據實際情況對待。

關註我

轉載請務必註明原創地址為:http://www.54tianzhisheng.cn/2018/11/04/Flink-Data-transformation/

微信公眾號:zhisheng

另外我自己整理了些 Flink 的學習資料,目前已經全部放到微信公眾號(zhisheng)了,你可以回覆關鍵字:Flink 即可無條件獲取到。另外也可以加我微信 你可以加我的微信:yuanblog_tzs,探討技術!

更多私密資料請加入知識星球!

Github 代碼倉庫

https://github.com/zhisheng17/flink-learning/

以後這個項目的所有代碼都將放在這個倉庫里,包含了自己學習 flink 的一些 demo 和博客

博客

1、Flink 從0到1學習 —— Apache Flink 介紹

2、Flink 從0到1學習 —— Mac 上搭建 Flink 1.6.0 環境並構建運行簡單程式入門

3、Flink 從0到1學習 —— Flink 配置文件詳解

4、Flink 從0到1學習 —— Data Source 介紹

5、Flink 從0到1學習 —— 如何自定義 Data Source ?

6、Flink 從0到1學習 —— Data Sink 介紹

7、Flink 從0到1學習 —— 如何自定義 Data Sink ?

8、Flink 從0到1學習 —— Flink Data transformation(轉換)

9、Flink 從0到1學習 —— 介紹 Flink 中的 Stream Windows

10、Flink 從0到1學習 —— Flink 中的幾種 Time 詳解

11、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 ElasticSearch

12、Flink 從0到1學習 —— Flink 項目如何運行?

13、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 Kafka

14、Flink 從0到1學習 —— Flink JobManager 高可用性配置

15、Flink 從0到1學習 —— Flink parallelism 和 Slot 介紹

16、Flink 從0到1學習 —— Flink 讀取 Kafka 數據批量寫入到 MySQL

17、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 RabbitMQ

18、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 HBase

19、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 HDFS

20、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 Redis

21、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 Cassandra

22、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 Flume

23、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 InfluxDB

24、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 RocketMQ

25、Flink 從0到1學習 —— 你上傳的 jar 包藏到哪裡去了

26、Flink 從0到1學習 —— 你的 Flink job 日誌跑到哪裡去了

27、阿裡巴巴開源的 Blink 實時計算框架真香

28、Flink 從0到1學習 —— Flink 中如何管理配置?

29、Flink 從0到1學習—— Flink 不可以連續 Split(分流)?

30、Flink 從0到1學習—— 分享四本 Flink 國外的書和二十多篇 Paper 論文

31、Flink 架構、原理與部署測試

32、為什麼說流處理即未來?

33、OPPO 數據中台之基石:基於 Flink SQL 構建實時數據倉庫

34、流計算框架 Flink 與 Storm 的性能對比

35、Flink狀態管理和容錯機制介紹

36、Apache Flink 結合 Kafka 構建端到端的 Exactly-Once 處理

37、360深度實踐:Flink與Storm協議級對比

38、如何基於Flink+TensorFlow打造實時智能異常檢測平臺?只看這一篇就夠了

39、Apache Flink 1.9 重大特性提前解讀

40、Flink 全網最全資源(視頻、博客、PPT、入門、實戰、源碼解析、問答等持續更新)

41、Flink 靈魂兩百問,這誰頂得住?

42、Flink 從0到1學習 —— 如何使用 Side Output 來分流?

43、你公司到底需不需要引入實時計算引擎?

44、一文讓你徹底瞭解大數據實時計算引擎 Flink

源碼解析

1、Flink 源碼解析 —— 源碼編譯運行

2、Flink 源碼解析 —— 項目結構一覽

3、Flink 源碼解析—— local 模式啟動流程

4、Flink 源碼解析 —— standalone session 模式啟動流程

5、Flink 源碼解析 —— Standalone Session Cluster 啟動流程深度分析之 Job Manager 啟動

6、Flink 源碼解析 —— Standalone Session Cluster 啟動流程深度分析之 Task Manager 啟動

7、Flink 源碼解析 —— 分析 Batch WordCount 程式的執行過程

8、Flink 源碼解析 —— 分析 Streaming WordCount 程式的執行過程

9、Flink 源碼解析 —— 如何獲取 JobGraph?

10、Flink 源碼解析 —— 如何獲取 StreamGraph?

11、Flink 源碼解析 —— Flink JobManager 有什麼作用?

12、Flink 源碼解析 —— Flink TaskManager 有什麼作用?

13、Flink 源碼解析 —— JobManager 處理 SubmitJob 的過程

14、Flink 源碼解析 —— TaskManager 處理 SubmitJob 的過程

15、Flink 源碼解析 —— 深度解析 Flink Checkpoint 機制

16、Flink 源碼解析 —— 深度解析 Flink 序列化機制

17、Flink 源碼解析 —— 深度解析 Flink 是如何管理好記憶體的?

18、Flink Metrics 源碼解析 —— Flink-metrics-core

19、Flink Metrics 源碼解析 —— Flink-metrics-datadog

20、Flink Metrics 源碼解析 —— Flink-metrics-dropwizard

21、Flink Metrics 源碼解析 —— Flink-metrics-graphite

22、Flink Metrics 源碼解析 —— Flink-metrics-influxdb

23、Flink Metrics 源碼解析 —— Flink-metrics-jmx

24、Flink Metrics 源碼解析 —— Flink-metrics-slf4j

25、Flink Metrics 源碼解析 —— Flink-metrics-statsd

26、Flink Metrics 源碼解析 —— Flink-metrics-prometheus

26、Flink Annotations 源碼解析

27、Flink 源碼解析 —— 如何獲取 ExecutionGraph ?

28、大數據重磅炸彈——實時計算框架 Flink

29、Flink Checkpoint-輕量級分散式快照

30、Flink Clients 源碼解析
原文出處:zhisheng的博客,歡迎關註我的公眾號:zhisheng


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 本篇文章給大家帶來的內容是關於Laravel服務容器的綁定與解析,有一定的參考價值,有需要的朋友可以參考一下,希望對你有所幫助。 前言 老實說,第一次老大讓我看laravel框架手冊的那天早上,我是很絕望的,因為真的沒接觸過,對我這種渣渣來說,laravel的入門門檻確實有點高了,但還是得硬著頭皮看 ...
  • 開發環境: Windows操作系統 開發工具:MyEclipse/Eclipse + JDK+ Tomcat + MySQL 資料庫 項目簡介: 戶籍管理系統主體將圍繞戶籍信息,身份證服務管理等方面進行展開設計,系統分為前臺信息展示,後臺的數據處理兩大模塊。必須選擇非功能性需求與功能需求共同實施,提 ...
  • 本文源碼: "GitHub·點這裡" || "GitEE·點這裡" "01:項目技術選型簡介,架構圖解說明" "02:業務架構設計,系統分層管理" "03:資料庫選型,業務數據設計規劃" 04:中間件集成,公共服務管理 一、中間件簡介 中間件是基礎軟體的一類, 屬於復用性極高的軟體。處於操作系統軟體 ...
  • 重構是 一種對軟體進行修改的行為,但它並不改變軟體的功能特征,而是通過讓軟體程式更清晰,更簡潔和更條理來改進軟體的質量。代碼重構之於軟體,相當於結構修改之於散文。每次人們對如何對代碼進行重構的討論就像是討論如果對一篇文學作品進行修訂一樣無休無止。所有人都知道應該根據項目的自身情況來對代碼進行重構,而 ...
  • 1. springboot是對spring的缺點進行改善和優化,它的約定大於配置,開箱即用,沒有代碼生成,也不需要xml文件配置,可以修改屬性值來滿足需求 2. springboot的入門程式 在idea中創建springboot的項目 (1) 預設有個DemoApplication類,是sprin ...
  • (手機橫屏看源碼更方便) 註:java源碼分析部分如無特殊說明均基於 java8 版本。 註:線程池源碼部分如無特殊說明均指ThreadPoolExecutor類。 簡介 前面我們一起學習了線程池中普通任務的執行流程,但其實線程池中還有一種任務,叫作未來任務(future task),使用它您可以獲 ...
  • python 命令行參數解析常用到 argparse 包,但是 argparse 包對 bool 值的傳遞可能和你想不太一樣,在傳遞 bool 的時候,命令行中只要出現了就取 True,不管你設置了該參數取 True 還是 False,這個有點反人類,正確的做法是使用 action='store_t... ...
  • toc: true title: 滴滴實時計算發展之路及平臺架構實踐 date: 2019 08 25 tags: Flink 大數據 滴滴的核心業務是一個實時線上服務,因此具有豐富的實時數據和實時計算場景。本文將介紹滴滴實時計算發展之路以及平臺架構實踐。 <! more 實時計算演進 隨著滴滴業務 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...