教程 | 使用 Apache SeaTunnel 同步本地文件到阿裡雲 OSS

来源:https://www.cnblogs.com/seatunnel/archive/2023/09/26/17729665.html
-Advertisement-
Play Games

一直以來,大數據量一直是爆炸性增長,每天幾十 TB 的數據增量已經非常常見,但雲存儲相對來說還是不便宜的。眾多雲上的大數據用戶特別希望可以非常簡單快速的將文件移動到更實惠的 S3、OSS 上進行保存,這篇文章就來介紹如何使用 SeaTunnel 來進行到 OSS 的數據同步。 首先簡要介紹一下 Ap ...


file

一直以來,大數據量一直是爆炸性增長,每天幾十 TB 的數據增量已經非常常見,但雲存儲相對來說還是不便宜的。眾多雲上的大數據用戶特別希望可以非常簡單快速的將文件移動到更實惠的 S3、OSS 上進行保存,這篇文章就來介紹如何使用 SeaTunnel 來進行到 OSS 的數據同步。

首先簡要介紹一下 Apache SeaTunnel,SeaTunnel 專註於數據集成和數據同步,主要解決以下問題:

  • 數據源多樣:常用的數據源有數百種,版本不相容。隨著新技術的出現,出現了更多的數據源。用戶很難找到能夠全面快速支持這些數據源的工具。

  • 複雜同步場景:數據同步需要支持離線-全量同步、離線-增量同步、CDC、實時同步、全庫同步等多種同步場景。

  • 資源需求高:現有的數據集成和數據同步工具往往需要大量的計算資源或 JDBC 連接資源來完成海量小表的實時同步。這在一定程度上加重了企業的負擔。

  • 缺乏質量和監控:數據集成和同步過程經常會丟失或重覆數據。同步過程缺乏監控,無法直觀瞭解任務過程中數據的真實情況

SeaTunnel 支持海量數據的高效離線/實時同步, 每天可穩定高效同步數百億級數據,已經有 B 站,騰訊雲,微博,360,Shopee 等數百家公司生產使用。

下麵步入今天的正題,今天具體來說是講 Apache SeaTunnel 產品與阿裡雲 OSS 的集成。

在阿裡雲 OSS 產品界面,開通 Bucket:

file

下麵是 SeaTunnel 的部署, SeaTunnel 支持多種部署方式: 單機,集群,K8s 等方式。由於 SeaTunnel 不依賴 Zookeeper 等第三方組件,所以整體部署非常簡單,具體請參考其官網:https://seatunnel.apache.org/docs/2.3.0/start-v2/locally/deployment

接下來是 SeaTunnel 使用過程,使用命令:

./bin/seatunnel.sh -m local -c ./config/localfile-oss.config

在 SeaTunnel 中,用戶可以通過 config 文件定製自己的數據同步需求,最大限度地發揮 SeaTunnel 的潛力。那麼接下來就給大家介紹一下如何配置 Config 文件

可以看到,config 文件包含幾個部分:env、source、transform、sink。不同的模塊有不同的功能。瞭解這些模塊後,您將瞭解 SeaTunnel 的工作原理。

用於添加一些引擎可選參數,無論是哪個引擎(Spark或Flink),這裡都要填寫相應的可選參數。

source 用於定義 SeaTunnel 需要從哪裡獲取數據,並將獲取的數據用於下一步。可以同時定義多個源。現在支持的來源檢查 SeaTunnel 的來源。每個 Source 都有自己特定的參數來定義如何取數據,SeaTunnel 也提取了每個 source 會用到的參數,比如parameter,用來指定 result_table_name 當前 source 產生的數據的名稱,方便供其他模塊後續使用。

本例中的 localfile-oss.config 配置文件內容介紹:

env {                                                                                                                                                                          
 
  # You can set SeaTunnel environment configuration here                                                                                                                      
 
  execution.parallelism = 10                                                                                                                                                  
 
  job.mode = "BATCH"                                                                                                                                                           
 
  checkpoint.interval = 10000                                                                                                                                                  
 
  #execution.checkpoint.interval = 10000                                                                                                                                      
 
  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"                                                                                                         
 
}                                                                                                                                                                              
 
                                                                                                                                                                               
 
source {                                                                                                                                                                       
 
LocalFile {                                                                                                                                                                   
  #本地待同步的數據文件夾, 本例子中只有一個 test0.csv 文件,具體內容參考下圖
  path = "/data/seatunnel-2.3.1/testfile/source"                                                                                                                              
 
  type = "csv"                                                                                                                                                                
                                                                                                                                                                   
  delimiter = "#"                                                                                                                                                               
 
  schema {                                                                                                                                                                     
 
    fields {                                                                                                                                                                   
 
        name = string                                                                                                                                                          
 
        age = int                                                                                                                                                             
 
        gender = string                                                                                                                                                        
 
    }                                                                                                                                                                          
 
  }                                                                                                                                                                            
}                                                                                                                                                                             
                                                                                                        
 
}                                                                                                                                                                              
 
                                                                                                                                                                               
 
sink {                                                                                                                                                                                                                                                                                                                                         
  OssJindoFile {                                                                                                                                                              
                                                                                                                                                                                                                                   path="/seatunnel/oss03"                                                        
    bucket = "oss://bucket123456654321234.cn-hangzhou.oss-dls.aliyuncs.com"                                                                                                      
 
    access_key = "I5t7VZyZSmMNwKsNv1LTADxW"                                                                                                                                   
 
    access_secret = "BinZ9J0zYxRbvG9wQUi6LiUjZElLTA"                                                                                                                                                                                                                                                           
 
    endpoint = "cn-hangzhou.oss-dls.aliyuncs.com"                                                                                                                             
 
  }
                                                                                                                                                                                                                                                                                  
}

註:下圖本地待同步的數據文件夾, 本例子中只有一個 test0.csv 文件,具體內容
file

特別註意:如果是開通了 HDFS 的 OSS,有 2 個地方是不一樣的:1 是 bucket,1 是 endpoint 。如下紅色部分是開通了 HDFS 後的,被 “#” 註釋掉的是未開通 HDFS 的情況。

file

SeaTunnel 對這 2 種情況都是支持的,只是大家要註意一下配置 bucket 和 endpoint 時的不同!

執行運行命令後,我們可以從 SeaTunnel 控制台看下以下 SeaTunnel 本次同步情況的數據:


       Job Statistic Information                                                                                                                                           

Start Time : 2023-02-22 17:12:19

End Time : 2023-02-22 17:12:37

Total Time(s) : 18

Total Read Count : 10000000

Total Write Count : 10000000

Total Failed Count : 0


從阿裡雲界面上可以看到 OSS 端的監控數據:

file
file
file

可以看出來 SeaTunnel 快速高效地同步了 1000萬數據量的本地文件!

最後,Apache SeaTunnel 目前已經支持了過百種數據源,併發布了 SeaTunnel Zeta 同步引擎,性能巨佳,還有群進行技術支持,歡迎對比,歡迎一試!感興趣的伙伴歡迎聯繫社區志願者微信: seatunnel1

參考:

1、https://seatunnel.apache.org/docs/2.3.0/start-v2/locally/deployment

2、https://seatunnel.apache.org/docs/2.3.0/start-v2/locally/quick-start-seatunnel-engine

3、https://seatunnel.apache.org

本文由 白鯨開源 提供發佈支持!


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 引言 在C#的併發編程中,Channel是一種非常強大的數據結構,用於在生產者和消費者之間進行通信。本文將首先通過一個實際的使用案例,介紹如何在C#中使用Channel,然後深入到Channel的源碼中,解析其內部的實現機制。 使用案例一:文件遍歷和過濾 在我們的使用案例中,我們需要遍歷一個文件夾及 ...
  • 一、vi編譯器介紹 Vi編輯器是所有Unix及Linux系統下標準的編輯器,類似於windows系統下的notepad(記事本)編輯器,由於在Unix及Linux系統的任何版本,Vi編輯器是完全相同的,因 此可以在其他任何介紹vi的地方都能進一步瞭解它,Vi也是Linux中最基本的文本編輯器,學會它 ...
  • 一、準備環節 rpm -qa | grep postgres 檢查PostgreSQL 是否已經安裝 rpm -qal | grep postgres 檢查PostgreSQL 安裝位置 postgresql-12.2.tar.gz 二、Pgsql資料庫安裝下載 下載地址: http://www.p ...
  • 1、Stream記憶體帶寬測試 Stream是業界主流的記憶體帶寬測試程式,測試行為相對簡單可控。該程式對CPU的計算能力要求很小,對CPU記憶體帶寬壓力很大。隨著處理器核心數量的增大,而記憶體帶寬並沒有隨之成線性增長,因此記憶體帶寬對提升多核心的處理能力就越發重要。Stream具有良好的空間局部性,是對TL ...
  • 前言 不想看可以跳過前言部分,教程在下幾章。 ​ 最新搬到新校園,寢室的校園網可使用網線連接。雖然撥號的寬頻賬號和密碼已經自動記錄,但啟動電腦並登入電腦時仍需要手動進入設置並點擊自動登錄,就像鞋子里的小石子,雖然腳不會出血,但就是難受。於是開始網上搜索教程win11自動撥號。結合了兩篇文章實現了開機 ...
  • MySQL 高級(進階) SQL 語句 use gy; create table location (Region char(20),Store_Name char(20)); insert into location values('East','Boston'); insert into loc ...
  • 一、背景 在預發環境中,由消息驅動最終觸發執行事務來寫庫存,但是導致MySQL發生死鎖,寫庫存失敗。 com.mysql.jdbc.exceptions.jdbc4.MySQLTransactionRollbackException: rpc error: code = Aborted desc = ...
  • 為了標識一段數據,通常我們會為其指定一個唯一id,比如利用MySQL資料庫中的自增主鍵。 但是當數據量非常大時,僅靠資料庫的自增主鍵是遠遠不夠的,並且對於分散式資料庫只依賴MySQL的自增id無法滿足全局唯一的需求。因此,產生了多種解決方案,如UUID,SnowFlake等。下文將介紹Vitess是... ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...