Flink CDC 於 2021 年 11 月 15 日發佈了最新版本 2.1,該版本通過引入內置 Debezium 組件,增加了對 Oracle 的支持。 Flink下載地址 https://flink.apache.org/downloads/ 其他必需的jar包(cdc、jdbc、mysq和o ...
Flink CDC 於 2021 年 11 月 15 日發佈了最新版本 2.1,該版本通過引入內置 Debezium 組件,增加了對 Oracle 的支持。
Flink下載地址
https://flink.apache.org/downloads/
其他必需的jar包(cdc、jdbc、mysq和oracle等驅動包)
下載Flink後,直接解壓到指定目錄下即可;
tar zxvf flink-1.20.0-bin-scala_2.12.tgz
將所有必須的jar包放在lib目錄下,我這邊的目錄為/u01/flink-1.20.0/lib;
啟動flink:
[root@gcv-b-test-gmes-oracle bin]# /u01/flink-1.20.0/bin/start-cluster.sh Starting cluster. Starting standalonesession daemon on host gcv-b-test-gmes-oracle. Starting taskexecutor daemon on host gcv-b-test-gmes-oracle.
如果需要web登錄查看flink,需要修改配置文件(/u01/flink-1.20.0/conf/config.yaml)
address: 10.240.12.219 bind-address: 0.0.0.0
登錄web界面:
配置Oracle:
必須開啟歸檔(步驟查資料);
測試用戶及表 create user flink identified by "123456"; grant connect ,resource to flink; create table flink.user_info(id number primary key,name varchar2(100),age number); ##開啟資料庫級別補充日誌 ALTER DATABASE ADD SUPPLEMENTAL LOG DATA; ##開啟該表的列附加日誌 ALTER TABLE flink.user_info ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; 創建用於cdc解析的表空間 CREATE TABLESPACE logminer_tbs DATAFILE '/u01/oradata/sharedb/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED; 創建flinkuser複製用戶 CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS; GRANT CREATE SESSION TO flinkuser; GRANT SET CONTAINER TO flinkuser; GRANT SELECT ON V_$DATABASE to flinkuser; GRANT FLASHBACK ANY TABLE TO flinkuser; GRANT SELECT ANY TABLE TO flinkuser; GRANT SELECT_CATALOG_ROLE TO flinkuser; GRANT EXECUTE_CATALOG_ROLE TO flinkuser; GRANT SELECT ANY TRANSACTION TO flinkuser; GRANT LOGMINING TO flinkuser; GRANT ANALYZE ANY TO flinkuser; GRANT CREATE TABLE TO flinkuser; -- need not to execute if set scan.incremental.snapshot.enabled=true(default) GRANT LOCK ANY TABLE TO flinkuser; GRANT ALTER ANY TABLE TO flinkuser; GRANT CREATE SEQUENCE TO flinkuser; GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser; GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser; GRANT SELECT ON V_$LOG TO flinkuser; GRANT SELECT ON V_$LOG_HISTORY TO flinkuser; GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser; GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser; GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser; GRANT SELECT ON V_$LOGFILE TO flinkuser; GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser; GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;
啟動sql-client(SQL 客戶端 的目的是提供一種簡單的方式來編寫、調試和提交表程式到 Flink 集群上,而無需寫一行 Java 或 Scala 代碼。SQL 客戶端命令行界面(CLI) 能夠在命令行中檢索和可視化分散式應用中實時產生的結果)
我的理解就是幫你智能生成java代碼,不需要自己寫代碼。
[root@gcv-b-test-gmes-oracle conf]# /u01/flink-1.20.0/bin/sql-client.sh ▒▓██▓██▒ ▓████▒▒█▓▒▓███▓▒ ▓███▓░░ ▒▒▒▓██▒ ▒ ░██▒ ▒▒▓▓█▓▓▒░ ▒████ ██▒ ░▒▓███▒ ▒█▒█▒ ░▓█ ███ ▓░▒██ ▓█ ▒▒▒▒▒▓██▓░▒░▓▓█ █░ █ ▒▒░ ███▓▓█ ▒█▒▒▒ ████░ ▒▓█▓ ██▒▒▒ ▓███▒ ░▒█▓▓██ ▓█▒ ▓█▒▓██▓ ░█░ ▓░▒▓████▒ ██ ▒█ █▓░▒█▒░▒█▒ ███▓░██▓ ▓█ █ █▓ ▒▓█▓▓█▒ ░██▓ ░█░ █ █▒ ▒█████▓▒ ██▓░▒ ███░ ░ █░ ▓ ░█ █████▒░░ ░█░▓ ▓░ ██▓█ ▒▒▓▒ ▓███████▓░ ▒█▒ ▒▓ ▓██▓ ▒██▓ ▓█ █▓█ ░▒█████▓▓▒░ ██▒▒ █ ▒ ▓█▒ ▓█▓ ▓█ ██▓ ░▓▓▓▓▓▓▓▒ ▒██▓ ░█▒ ▓█ █ ▓███▓▒░ ░▓▓▓███▓ ░▒░ ▓█ ██▓ ██▒ ░▒▓▓███▓▓▓▓▓██████▓▒ ▓███ █ ▓███▒ ███ ░▓▓▒░░ ░▓████▓░ ░▒▓▒ █▓ █▓▒▒▓▓██ ░▒▒░░░▒▒▒▒▓██▓░ █▓ ██ ▓░▒█ ▓▓▓▓▒░░ ▒█▓ ▒▓▓██▓ ▓▒ ▒▒▓ ▓█▓ ▓▒█ █▓░ ░▒▓▓██▒ ░▓█▒ ▒▒▒░▒▒▓█████▒ ██░ ▓█▒█▒ ▒▓▓▒ ▓█ █░ ░░░░ ░█▒ ▓█ ▒█▓ ░ █░ ▒█ █▓ █▓ ██ █░ ▓▓ ▒█▓▓▓▒█░ █▓ ░▓██░ ▓▒ ▓█▓▒░░░▒▓█░ ▒█ ██ ▓█▓░ ▒ ░▒█▒██▒ ▓▓ ▓█▒ ▒█▓▒░ ▒▒ █▒█▓▒▒░░▒██ ░██▒ ▒▓▓▒ ▓██▓▒█▒ ░▓▓▓▓▒█▓ ░▓██▒ ▓░ ▒█▓█ ░░▒▒▒ ▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓ ▓░▒█░ ______ _ _ _ _____ ____ _ _____ _ _ _ BETA | ____| (_) | | / ____|/ __ \| | / ____| (_) | | | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_ | __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __| | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_ |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__| Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit. Command history file path: /root/.flink-sql-history Flink SQL>
配置Oracle連接器:
CREATE TABLE user_info ( ID INT NOT NULL, NAME STRING, AGE int, PRIMARY KEY(ID) NOT ENFORCED ) WITH ( 'connector' = 'oracle-cdc', 'hostname' = '10.240.12.219', 'port' = '1521', 'username' = 'flinkuser', 'password' = 'flinkpw', 'database-name' = 'sharedb', 'schema-name' = 'FLINK', 'table-name' = 'USER_INFO', 'debezium.log.mining.strategy' = 'online_catalog', 'debezium.log.mining.continuous.mine' = 'true' );
##這裡有個坑,欄位必須大寫啊(因為oracle預設都是大寫,這裡是嚴格區分大小寫的)
##如果大小寫不一致,會識別不到欄位。查詢的時候報錯如下:
##org.apache.flink.table.api.TableException: Column 'id' is NOT NULL,
##however, a null value is being written into it.)
查看數據:
Flink SQL> select * from user_info; [INFO] Result retrieval cancelled.
mysql資料庫同步的表:
create database flink; CREATE TABLE `user_info` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `name` varchar(100) NOT NULL, `age` bigint(20) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;
sql-client配置mysql:
CREATE TABLE user_info_mysql ( id INT NOT NULL, name STRING, age int, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://10.251.93.3:3306/flink', 'driver' = 'com.mysql.cj.jdbc.Driver', 'username' = 'xxxx', 'password' = 'xxxx', 'table-name' = 'user_info'); Insert into user_info_mysql select * from user_info;
驗證:
oracle:插入數據
mysql驗證:
感覺上手難度不大,有些jar包容易漏,導致異常。
參考文檔:https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/zh/docs/connectors/flink-sources/tutorials/oracle-tutorial/
https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/zh/docs/connectors/flink-sources/oracle-cdc/