本次需求場景主要為實現將flinksql中collect()函數輸出的Mutiset(VARCHAR<100>)多行結果轉換為字元串。 一、FlinkSQL自定義函數分類 Flink SQL 的自定義函數是用戶可以自行編寫的一種函數,用於擴展 Flink SQL 的功能。自定義函數可以在 SQL 查 ...
本次需求場景主要為實現將flinksql中collect()函數輸出的Mutiset(VARCHAR<100>)多行結果轉換為字元串。
一、FlinkSQL自定義函數分類
Flink SQL 的自定義函數是用戶可以自行編寫的一種函數,用於擴展 Flink SQL 的功能。自定義函數可以在 SQL 查詢中被調用,以完成用戶自定義的數據處理邏輯。 在 Flink SQL 中,自定義函數分為標量函數、表函數和聚合函數三種類型。
1、標量函數(Scalar Function)
標量函數接受一行輸入,返回一行輸出。常見的標量函數有字元串函數、數學函數等。用戶可以通過繼承 ScalarFunction 類或實現 ScalarFunction 介面的方式來實現自定義的標量函數。
2、表函數(Table Function)
表函數接受一行輸入,返回多行輸出。在 Flink SQL 中,表函數可以使用 LATERAL TABLE 語法進行調用。用戶可以通過繼承 TableFunction 類或實現 TableFunction 介面的方式來實現自定義的表函數。
3、聚合函數(Aggregate Function)
聚合函數接受多行輸入,返回一行輸出。在 Flink SQL 中,聚合函數可以使用 GROUP BY 語法進行調用。用戶可以通過繼承 AggregateFunction 類或實現 AggregateFunction 介面的方式來實現自定義的聚合函數。 在使用自定義函數時,需要將對應的 Jar 包提交到 Flink 集群中,併在執行任務時將其加入到 Classpath 中。Flink SQL 還提供了 CREATE FUNCTION 語句來註冊用戶自定義的函數,以便在 SQL 查詢中進行調用。 總的來說,自定義函數是 Flink SQL 中非常重要的一個功能,可以幫助用戶擴展 Flink SQL 的功能,提高數據處理的靈活性和效率。
上面的圖片展示了一個聚合的例子。假設你有一個關於飲料的表。表裡面有三個欄位,分別是 id、name、price,表裡有 5 行數據。假設你需要找到所有飲料里最貴的飲料的價格,即執行一個 max() 聚合。你需要遍歷所有 5 行數據,而結果就只有一個數值。
自定義聚合函數是通過擴展 AggregateFunction 來實現的。AggregateFunction 的工作過程如下。首先,它需要一個 accumulator,它是一個數據結構,存儲了聚合的中間結果。通過調用 AggregateFunction 的 createAccumulator() 方法創建一個空的 accumulator。接下來,對於每一行數據,會調用 accumulate() 方法來更新 accumulator。當所有的數據都處理完了之後,通過調用 getValue 方法來計算和返回最終的結果。
下麵幾個方法是每個 AggregateFunction 必須要實現的:
- createAccumulator()
- accumulate()
- getValue()
4、表值聚合函數
自定義表值聚合函數(UDTAGG)可以把一個表(一行或者多行,每行有一列或者多列)聚合成另一張表,結果中可以有多行多列。
上圖展示了一個表值聚合函數的例子。假設你有一個飲料的表,這個表有 3 列,分別是 id、name 和 price,一共有 5 行。假設你需要找到價格最高的兩個飲料,類似於 top2() 表值聚合函數。你需要遍歷所有 5 行數據,結果是有 2 行數據的一個表。
用戶自定義表值聚合函數是通過擴展 TableAggregateFunction 類來實現的。一個 TableAggregateFunction 的工作過程如下。首先,它需要一個 accumulator,這個 accumulator 負責存儲聚合的中間結果。 通過調用 TableAggregateFunction 的 createAccumulator 方法來構造一個空的 accumulator。接下來,對於每一行數據,會調用 accumulate 方法來更新 accumulator。當所有數據都處理完之後,調用 emitValue 方法來計算和返回最終的結果。
下麵幾個 TableAggregateFunction 的方法是必須要實現的:
- createAccumulator()
- accumulate()
5、非同步表值函數
非同步表值函數是非同步查詢外部數據系統的特殊函數。
二、需求場景
1、需求描述
基於Flink1.14.4集群,有一批基於某個主鍵生成的collect函數結果數據,需要轉換為字元串傳到下游Kafka。由於collect()函數生成的結果是一個多行的集合MutiSet<varchar(100)>,FlinkSQL中暫未支持concat_ws或者concat函數,因此無法將collect生成的多行結果直接通過現有SQL函數轉換為一行字元串。基於以上原因,需要開發一個自定義函數實現。
2、數據樣例
CREATE TABLE "air_data_source_result" (
"id" int NOT NULL DEFAULT '0' COMMENT '主鍵',
"airlineLogo" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"airlineShortCompany" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"arrActCross" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"arrActTime" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"arrAirport" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"arrCode" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"arrOntimeRate" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"arrPlanCross" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"arrPlanTime" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"arrTerminal" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"checkInTable" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"checkInTableWidth" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"depActCross" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"depActTime" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"depAirport" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"depCode" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"depPlanCross" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"depPlanTime" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"depTerminal" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"flightNo" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"flightState" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"localDate" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"mainFlightNo" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"shareFlag" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"stateColor" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
INSERT INTO `air_data`.`air_data_source_result` (`id`, `airlineLogo`, `airlineShortCompany`, `arrActCross`, `arrActTime`, `arrAirport`, `arrCode`, `arrOntimeRate`, `arrPlanCross`, `arrPlanTime`, `arrTerminal`, `checkInTable`, `checkInTableWidth`, `depActCross`, `depActTime`, `depAirport`, `depCode`, `depPlanCross`, `depPlanTime`, `depTerminal`, `flightNo`, `flightState`, `localDate`, `mainFlightNo`, `shareFlag`, `stateColor`) VALUES (1, 'https://cdn1.133.cn/ticket/airline/image_ca_cca.png', '中國國航', '', '11:11\n', '廣州白雲', 'CAN', '89.65%', '', '11:25', 'T1B', 'https://api.133.cn/third/textImg?code=FvOGTb%2Bgbgxpgw9zPNG2Qw==', '30', '', '08:15\n', '北京首都', 'PEK', '', '08:00', 'T3', 'CA1351', '到達', '2023-02-27', '', '0', '#4273FE');
INSERT INTO `air_data`.`air_data_source_result` (`id`, `airlineLogo`, `airlineShortCompany`, `arrActCross`, `arrActTime`, `arrAirport`, `arrCode`, `arrOntimeRate`, `arrPlanCross`, `arrPlanTime`, `arrTerminal`, `checkInTable`, `checkInTableWidth`, `depActCross`, `depActTime`, `depAirport`, `depCode`, `depPlanCross`, `depPlanTime`, `depTerminal`, `flightNo`, `flightState`, `localDate`, `mainFlightNo`, `shareFlag`, `stateColor`) VALUES (2, 'https://cdn1.133.cn/ticket/airline/image_zh_csz.png', '深圳航空', '', '11:11\n', '廣州白雲', 'CAN', '89.65%', '', '11:25', 'T1B', 'https://api.133.cn/third/textImg?code=FvOGTb%2Bgbgxpgw9zPNG2Qw==', '30', '', '08:15\n', '北京首都', 'PEK', '', '08:00', 'T3', 'ZH1351', '到達', '2023-02-27', 'CA1351', '1', '#4273FE');
INSERT INTO `air_data`.`air_data_source_result` (`id`, `airlineLogo`, `airlineShortCompany`, `arrActCross`, `arrActTime`, `arrAirport`, `arrCode`, `arrOntimeRate`, `arrPlanCross`, `arrPlanTime`, `arrTerminal`, `checkInTable`, `checkInTableWidth`, `depActCross`, `depActTime`, `depAirport`, `depCode`, `depPlanCross`, `depPlanTime`, `depTerminal`, `flightNo`, `flightState`, `localDate`, `mainFlightNo`, `shareFlag`, `stateColor`) VALUES (3, 'https://cdn1.133.cn/ticket/airline/image_hu_chh.png', '海南航空', '', '11:57\n', '廣州白雲', 'CAN', '75.86%', '', '11:50', 'T1B', 'https://api.133.cn/third/textImg?code=IfLOkkFeJagwbNuqYtoqNg==', '140', '', '08:51\n', '北京首都', 'PEK', '', '08:30', 'T2', 'HU7805', '到達', '2023-02-27', '', '0', '#4273FE');
INSERT INTO `air_data`.`air_data_source_result` (`id`, `airlineLogo`, `airlineShortCompany`, `arrActCross`, `arrActTime`, `arrAirport`, `arrCode`, `arrOntimeRate`, `arrPlanCross`, `arrPlanTime`, `arrTerminal`, `checkInTable`, `checkInTableWidth`, `depActCross`, `depActTime`, `depAirport`, `depCode`, `depPlanCross`, `depPlanTime`, `depTerminal`, `flightNo`, `flightState`, `localDate`, `mainFlightNo`, `shareFlag`, `stateColor`) VALUES (4, 'https://cdn1.133.cn/ticket/airline/image_ca_cca.png', '中國國航', '', '12:14\n', '廣州白雲', 'CAN', '79.31%', '', '12:20', 'T1B', 'https://api.133.cn/third/textImg?code=FvOGTb%2Bgbgxpgw9zPNG2Qw==', '30', '', '09:19\n', '北京首都', 'PEK', '', '09:00', 'T3', 'CA1321', '到達', '2023-02-27', '', '0', '#4273FE');
INSERT INTO `air_data`.`air_data_source_result` (`id`, `airlineLogo`, `airlineShortCompany`, `arrActCross`, `arrActTime`, `arrAirport`, `arrCode`, `arrOntimeRate`, `arrPlanCross`, `arrPlanTime`, `arrTerminal`, `checkInTable`, `checkInTableWidth`, `depActCross`, `depActTime`, `depAirport`, `depCode`, `depPlanCross`, `depPlanTime`, `depTerminal`, `flightNo`, `flightState`, `localDate`, `mainFlightNo`, `shareFlag`, `stateColor`) VALUES (5, 'https://cdn1.133.cn/ticket/airline/image_zh_csz.png', '深圳航空', '', '12:14\n', '廣州白雲', 'CAN', '79.31%', '', '12:20', 'T1B', 'https://api.133.cn/third/textImg?code=FvOGTb%2Bgbgxpgw9zPNG2Qw==', '30', '', '09:19\n', '北京首都', 'PEK', '', '09:00', 'T3', 'ZH1321', '到達', '2023-02-27', 'CA1321', '1', '#4273FE');
INSERT INTO `air_data`.`air_data_source_result` (`id`, `airlineLogo`, `airlineShortCompany`, `arrActCross`, `arrActTime`, `arrAirport`, `arrCode`, `arrOntimeRate`, `arrPlanCross`, `arrPlanTime`, `arrTerminal`, `checkInTable`, `checkInTableWidth`, `depActCross`, `depActTime`, `depAirport`, `depCode`, `depPlanCross`, `depPlanTime`, `depTerminal`, `flightNo`, `flightState`, `localDate`, `mainFlightNo`, `shareFlag`, `stateColor`) VALUES (6, 'https://cdn1.133.cn/ticket/airline/image_hu_chh.png', '海南航空', '', '13:12\n', '廣州白雲', 'CAN', '96.55%', '', '13:40', 'T1B', 'https://api.133.cn/third/textImg?code=IfLOkkFeJagwbNuqYtoqNg==', '140', '', '10:07\n', '北京首都', 'PEK', '', '10:00', 'T2', 'HU7813', '到達', '2023-02-27', '', '0', '#4273FE');
INSERT INTO `air_data`.`air_data_source_result` (`id`, `airlineLogo`, `airlineShortCompany`, `arrActCross`, `arrActTime`, `arrAirport`, `arrCode`, `arrOntimeRate`, `arrPlanCross`, `arrPlanTime`, `arrTerminal`, `checkInTable`, `checkInTableWidth`, `depActCross`, `depActTime`, `depAirport`, `depCode`, `depPlanCross`, `depPlanTime`, `depTerminal`, `flightNo`, `flightState`, `localDate`, `mainFlightNo`, `shareFlag`, `stateColor`) VALUES (7, 'https://cdn1.133.cn/ticket/airline/image_zh_csz.png', '深圳航空', '', '14:22\n', '廣州白雲', 'CAN', '82.75%', '', '14:25', 'T1B', 'https://api.133.cn/third/textImg?code=FvOGTb%2Bgbgxpgw9zPNG2Qw==', '30', '', '11:22\n', '北京首都', 'PEK', '', '11:00', 'T3', 'ZH1315', '到達', '2023-02-27', 'CA1315', '1', '#4273FE');
INSERT INTO `air_data`.`air_data_source_result` (`id`, `airlineLogo`, `airlineShortCompany`, `arrActCross`, `arrActTime`, `arrAirport`, `arrCode`, `arrOntimeRate`, `arrPlanCross`, `arrPlanTime`, `arrTerminal`, `checkInTable`, `checkInTableWidth`, `depActCross`, `depActTime`, `depAirport`, `depCode`, `depPlanCross`, `depPlanTime`, `depTerminal`, `flightNo`, `flightState`, `localDate`, `mainFlightNo`, `shareFlag`, `stateColor`) VALUES (8, 'https://cdn1.133.cn/ticket/airline/image_ca_cca.png', '中國國航', '', '14:22\n', '廣州白雲', 'CAN', '82.75%', '', '14:25', 'T1B', 'https://api.133.cn/third/textImg?code=FvOGTb%2Bgbgxpgw9zPNG2Qw==', '30', '', '11:22\n', '北京首都', 'PEK', '', '11:00', 'T3', 'CA1315', '到達', '2023-02-27', '', '0', '#4273FE');
INSERT INTO `air_data`.`air_data_source_result` (`id`, `airlineLogo`, `airlineShortCompany`, `arrActCross`, `arrActTime`, `arrAirport`, `arrCode`, `arrOntimeRate`, `arrPlanCross`, `arrPlanTime`, `arrTerminal`, `checkInTable`, `checkInTableWidth`, `depActCross`, `depActTime`, `depAirport`, `depCode`, `depPlanCross`, `depPlanTime`, `depTerminal`, `flightNo`, `flightState`, `localDate`, `mainFlightNo`, `shareFlag`, `stateColor`) VALUES (9, 'https://cdn1.133.cn/ticket/airline/image_zh_csz.png', '深圳航空', '', '15:13\n', '廣州白雲', 'CAN', '78.57%', '', '15:25', 'T1B', 'https://api.133.cn/third/textImg?code=FvOGTb%2Bgbgxpgw9zPNG2Qw==', '30', '', '12:19\n', '北京首都', 'PEK', '', '12:00', 'T3', 'ZH1339', '到達', '2023-02-27', 'CA1339', '1', '#4273FE');
INSERT INTO `air_data`.`air_data_source_result` (`id`, `airlineLogo`, `airlineShortCompany`, `arrActCross`, `arrActTime`, `arrAirport`, `arrCode`, `arrOntimeRate`, `arrPlanCross`, `arrPlanTime`, `arrTerminal`, `checkInTable`, `checkInTableWidth`, `depActCross`, `depActTime`, `depAirport`, `depCode`, `depPlanCross`, `depPlanTime`, `depTerminal`, `flightNo`, `flightState`, `localDate`, `mainFlightNo`, `shareFlag`, `stateColor`) VALUES (10, 'https://cdn1.133.cn/ticket/airline/image_ca_cca.png', '中國國航', '', '15:13\n', '廣州白雲', 'CAN', '78.57%', '', '15:25', 'T1B', 'https://api.133.cn/third/textImg?code=FvOGTb%2Bgbgxpgw9zPNG2Qw==', '30', '', '12:19\n', '北京首都', 'PEK', '', '12:00', 'T3', 'CA1339', '到達', '2023-02-27', '', '0', '#4273FE');
3、FlinkSQL表連接
create table air_data_source(
id int COMMENT '主鍵',
airlineLogo varchar(100) ,
airlineShortCompany varchar(100) ,
arrActCross varchar(100) ,
arrActTime varchar(100) ,
arrAirport varchar(100) ,
arrCode varchar(100) ,
arrOntimeRate varchar(100) ,
arrPlanCross varchar(100) ,
arrPlanTime varchar(100) ,
arrTerminal varchar(100) ,
checkInTable varchar(100) ,
checkInTableWidth varchar(100) ,
depActCross varchar(100) ,
depActTime varchar(100) ,
depAirport varchar(100) ,
depCode varchar(100) ,
depPlanCross varchar(100) ,
depPlanTime varchar(100) ,
depTerminal varchar(100) ,
flightNo varchar(100) ,
flightState varchar(100) ,
localDate varchar(100) ,
mainFlightNo varchar(100) ,
shareFlag varchar(100) ,
stateColor varchar(100)
) with (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://master:3306/air_data?serverTimezone=GMT%2B8',
'username' = 'root',
'password' = 'root',
'table-name' = 'air_data_source'
)
;
4、collect()函數結果
SELECT arrAirport,cast(count(airlineShortCompany) as int) as counts, collect(airlineShortCompany) as collects FROM air_data_source group by arrAirport having count(airlineShortCompany) = 2
三、FlinkSQL UDF 代碼開發
1、pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.xxxxx.tech</groupId>
<artifactId>alarmCollectPlatform</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<flink.version>1.14.4</flink.version>
</properties>
<dependencies>
<!-- flink依賴引入-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
2、java代碼實現
package com.xxxxx.tech.udf;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.functions.ScalarFunction;
import java.util.Map;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
public class MultisetToString extends ScalarFunction implements ResultTypeQueryable<String> {
public String eval(@DataTypeHint("MULTISET<STRING>") Map<String, Integer> multiset) {
return multiset.toString();
}
@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
}
3、打包
mvn clean install
4、上傳
將打好的jar包上傳到Flink_HOME的lib目錄下,並重啟集群
5、註冊函數
進入bin目錄啟動sql-client,註冊函數
6、使用函數進行轉換
select arrAirport,counts,multiset_to_string(collects) as collects from (
SELECT arrAirport,cast(count(airlineShortCompany) as int) as counts, collect(airlineShortCompany) as collects FROM air_data_source group by arrAirport having count(airlineShortCompany) = 2
) t