![file](https://img2023.cnblogs.com/other/3195851/202306/3195851-20230616175541667-1729991882.jpg) Catalog(目錄)提供了關於資料庫、表格和訪問數據所需的信息的元數據,以及統一的 API 來管理元 ...
Catalog(目錄)提供了關於資料庫、表格和訪問數據所需的信息的元數據,以及統一的 API 來管理元數據,驗證連接,讓元數據對 Sources(數據源)、Sinks(數據匯)和 Web 可訪問。
Catalog 讓用戶能夠引用其數據系統中的現有元數據,並自動映射到 SeaTunnel 的對應元數據。總之,Catalog 大大簡化了使用用戶現有系統開始使用 SeaTunnel 的步驟,並顯著增強了用戶體驗。
Catalog 功能的重要性
目前,許多現有功能都是基於 Catalog 實現的,例如 CDC(變更數據捕獲)多表同步功能,我們使用 Catalog 獲取表格和欄位列表。
Apache SeaTunnel 目前正在設計一個叫做 SaveMode 的功能,它是由連接器實現的,用於支持目標表中現有表格結構和數據的處理。這些功能也是基於 Catalog 實現的。
Catalog 是如何設計的?如何實現一個新的 Catalog?以下是詳細介紹。
Catalog API
初始化操作
註意:目錄名稱目前沒有被使用,預計會提供給 Web 後端進行保存和查詢。
Java
public interface CatalogFactory extends Factory { String factoryIdentifier(); OptionRule optionRule(); Catalog createCatalog(String catalogName, ReadonlyConfig options); } public interface Catalog extends AutoCloseable { void open() throws CatalogException; void close() throws CatalogException; }
資料庫操作
java
public interface Catalog extends AutoCloseable { // -------------------------------------------------------------------------------------------- // 資料庫 // -------------------------------------------------------------------------------------------- String getDefaultDatabase() throws CatalogException; boolean databaseExists(String databaseName) throws CatalogException; List<String> listDatabases() throws CatalogException; void createDatabase(String databaseName, boolean ignoreIfExists) throws DatabaseAlreadyExistException, CatalogException; void dropDatabase(String databaseName, boolean ignoreIfNotExists) throws DatabaseNotExistException, CatalogException; }
表格操作
java
public interface Catalog extends AutoCloseable { // -------------------------------------------------------------------------------------------- // 表格 // -------------------------------------------------------------------------------------------- List<String> listTables(String databaseName) throws CatalogException, DatabaseNotExistException; boolean tableExists(TablePath tablePath) throws CatalogException; CatalogTable getTable(TablePath tablePath) throws CatalogException, TableNotExistException; void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists) throws TableAlreadyExistException, DatabaseNotExistException, CatalogException; void dropTable(TablePath tablePath, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException; }
這裡是一個已經實現的示例。
MySQL Catalog
MySQL Catalog 的使用方式:
- username [String] 連接到資料庫伺服器時要使用的資料庫名稱。
- password [String] 連接到資料庫伺服器時要使用的密碼。
- base-url [String] URL 必須包含資料庫,例如 "jdbc:mysql://localhost:5432/db" 或 "jdbc:mysql://localhost:5432/db?useSSL=true"。
- table-names [List] 要捕獲的資料庫表格名稱列表。表格名稱需要包括資料庫名稱,例如:database_name.table_name。
- database-pattern [String] 要捕獲的資料庫名稱的正則表達式。
- table-pattern [String] 要捕獲的資料庫表格名稱的正則表達式。表格名稱需要包括資料庫名稱,例如:database_.\.table_.。
配置文件配置
conf
[source/sink] { [connector-factory-id] { catalog { factory = "MySQL" username = "test" password = "123456" base-url = "jdbc:mysql://localhost:5432/db" table-names = [ "db.table" ] } } }
如何使用 Catalog
對於支持 Catalog 的連接器,我們將打開一個 Catalog 參數來配置所使用的 Catalog:
示例
sql
env { "job.mode"=STREAMING "job.name"="cdc_mysql_to_mysql" "checkpoint.interval"="2000" "custom_parameters"="" } source { MySQL-CDC { parallelism = 1 catalog { factory = "MySQL" # 預設情況下,Catalog 將使用與連接器同名的選項 } username = "mysqluser" password = "mysqlpw" database-names = ["seatunnel-test"] table-pattern = "seatunnel-test\\.orders_\\d+" base-url = "jdbc:mysql://localhost:54508/seatunnel-test" } } sink { jdbc { url = "jdbc:mysql://localhost:4000/test" driver = "com.mysql.cj.jdbc.Driver" catalog { factory = "MySQL" username = "root" password = "" base-url = "jdbc:mysql://localhost:4000/test" table-pattern = "seatunnel-test2\\.orders_\\d+" } user = "root" password = "" query = "insert into sink(age, name) values(?,?)" } }
未來規劃
目前,我們只實現了部分 Catalog。未來,我們計劃擴大 Catalog 的實現範圍,包括更多支持 Catalog 的連接器,這將使更多的連接器支持 SaveMode 和自動表格創建等功能。
Apache SeaTunnel 是一個分散式、高性能、易擴展、用於海量數據(離線&實時)同步和轉化的數據集成平臺
-
Proposal:https://cwiki.apache.org/confluence/display/INCUBATOR/SeaTunnelProposal
-
Apache SeaTunnel 下載地址:https://seatunnel.apache.org/download
衷心歡迎更多人加入!
我們相信,在「Community Over Code」(社區大於代碼)、「Open and Cooperation」(開放協作)、「Meritocracy」(精英管理)、以及「多樣性與共識決策」等 The Apache Way 的指引下,我們將迎來更加多元化和包容的社區生態,共建開源精神帶來的技術進步!
我們誠邀各位有志於讓本土開源立足全球的伙伴加入 SeaTunnel 貢獻者大家庭,一起共建開源!
- 提交問題和建議:https://github.com/apache/seatunnel/issues
- 貢獻代碼: https://github.com/apache/seatunnel/pulls
- 訂閱社區開發郵件列表 : [email protected]
- 開發郵件列表:[email protected]
- 加入 Slack: https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1kcxzyrxz-lKcF3BAyzHEmpcc4OSaCjQ
- 關註 Twitter: https://twitter.com/ASFSeaTunnel
本文由 白鯨開源 提供發佈支持!