Flink實戰(六) - Table API & SQL編程

来源:https://www.cnblogs.com/washabi/archive/2019/07/21/11223168.html
-Advertisement-
Play Games

1 意義 1.1 分層的 APIs & 抽象層次 Flink提供三層API。 每個API在簡潔性和表達性之間提供不同的權衡,並針對不同的用例。 而且Flink提供不同級別的抽象來開發流/批處理應用程式 最低級抽象只提供有狀態流。它通過Process Function嵌入到DataStream API ...


1 意義

1.1 分層的 APIs & 抽象層次

Flink提供三層API。 每個API在簡潔性和表達性之間提供不同的權衡,並針對不同的用例。

而且Flink提供不同級別的抽象來開發流/批處理應用程式

  • 最低級抽象只提供有狀態流。它通過Process Function嵌入到DataStream API中。它允許用戶自由處理來自一個或多個流的事件,並使用一致的容錯狀態。此外,用戶可以註冊事件時間和處理時間回調,允許程式實現複雜的計算。
  • 實際上,大多數應用程式不需要上述低級抽象,而是針對Core API編程, 如DataStream API(有界/無界流)和DataSet API (有界數據集)。這些流暢的API提供了用於數據處理的通用構建塊,例如各種形式的用戶指定的轉換,連接,聚合,視窗,狀態等。在這些API中處理的數據類型在相應的編程語言中表示為類。
    低級Process Function與DataStream API集成,因此只能對某些 運算元操作進行低級抽象。該數據集API提供的有限數據集的其他原語,如迴圈/迭代。
  • Table API 是為中心的聲明性DSL 表,其可被動態地改變的表(表示流時)。該 Table API遵循(擴展)關係模型:表有一個模式連接(類似於在關係資料庫中的表)和API提供可比的 運算元操作,如選擇,項目,連接,分組依據,聚合等 Table API程式以聲明方式定義應該執行的邏輯 運算元操作,而不是準確指定 運算元操作代碼的外觀。雖然 Table API可以通過各種類型的用戶定義函數進行擴展,但它的表現力不如Core API,但使用更簡潔(編寫的代碼更少)。此外, Table API程式還會通過優化程式,在執行之前應用優化規則。
    可以在表和DataStream / DataSet之間無縫轉換,允許程式混合 Table API以及DataStream 和DataSet API。
  • Flink提供的最高級抽象是SQL。這種抽象在語義和表達方面類似於 Table API,但是將程式表示為SQL查詢表達式。在SQL抽象與 Table API緊密地相互作用,和SQL查詢可以通過定義表來執行 Table API。1.2 模型類比MapReduce ==> Hive SQL
    Spark ==> Spark SQL
    Flink ==> SQL

2 總覽

2.1 簡介

Apache Flink具有兩個關係型API

  • Table API
  • SQL

用於統一流和批處理

Table API是Scala和Java語言集成查詢API,可以非常直觀的方式組合來自關係運算元的查詢(e.g. 選擇,過濾和連接).

Flink的SQL支持基於實現SQL標準的Apache Calcite。無論輸入是批輸入(DataSet)還是流輸入(DataStream),任一介面中指定的查詢都具有相同的語義並指定相同的結果。

Table API和SQL介面彼此緊密集成,就如Flink的DataStream和DataSet API。我們可以輕鬆地在基於API構建的所有API和庫之間切換。例如,可以使用CEP庫從DataStream中提取模式,然後使用 Table API分析模式,或者可以在預處理上運行Gelly圖演算法之前使用SQL查詢掃描,過濾和聚合批處理表數據。

Table API和SQL尚未完成並且正在積極開發中。並非 Table API,SQL和stream,batch輸入的每種組合都支持所有運算元操作

2.2 依賴結構

所有Table API和SQL組件都捆綁在flink-table Maven工件中。

以下依賴項與大多數項目相關:

  • flink-table-common
    通過自定義函數,格式等擴展表生態系統的通用模塊。
  • flink-table-api-java
    使用Java編程語言的純表程式的表和SQL API(在早期開發階段,不推薦!)。
  • flink-table-api-scala
    使用Scala編程語言的純表程式的表和SQL API(在早期開發階段,不推薦!)。
  • flink-table-api-java-bridge
    使用Java編程語言支持DataStream / DataSet API的Table&SQL API。
  • flink-table-api-scala-bridge
    使用Scala編程語言支持DataStream / DataSet API的Table&SQL API。
  • flink-table-planner
    表程式規劃器和運行時。
  • flink-table-uber
    將上述模塊打包成大多數Table&SQL API用例的發行版。 uber JAR文件flink-table * .jar位於Flink版本的/ opt目錄中,如果需要可以移動到/ lib。

2.3 項目依賴

必須將以下依賴項添加到項目中才能使用Table API和SQL來定義管道:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-planner_2.11</artifactId>
  <version>1.8.0</version>
</dependency>

此外,根據目標編程語言,您需要添加Java或Scala API。

<!-- Either... -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java-bridge_2.11</artifactId>
  <version>1.8.0</version>
</dependency>
<!-- or... -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
  <version>1.8.0</version>
</dependency>

在內部,表生態系統的一部分是在Scala中實現的。 因此,請確保為批處理和流應用程式添加以下依賴項:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-scala_2.11</artifactId>
  <version>1.8.0</version>
</dependency>

2.4 擴展依賴

如果要實現與Kafka或一組用戶定義函數交互的自定義格式,以下依賴關係就足夠了,可用於SQL客戶端的JAR文件:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-common</artifactId>
  <version>1.8.0</version>
</dependency>

目前,該模塊包括以下擴展點:

  • SerializationSchemaFactory
  • DeserializationSchemaFactory
  • ScalarFunction
  • TableFunction
  • AggregateFunction

3 概念和通用API

Table API和SQL集成在一個聯合API中。此API的核心概念是Table用作查詢的輸入和輸出。本文檔顯示了具有 Table API和SQL查詢的程式的常見結構,如何註冊Table,如何查詢Table以及如何發出Table。

3.1 Table API和SQL程式的結構

批處理和流式傳輸的所有 Table API和SQL程式都遵循相同的模式。以下代碼示例顯示了 Table API和SQL程式的常見結構。

// 對於批處理程式,使用ExecutionEnvironment而不是StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 創建一個TableEnvironment
// 對於批處理程式使用BatchTableEnvironment而不是StreamTableEnvironment
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// 註冊一個 Table
tableEnv.registerTable("table1", ...)            // 或者
tableEnv.registerTableSource("table2", ...);     // 或者
tableEnv.registerExternalCatalog("extCat", ...);
// 註冊一個輸出 Table
tableEnv.registerTableSink("outputTable", ...);

/ 從 Table API query 創建一個Table
Table tapiResult = tableEnv.scan("table1").select(...);
// 從 SQL query 創建一個Table
Table sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table2 ... ");

// 將表API結果表發送到TableSink,對於SQL結果也是如此
tapiResult.insertInto("outputTable");

// 執行
env.execute();

3.2 將DataStream或DataSet轉換為表

它也可以直接轉換為a 而不是註冊a DataStream或DataSetin 。如果要在 Table API查詢中使用Table,這很方便。TableEnvironmentTable

// 獲取StreamTableEnvironment
//在BatchTableEnvironment中註冊DataSet是等效的
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

DataStream<Tuple2<Long, String>> stream = ...

// 將DataStream轉換為預設欄位為“f0”,“f1”的表
Table table1 = tableEnv.fromDataStream(stream);

// 將DataStream轉換為包含欄位“myLong”,“myString”的表
Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");
  • sale.csv文件
  • Scala
  • Java

還不完善,等日後Flink該模塊開發完畢再深入研究!

參考

Table API & SQL


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 剛纔對數據進行批量更新時,收到一條錯誤信息:The JSON request was too large to be deserialized。 查找資料,原來json對象數量有限制,得需要在web.config時行配置參數: <appSettings> <add key="aspnet:MaxJs ...
  • 一、簡要介紹 ABP vNext 框架本身就是圍繞著 DDD 理念進行設計的,所以在 DDD 裡面我們能夠見到的實體、倉儲、值對象、領域服務,ABP vNext 框架都為我們進行了實現,這些基礎設施都存放在 Volo.Abp.Ddd.Domain 項目當中。 本篇文章將會側重於理論講解,但也只是一個 ...
  • Windows無法上網,提示[Windows無法連接到 System Event Notification Service服務] ...
  • 解決Deepin無法在root用戶啟動Google Chrome瀏覽器的問題,步驟如下。 前提:如何用root用戶登錄系統?編輯 vim /etc/lightdm/lightdm.conf , 找到並賦值 autologin-user=root,保存退出即可。 1.找到Chrome的路徑 2.編輯g ...
  • iptables -A INPUT -s 127.0.0.1 -d 127.0.0.1 -j ACCEPT #允許本地迴環介面(即運行本機訪問本機) iptables -A INPUT -m state --state ESTABLISHED,RELATED -j ACCEPT #允許已建立的或相關 ...
  • 如何高效獲取命令幫助信息 一、 內核版本號:主.次.修訂 系統中:用戶名UID(只認識,不認識名字) root:密碼 student: 3A認證: 認證機制authentication 密碼認證:符合複雜性 字元長度(至少7位) 不要使用易記 定期更改 重覆密碼的時間要長 授權機制authoriza ...
  • Linux系統說明 Linux( 誕生於1991.10.5) 繼承了Unix以網路為核心的設計思想, 是一個性能穩定的多用戶網路操作系統. Linux這個詞嚴格意義上只表示Linux內核, 但日常中, 習慣用Linux來形容整個基於Linux內核, 並使用GNU( 一個自由的操作系統) 工程各種工具 ...
  • Oracle表級約束和列級約束 1. 表級定義約束 指的是在定義完一個表所有列之後,再去定義所有相關的約束。 註意:not null 約束只能在列級上定義。 2. 列級定義約束 指的是在定義一個表的每一列的同時定義每一個列的約束條件,其約束條件 位於每一列之後。 約束:FOREIGN KEY,PRI ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...