一文教會你用Apache SeaTunnel Zeta離線把數據從MySQL同步到StarRocks

来源:https://www.cnblogs.com/seatunnel/archive/2023/05/26/17435413.html
-Advertisement-
Play Games

我們介紹一下SeaTunnel支持的第一個同步場景:離線批量同步。顧名思意,離線批量同步需要用戶定義好SeaTunnel JobConfig,選擇批處理模式,作業啟動後開始同步數據,當數據同步完成後作業完成退出。 ...


在上一篇文章中,我們介紹瞭如何下載安裝部署SeaTunnel Zeta服務(3分鐘部署SeaTunnel Zeta單節點Standalone模式環境),接下來我們介紹一下SeaTunnel支持的第一個同步場景:離線批量同步。顧名思意,離線批量同步需要用戶定義好SeaTunnel JobConfig,選擇批處理模式,作業啟動後開始同步數據,當數據同步完成後作業完成退出。

下麵以MySQL離線同步到StarRocks為例,介紹如何使用SeaTunnel進行離線同步作業的定義和運行。

1. 定義作業配置文件

SeaTunnel使用配置文件來定義作業,在這個示例中,作業的配置文件如下,文件保存路徑~/seatunnel/apache-seatunnel-incubating-2.3.1/config/mysql_to_sr.config

#定義一些作業的運行參數,具體可以參考 https://seatunnel.apache.org/docs/2.3.1/concept/JobEnvConfig
env {
	job.mode="BATCH"  #作業的運行模式,BATCH=離線批同步,STREAMING=實時同步
	job.name="SeaTunnel_Job"
	checkpoint.interval=10000 #每10000ms進行一次checkpoint,後面會詳細介紹checkpoint對JDBC Source和StarRocks Sink這兩個連接器的影響
}
source {
	Jdbc {
    	parallelism=5 # 並行度,這裡是啟動5個Source Task來並行的讀取數據
    	partition_column="id" # 使用id欄位來進行split的拆分,目前只支持數字類型的主鍵列,而且該列的值最好是離線的,自增id最佳
    	partition_num="20" # 拆分成20個split,這20個split會被分配給5個Source Task來處理
    	result_table_name="Table9210050164000"
    	query="SELECT `id`, `f_binary`, `f_blob`, `f_long_varbinary`, `f_longblob`, `f_tinyblob`, `f_varbinary`, `f_smallint`, `f_smallint_unsigned`, `f_mediumint`, `f_mediumint_unsigned`, `f_int`, `f_int_unsigned`, `f_integer`, `f_integer_unsigned`, `f_bigint`, `f_bigint_unsigned`, `f_numeric`, `f_decimal`, `f_float`, `f_double`, `f_double_precision`, `f_longtext`, `f_mediumtext`, `f_text`, `f_tinytext`, `f_varchar`, `f_date`, `f_datetime`, `f_timestamp` FROM `sr_test`.`test1`"
    	password="root@123"
    	driver="com.mysql.cj.jdbc.Driver"
    	user=root
    	url="jdbc:mysql://st01:3306/sr_test?enabledTLSProtocols=TLSv1.2&rewriteBatchedStatements=true"
	}
}
transform {
# 在本次示例中我們不需要做任務的Transform操作,所以這裡為空,也可以將transform整個元素刪除
}
sink {
	StarRocks {
    	batch_max_rows=10240 # 
    	source_table_name="Table9210050164000"
    	table="test2"
    	database="sr_test"
    	base-url="jdbc:mysql://datasource01:9030"
    	password="root"
    	username="root"
    	nodeUrls=[
        	"datasource01:8030" #寫入數據是通過StarRocks的Http介面
    	]
	}
}

2. 作業配置說明

在這個作業定義文件中,我們通過env定義了作業的運行模式是BATCH離線批處理模式,同時定義了作業的名稱是"SeaTunnel_Job"。checkpoint.interval參數用來定義該作業過程中多久進行一次checkpoint,那什麼是checkpoint,以及checkpoint在Apache SeaTunnel中的作用是什麼呢?

2.1 checkpoint

查看官方文檔中對Apache SeaTunnel Zeta引擎checkpoint的介紹: https://seatunnel.apache.org/docs/2.3.1/seatunnel-engine/checkpoint-storage#introduction 發現checkpoint是用來使運行在Apache SeaTunnel Zeta中的作業能定期的將自己的狀態以快照的形式保存下來,當任務意外失敗時,可以從最近一次保存的快照中恢復作業,以實現任務的失敗恢復,斷點續傳等功能。其實checkpoint的核心是分散式快照演算法:Chandy-Lamport 演算法,是廣泛應用在分散式系統,更多是分散式計算系統中的一種容錯處理理論基礎。這裡不詳細介紹Chandy-Lamport 演算法,接下來我們重點說明在本示例中checkpoint對這個同步任務的影響。

Apache SeaTunnel Zeta引擎在作業啟動時會啟動一個叫CheckpointManager的線程,用來管理這個作業的checkpoint。SeaTunnel Connector API提供了一套checkpoint的API,用於在引擎觸發checkpoint時通知具體的Connector進行相應的處理。SeaTunnel的Source和Sink連接器都是基於SeaTunnel Connector API開發的,只是不同的連接器對checkpoint API的實現細節不同,所以能實現的功能也不同。

2.1.1 checkpoint對JDBC Source的影響

在本示例中我們通過JDBC Source連接器的官方文檔https://seatunnel.apache.org/docs/2.3.1/connector-v2/source/Jdbc 可以發現如下內容:

這說明JDBC Source連接器實現了checkpoint相關的介面,通過源碼我們可以得知,當checkpoint發生時,JDBC Source會將自己還未處理的split做為狀態的快照發送給CheckpointManager進行持久化保存。這樣當作業失敗並恢復時,JDBC Source會從最近一次保存的快照中讀取哪些split還未處理,然後接著處理這些split。

在該作業中通過partition_num=20,會將query參數中指定的sql語句的結果分成20個split進行處理,每個split會生成讀取它負責的數據的sql,這個sql是由query中指定的sql再加上一些where過濾條件組成的。這20個split會被分配給5個Source Task進行處理,理想情況下,每個Source Task會分配到4個split。假設在一次checkpoint時每個Source Task都只剩下一個split沒有處理,這個split的信息會被保存下來,如果這之後作業掛掉了,作業會自動進行恢復,恢復時每個Source Task都會獲取到那個還未處理的split,並接著進行處理。如果作業不再報錯,這些split都處理完成後,作業運行完成。如果作業還是報錯(比如目標端StarRocks掛了,無法寫入數據),最終作業會以失敗狀態結束。

斷點續傳:

如果在作業失敗後,我們修複了問題,並且希望該作業接著之前的進度運行,只處理那些之前沒有被處理過的split,可以使用 sh seatunnel.sh -r jobId來讓作業ID為jobId的作業從斷點中恢復。

回到主題,checkpoint.interval=10000對於從Mysql中讀取數據意味著每過10s,SeaTunnel Zeta引擎就會觸發一次checkpoint操作,然後JDBC Source Task會被要求將自己還未處理的split信息保存下來,這裡需求註意的是,JDBC Source Task讀取數據是以split為單位的,如果checkpoint觸發時一個split中的數據正在被讀取還未完全發送給下游的StarRocks,它會等到這個split的數據處理完成之後才會響應這次checkpoint操作。這裡一定要註意,如果MySQL中的數據量比較大,一個split的數據需要很長的時候才能處理完成,可能會導致checkpoint超時。關於checkpoint的超時時長可以參數https://seatunnel.apache.org/docs/2.3.1/seatunnel-engine/checkpoint-storage, 預設是1分鐘。

2.1.2 checkpoint對StarRocks Sink的影響

在Sink連接器的文檔上,我們也能看到如下圖中的標識:

這個標識代表該Sink連接器是否實現了精確處理一次的語義,如果該標識被選中,說明這個Sink連接器能保證發給它的數據它只會往目標端寫入一次,不會漏掉導致目標端數據丟失 ,也不會重覆往目標端寫入。這一功能常見的實現方式是兩階段提交,支持事務的連接器一般會先開啟事務進行數據的寫入。當checkpoint發生時,將事務ID返回給CheckManager進行持久化,當作業中的所有Task都響應了CheckManager的checkpoint請求後,第一階段完成。然後Apache SeaTunnel Zeta引擎會調用AggregateCommit的方法讓Sink對其事務進行提交,這個過程被稱為第二階段,第二階段完成後該次checkpoint完成。如果第二階段提交失敗,作業會失敗,然後自動恢復,恢復後會再次從第二階段開始,要求對事務進行提交,直到該事務提交完成,如果事務一直失敗,作業也將失敗。

並不是只有實現了exactly-once特性的Sink連接器才能保證目標端的數據不丟失不重覆,如果目標端的資料庫支持以主鍵去重,那隻要Sink連接器保證發送給它的數據至少往目標端寫入一次,無論重覆寫入多少次,最終都不會導致目標端數據丟失或重覆。在該示例中StarRocks Sink連接器即是使用了這種方式,StarRocks Sink連接器會將收到的數據先緩存在記憶體中,當緩存的行數達到batch_max_rows設置的10240行,就會發起一次寫入請求,將數據寫入到StarRocks中。如果MySQL中的數據量很小,達不到10240行,那就會在checkpoint觸發時進行StarRocks的寫入。

3. 運行作業

我們使用Apache SeaTunnel Zeta引擎來運行該作業

cd ~/seatunnel/apache-seatunnel-incubating-2.3.1
sh bin/seatunnel.sh --config config/mysql_to_sr.config

作業運行完成後可以看到如下信息,說明作業狀態為FINISHED,讀取20w行數據,寫入StarRocks也是20w行數據,用時6s。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • > ML.Net - 開源的跨平臺機器學習框架 > - 支持CPU/GPU訓練 > - 輕鬆簡潔的預測代碼 > - 可擴展其他的機器學習平臺 > - 跨平臺 ![img](https://img2023.cnblogs.com/blog/1339560/202305/1339560-20230524 ...
  • 要使用 `systemctl` 啟動單個服務,其中包含多個進程,你可以使用 Systemd 的 `template` 機制。以下是使用 Systemd 'template' 以創建一個可同時啟動多個進程的服務單元文件的過程: 1. 為你的服務創建一個 template 服務單元文件。服務單元文件通常 ...
  • 目錄 一、條件判斷 二、邏輯判斷 三、if和case 四、七個實驗 一、條件判斷 1.test測試 test [ 條件表達式 ] -e:測試目錄是否存在 -d:測試是否為目錄 -f:是否為文件 -r:當前用戶是否有讀寫許可權 -w:當前用戶是否有寫許可權 -x:當前用戶是否有執行許可權 2.整數值判斷 格 ...
  • INFINI Labs 產品更新啦~,本次產品版本更新包括 Gateway v1.14.0、Console v1.2.0、Easysearch v1.1.1 等,其中 Console 在上一版基礎上做了很多優化改進以及新增了一些特性,如新增數據比對校驗功能、數據看板模塊新增了表格組件、圖表組件支持下 ...
  • 我們都知道,預設情況下,nginx的項目log是一直被累計寫入的,隨著時間越久,那麼這個文件就會越大,這個時候如果我們要去做一些查找和排查就會比較困難,因為日誌文件太大,操作起來比較費勁。 因此我們為了規避這個問題,提出日誌切割的方案。 那日誌切割的原理是怎麼樣的,我們來分析一下,我們先統計下連續1 ...
  • 在上篇文章 [《深入理解 slab cache 記憶體分配全鏈路實現》](https://mp.weixin.qq.com/s?__biz=Mzg2MzU3Mjc3Ng==&mid=2247488152&idx=1&sn=7c65f8ee28e9cc14a86e9df92b6d2b93&chksm=c ...
  • ![](https://img2023.cnblogs.com/blog/3076680/202305/3076680-20230516151258933-1445766867.png) # 1. 基本信息 SQL進階教程 [日]MICK 人民郵電出版社,2017年11月出版,**1版** ## 1 ...
  • ## Doris 簡介 ### Doris 概述 Apache Doris 由百度大數據部研發 (之前叫百度 Palo,2018 年貢獻到 Apache 社區後,更名為 Doris), 在百度內部,有超過 200 個產品線在使用,部署機器超過 1000 台,單一業務最大可達到上百 TB。 Apach ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...