Apache SeaTunnel是一個非常易於使用的、超高性能的分散式數據集成平臺,支持海量數據的實時同步。每天可穩定高效同步數百億數據,已被近百家企業投入生產使用。 現在的版本不支持通過jtds的方式鏈接sqlserver,我們來自己寫代碼來實現它,並把代碼提交給apache seatunnel。 ...
Apache SeaTunnel是一個非常易於使用的、超高性能的分散式數據集成平臺,支持海量數據的實時同步。每天可穩定高效同步數百億數據,已被近百家企業投入生產使用。
現在的版本不支持通過jtds的方式鏈接sqlserver,我們來自己寫代碼來實現它,並把代碼提交給apache seatunnel。
1. 下載源代碼
1.首先從遠端倉庫 https://github.com/apache/seatunnel fork一份代碼到自己的倉庫中
2.遠端倉庫中目前有超過30個分支:
-
dev :日常開發分支
-
其他分支 :發佈版本分支
3.把自己倉庫clone到本地
git clone [email protected]:yougithubID/seatunnel.git
- 添加遠端倉庫地址,命名為upstream
這一步是為了讓本地代碼知道他的上游是apache/seatunnel
git remote add upstream [email protected]:apache/seatunnel.git
5.查看倉庫:
git remote -v
此時會有兩個倉庫:origin(自己的倉庫)和upstream(遠端倉庫)
6.獲取/更新遠端倉庫代碼(已經是最新代碼,就跳過)
git fetch upstream
2. 編寫代碼
1.載入拉取到本地的代碼到IDEA中
這裡我們需要註意兩個module:seatunnel-connectors-v2和seatunnel-examples,其中seatunnel-connectors-v2是我們來寫代碼的module,seatunnel-examples是我們用來測試代碼的module。
2.編寫代碼
目前代碼中已經實現了基於JDBC的方式取鏈接SqlServer。我們只需要在它的基礎之上去做一定的修改即可,經過debug來閱讀源碼,並瞭解了需要改的地方如下圖:
代碼實現如下:
SqlServerDialectFactory.java
return (url.startsWith("jdbc:jtds:sqlserver:") || url.startsWith("jdbc:sqlserver:"));
SqlserverTypeMapper.java
private static final String SQLSERVER_SYSNAME = "SYSNAME";
case SQLSERVER_SYSNAME:
return BasicType.STRING_TYPE;
SimpleJdbcConnectionProvider.java
public boolean isConnectionValid() throws SQLException {
if (connection != null && connection.toString().startsWith("net.sourceforge.jtds")){
return connection != null
&& !connection.isClosed();
}else {
return connection != null
&& connection.isValid(jdbcConfig.getConnectionCheckTimeoutSeconds());
}
}
pom.xml
<jtds.version>1.3.1</jtds.version>
<dependency>
<groupId>net.sourceforge.jtds</groupId>
<artifactId>jtds</artifactId>
<version>${jtds.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>net.sourceforge.jtds</groupId>
<artifactId>jtds</artifactId>
<version>${jtds.version}</version>
</dependency>
3. 測試代碼
1.編寫config文件,我們測試通過net.sourceforge.jtds.jdbc.Driver 從sqlserver中讀出數據再寫入sqlserver中
env {
# You can set flink configuration here
execution.parallelism = 1
job.mode = "BATCH"
#execution.checkpoint.interval = 10000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
Jdbc {
driver = net.sourceforge.jtds.jdbc.Driver
url = "jdbc:jtds:sqlserver://localhost:1433/dbname"
user = SA
password = "A_Str0ng_Required_Password"
query = "select age, name from source"
}
# If you would like to get more information about how to configure seatunnel and see full list of source plugins,
# please go to https://seatunnel.apache.org/docs/connector-v2/source/Jdbc
}
transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/transform-v2/sql
}
sink {
Jdbc {
driver = net.sourceforge.jtds.jdbc.Driver
url = "jdbc:jtds:sqlserver://localhost:1433/dbname"
user = SA
password = "A_Str0ng_Required_Password"
query = "insert into sink(age, name) values(?,?)"
}
# If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
# please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc
}
2.修改seatunnel-flink-connector-v2-example中的SeaTunnelApiExample,寫入我們寫好的config文件
添加seatunnel-flink-connector-v2-example pom文件中添加jdbc依賴
3.運行代碼
運行SeaTunnelApiExample,右鍵->run
4. 提交issue
issue的作用就是告訴社區我們打算做什麼事情,後續的PR就是來提交代碼解決這個issue。除此以外issue也是我們來提出bug或者其他想法的地方。不一定自己來實現它。你提出來,別人能解決,他們就會提交PR來解決這個問題。
我這裡提交了一個叫[Feature][Connector-V2][SqlServer] Support driver jtds for SqlServer #5307 的issue,其中Feature可以按具體的內容換成BUG/DOCS等等 Connector-V2可以換成其他的具體模塊,這裡大家可以參照別人已經提的issue來命名。
註:帶"*"的都是必填項
5. 提交代碼
git commit -m 'commit content'
git push
提交後在github上查看提交代碼詳情
6. 提交PR(Pull Request)
提交完成後,因為我們的倉庫的代碼的上游是apache/seatunnel,在conribute中我們就可以去提交一個PR。
點擊Open pull request,就會跳轉到上游apache/seatunnel下麵的Pull Request列表,並自動開發創建PR頁面。填寫相關內容,PR的名稱就跟需要解決的這個issue一樣就可以了,需要註意的時候,最後要帶上issue的ID ,我這裡是#5307 ,之後點擊提交就可以了。
之後就等社區的管理員審核就可以了,這個過程中可能會在PR中進行留言交流,必要的話,再修改代碼,重新提交代碼,openPR...
本文由 白鯨開源 提供發佈支持!