教程 | 使用 Apache SeaTunnel 同步本地文件到阿裡雲 OSS

来源:https://www.cnblogs.com/seatunnel/archive/2023/09/26/17729665.html
-Advertisement-
Play Games

一直以來,大數據量一直是爆炸性增長,每天幾十 TB 的數據增量已經非常常見,但雲存儲相對來說還是不便宜的。眾多雲上的大數據用戶特別希望可以非常簡單快速的將文件移動到更實惠的 S3、OSS 上進行保存,這篇文章就來介紹如何使用 SeaTunnel 來進行到 OSS 的數據同步。 首先簡要介紹一下 Ap ...


file

一直以來,大數據量一直是爆炸性增長,每天幾十 TB 的數據增量已經非常常見,但雲存儲相對來說還是不便宜的。眾多雲上的大數據用戶特別希望可以非常簡單快速的將文件移動到更實惠的 S3、OSS 上進行保存,這篇文章就來介紹如何使用 SeaTunnel 來進行到 OSS 的數據同步。

首先簡要介紹一下 Apache SeaTunnel,SeaTunnel 專註於數據集成和數據同步,主要解決以下問題:

  • 數據源多樣:常用的數據源有數百種,版本不相容。隨著新技術的出現,出現了更多的數據源。用戶很難找到能夠全面快速支持這些數據源的工具。

  • 複雜同步場景:數據同步需要支持離線-全量同步、離線-增量同步、CDC、實時同步、全庫同步等多種同步場景。

  • 資源需求高:現有的數據集成和數據同步工具往往需要大量的計算資源或 JDBC 連接資源來完成海量小表的實時同步。這在一定程度上加重了企業的負擔。

  • 缺乏質量和監控:數據集成和同步過程經常會丟失或重覆數據。同步過程缺乏監控,無法直觀瞭解任務過程中數據的真實情況

SeaTunnel 支持海量數據的高效離線/實時同步, 每天可穩定高效同步數百億級數據,已經有 B 站,騰訊雲,微博,360,Shopee 等數百家公司生產使用。

下麵步入今天的正題,今天具體來說是講 Apache SeaTunnel 產品與阿裡雲 OSS 的集成。

在阿裡雲 OSS 產品界面,開通 Bucket:

file

下麵是 SeaTunnel 的部署, SeaTunnel 支持多種部署方式: 單機,集群,K8s 等方式。由於 SeaTunnel 不依賴 Zookeeper 等第三方組件,所以整體部署非常簡單,具體請參考其官網:https://seatunnel.apache.org/docs/2.3.0/start-v2/locally/deployment

接下來是 SeaTunnel 使用過程,使用命令:

./bin/seatunnel.sh -m local -c ./config/localfile-oss.config

在 SeaTunnel 中,用戶可以通過 config 文件定製自己的數據同步需求,最大限度地發揮 SeaTunnel 的潛力。那麼接下來就給大家介紹一下如何配置 Config 文件

可以看到,config 文件包含幾個部分:env、source、transform、sink。不同的模塊有不同的功能。瞭解這些模塊後,您將瞭解 SeaTunnel 的工作原理。

用於添加一些引擎可選參數,無論是哪個引擎(Spark或Flink),這裡都要填寫相應的可選參數。

source 用於定義 SeaTunnel 需要從哪裡獲取數據,並將獲取的數據用於下一步。可以同時定義多個源。現在支持的來源檢查 SeaTunnel 的來源。每個 Source 都有自己特定的參數來定義如何取數據,SeaTunnel 也提取了每個 source 會用到的參數,比如parameter,用來指定 result_table_name 當前 source 產生的數據的名稱,方便供其他模塊後續使用。

本例中的 localfile-oss.config 配置文件內容介紹:

env {                                                                                                                                                                          
 
  # You can set SeaTunnel environment configuration here                                                                                                                      
 
  execution.parallelism = 10                                                                                                                                                  
 
  job.mode = "BATCH"                                                                                                                                                           
 
  checkpoint.interval = 10000                                                                                                                                                  
 
  #execution.checkpoint.interval = 10000                                                                                                                                      
 
  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"                                                                                                         
 
}                                                                                                                                                                              
 
                                                                                                                                                                               
 
source {                                                                                                                                                                       
 
LocalFile {                                                                                                                                                                   
  #本地待同步的數據文件夾, 本例子中只有一個 test0.csv 文件,具體內容參考下圖
  path = "/data/seatunnel-2.3.1/testfile/source"                                                                                                                              
 
  type = "csv"                                                                                                                                                                
                                                                                                                                                                   
  delimiter = "#"                                                                                                                                                               
 
  schema {                                                                                                                                                                     
 
    fields {                                                                                                                                                                   
 
        name = string                                                                                                                                                          
 
        age = int                                                                                                                                                             
 
        gender = string                                                                                                                                                        
 
    }                                                                                                                                                                          
 
  }                                                                                                                                                                            
}                                                                                                                                                                             
                                                                                                        
 
}                                                                                                                                                                              
 
                                                                                                                                                                               
 
sink {                                                                                                                                                                                                                                                                                                                                         
  OssJindoFile {                                                                                                                                                              
                                                                                                                                                                                                                                   path="/seatunnel/oss03"                                                        
    bucket = "oss://bucket123456654321234.cn-hangzhou.oss-dls.aliyuncs.com"                                                                                                      
 
    access_key = "I5t7VZyZSmMNwKsNv1LTADxW"                                                                                                                                   
 
    access_secret = "BinZ9J0zYxRbvG9wQUi6LiUjZElLTA"                                                                                                                                                                                                                                                           
 
    endpoint = "cn-hangzhou.oss-dls.aliyuncs.com"                                                                                                                             
 
  }
                                                                                                                                                                                                                                                                                  
}

註:下圖本地待同步的數據文件夾, 本例子中只有一個 test0.csv 文件,具體內容
file

特別註意:如果是開通了 HDFS 的 OSS,有 2 個地方是不一樣的:1 是 bucket,1 是 endpoint 。如下紅色部分是開通了 HDFS 後的,被 “#” 註釋掉的是未開通 HDFS 的情況。

file

SeaTunnel 對這 2 種情況都是支持的,只是大家要註意一下配置 bucket 和 endpoint 時的不同!

執行運行命令後,我們可以從 SeaTunnel 控制台看下以下 SeaTunnel 本次同步情況的數據:


       Job Statistic Information                                                                                                                                           

Start Time : 2023-02-22 17:12:19

End Time : 2023-02-22 17:12:37

Total Time(s) : 18

Total Read Count : 10000000

Total Write Count : 10000000

Total Failed Count : 0


從阿裡雲界面上可以看到 OSS 端的監控數據:

file
file
file

可以看出來 SeaTunnel 快速高效地同步了 1000萬數據量的本地文件!

最後,Apache SeaTunnel 目前已經支持了過百種數據源,併發布了 SeaTunnel Zeta 同步引擎,性能巨佳,還有群進行技術支持,歡迎對比,歡迎一試!感興趣的伙伴歡迎聯繫社區志願者微信: seatunnel1

參考:

1、https://seatunnel.apache.org/docs/2.3.0/start-v2/locally/deployment

2、https://seatunnel.apache.org/docs/2.3.0/start-v2/locally/quick-start-seatunnel-engine

3、https://seatunnel.apache.org

本文由 白鯨開源 提供發佈支持!


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 引言 在C#的併發編程中,Channel是一種非常強大的數據結構,用於在生產者和消費者之間進行通信。本文將首先通過一個實際的使用案例,介紹如何在C#中使用Channel,然後深入到Channel的源碼中,解析其內部的實現機制。 使用案例一:文件遍歷和過濾 在我們的使用案例中,我們需要遍歷一個文件夾及 ...
  • 一、vi編譯器介紹 Vi編輯器是所有Unix及Linux系統下標準的編輯器,類似於windows系統下的notepad(記事本)編輯器,由於在Unix及Linux系統的任何版本,Vi編輯器是完全相同的,因 此可以在其他任何介紹vi的地方都能進一步瞭解它,Vi也是Linux中最基本的文本編輯器,學會它 ...
  • 一、準備環節 rpm -qa | grep postgres 檢查PostgreSQL 是否已經安裝 rpm -qal | grep postgres 檢查PostgreSQL 安裝位置 postgresql-12.2.tar.gz 二、Pgsql資料庫安裝下載 下載地址: http://www.p ...
  • 1、Stream記憶體帶寬測試 Stream是業界主流的記憶體帶寬測試程式,測試行為相對簡單可控。該程式對CPU的計算能力要求很小,對CPU記憶體帶寬壓力很大。隨著處理器核心數量的增大,而記憶體帶寬並沒有隨之成線性增長,因此記憶體帶寬對提升多核心的處理能力就越發重要。Stream具有良好的空間局部性,是對TL ...
  • 前言 不想看可以跳過前言部分,教程在下幾章。 ​ 最新搬到新校園,寢室的校園網可使用網線連接。雖然撥號的寬頻賬號和密碼已經自動記錄,但啟動電腦並登入電腦時仍需要手動進入設置並點擊自動登錄,就像鞋子里的小石子,雖然腳不會出血,但就是難受。於是開始網上搜索教程win11自動撥號。結合了兩篇文章實現了開機 ...
  • MySQL 高級(進階) SQL 語句 use gy; create table location (Region char(20),Store_Name char(20)); insert into location values('East','Boston'); insert into loc ...
  • 一、背景 在預發環境中,由消息驅動最終觸發執行事務來寫庫存,但是導致MySQL發生死鎖,寫庫存失敗。 com.mysql.jdbc.exceptions.jdbc4.MySQLTransactionRollbackException: rpc error: code = Aborted desc = ...
  • 為了標識一段數據,通常我們會為其指定一個唯一id,比如利用MySQL資料庫中的自增主鍵。 但是當數據量非常大時,僅靠資料庫的自增主鍵是遠遠不夠的,並且對於分散式資料庫只依賴MySQL的自增id無法滿足全局唯一的需求。因此,產生了多種解決方案,如UUID,SnowFlake等。下文將介紹Vitess是... ...
一周排行
    -Advertisement-
    Play Games
  • 示例項目結構 在 Visual Studio 中創建一個 WinForms 應用程式後,項目結構如下所示: MyWinFormsApp/ │ ├───Properties/ │ └───Settings.settings │ ├───bin/ │ ├───Debug/ │ └───Release/ ...
  • [STAThread] 特性用於需要與 COM 組件交互的應用程式,尤其是依賴單線程模型(如 Windows Forms 應用程式)的組件。在 STA 模式下,線程擁有自己的消息迴圈,這對於處理用戶界面和某些 COM 組件是必要的。 [STAThread] static void Main(stri ...
  • 在WinForm中使用全局異常捕獲處理 在WinForm應用程式中,全局異常捕獲是確保程式穩定性的關鍵。通過在Program類的Main方法中設置全局異常處理,可以有效地捕獲並處理未預見的異常,從而避免程式崩潰。 註冊全局異常事件 [STAThread] static void Main() { / ...
  • 前言 給大家推薦一款開源的 Winform 控制項庫,可以幫助我們開發更加美觀、漂亮的 WinForm 界面。 項目介紹 SunnyUI.NET 是一個基於 .NET Framework 4.0+、.NET 6、.NET 7 和 .NET 8 的 WinForm 開源控制項庫,同時也提供了工具類庫、擴展 ...
  • 說明 該文章是屬於OverallAuth2.0系列文章,每周更新一篇該系列文章(從0到1完成系統開發)。 該系統文章,我會儘量說的非常詳細,做到不管新手、老手都能看懂。 說明:OverallAuth2.0 是一個簡單、易懂、功能強大的許可權+可視化流程管理系統。 有興趣的朋友,請關註我吧(*^▽^*) ...
  • 一、下載安裝 1.下載git 必須先下載並安裝git,再TortoiseGit下載安裝 git安裝參考教程:https://blog.csdn.net/mukes/article/details/115693833 2.TortoiseGit下載與安裝 TortoiseGit,Git客戶端,32/6 ...
  • 前言 在項目開發過程中,理解數據結構和演算法如同掌握蓋房子的秘訣。演算法不僅能幫助我們編寫高效、優質的代碼,還能解決項目中遇到的各種難題。 給大家推薦一個支持C#的開源免費、新手友好的數據結構與演算法入門教程:Hello演算法。 項目介紹 《Hello Algo》是一本開源免費、新手友好的數據結構與演算法入門 ...
  • 1.生成單個Proto.bat內容 @rem Copyright 2016, Google Inc. @rem All rights reserved. @rem @rem Redistribution and use in source and binary forms, with or with ...
  • 一:背景 1. 講故事 前段時間有位朋友找到我,說他的窗體程式在客戶這邊出現了卡死,讓我幫忙看下怎麼回事?dump也生成了,既然有dump了那就上 windbg 分析吧。 二:WinDbg 分析 1. 為什麼會卡死 窗體程式的卡死,入口門檻很低,後續往下分析就不一定了,不管怎麼說先用 !clrsta ...
  • 前言 人工智慧時代,人臉識別技術已成為安全驗證、身份識別和用戶交互的關鍵工具。 給大家推薦一款.NET 開源提供了強大的人臉識別 API,工具不僅易於集成,還具備高效處理能力。 本文將介紹一款如何利用這些API,為我們的項目添加智能識別的亮點。 項目介紹 GitHub 上擁有 1.2k 星標的 C# ...