一、FlinkSql的概念 核心概念 Flink 的 Table API 和 SQL 是流批統一的 API。 這意味著 Table API & SQL 在無論有限的批式輸入還是無限的流式輸入下,都具有相同的語義。 因為傳統的關係代數以及 SQL 最開始都是為了批式處理而設計的, 關係型查詢在流式場景 ...
核心概念
Flink 的 Table API 和 SQL 是流批統一的 API。 這意味著 Table API & SQL 在無論有限的批式輸入還是無限的流式輸入下,都具有相同的語義。 因為傳統的關係代數以及 SQL 最開始都是為了批式處理而設計的, 關係型查詢在流式場景下不如在批式場景下容易理解.
動態表和連續查詢
動態表(Dynamic Tables) 是 Flink 的支持流數據的 Table API 和 SQL 的核心概念。
與表示批處理數據的靜態表不同,動態表是隨時間變化的。可以像查詢靜態批處理表一樣查詢它們。查詢動態表將生成一個連續查詢(Continuous Query)。一個連續查詢永遠不會終止,結果會生成一個動態表。查詢不斷更新其(動態)結果表,以反映其(動態)輸入表上的更改。
TableAPI
首先需要導入依賴
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.21</version>
</dependency>
/**
* 使用TableAPI的基本套路:
* 1.創建表的執行環境
* 2.創建表,將流轉換為動態表,表的欄位名從bean的屬性名自動抽取
* 3.對動態表進行查詢
* 4.把動態表轉換為流
*/
1.TableAPI 中將動態表轉換為流時有兩種方法
DataStream<Row> rowDataStream = tableEnvironment.toAppendStream(result, Row.class);
toAppendStream方法只能在查詢時使用,不能使用包含聚合函數等更新語句
DataStream<Tuple2<Boolean, Row>> tuple2DataStream = tableEnvironment.toRetractStream(select, Row.class);
toRetractStream則可以使用
2.上述兩種方法內傳入的參數Row.class,表示將表中查詢出的數據封裝為行類型,也就是對每行進行封裝,解決查詢出的數據列少於或者多於原表。如何能夠確保所查詢的數據與之前封裝的Bean有完全一致的結構則也可以封裝為原Bean.class
代碼實現:
package net.cyan.FlinkSql;
import net.cyan.POJO.WaterSensor;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.$;
/**
* 使用TableAPI的基本套路:
* 1.創建表的執行環境
* 2.創建表,將流轉換為動態表,表的欄位名從bean的屬性名自動抽取
* 3.對動態表進行查詢
* 4.把動態表轉換為流
*/
public class Demo1 {
public static void main(String[] args) {
Configuration configuration=new Configuration();
configuration.setInteger("rest.port",3333);
//創建執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.setParallelism(1);
//模擬數據
DataStreamSource<WaterSensor> WaterSensorSource = env.fromElements(
new WaterSensor("S1", 1000L, 10),
new WaterSensor("S1", 1000L, 10),
new WaterSensor("S2", 2000L, 20),
new WaterSensor("S3", 3000L, 30),
new WaterSensor("S4", 4000L, 40),
new WaterSensor("S5", 5000L, 50)
);
//創建表的執行環境
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
//創建表,將流轉換為動態表,表的欄位名從bean的屬性名自動抽取
Table table = tableEnvironment.fromDataStream(WaterSensorSource);
//對錶進行查詢
//1、過時的查詢書寫
Table result = table
.where("id='S1'")
.select("*");
//2、不過時的書寫
Table result1