我們介紹一下SeaTunnel支持的第一個同步場景:離線批量同步。顧名思意,離線批量同步需要用戶定義好SeaTunnel JobConfig,選擇批處理模式,作業啟動後開始同步數據,當數據同步完成後作業完成退出。 ...
在上一篇文章中,我們介紹瞭如何下載安裝部署SeaTunnel Zeta服務(3分鐘部署SeaTunnel Zeta單節點Standalone模式環境),接下來我們介紹一下SeaTunnel支持的第一個同步場景:離線批量同步。顧名思意,離線批量同步需要用戶定義好SeaTunnel JobConfig,選擇批處理模式,作業啟動後開始同步數據,當數據同步完成後作業完成退出。
下麵以MySQL離線同步到StarRocks為例,介紹如何使用SeaTunnel進行離線同步作業的定義和運行。
1. 定義作業配置文件
SeaTunnel使用配置文件來定義作業,在這個示例中,作業的配置文件如下,文件保存路徑~/seatunnel/apache-seatunnel-incubating-2.3.1/config/mysql_to_sr.config
#定義一些作業的運行參數,具體可以參考 https://seatunnel.apache.org/docs/2.3.1/concept/JobEnvConfig
env {
job.mode="BATCH" #作業的運行模式,BATCH=離線批同步,STREAMING=實時同步
job.name="SeaTunnel_Job"
checkpoint.interval=10000 #每10000ms進行一次checkpoint,後面會詳細介紹checkpoint對JDBC Source和StarRocks Sink這兩個連接器的影響
}
source {
Jdbc {
parallelism=5 # 並行度,這裡是啟動5個Source Task來並行的讀取數據
partition_column="id" # 使用id欄位來進行split的拆分,目前只支持數字類型的主鍵列,而且該列的值最好是離線的,自增id最佳
partition_num="20" # 拆分成20個split,這20個split會被分配給5個Source Task來處理
result_table_name="Table9210050164000"
query="SELECT `id`, `f_binary`, `f_blob`, `f_long_varbinary`, `f_longblob`, `f_tinyblob`, `f_varbinary`, `f_smallint`, `f_smallint_unsigned`, `f_mediumint`, `f_mediumint_unsigned`, `f_int`, `f_int_unsigned`, `f_integer`, `f_integer_unsigned`, `f_bigint`, `f_bigint_unsigned`, `f_numeric`, `f_decimal`, `f_float`, `f_double`, `f_double_precision`, `f_longtext`, `f_mediumtext`, `f_text`, `f_tinytext`, `f_varchar`, `f_date`, `f_datetime`, `f_timestamp` FROM `sr_test`.`test1`"
password="root@123"
driver="com.mysql.cj.jdbc.Driver"
user=root
url="jdbc:mysql://st01:3306/sr_test?enabledTLSProtocols=TLSv1.2&rewriteBatchedStatements=true"
}
}
transform {
# 在本次示例中我們不需要做任務的Transform操作,所以這裡為空,也可以將transform整個元素刪除
}
sink {
StarRocks {
batch_max_rows=10240 #
source_table_name="Table9210050164000"
table="test2"
database="sr_test"
base-url="jdbc:mysql://datasource01:9030"
password="root"
username="root"
nodeUrls=[
"datasource01:8030" #寫入數據是通過StarRocks的Http介面
]
}
}
2. 作業配置說明
在這個作業定義文件中,我們通過env定義了作業的運行模式是BATCH離線批處理模式,同時定義了作業的名稱是"SeaTunnel_Job"。checkpoint.interval參數用來定義該作業過程中多久進行一次checkpoint,那什麼是checkpoint,以及checkpoint在Apache SeaTunnel中的作用是什麼呢?
2.1 checkpoint
查看官方文檔中對Apache SeaTunnel Zeta引擎checkpoint的介紹: https://seatunnel.apache.org/docs/2.3.1/seatunnel-engine/checkpoint-storage#introduction 發現checkpoint是用來使運行在Apache SeaTunnel Zeta中的作業能定期的將自己的狀態以快照的形式保存下來,當任務意外失敗時,可以從最近一次保存的快照中恢復作業,以實現任務的失敗恢復,斷點續傳等功能。其實checkpoint的核心是分散式快照演算法:Chandy-Lamport 演算法,是廣泛應用在分散式系統,更多是分散式計算系統中的一種容錯處理理論基礎。這裡不詳細介紹Chandy-Lamport 演算法,接下來我們重點說明在本示例中checkpoint對這個同步任務的影響。
Apache SeaTunnel Zeta引擎在作業啟動時會啟動一個叫CheckpointManager的線程,用來管理這個作業的checkpoint。SeaTunnel Connector API提供了一套checkpoint的API,用於在引擎觸發checkpoint時通知具體的Connector進行相應的處理。SeaTunnel的Source和Sink連接器都是基於SeaTunnel Connector API開發的,只是不同的連接器對checkpoint API的實現細節不同,所以能實現的功能也不同。
2.1.1 checkpoint對JDBC Source的影響
在本示例中我們通過JDBC Source連接器的官方文檔https://seatunnel.apache.org/docs/2.3.1/connector-v2/source/Jdbc 可以發現如下內容:
這說明JDBC Source連接器實現了checkpoint相關的介面,通過源碼我們可以得知,當checkpoint發生時,JDBC Source會將自己還未處理的split做為狀態的快照發送給CheckpointManager進行持久化保存。這樣當作業失敗並恢復時,JDBC Source會從最近一次保存的快照中讀取哪些split還未處理,然後接著處理這些split。
在該作業中通過partition_num=20,會將query參數中指定的sql語句的結果分成20個split進行處理,每個split會生成讀取它負責的數據的sql,這個sql是由query中指定的sql再加上一些where過濾條件組成的。這20個split會被分配給5個Source Task進行處理,理想情況下,每個Source Task會分配到4個split。假設在一次checkpoint時每個Source Task都只剩下一個split沒有處理,這個split的信息會被保存下來,如果這之後作業掛掉了,作業會自動進行恢復,恢復時每個Source Task都會獲取到那個還未處理的split,並接著進行處理。如果作業不再報錯,這些split都處理完成後,作業運行完成。如果作業還是報錯(比如目標端StarRocks掛了,無法寫入數據),最終作業會以失敗狀態結束。
斷點續傳:
如果在作業失敗後,我們修複了問題,並且希望該作業接著之前的進度運行,只處理那些之前沒有被處理過的split,可以使用 sh seatunnel.sh -r jobId來讓作業ID為jobId的作業從斷點中恢復。
回到主題,checkpoint.interval=10000對於從Mysql中讀取數據意味著每過10s,SeaTunnel Zeta引擎就會觸發一次checkpoint操作,然後JDBC Source Task會被要求將自己還未處理的split信息保存下來,這裡需求註意的是,JDBC Source Task讀取數據是以split為單位的,如果checkpoint觸發時一個split中的數據正在被讀取還未完全發送給下游的StarRocks,它會等到這個split的數據處理完成之後才會響應這次checkpoint操作。這裡一定要註意,如果MySQL中的數據量比較大,一個split的數據需要很長的時候才能處理完成,可能會導致checkpoint超時。關於checkpoint的超時時長可以參數https://seatunnel.apache.org/docs/2.3.1/seatunnel-engine/checkpoint-storage, 預設是1分鐘。
2.1.2 checkpoint對StarRocks Sink的影響
在Sink連接器的文檔上,我們也能看到如下圖中的標識:
這個標識代表該Sink連接器是否實現了精確處理一次的語義,如果該標識被選中,說明這個Sink連接器能保證發給它的數據它只會往目標端寫入一次,不會漏掉導致目標端數據丟失 ,也不會重覆往目標端寫入。這一功能常見的實現方式是兩階段提交,支持事務的連接器一般會先開啟事務進行數據的寫入。當checkpoint發生時,將事務ID返回給CheckManager進行持久化,當作業中的所有Task都響應了CheckManager的checkpoint請求後,第一階段完成。然後Apache SeaTunnel Zeta引擎會調用AggregateCommit的方法讓Sink對其事務進行提交,這個過程被稱為第二階段,第二階段完成後該次checkpoint完成。如果第二階段提交失敗,作業會失敗,然後自動恢復,恢復後會再次從第二階段開始,要求對事務進行提交,直到該事務提交完成,如果事務一直失敗,作業也將失敗。
並不是只有實現了exactly-once特性的Sink連接器才能保證目標端的數據不丟失不重覆,如果目標端的資料庫支持以主鍵去重,那隻要Sink連接器保證發送給它的數據至少往目標端寫入一次,無論重覆寫入多少次,最終都不會導致目標端數據丟失或重覆。在該示例中StarRocks Sink連接器即是使用了這種方式,StarRocks Sink連接器會將收到的數據先緩存在記憶體中,當緩存的行數達到batch_max_rows設置的10240行,就會發起一次寫入請求,將數據寫入到StarRocks中。如果MySQL中的數據量很小,達不到10240行,那就會在checkpoint觸發時進行StarRocks的寫入。
3. 運行作業
我們使用Apache SeaTunnel Zeta引擎來運行該作業
cd ~/seatunnel/apache-seatunnel-incubating-2.3.1
sh bin/seatunnel.sh --config config/mysql_to_sr.config
作業運行完成後可以看到如下信息,說明作業狀態為FINISHED,讀取20w行數據,寫入StarRocks也是20w行數據,用時6s。