DB2是IBM的一款關係型資料庫管理系統,JDBC DB2 Source Connector是一個用於通過JDBC讀取外部數據源數據的連接器。Apache SeaTunnel如何支持JDBC DB2 Sink Connector?請參考本文檔。 支持引擎 Spark Flink SeaTunnel ...
DB2是IBM的一款關係型資料庫管理系統,JDBC DB2 Source Connector是一個用於通過JDBC讀取外部數據源數據的連接器。Apache SeaTunnel如何支持JDBC DB2 Sink Connector?請參考本文檔。
支持引擎
Spark
Flink
SeaTunnel Zeta
主要功能
使用
Xa 事務
來確保精確一次性
。因此,只支持對支持Xa 事務
的資料庫進行精確一次性
操作。您可以設置is_exactly_once=true
來啟用它。
描述
通過 JDBC 寫入數據。支持批處理模式和流式模式,支持併發寫入,支持精確一次性語義(使用 XA 事務保證)。
支持的數據源信息
數據源 | 支持的版本 | 驅動程式 | URL | Maven |
---|---|---|---|---|
DB2 | 不同的依賴版本有不同的驅動程式 | com.ibm.db2.jdbc.app.DB2Driver | jdbc:db2://127.0.0.1:50000/dbname | 下載 |
資料庫依賴
請下載與 'Maven' 相對應的支持列表,並將其複製到
'$SEATNUNNEL_HOME/plugins/jdbc/lib/'
工作目錄中
例如,對於 DB2 數據源:cp db2-connector-java-xxx.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/
數據類型映射
Sink 選項
名稱 | 類型 | 必填 | 預設值 | 描述 |
---|---|---|---|---|
url | 字元串 | 是 | - | JDBC 連接的 URL。例如:jdbc:db2://127.0.0.1:50000/dbname |
driver | 字元串 | 是 | - | 用於連接到遠程數據源的 JDBC 類名,如果使用 DB2,則值為 com.ibm.db2.jdbc.app.DB2Driver 。 |
user | 字元串 | 否 | - | 連接實例的用戶名 |
password | 字元串 | 否 | - | 連接實例的密碼 |
query | 字元串 | 否 | - | 使用此 SQL 將上游輸入數據寫入資料庫。例如 INSERT ... ,query 具有更高的優先順序。 |
database | 字元串 | 否 | - | 使用此 database 和 table-name 自動生成 SQL,並接收上游輸入數據寫入資料庫。此選項與 query 互斥,並具有更高的優先順序。 |
table | 字元串 | 否 | - | 使用資料庫和此表名自動生成 SQL,接收上游輸入數據寫入資料庫。此選項與 query 互斥,並具有更高的優先順序。 |
primary_keys | 數組 | 否 | - | 此選項用於支持自動生成 SQL 時的 insert 、delete 和 update 操作。 |
support_upsert_by_query_primary_key_exist | 布爾 | 否 | false | 根據查詢主鍵是否存在選擇使用 INSERT SQL、UPDATE SQL 處理更新事件(INSERT、UPDATE_AFTER)。此配置僅在資料庫不支持 upsert 語法時使用。請註意,此方法性能較低。 |
connection_check_timeout_sec | 整數 | 否 | 30 | 用於等待驗證連接的資料庫操作完成的時間(以秒為單位)。 |
max_retries | 整數 | 否 | 0 | 提交失敗(executeBatch)的重試次數。 |
batch_size | 整數 | 否 | 1000 | 用於批處理寫入,當緩衝記錄數量達到 batch_size 或時間達到 batch_interval_ms 時,數據將刷新到資料庫。 |
batch_interval_ms | 整數 | 否 | 1000 | 用於批處理寫入,當緩衝記錄數量達到 batch_size 或時間達到 batch_interval_ms 時,數據將刷新到資料庫。 |
is_exactly_once | 布爾 | 否 | false | 是否啟用精確一次性語義,將使用 XA 事務。如果啟用,需要設置 xa_data_source_class_name 。 |
generate_sink_sql | 布爾 | 否 | false | 基於要寫入的資料庫表自動生成 SQL 語句。 |
xa_data_source_class_name | 字元串 | 否 | - | 資料庫驅動程式的 XA 數據源類名,例如,DB2 為 com.db2.cj.jdbc.Db2XADataSource 。其他數據源請參考附錄。 |
max_commit_attempts | 整數 | 否 | 3 | 事務提交失敗的重試次數。 |
transaction_timeout_sec | 整數 | 否 | -1 | 事務打開後的超時時間,預設為 -1(永不超時)。請註意,設置超時可能會影響精確一次性語義。 |
auto_commit | 布爾 | 否 | true | 預設啟用自動事務提交。 |
common-options | 否 | - | Sink 插件的通用參數,請參考 Sink Common Options 獲取詳細信息。 |
提示
如果未設置
partition_column
,則將以單一併發方式運行;如果設置了partition_column
,則根據任務的併發度並行執行。
任務示例
簡單示例:
該示例定義了一個 SeaTunnel 同步任務,通過 FakeSource 自動生成數據併發送到 JDBC Sink。FakeSource 生成總共 16 行數據(row.num=16),每行有兩個欄位,name(字元串類型)和 age(整數類型)。最終的目標表是 test_table,在表中也將有 16 行數據。在運行此作業之前,您需要在您的 DB2 中創建資料庫 test 和表 test_table。如果您尚未安裝和部署 SeaTunnel,請按照 安裝 SeaTunnel 中的說明安裝和部署 SeaTunnel。然後按照 使用 SeaTunnel 引擎快速入門 中的說明運行此作業。
# 定義運行時環境
env {
# 您可以在這裡設置 Flink 配置
execution.parallelism = 1
job.mode = "BATCH"
}
source {
# 這是一個示例源插件,僅用於測試和演示源插件功能
FakeSource {
parallelism = 1
result_table_name = "fake"
row.num = 16
schema = {
fields {
name = "string"
age = "int"
}
}
}
# 如果您想要獲取更多關於如何配置 SeaTunnel 並查看完整的源插件列表的信息,
# 請訪問 https://seatunnel.apache.org/docs/category/source-v2
}
transform {
# 如果您想要獲取更多關於如何配置 SeaTunnel 並查看完整的轉換插件列表的信息,
# 請訪問 https://seatunnel.apache.org/docs/category/transform-v2
}
生成 Sink SQL
該示例不需要編寫複雜的 SQL 語句,您可以配置資料庫名稱和表名稱,以自動生成要插入的語句。
sink {
jdbc {
url = "jdbc:db2://127.0.0.1:50000/dbname"
driver = "com.ibm.db2.jdbc.app.DB2Driver"
user = "root"
password = "123456"
query = "insert into test_table(name,age) values(?,?)"
}
# 如果您想要獲取更多關於如何配置 SeaTunnel 並查看完整的接收插件列表的信息,
# 請訪問 https://seatunnel.apache.org/docs/category/sink-v2
}
sink {
jdbc {
url = "jdbc:db2://127.0.0.1:50000/dbname"
driver = "com.ibm.db2.jdbc.app.DB2Driver"
user = "root"
password = "123456"
# 根據資料庫表名自動生成 SQL 語句
generate_sink_sql = true
database = test
table = test_table
}
}
精確一次性:
為了確保精確寫入場景,我們保證精確一次性。
sink {
jdbc {
url = "jdbc:db2://127.0.0.1:50000/dbname"
driver = "com.ibm.db2.jdbc.app.DB2Driver"
max_retries = 0
user = "root"
password = "123456"
query = "insert into test_table(name,age) values(?,?)"
is_exactly_once = "true"
xa_data_source_class_name = "com.db2.cj.jdbc.Db2XADataSource"
}
}
本文由 白鯨開源 提供發佈支持!