DataStream API(一)

来源:https://www.cnblogs.com/pinewaves/archive/2020/07/16/13323860.html
-Advertisement-
Play Games

DataStream API(一) 在瞭解DataStream API之前我們先來瞭解一下Flink API的構成。Flink API是分層的。由最底層的Stateful Stream Process到最頂層的SQL分為四層。如下圖: DataStream API 顧名思義,就是DataStream ...


DataStream API(一)

在瞭解DataStream API之前我們先來瞭解一下Flink API的構成。Flink API是分層的。由最底層的Stateful Stream Process到最頂層的SQL分為四層。如下圖:

API-Level

DataStream API 顧名思義,就是DataStream類的API,DataStream表示Flink程式中的流式數據集合。它是一個包含重覆項的不可變數據集合,這些數據可以是有界的也可以是無界的,處理他們的API是相同的。

DataStream是不可變的,這意味著一旦它們被創建,就不能添加或刪除元素。也不能簡單地檢查內部的元素,而只能使用DataStream API(Transform)來處理它們。

Flink程式基本部分組成:

  1. 獲得執行環境(Environment),
  2. 載入/創建初始數據(Source),
  3. 指定此數據的轉換(Transform),
  4. 指定計算結果的存放位置(Sink),
  5. 觸發程式執行(Execut)

下麵我們一起來瞭解一下Flink DataStream的執行環境。

Environment

Flink的執行環境包括兩種,分別是StreamExecutionEnvironment和ExecutionEnvironment,他們分別對應StreamData和DataSet。StreamData是流式數據集,DataSet是批量數據集。

StreamExecutionEnvironment是所有Flink流式處理程式的基礎。它為我們提供了三種實例化的方法,分別是:

getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment(String host, int port, String... jarFiles)

createLocalEnvironment()

這個方法是獲取本地的執行環境。它有兩個重載,分別是:

//parallelism表示並行度
createLocalEnvironment(int parallelism)
createLocalEnvironment(int parallelism, Configuration configuration)

createRemoteEnvironment(String host, int port, String... jarFiles)

這個方法是獲取集群的執行環境。與createLocalEnvironment()類似,它也有兩個重載,分別是:

createRemoteEnvironment(String host, int port, int parallelism, String... jarFiles)
createRemoteEnvironment(String host, int port, Configuration clientConfig, String... jarFiles)

getExecutionEnvironment()

getExecutionEnvironment()可以自行判斷我們當前程式的執行環境併為我們返回與之相對應的實例。換句話說,通常情況下我們不需要自己判斷到底是使用createLocalEnvironment還是使用createRemoteEnvironment,一律用getExecutionEnvironment就OK了。

Environment實例

通過Environment實例我們可以做很多事情,比如:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//設置並行度
env.setParallelism(1);
//獲取數據源
//當然獲取數據源的方式有很多種,下麵會一一介紹
env.readTextFile("FilePath");
//執行程式
env.Execute();
//...

Data Sources

Data Source的核心組件包括三個,分別是Split、SourceReader、SplitEnumerator。

  • Split:Split是數據源的一部分,例如:文件的一部分或者一條日誌。Splits是Source並行讀取數據和分發作業的基礎。
  • SourceReader:SourceReader向SplitEnumerator請求Splits並處理請求到的Splits。SourceReader位於TaskManager中,這意味著它是並行運行的,同時,它可以產生並行的事件流/記錄流。
  • SplitEnumerator:SplitEnumerator負責管理Splits並且將他們發送給SourceReader。它是運行在JobManager上的單個實例。負責維護正在進行的分片的備份日誌並將這些分片均衡的分發到SourceReader中。

具體的過程可以參考以下圖片:

Source

下麵我們來介紹一下幾個常用的獲取數據源的方式。

從集合中讀取

ArrayList<String> strList = new ArrayList<String>();
strList.add("are");
strList.add("you");
strList.add("ok");
env.fromCollection(strList);

從文件中讀取

DataStreamSource<String> inputData = env.readTextFile("FilePath");

消費kafka中的數據

引入kafka依賴

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.11.0</version>
</dependency>

代碼示例

//kafka配置
Properties pro = new Properties();
pro.setProperty("bootstrap.servers", "localhost:9092");
pro.setProperty("group.id", "consumer-group");
pro.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
pro.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
pro.setProperty("auto.offset.reset", "latest");
//消費kafka數據
DataStreamSource inputStream = env.addSource(new FlinkKafkaConsumer("topic", new SimpleStringSchema(), pro));

當然,出了kafka之外Flink還為我們提供了ElasticSearch、HDFS、RabbitMQ、JDBC等作為數據源的介面。此處就不一一介紹了。

自定義數據源

其實深入研究之後我們會發現,FlinkKafkaConsumer其實是實現了一個SourceFunction介面。so,我們可以通過實現SourceFunction的方式來自定義我們自己的數據源。

有了這個功能我們可以很輕鬆的模擬真實的業務場景。畢竟,絕大多數的項目在開發階段並不會有真實的業務場景來提供數據源。

DataStreamSource inputStream = env.addSource(new MyDataSource());

private static class MyDataSource implements SourceFunction<Sensor> {
    private boolean running = true;

    public void run(SourceContext sourceContext) throws Exception {
        while(running){
            //讀取數據,可以從cvs文件...自定義數據源讀取數據
            Thread.sleep(100);
        }
    }
    public void cancel() {
        running = false;
    }
}

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 基本概念 傳統IO的種類 InputStream、OutputStream 基於位元組流操作的 IO Write、Reader基於字元流的IO File基於磁碟操作的IO Socket基於網路操作的IO 內核空間與用戶空間 內核負責網路與文件數據的讀寫 用戶程式通過系統調用獲得網路和文件的數據 內核態 ...
  • 一 瀏覽器緩存 1.1 緩存概述 緩存對於Web至關重要,尤其對於大型高負載Web站點。Nginx緩存可作為性能優化的一個重要手段,可以極大減輕後端伺服器的負載。通常對於靜態資源,即較少經常更新的資源,如圖片,css或js等進行緩存,從而在每次刷新瀏覽器的時候,不用重新請求,而是從緩存裡面讀取,這樣 ...
  • ###Windows驅動名列印 #include <stdio.h> #include <Psapi.h> #include <Shlwapi.h> #include <Windows.h> #pragma comment(lib, "psapi.lib") #pragma comment(lib, ...
  • 所謂部署就是把webapp的源文件放置於目標目錄(網頁程式文件存放目錄,類似httpd中的documentroot指定的目錄),然後配置tomcat伺服器能夠基於web.xml和context.xml文件中定義的路徑來訪問webapp;然後將其特有的類和依賴的類通過類載入器(class loade... ...
  • win中查看MD5值:certutil -hashfile 文件名 MD5查看 SHA1certutil -hashfile 文件名 SHA1查看SHA256certutil -hashfile 文件名 SHA256linux中 md5sum 文件名 ...
  • 大數據工具 kafka 學習 之前需要先瞭解隊列的相關知識 瞭解萬隊列就知道kafka的用處 之後再詳細瞭解kafka的具體知識和操作 ...
  • 本文更新於2019-08-18,使用MySQL 5.7,操作系統為Deepin 15.4。 一旦資料庫對象設計完畢並投入使用,再進行修改就比較麻煩。 優化表的數據類型 select_statement PROCEDURE ANALYSE([max_elements[, max_memory]]) 對 ...
  • 框架 Apache Hadoop:分散式處理架構,結合了 MapReduce(並行處理)、YARN(作業調度)和HDFS(分散式文件系統); Tigon:高吞吐量實時流處理框架。 分散式編程 AddThis Hydra :最初在AddThis上開發的分散式數據處理和存儲系統; AMPLab SIMR ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...