詳解 Flink Catalog 在 ChunJun 中的實踐之路

来源:https://www.cnblogs.com/DTinsight/archive/2023/04/07/17295885.html
-Advertisement-
Play Games

我們知道 Flink 有Table(表)、View(視圖)、Function(函數/運算元)、Database(資料庫)的概念,相對於這些耳熟能詳的概念,Flink 里還有一個 Catalog(目錄) 的概念。 本文將為大家帶來 Flink Catalog 的介紹以及 Flink Catalog 在 ...


我們知道 Flink 有Table(表)、View(視圖)、Function(函數/運算元)、Database(資料庫)的概念,相對於這些耳熟能詳的概念,Flink 里還有一個 Catalog(目錄) 的概念。

本文將為大家帶來 Flink Catalog 的介紹以及 Flink Catalog 在 ChunJun 中的實踐之路。

Flink Catalog 簡介

Catalog 提供元數據,如資料庫、表、分區、視圖,以及訪問存儲在資料庫或其他外部系統中的數據所需的函數和信息。

數據處理中最關鍵的一個方面是管理元數據:

· 可能是暫時性的元數據,如臨時表,或針對錶環境註冊的 UDFs;

· 或者是永久性的元數據,比如 Hive 元存儲中的元數據。

Catalog 提供了一個統一的 API 來管理元數據,並使其可以從表 API 和 SQL 查詢語句中來訪問。

Catalog 使用戶能夠引用他們數據系統中的現有元數據,並自動將它們映射到 Flink 的相應元數據。例如,Flink 可以將 JDBC 表自動映射到 Flink 表,用戶不必在 Flink 中手動重寫 DDL。Catalog 大大簡化了用戶現有系統開始使用 Flink 所需的步驟,並增強了用戶體驗。

● Flink Catalog 原生結構

• GenericInMemoryCatalog:基於記憶體實現的 Catalog

• Jdbc Catalog:可以將 Flink 通過 JDBC 協議連接到關係資料庫,目前 Flink 在1.12和1.13中有不同的實現,包括 MySql Catalog 和 Postgres Catalog

• Hive Catalog:作為原生 Flink 元數據的持久化存儲,以及作為讀寫現有 Hive 元數據的介面

● Flink Iceberg Catalog

● Flink Hudi Catalog

HoodieCatalog、HoodieHiveCatalog

file
file

Flink Catalog 詳解

GenericInMemoryCatalog

final CatalogManager catalogManager =
        CatalogManager.newBuilder()
                .classLoader(userClassLoader)
                .config(tableConfig)
                .defaultCatalog(
                        settings.getBuiltInCatalogName(),
                        new GenericInMemoryCatalog(
                                settings.getBuiltInCatalogName(),
                                settings.getBuiltInDatabaseName()))
                .build();

        
defaultCatalog =
                    new GenericInMemoryCatalog(
                            defaultCatalogName, settings.getBuiltInDatabaseName());

CatalogManager catalogManager =
                builder.defaultCatalog(defaultCatalogName, defaultCatalog).build();

GenericInMemoryCatalog 所有的數據都保存在 HashMap 裡面,無法持久化。

JDBC Catalog

CREATE CATALOG my_catalog WITH(
    'type' = 'jdbc',
    'default-database' = '...',
    'username' = '...',
    'password' = '...',
    'base-url' = '...'
);

USE CATALOG my_catalog;

如果創建並使用 Postgres Catalog 或 MySQL Catalog,請配置 JDBC 連接器和相應的驅動。

JDBC Catalog 支持以下參數:

• name:必填,Catalog 的名稱

• default-database:必填,預設要連接的資料庫

• username:必填,Postgres/MySQL 賬戶的用戶名

• password:必填,賬戶的密碼

• base-url: 必填,(不應該包含資料庫名)

對於 Postgres Catalog base-url 應為 "jdbc:postgresql://:" 的格式

對於 MySQL Catalog base-url 應為 "jdbc:mysql://:" 的格式

Hive Catalog

CREATE CATALOG myhive WITH (
    'type' = 'hive',
    'default-database' = 'mydatabase',
    'hive-conf-dir' = '/opt/hive-conf'
);
-- set the HiveCatalog as the current catalog of the session
USE CATALOG myhive;

file

Iceberg Catalog

● Hive Catalog 管理 Iceberg 表

(Flink) default_database.flink_table -> 
(Iceberg) default_database.flink_table
CREATE TABLE flink_table (
    id   BIGINT,
    data STRING
) WITH (
    'connector'='iceberg',
    'catalog-name'='hive_prod',
    'uri'='thrift://localhost:9083',
    'warehouse'='hdfs://nn:8020/path/to/warehouse'
);
(Flink)default_database.flink_table -> 
(Iceberg) hive_db.hive_iceberg_table
CREATE TABLE flink_table (
    id   BIGINT,
    data STRING
) WITH (
    'connector'='iceberg',
    'catalog-name'='hive_prod',
    'catalog-database'='hive_db',
    'catalog-table'='hive_iceberg_table',
    'uri'='thrift://localhost:9083',
    'warehouse'='hdfs://nn:8020/path/to/warehouse'
);

● Hadoop Catalog 管理 Iceberg 表

CREATE TABLE flink_table (
    id   BIGINT,
    data STRING
) WITH (
    'connector'='iceberg',
    'catalog-name'='hadoop_prod',
    'catalog-type'='hadoop',
    'warehouse'='hdfs://nn:8020/path/to/warehouse'
);

● 自定義 Catalog 管理 Iceberg 表

CREATE TABLE flink_table (
    id   BIGINT,
    data STRING
) WITH (
    'connector'='iceberg',
    'catalog-name'='custom_prod',
    'catalog-impl'='com.my.custom.CatalogImpl',
     -- More table properties for the customized catalog
    'my-additional-catalog-config'='my-value',
     ...
);

• connector:iceberg

• catalog-name:用戶指定的目錄名稱,這是必須的,因為連接器沒有任何預設值

• catalog-type:內置目錄的 hive 或 hadoop(預設為hive),或者對於使用 catalog-impl 的自定義目錄實現,不做設置

• catalog-impl:自定義目錄實現的全限定類名,如果 catalog-type 沒有被設置,則必須被設置,更多細節請參見自定義目錄

• catalog-database: 後臺目錄中的 iceberg 資料庫名稱,預設使用當前的 Flink 資料庫名稱

• catalog-table: 後臺目錄中的冰山表名,預設使用 Flink CREATE TABLE 句子中的表名

Hudi Catalog

create catalog hudi with(
 'type' = 'hudi',
 'mode' = 'hms',
  'hive.conf.dir'='/etc/hive/conf'
);

--- 創建資料庫供hudi使用
create database hudi.hudidb;

--- order表
CREATE TABLE hudi.hudidb.orders_hudi(
  uuid INT,
  ts INT,
  num INT,
  PRIMARY KEY(uuid) NOT ENFORCED
) WITH (
  'connector' = 'hudi',
  'table.type' = 'MERGE_ON_READ'
);

select * from hudi.hudidb.orders_hudi;

file
file

Flink Catalog 在 ChunJun 中的實踐

下麵將為大家介紹本文的重頭戲,Flink Catalog 在 ChunJun 中的實踐之路。

直接引入開源 Catalog

ChunJun 目前的所有 Catalog 為以下四種:

file

● Hive Catalog 需要的依賴

file

● Iceberg Catalog 需要的依賴

file

● JDBC Catalog

JDBC 因為 Flink 1.12 和 1.13 API 有變化,因此需要涉及源碼的改動,改動一些 API 後,從源碼引入。

● DT Catalog

結合內部業務,自定義的一種 Catalog ,下文將會進行詳細介紹。

DT Catalog -存儲元數據表設計

● 創建 mysql 元數據表 database_info

-- 創建表的 sql
create table database_info
(
    `id`            bigint PRIMARY KEY NOT NULL AUTO_INCREMENT COMMENT '項目ID',-- database id
    `catalog_name`  varchar(255) COMMENT 'catalog 名字',
    `database_name` varchar(255) COMMENT 'database 名字',
    `catalog_type`  varchar(30) COMMENT 'catalog 類型, eg: mysql,oracle...',
    `project_id`    int(11)            NOT NULL COMMENT '項目ID',
    `tenant_id`     int(11)            NOT NULL COMMENT '租戶ID'
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

-- 創建索引
CREATE INDEX idx_catalog_name_database_name_project_id_tenant_id ON database_info (`catalog_name`, `database_name`, `project_id`, `tenant_id`);

● 創建 mysql 元數據表 table_info

-- 創建表的 sql
create table table_info
(
    `id`            bigint PRIMARY KEY NOT NULL AUTO_INCREMENT,
    `database_id`    bigint COMMENT 'database_info 表的 id',
    `table_name`  varchar(255) COMMENT '表名',
    `project_id`    int(11)            NOT NULL COMMENT '項目ID',
    `tenant_id`     int(11)            NOT NULL COMMENT '租戶ID'
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

-- 創建索引
CREATE INDEX idx_catalog_id_project_id_tenant_id ON table_info (`database_id`, `project_id`, `tenant_id`);
CREATE INDEX idx_database_id_table_name_project_id_tenant_id ON table_info (`database_id`, `table_name`, `project_id`, `tenant_id`);

● 創建 mysql 元數據表 properties_info

create table properties_info
(
    `id`       bigint PRIMARY KEY NOT NULL AUTO_INCREMENT ,
    `table_id` bigint(20) COMMENT 'table_info 表的 id',
    `key`      varchar(255) COMMENT '表的屬性 key',
    `value`    varchar(255) COMMENT '表的屬性 value'
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

CREATE INDEX idx_table_id ON properties_info (table_id);

● properties_info 裡面存了什麼?

schema.0.name=id,
  schema.0.data-type=INT NOT NULL,
  schema.1.name=name,
  schema.1.data-type=VARCHAR(2147483647)
  schema.2.name=age,
  schema.2.data-type=BIGINT,
  schema.primary-key.name=PK_3386,
  schema.primary-key.columns=id,

  connector=jdbc,
  url=jdbc:mysql: //172.16.83.218:3306/wujuan?useSSL=false,
  username=drpeco,
  password=DT@Stack#123,

  comment=,
  scan.auto-commit=true,
  lookup.cache.max-rows=20000,
  scan.fetch-size=10,
  lookup.cache.ttl=700000
  table-name=t2,

使用 DT Catalog

● 創建 DT Catalog

CREATE CATALOG catalog1
WITH (
    'type' = 'dt',
    'default-database' = 'default_database',
    'driver' = 'com.mysql.cj.jdbc.Driver',
    'url' = 'jdbc:mysql://xxx:3306/catalog_default',
    'username' = 'drpeco',
    'password' = 'DT@Stack#123',
    'project-id' = '1',
    'tenant-id' = '1'
  );

file

● 創建 Database

DROP DATABASE [IF EXISTS] [catalog_name.]db_name [ (RESTRICT | CASCADE) ]
Drop a database with the given database name. If the database to drop does not exist, an exception is thrown.
IF EXISTS
If the database does not exist, nothing happens.
RESTRICT
Dropping a non-empty database triggers an exception. Enabled by default.
CASCADE
Dropping a non-empty database also drops all associated tables and functions.


create database if not exists catalog1.database1

drop database if exists catalog1.database1 
-- 刪除非空資料庫,連通資料庫中的所有表也一起刪除
drop database if exists catalog1.database1 CASCADE

● 創建 Table

1)Rename Table

ALTER TABLE [catalog_name.][db_name.]table_name RENAME TO new_table_name
Rename the given table name to another new table name

2)Set or Alter Table Properties

ALTER TABLE [catalog_name.][db_name.]table_name SET (key1=val1, key2=val2, ...) 
Set one or more properties in the specified table. If a particular property is already set in the table, override the old value with the new one.
-- 創建表
CREATE TABLE if not exists catalog1.default_database.table1
(
    id      int,
    name    string,
    age     bigint,
    primary key ( id) not enforced
) with (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://172.16.83.218:3306/wujuan?useSSL=false',
    'table-name' = 't2',
    'username' = 'drpeco',
    'password' = 'DT@Stack#123'
);
-- 刪除表
drop table if exists mysql_catalog2.wujuan_database2.wujuan_table

-- 重命名錶名
ALTER TABLE catalog1.default_database.table1 RENAME TO table2;

-- 設置表屬性
ALTER TABLE catalog1.default_database.table1 
SET (
'tablename'='t2',
'url'='dbc:mysql://172.16.83.218:3306/wujuan?useSSL=false'
)

使用 DTCatalog 的具體場景和實現原理

● 全部是 DDL,只有 Catalog 的創建

CREATE CATALOG catalog1
WITH (
    'type' = 'DT',
    'default-database' = 'default_database',
    'driver' = 'com.mysql.cj.jdbc.Driver',
    'url' = 'jdbc:mysql://172.16.100.186:3306/catalog_default?autoReconnect=true&failOverReadOnly=false',
    'username' = 'drpeco',
    'password' = 'DT@Stack#123',
    'project-id' = '1',
    'tenant-id' = '1'
  );
	```
	
· 可以執行,但是沒有意義,ChunJun 不會存儲 Catalog 信息,只有平臺存儲;

· 不支持語法校驗。



● 全部是 DDL,包含 Catalog、Database、Table 的創建

-- 初始化 Catalog
CREATE CATALOG catalog1
WITH (
'type' = 'dt',
'default-database' = 'default_database',
'driver' = 'com.mysql.cj.jdbc.Driver',
'url' = 'jdbc:mysql://172.16.100.186:3306/catalog_default',
'username' = 'drpeco',
'password' = 'DT@Stack#123',
'project-id' = '1',
'tenant-id' = '1'
);
-- 創建資料庫
create database if not exists database1
-- 創建表
CREATE TABLE if not exists catalog1.default_database.table1
(
id int,
name string,
age bigint,
primary key ( id) not enforced
) with (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://172.16.83.218:3306/wujuan?useSSL=false',
'table-name' = 't2',
'username' = 'drpeco',
'password' = 'DT@Stack#123'
);


· 無論創建資料庫、表,刪除資料庫、表,必須包含 create catalog 語句;

· 可以執行,可以創建資料庫和表;

· 不支持語法校驗。


// 拋出異常的邏輯
StatementSet statementSet = SqlParser.parseSql(job, jarUrlList, tEnv);
TableResult execute = statementSet.execute(); -->
tableEnvironment.executeInternal(operations); -->
Pipeline pipeline = execEnv.createPipeline(transformations, tableConfig, jobName); -->
StreamGraph streamGraph = ExecutorUtils.generateStreamGraph(getExecutionEnvironment(), transformations); -->

// 拋出異常的方法
public static StreamGraph generateStreamGraph(StreamExecutionEnvironment execEnv, List<Transformation<?>> transformations){
if (transformations.size() <= 0) {
throw new IllegalStateException(
"No operators defined in streaming topology. Cannot generate StreamGraph.");
}
...
return generator.generate();
}

// 如果沒有 insert 語句的時候,無法生成 JobGraph,但是 DDL 是執行成功的。
// 因此捕獲 FlinkX 拋出的特殊異常,此語句的異常 Message 是 FlinkX 裡面處理的。
try {
PackagedProgramUtils.createJobGraph(program, flinkConfig, 1, false);
} catch (ProgramInvocationException e) {
// 僅執行 DDL FlinkX 拋出的異常
if (!e.getMessage().contains("OnlyExecuteDDL")) {
throw e;
}
}

![file](https://img2023.cnblogs.com/other/2317299/202304/2317299-20230407133935923-1285809401.png)


● DDL + DML,包含 create + insert 語句

1)初始化 Catalog

CREATE CATALOG catalog1
WITH (
'type' = 'dt',
'default-database' = 'default_database',
'driver' = 'com.mysql.cj.jdbc.Driver',
'url' = 'jdbc:mysql://172.16.100.186:3306/catalog_default',
'username' = 'drpeco',
'password' = 'DT@Stack#123',
'project-id' = '1',
'tenant-id' = '1'
);


2.1)創建資料庫

create database if not exists database1


2.2)創建源表

CREATE TABLE if not exists catalog1.default_database.table1
(
id int,
name string,
age bigint,
primary key ( id) not enforced
) with (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://172.16.83.218:3306/wujuan?useSSL=false',
'table-name' = 't2',
'username' = 'drpeco',
'password' = 'DT@Stack#123'
);


3.1)創建資料庫

create database if not exists catalog1.database2;


3.2)創建結果表

CREATE TABLE if not exists catalog1.database2.table2
(
id int,
name string,
age bigint,
primary key ( id) not enforced
) with (
'connector' = 'print'
);


4)執行任務

insert into catalog1.database2.table2 select * from catalog1.database1.table1


· 不可以執行,可以提交;

· 支持語法校驗。



● DML,只有 Insert 語句

-- 初始化 Catalog
CREATE CATALOG catalog1
WITH (
'type' = 'dt',
'default-database' = 'default_database',
'driver' = 'com.mysql.cj.jdbc.Driver',
'url' = 'jdbc:mysql://172.16.100.186:3306/catalog_default',
'username' = 'drpeco',
'password' = 'DT@Stack#123',
'project-id' = '1',
'tenant-id' = '1'
);

-- 執行任務
insert into catalog1.database2.table2 select * from catalog1.database1.table1


· 如果 Catalog 的 資料庫和表都已經創建好了,那麼直接寫 insert 就可以提交任務;

· 不可以執行,可以提交;

· 支持語法校驗。



 
《數據治理行業實踐白皮書》下載地址:https://fs80.cn/380a4b

想瞭解或咨詢更多有關袋鼠雲大數據產品、行業解決方案、客戶案例的朋友,瀏覽袋鼠雲官網:https://www.dtstack.com/?src=szbky

同時,歡迎對大數據開源項目有興趣的同學加入「袋鼠雲開源框架釘釘技術qun」,交流最新開源技術信息,qun號碼:30537511,項目地址:https://github.com/DTStack

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 最近小編同事面試遇到了一道面試題,題目是有個int數組,把輸入包含的指定元素刪除。這道題主要考察C#基礎知識和編碼動手能力。小編將以如下幾種方法實現,供大家參考。(註:文末還有擴展問題。) 1、使用臨時數組copy後替換 這種方法涉及創建一個比原始數組小一個元素的新數組。然後,將原始數組複製到新數組 ...
  • #1、熟悉Linux的文件系統結構 Linux的文件系統結構其實是一個樹形的分層組織結構,如下圖: Linux系統目錄結構及目錄路徑: ##1.1、文件系統層次結構標準 Linux是開源的操作系統,各個Linux發行機構都可以按照自己的需求對Linux系統的文件系統進行相應的裁剪,所以各個Linux ...
  • 如文章標題,我們安裝的Linux開發環境是**:Windows-VMware-Ubuntu環境** 配置,即在windows系統下安裝VMware虛擬機,之後在VMware中配置安裝Linux系統的常用發行版——Ubuntu系統。 沒有在一臺伺服器或者一臺電腦上直接安裝linux系統(比如直接主機安 ...
  • ​ 嵌入式軟體(如航空電子和汽車系統)的設計、開發、測試和驗證正變得越來越複雜。傳統的文檔驅動式環境中,一旦開發人員之間缺乏協調,軟體程式生命周期的質量和成本就會受到嚴重影響,顯然已經無法應對日益複雜的嵌入式軟體生產。 正確使用基於模型的設計(Model-Based Design,以下簡稱MBD)方 ...
  • 無論是Window,MacOs,Linux均採用多用戶的管理模式進行許可權管理。 在Linux系統中,擁有最大許可權的賬戶名為:root(超級管理員)。 root用戶擁有最大的系統操作許可權,而普通用戶在許多地方的許可權是受限的。 普通用戶一般在HOME目錄是不受限制的,一旦出了HOME目錄,大多地方,僅有 ...
  • 主要內容 晶元介紹 開發環境 編程舉例 晶元介紹 什麼是NodeMCU? NodeMCU,是一個開源的物聯網平臺。 它使用Lua腳本語言編程。該平臺基於eLua 開源項目,底層使用ESP8266 sdk 0.9.5版本。該平臺使用了很多開源項目, 例如 lua-cjson, spiffs. Node ...
  • 鎖屏面試題百日百刷,每個工作日堅持更新面試題。鎖屏面試題app、小程式現已上線,官網地址:https://www.demosoftware.cn。已收錄了每日更新的面試題的所有內容,還包含特色的解鎖屏幕複習面試題、每日編程題目郵件推送等功能。讓你在面試中先人一步!接下來的是今日的面試題: 1.HBa ...
  • 一、MySQLl和Navicat的關係 Mysql一個關係型資料庫管理系統,由瑞典MysqlLAB公司開發,目前屬於Oracle旗下產品,是目前最流行的關心型資料庫管理系統之一。 Navicat一個資料庫管理工具,用可視化界面提供給用戶操作Mysql資料庫管理系統。 記得我第一次安裝Navicat之 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...