必看!S3File Sink Connector 使用文檔

来源:https://www.cnblogs.com/seatunnel/archive/2023/09/21/17720087.html
-Advertisement-
Play Games

S3File 是一個用於管理 Amazon S3(Simple Storage Service)的 Python 模塊。當前,Apache SeaTunnel 已經支持 S3File Sink Connector,為了更好地使用這個 Connector,有必要看一下這篇使用文檔指南。 描述 將數據輸 ...


file

S3File 是一個用於管理 Amazon S3(Simple Storage Service)的 Python 模塊。當前,Apache SeaTunnel 已經支持 S3File Sink Connector,為了更好地使用這個 Connector,有必要看一下這篇使用文檔指南。

描述

將數據輸出到 AWS S3 文件系統。

提示:

如果您使用的是 Spark/Flink,在使用此連接器之前,必須確保您的 Spark/Flink 集群已經集成了 Hadoop。Hadoop 2.x 版本已通過測試。

如果您使用的是 SeaTunnel Engine,它會在您下載和安裝 SeaTunnel Engine 時自動集成 Hadoop JAR 包。您可以在 ${SEATUNNEL_HOME}/lib 目錄下確認這個 JAR 包是否存在。

主要特性

預設情況下,我們使用 2PC 提交來確保 "僅一次語義"。

選項

名稱 類型 必需 預設值 備註
path string -
bucket string -
fs.s3a.endpoint string -
fs.s3a.aws.credentials.provider string com.amazonaws.auth.InstanceProfileCredentialsProvider
access_key string - 僅在 fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider 時使用
access_secret string - 僅在 fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider 時使用
custom_filename boolean false 是否需要自定義文件名
file_name_expression string "${transactionId}" 僅在 custom_filename 為 true 時使用
filename_time_format string "yyyy.MM.dd" 僅在 custom_filename 為 true 時使用
file_format_type string "csv"
field_delimiter string '\001' 僅在 file_format 為 text 時使用
row_delimiter string "\n" 僅在 file_format 為 text 時使用
have_partition boolean false 是否需要處理分區
partition_by array - 僅在 have_partition 為 true 時使用
partition_dir_expression string "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" 僅在 have_partition 為 true 時使用
is_partition_field_write_in_file boolean false 僅在 have_partition 為 true 時使用
sink_columns array 當此參數為空時,將寫入所有從 "Transform" 或 "Source" 獲取的欄位
is_enable_transaction boolean true
batch_size int 1000000
compress_codec string none
common-options object -
max_rows_in_memory int - 僅在 file_format 為 Excel 時使用
sheet_name string Sheet$ 僅在 file_format 為 Excel 時使用

path [string]

目標目錄路徑是必需的。

bucket [string]

S3 文件系統的bucket地址,例如:s3n://seatunnel-test,如果您使用的是 s3a 協議,此參數應為 s3a://seatunnel-test

fs.s3a.endpoint [string]

fs s3a 端點

fs.s3a.aws.credentials.provider [string]

認證 s3a 的方式。目前我們僅支持 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvidercom.amazonaws.auth.InstanceProfileCredentialsProvider

關於憑證提供程式的更多信息,您可以參考 Hadoop AWS 文檔

access_key [string]

S3 文件系統的訪問密鑰。如果未設置此參數,請確認憑證提供程式鏈可以正確驗證,可參考 hadoop-aws

access_secret [string]

S3 文件系統的訪問密鑰。如果未設置此參數,請確認憑證提供程式鏈可以正確驗證,可參考 hadoop-aws

hadoop_s3_properties [map]

如果需要添加其他選項,可以在這裡添加並參考此 鏈接

hadoop_s3_properties {
      "fs.s3a.buffer.dir" = "/data/st_test/s3a"
      "fs.s3a.fast.upload.buffer" = "disk"
   }

custom_filename [boolean]

是否自定義文件名。

file_name_expression [string]

僅在 custom_filenametrue 時使用

file_name_expression 描述了將創建到 path 中的文件表達式。我們可以在 file_name_expression 中添加變數 ${now} ${uuid},例如 test_${uuid}_${now}
${now} 代表當前時間,其格式可以通過指定選項 filename_time_format 來定義。

請註意,如果 is_enable_transactiontrue,我們將在文件名的開頭自動添加${transactionId}_

filename_time_format [string]

僅在 custom_filenametrue 時使用

file_name_expression 參數中的格式為 xxxx-${now} 時,filename_time_format 可以指定路徑的時間格式,預設值為 yyyy.MM.dd。常用的時間格式列於下表中:

符號 描述
y
M
d 月中的天數
H 一天中的小時 (0-23)
m 小時中的分鐘
s 分鐘中的秒數

file_format_type [string]

我們支持以下文件類型:

  • 文本 (text)
  • JSON
  • CSV
  • ORC
  • Parquet
  • Excel

請註意,最終文件名將以文件格式的尾碼結尾,文本文件的尾碼是 txt

field_delimiter [string]

數據行中列之間的分隔符。僅在 file_format 為 text 時需要。

row_delimiter [string]

文件中行之間的分隔符。僅在 file_format 為 text 時需要。

have_partition [boolean]

是否需要處理分區。

partition_by [array]

僅在 have_partitiontrue 時使用。

基於選定欄位對分區數據進行分區。

partition_dir_expression [string]

僅在 have_partitiontrue 時使用。

如果指定了 partition_by,我們將根據分區信息生成相應的分區目錄,並將最終文件放在分區目錄中。

預設的 partition_dir_expression${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/k0 是第一個分區欄位,v0 是第一個分區欄位的值。

is_partition_field_write_in_file [boolean]

僅在 have_partitiontrue 時使用。

如果 is_partition_field_write_in_filetrue,分區欄位及其值將寫入數據文件中。

例如,如果您想要寫入 Hive 數據文件,其值應為 false

sink_columns [array]

需要寫入文件的哪些列,預設值為從 "Transform" 或 "Source" 獲取的所有列。
欄位的順序決定了實際寫入文件的順序。

is_enable_transaction [boolean]

如果 is_enable_transaction 為 true,我們將確保在寫入目標目錄時數據不會丟失或重覆。

請註意,如果 is_enable_transactiontrue,我們將在文件頭部自動添加 ${transactionId}_

目前僅支持 true

batch_size [int]

文件中的最大行數。對於 SeaTunnel Engine,文件中的行數由 batch_sizecheckpoint.interval 共同決定。如果 checkpoint.interval 的值足夠大,當文件中的行數大於 batch_size 時,寫入器將寫入文件。如果 checkpoint.interval 較小,則在新的檢查點觸發時,寫入器將創建一個新文件。

compress_codec [string]

文件的壓縮編解碼器及其支持的詳細信息如下:

  • txt: lzo none
  • JSON: lzo none
  • CSV: lzo none
  • ORC: lzo snappy lz4 zlib none
  • Parquet: lzo snappy lz4 gzip brotli zstd none

提示:Excel 類型不支持任何壓縮格式。

常見選項

請參考 Sink Common Options 獲取 Sink 插件的常見參數詳細信息。

max_rows_in_memory [int]

當文件格式為 Excel 時,可以緩存在記憶體中的數據項的最大數量。

sheet_name [string]

工作簿的工作表名稱。

示例

對於文本文件格式,具有 have_partitioncustom_filenamesink_columnscom.amazonaws.auth.InstanceProfileCredentialsProvider 的配置示例:

  S3File {
    bucket = "s3a://seatunnel-test"
    tmp_path = "/tmp/seatunnel"
    path="/seatunnel/text"
    fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
    fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
    file_format_type = "text"
    field_delimiter = "\t"
    row_delimiter = "\n"
    have_partition = true
    partition_by = ["age"]
    partition_dir_expression = "${k0}=${v0}"
    is_partition_field_write_in_file = true
    custom_filename = true
    file_name_expression = "${transactionId}_${now}"
    filename_time_format = "yyyy.MM.dd"
    sink_columns = ["name","age"]
    is_enable_transaction=true
    hadoop_s3_properties {
      "fs.s3a.buffer.dir" = "/data/st_test/s3a"
      "fs.s3a.fast.upload.buffer" = "disk"
    }
  }

對於 Parquet 文件格式,僅需用 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider進行配置:

  S3File {
    bucket = "s3a://seatunnel-test"
    tmp_path = "/tmp/seatunnel"
    path="/seatunnel/parquet"
    fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
    fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
    access_key = "xxxxxxxxxxxxxxxxx"
    secret_key = "xxxxxxxxxxxxxxxxx"
    file_format_type = "parquet"
    hadoop_s3_properties {
      "fs.s3a.buffer.dir" = "/data/st_test/s3a"
      "fs.s3a.fast.upload.buffer" = "disk"
    }
  }

對於 orc 文件僅需配置 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider

  S3File {
    bucket = "s3a://seatunnel-test"
    tmp_path = "/tmp/seatunnel"
    path="/seatunnel/orc"
    fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
    fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
    access_key = "xxxxxxxxxxxxxxxxx"
    secret_key = "xxxxxxxxxxxxxxxxx"
    file_format_type = "orc"
  }

更新日誌

2.3.0-beta 2022-10-20

  • 添加 S3File Sink 連接器

2.3.0 2022-12-30

  • Bug修複
    • 修複了以下導致數據寫入文件失敗的錯誤:
      • 當上游欄位為空時會拋出 NullPointerException
      • Sink 列映射失敗
      • 從狀態中恢覆寫入器時直接獲取事務失敗 (3258)
  • 功能
    • 支持 S3A 協議 (3632)
      • 允許用戶添加額外的 Hadoop-S3 參數
      • 允許使用 S3A 協議
      • 解耦 Hadoop-AWS 依賴
    • 支持設置每個文件的批處理大小 (3625)
    • 設置 S3 AK 為可選項 (3688)

下一版本

本文由 白鯨開源 提供發佈支持!


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 在學習C#中的記錄類型時,對出現的Equals和ReferenceEquals得到的不同結果表示不理解,隨即進行相關資料查找。 值類型 == : 比較兩者的“內容”是否相同,即“值”是否一樣Equals:比較兩者的“內容”是否相同,即“值”是否一樣ReferenceEquals:返回false,因為 ...
  • shell批量執行命令與文件傳輸腳本 需求: 對未進行主機信任操作的伺服器進行批量操作 實現: 由於ssh只能在交互模式中輸入伺服器密碼進行登錄登操作,不便於進行大批量伺服器進行巡檢或日誌採集。sshpass恰好又解決了這個問題,使用ssh -p passwd可以實現命令行輸入密碼操作,便於進行規模 ...
  • 一、目錄介紹 /:表示的是根的意思 /bin:(binary)存放的是一些二進位文件,但是在Linux中二進位文件是可以被執行的。這個目錄中的命令文件是給普通用戶使用(非超級管理員用戶)。 /etc:Linux下所有的配置文件都會存放到etc目錄。 /home:是所有非root用戶家目錄的一個集合。 ...
  • 測試伺服器CPU單核及多核SuperPI圓周率測試real和user值,SuperPI是利用CPU的浮點運算能力來計算出π(圓周率),測試系統穩定性和測試CPU計算完後特定位數圓周率所需的時間;及Unixbench單核及多核測試Index得分,測試方法如下: 類型 預期結果 測試步驟 SuperPI ...
  • 可擴展性對於物聯網管理系統的設計和開發非常重要,它直接影響著系統的性能、可靠性和能耗等方面,是評估一個系統優劣的重要因素之一。可擴展性對物聯網管理系統的影響主要體現在以下幾個方面: ...
  • 1. 為什麼需要加鎖 在日常生活中,如果你心情不好想靜靜,不想被比別人打擾,你就可以把自己關進房間里,並且反鎖。這就是生活中的加鎖。 同理,對於MySQL資料庫來說的話,一般的對象都是一個事務一個事務來說的。所以,如果一個事務內,一個SQL正在更新某條記錄,我們肯定不想它被別的事務影響到嘛?因此,數 ...
  • GraphiteMergeTree該引擎用來對Graphite數據(圖數據)進行瘦身及彙總。對於想使用ClickHouse來存儲Graphite數據的開發者來說可能有用。 如果不需要對Graphite數據做彙總,那麼可以使用任意的ClickHouse表引擎;但若需要,那就採用GraphiteMerg ...
  • 作者 | 代立冬 編輯 | Debra Chen Apache DolphinScheduler 是現代數據工作流編排平臺,具有非常強大的可視化能力,DolphinScheduler 致力於使數據工程師、分析師、數據科學家等數據工作者都可以簡單輕鬆地搭建各種數據工作流,讓數據處理流程更簡單可靠。 D ...
一周排行
    -Advertisement-
    Play Games
  • 示例項目結構 在 Visual Studio 中創建一個 WinForms 應用程式後,項目結構如下所示: MyWinFormsApp/ │ ├───Properties/ │ └───Settings.settings │ ├───bin/ │ ├───Debug/ │ └───Release/ ...
  • [STAThread] 特性用於需要與 COM 組件交互的應用程式,尤其是依賴單線程模型(如 Windows Forms 應用程式)的組件。在 STA 模式下,線程擁有自己的消息迴圈,這對於處理用戶界面和某些 COM 組件是必要的。 [STAThread] static void Main(stri ...
  • 在WinForm中使用全局異常捕獲處理 在WinForm應用程式中,全局異常捕獲是確保程式穩定性的關鍵。通過在Program類的Main方法中設置全局異常處理,可以有效地捕獲並處理未預見的異常,從而避免程式崩潰。 註冊全局異常事件 [STAThread] static void Main() { / ...
  • 前言 給大家推薦一款開源的 Winform 控制項庫,可以幫助我們開發更加美觀、漂亮的 WinForm 界面。 項目介紹 SunnyUI.NET 是一個基於 .NET Framework 4.0+、.NET 6、.NET 7 和 .NET 8 的 WinForm 開源控制項庫,同時也提供了工具類庫、擴展 ...
  • 說明 該文章是屬於OverallAuth2.0系列文章,每周更新一篇該系列文章(從0到1完成系統開發)。 該系統文章,我會儘量說的非常詳細,做到不管新手、老手都能看懂。 說明:OverallAuth2.0 是一個簡單、易懂、功能強大的許可權+可視化流程管理系統。 有興趣的朋友,請關註我吧(*^▽^*) ...
  • 一、下載安裝 1.下載git 必須先下載並安裝git,再TortoiseGit下載安裝 git安裝參考教程:https://blog.csdn.net/mukes/article/details/115693833 2.TortoiseGit下載與安裝 TortoiseGit,Git客戶端,32/6 ...
  • 前言 在項目開發過程中,理解數據結構和演算法如同掌握蓋房子的秘訣。演算法不僅能幫助我們編寫高效、優質的代碼,還能解決項目中遇到的各種難題。 給大家推薦一個支持C#的開源免費、新手友好的數據結構與演算法入門教程:Hello演算法。 項目介紹 《Hello Algo》是一本開源免費、新手友好的數據結構與演算法入門 ...
  • 1.生成單個Proto.bat內容 @rem Copyright 2016, Google Inc. @rem All rights reserved. @rem @rem Redistribution and use in source and binary forms, with or with ...
  • 一:背景 1. 講故事 前段時間有位朋友找到我,說他的窗體程式在客戶這邊出現了卡死,讓我幫忙看下怎麼回事?dump也生成了,既然有dump了那就上 windbg 分析吧。 二:WinDbg 分析 1. 為什麼會卡死 窗體程式的卡死,入口門檻很低,後續往下分析就不一定了,不管怎麼說先用 !clrsta ...
  • 前言 人工智慧時代,人臉識別技術已成為安全驗證、身份識別和用戶交互的關鍵工具。 給大家推薦一款.NET 開源提供了強大的人臉識別 API,工具不僅易於集成,還具備高效處理能力。 本文將介紹一款如何利用這些API,為我們的項目添加智能識別的亮點。 項目介紹 GitHub 上擁有 1.2k 星標的 C# ...