必看!S3File Sink Connector 使用文檔

来源:https://www.cnblogs.com/seatunnel/archive/2023/09/21/17720087.html
-Advertisement-
Play Games

S3File 是一個用於管理 Amazon S3(Simple Storage Service)的 Python 模塊。當前,Apache SeaTunnel 已經支持 S3File Sink Connector,為了更好地使用這個 Connector,有必要看一下這篇使用文檔指南。 描述 將數據輸 ...


file

S3File 是一個用於管理 Amazon S3(Simple Storage Service)的 Python 模塊。當前,Apache SeaTunnel 已經支持 S3File Sink Connector,為了更好地使用這個 Connector,有必要看一下這篇使用文檔指南。

描述

將數據輸出到 AWS S3 文件系統。

提示:

如果您使用的是 Spark/Flink,在使用此連接器之前,必須確保您的 Spark/Flink 集群已經集成了 Hadoop。Hadoop 2.x 版本已通過測試。

如果您使用的是 SeaTunnel Engine,它會在您下載和安裝 SeaTunnel Engine 時自動集成 Hadoop JAR 包。您可以在 ${SEATUNNEL_HOME}/lib 目錄下確認這個 JAR 包是否存在。

主要特性

預設情況下,我們使用 2PC 提交來確保 "僅一次語義"。

選項

名稱 類型 必需 預設值 備註
path string -
bucket string -
fs.s3a.endpoint string -
fs.s3a.aws.credentials.provider string com.amazonaws.auth.InstanceProfileCredentialsProvider
access_key string - 僅在 fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider 時使用
access_secret string - 僅在 fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider 時使用
custom_filename boolean false 是否需要自定義文件名
file_name_expression string "${transactionId}" 僅在 custom_filename 為 true 時使用
filename_time_format string "yyyy.MM.dd" 僅在 custom_filename 為 true 時使用
file_format_type string "csv"
field_delimiter string '\001' 僅在 file_format 為 text 時使用
row_delimiter string "\n" 僅在 file_format 為 text 時使用
have_partition boolean false 是否需要處理分區
partition_by array - 僅在 have_partition 為 true 時使用
partition_dir_expression string "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" 僅在 have_partition 為 true 時使用
is_partition_field_write_in_file boolean false 僅在 have_partition 為 true 時使用
sink_columns array 當此參數為空時,將寫入所有從 "Transform" 或 "Source" 獲取的欄位
is_enable_transaction boolean true
batch_size int 1000000
compress_codec string none
common-options object -
max_rows_in_memory int - 僅在 file_format 為 Excel 時使用
sheet_name string Sheet$ 僅在 file_format 為 Excel 時使用

path [string]

目標目錄路徑是必需的。

bucket [string]

S3 文件系統的bucket地址,例如:s3n://seatunnel-test,如果您使用的是 s3a 協議,此參數應為 s3a://seatunnel-test

fs.s3a.endpoint [string]

fs s3a 端點

fs.s3a.aws.credentials.provider [string]

認證 s3a 的方式。目前我們僅支持 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvidercom.amazonaws.auth.InstanceProfileCredentialsProvider

關於憑證提供程式的更多信息,您可以參考 Hadoop AWS 文檔

access_key [string]

S3 文件系統的訪問密鑰。如果未設置此參數,請確認憑證提供程式鏈可以正確驗證,可參考 hadoop-aws

access_secret [string]

S3 文件系統的訪問密鑰。如果未設置此參數,請確認憑證提供程式鏈可以正確驗證,可參考 hadoop-aws

hadoop_s3_properties [map]

如果需要添加其他選項,可以在這裡添加並參考此 鏈接

hadoop_s3_properties {
      "fs.s3a.buffer.dir" = "/data/st_test/s3a"
      "fs.s3a.fast.upload.buffer" = "disk"
   }

custom_filename [boolean]

是否自定義文件名。

file_name_expression [string]

僅在 custom_filenametrue 時使用

file_name_expression 描述了將創建到 path 中的文件表達式。我們可以在 file_name_expression 中添加變數 ${now} ${uuid},例如 test_${uuid}_${now}
${now} 代表當前時間,其格式可以通過指定選項 filename_time_format 來定義。

請註意,如果 is_enable_transactiontrue,我們將在文件名的開頭自動添加${transactionId}_

filename_time_format [string]

僅在 custom_filenametrue 時使用

file_name_expression 參數中的格式為 xxxx-${now} 時,filename_time_format 可以指定路徑的時間格式,預設值為 yyyy.MM.dd。常用的時間格式列於下表中:

符號 描述
y
M
d 月中的天數
H 一天中的小時 (0-23)
m 小時中的分鐘
s 分鐘中的秒數

file_format_type [string]

我們支持以下文件類型:

  • 文本 (text)
  • JSON
  • CSV
  • ORC
  • Parquet
  • Excel

請註意,最終文件名將以文件格式的尾碼結尾,文本文件的尾碼是 txt

field_delimiter [string]

數據行中列之間的分隔符。僅在 file_format 為 text 時需要。

row_delimiter [string]

文件中行之間的分隔符。僅在 file_format 為 text 時需要。

have_partition [boolean]

是否需要處理分區。

partition_by [array]

僅在 have_partitiontrue 時使用。

基於選定欄位對分區數據進行分區。

partition_dir_expression [string]

僅在 have_partitiontrue 時使用。

如果指定了 partition_by,我們將根據分區信息生成相應的分區目錄,並將最終文件放在分區目錄中。

預設的 partition_dir_expression${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/k0 是第一個分區欄位,v0 是第一個分區欄位的值。

is_partition_field_write_in_file [boolean]

僅在 have_partitiontrue 時使用。

如果 is_partition_field_write_in_filetrue,分區欄位及其值將寫入數據文件中。

例如,如果您想要寫入 Hive 數據文件,其值應為 false

sink_columns [array]

需要寫入文件的哪些列,預設值為從 "Transform" 或 "Source" 獲取的所有列。
欄位的順序決定了實際寫入文件的順序。

is_enable_transaction [boolean]

如果 is_enable_transaction 為 true,我們將確保在寫入目標目錄時數據不會丟失或重覆。

請註意,如果 is_enable_transactiontrue,我們將在文件頭部自動添加 ${transactionId}_

目前僅支持 true

batch_size [int]

文件中的最大行數。對於 SeaTunnel Engine,文件中的行數由 batch_sizecheckpoint.interval 共同決定。如果 checkpoint.interval 的值足夠大,當文件中的行數大於 batch_size 時,寫入器將寫入文件。如果 checkpoint.interval 較小,則在新的檢查點觸發時,寫入器將創建一個新文件。

compress_codec [string]

文件的壓縮編解碼器及其支持的詳細信息如下:

  • txt: lzo none
  • JSON: lzo none
  • CSV: lzo none
  • ORC: lzo snappy lz4 zlib none
  • Parquet: lzo snappy lz4 gzip brotli zstd none

提示:Excel 類型不支持任何壓縮格式。

常見選項

請參考 Sink Common Options 獲取 Sink 插件的常見參數詳細信息。

max_rows_in_memory [int]

當文件格式為 Excel 時,可以緩存在記憶體中的數據項的最大數量。

sheet_name [string]

工作簿的工作表名稱。

示例

對於文本文件格式,具有 have_partitioncustom_filenamesink_columnscom.amazonaws.auth.InstanceProfileCredentialsProvider 的配置示例:

  S3File {
    bucket = "s3a://seatunnel-test"
    tmp_path = "/tmp/seatunnel"
    path="/seatunnel/text"
    fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
    fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
    file_format_type = "text"
    field_delimiter = "\t"
    row_delimiter = "\n"
    have_partition = true
    partition_by = ["age"]
    partition_dir_expression = "${k0}=${v0}"
    is_partition_field_write_in_file = true
    custom_filename = true
    file_name_expression = "${transactionId}_${now}"
    filename_time_format = "yyyy.MM.dd"
    sink_columns = ["name","age"]
    is_enable_transaction=true
    hadoop_s3_properties {
      "fs.s3a.buffer.dir" = "/data/st_test/s3a"
      "fs.s3a.fast.upload.buffer" = "disk"
    }
  }

對於 Parquet 文件格式,僅需用 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider進行配置:

  S3File {
    bucket = "s3a://seatunnel-test"
    tmp_path = "/tmp/seatunnel"
    path="/seatunnel/parquet"
    fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
    fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
    access_key = "xxxxxxxxxxxxxxxxx"
    secret_key = "xxxxxxxxxxxxxxxxx"
    file_format_type = "parquet"
    hadoop_s3_properties {
      "fs.s3a.buffer.dir" = "/data/st_test/s3a"
      "fs.s3a.fast.upload.buffer" = "disk"
    }
  }

對於 orc 文件僅需配置 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider

  S3File {
    bucket = "s3a://seatunnel-test"
    tmp_path = "/tmp/seatunnel"
    path="/seatunnel/orc"
    fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
    fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
    access_key = "xxxxxxxxxxxxxxxxx"
    secret_key = "xxxxxxxxxxxxxxxxx"
    file_format_type = "orc"
  }

更新日誌

2.3.0-beta 2022-10-20

  • 添加 S3File Sink 連接器

2.3.0 2022-12-30

  • Bug修複
    • 修複了以下導致數據寫入文件失敗的錯誤:
      • 當上游欄位為空時會拋出 NullPointerException
      • Sink 列映射失敗
      • 從狀態中恢覆寫入器時直接獲取事務失敗 (3258)
  • 功能
    • 支持 S3A 協議 (3632)
      • 允許用戶添加額外的 Hadoop-S3 參數
      • 允許使用 S3A 協議
      • 解耦 Hadoop-AWS 依賴
    • 支持設置每個文件的批處理大小 (3625)
    • 設置 S3 AK 為可選項 (3688)

下一版本

本文由 白鯨開源 提供發佈支持!


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 在學習C#中的記錄類型時,對出現的Equals和ReferenceEquals得到的不同結果表示不理解,隨即進行相關資料查找。 值類型 == : 比較兩者的“內容”是否相同,即“值”是否一樣Equals:比較兩者的“內容”是否相同,即“值”是否一樣ReferenceEquals:返回false,因為 ...
  • shell批量執行命令與文件傳輸腳本 需求: 對未進行主機信任操作的伺服器進行批量操作 實現: 由於ssh只能在交互模式中輸入伺服器密碼進行登錄登操作,不便於進行大批量伺服器進行巡檢或日誌採集。sshpass恰好又解決了這個問題,使用ssh -p passwd可以實現命令行輸入密碼操作,便於進行規模 ...
  • 一、目錄介紹 /:表示的是根的意思 /bin:(binary)存放的是一些二進位文件,但是在Linux中二進位文件是可以被執行的。這個目錄中的命令文件是給普通用戶使用(非超級管理員用戶)。 /etc:Linux下所有的配置文件都會存放到etc目錄。 /home:是所有非root用戶家目錄的一個集合。 ...
  • 測試伺服器CPU單核及多核SuperPI圓周率測試real和user值,SuperPI是利用CPU的浮點運算能力來計算出π(圓周率),測試系統穩定性和測試CPU計算完後特定位數圓周率所需的時間;及Unixbench單核及多核測試Index得分,測試方法如下: 類型 預期結果 測試步驟 SuperPI ...
  • 可擴展性對於物聯網管理系統的設計和開發非常重要,它直接影響著系統的性能、可靠性和能耗等方面,是評估一個系統優劣的重要因素之一。可擴展性對物聯網管理系統的影響主要體現在以下幾個方面: ...
  • 1. 為什麼需要加鎖 在日常生活中,如果你心情不好想靜靜,不想被比別人打擾,你就可以把自己關進房間里,並且反鎖。這就是生活中的加鎖。 同理,對於MySQL資料庫來說的話,一般的對象都是一個事務一個事務來說的。所以,如果一個事務內,一個SQL正在更新某條記錄,我們肯定不想它被別的事務影響到嘛?因此,數 ...
  • GraphiteMergeTree該引擎用來對Graphite數據(圖數據)進行瘦身及彙總。對於想使用ClickHouse來存儲Graphite數據的開發者來說可能有用。 如果不需要對Graphite數據做彙總,那麼可以使用任意的ClickHouse表引擎;但若需要,那就採用GraphiteMerg ...
  • 作者 | 代立冬 編輯 | Debra Chen Apache DolphinScheduler 是現代數據工作流編排平臺,具有非常強大的可視化能力,DolphinScheduler 致力於使數據工程師、分析師、數據科學家等數據工作者都可以簡單輕鬆地搭建各種數據工作流,讓數據處理流程更簡單可靠。 D ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...