S3File 是一個用於管理 Amazon S3(Simple Storage Service)的 Python 模塊。當前,Apache SeaTunnel 已經支持 S3File Sink Connector,為了更好地使用這個 Connector,有必要看一下這篇使用文檔指南。 描述 將數據輸 ...
S3File 是一個用於管理 Amazon S3(Simple Storage Service)的 Python 模塊。當前,Apache SeaTunnel 已經支持 S3File Sink Connector,為了更好地使用這個 Connector,有必要看一下這篇使用文檔指南。
描述
將數據輸出到 AWS S3 文件系統。
提示:
如果您使用的是 Spark/Flink,在使用此連接器之前,必須確保您的 Spark/Flink 集群已經集成了 Hadoop。Hadoop 2.x 版本已通過測試。
如果您使用的是 SeaTunnel Engine,它會在您下載和安裝 SeaTunnel Engine 時自動集成 Hadoop JAR 包。您可以在 ${SEATUNNEL_HOME}/lib
目錄下確認這個 JAR 包是否存在。
主要特性
預設情況下,我們使用 2PC 提交來確保 "僅一次語義"。
選項
名稱 | 類型 | 必需 | 預設值 | 備註 |
---|---|---|---|---|
path | string | 是 | - | |
bucket | string | 是 | - | |
fs.s3a.endpoint | string | 是 | - | |
fs.s3a.aws.credentials.provider | string | 是 | com.amazonaws.auth.InstanceProfileCredentialsProvider | |
access_key | string | 否 | - | 僅在 fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider 時使用 |
access_secret | string | 否 | - | 僅在 fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider 時使用 |
custom_filename | boolean | 否 | false | 是否需要自定義文件名 |
file_name_expression | string | 否 | "${transactionId}" | 僅在 custom_filename 為 true 時使用 |
filename_time_format | string | 否 | "yyyy.MM.dd" | 僅在 custom_filename 為 true 時使用 |
file_format_type | string | 否 | "csv" | |
field_delimiter | string | 否 | '\001' | 僅在 file_format 為 text 時使用 |
row_delimiter | string | 否 | "\n" | 僅在 file_format 為 text 時使用 |
have_partition | boolean | 否 | false | 是否需要處理分區 |
partition_by | array | 否 | - | 僅在 have_partition 為 true 時使用 |
partition_dir_expression | string | 否 | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | 僅在 have_partition 為 true 時使用 |
is_partition_field_write_in_file | boolean | 否 | false | 僅在 have_partition 為 true 時使用 |
sink_columns | array | 否 | 當此參數為空時,將寫入所有從 "Transform" 或 "Source" 獲取的欄位 | |
is_enable_transaction | boolean | 否 | true | |
batch_size | int | 否 | 1000000 | |
compress_codec | string | 否 | none | |
common-options | object | 否 | - | |
max_rows_in_memory | int | 否 | - | 僅在 file_format 為 Excel 時使用 |
sheet_name | string | 否 | Sheet$ | 僅在 file_format 為 Excel 時使用 |
path [string]
目標目錄路徑是必需的。
bucket [string]
S3 文件系統的bucket地址,例如:s3n://seatunnel-test
,如果您使用的是 s3a
協議,此參數應為 s3a://seatunnel-test
。
fs.s3a.endpoint [string]
fs s3a 端點
fs.s3a.aws.credentials.provider [string]
認證 s3a 的方式。目前我們僅支持 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
和 com.amazonaws.auth.InstanceProfileCredentialsProvider
。
關於憑證提供程式的更多信息,您可以參考 Hadoop AWS 文檔
access_key [string]
S3 文件系統的訪問密鑰。如果未設置此參數,請確認憑證提供程式鏈可以正確驗證,可參考 hadoop-aws。
access_secret [string]
S3 文件系統的訪問密鑰。如果未設置此參數,請確認憑證提供程式鏈可以正確驗證,可參考 hadoop-aws。
hadoop_s3_properties [map]
如果需要添加其他選項,可以在這裡添加並參考此 鏈接
hadoop_s3_properties {
"fs.s3a.buffer.dir" = "/data/st_test/s3a"
"fs.s3a.fast.upload.buffer" = "disk"
}
custom_filename [boolean]
是否自定義文件名。
file_name_expression [string]
僅在 custom_filename
為 true
時使用
file_name_expression
描述了將創建到 path
中的文件表達式。我們可以在 file_name_expression
中添加變數 ${now}
或 ${uuid}
,例如 test_${uuid}_${now}
,
${now}
代表當前時間,其格式可以通過指定選項 filename_time_format
來定義。
請註意,如果 is_enable_transaction
為 true
,我們將在文件名的開頭自動添加${transactionId}_
。
filename_time_format [string]
僅在 custom_filename
為 true
時使用
當 file_name_expression
參數中的格式為 xxxx-${now}
時,filename_time_format
可以指定路徑的時間格式,預設值為 yyyy.MM.dd
。常用的時間格式列於下表中:
符號 | 描述 |
---|---|
y | 年 |
M | 月 |
d | 月中的天數 |
H | 一天中的小時 (0-23) |
m | 小時中的分鐘 |
s | 分鐘中的秒數 |
file_format_type [string]
我們支持以下文件類型:
- 文本 (text)
- JSON
- CSV
- ORC
- Parquet
- Excel
請註意,最終文件名將以文件格式的尾碼結尾,文本文件的尾碼是 txt
。
field_delimiter [string]
數據行中列之間的分隔符。僅在 file_format
為 text 時需要。
row_delimiter [string]
文件中行之間的分隔符。僅在 file_format
為 text 時需要。
have_partition [boolean]
是否需要處理分區。
partition_by [array]
僅在 have_partition
為 true
時使用。
基於選定欄位對分區數據進行分區。
partition_dir_expression [string]
僅在 have_partition
為 true
時使用。
如果指定了 partition_by
,我們將根據分區信息生成相應的分區目錄,並將最終文件放在分區目錄中。
預設的 partition_dir_expression
是 ${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/
。k0
是第一個分區欄位,v0
是第一個分區欄位的值。
is_partition_field_write_in_file [boolean]
僅在 have_partition
為 true
時使用。
如果 is_partition_field_write_in_file
為 true
,分區欄位及其值將寫入數據文件中。
例如,如果您想要寫入 Hive 數據文件,其值應為 false
。
sink_columns [array]
需要寫入文件的哪些列,預設值為從 "Transform" 或 "Source" 獲取的所有列。
欄位的順序決定了實際寫入文件的順序。
is_enable_transaction [boolean]
如果 is_enable_transaction
為 true,我們將確保在寫入目標目錄時數據不會丟失或重覆。
請註意,如果 is_enable_transaction
為 true
,我們將在文件頭部自動添加 ${transactionId}_
。
目前僅支持 true
。
batch_size [int]
文件中的最大行數。對於 SeaTunnel Engine,文件中的行數由 batch_size
和 checkpoint.interval
共同決定。如果 checkpoint.interval
的值足夠大,當文件中的行數大於 batch_size
時,寫入器將寫入文件。如果 checkpoint.interval
較小,則在新的檢查點觸發時,寫入器將創建一個新文件。
compress_codec [string]
文件的壓縮編解碼器及其支持的詳細信息如下:
- txt:
lzo
none
- JSON:
lzo
none
- CSV:
lzo
none
- ORC:
lzo
snappy
lz4
zlib
none
- Parquet:
lzo
snappy
lz4
gzip
brotli
zstd
none
提示:Excel 類型不支持任何壓縮格式。
常見選項
請參考 Sink Common Options 獲取 Sink 插件的常見參數詳細信息。
max_rows_in_memory [int]
當文件格式為 Excel 時,可以緩存在記憶體中的數據項的最大數量。
sheet_name [string]
工作簿的工作表名稱。
示例
對於文本文件格式,具有 have_partition
、custom_filename
、sink_columns
和 com.amazonaws.auth.InstanceProfileCredentialsProvider
的配置示例:
S3File {
bucket = "s3a://seatunnel-test"
tmp_path = "/tmp/seatunnel"
path="/seatunnel/text"
fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
file_format_type = "text"
field_delimiter = "\t"
row_delimiter = "\n"
have_partition = true
partition_by = ["age"]
partition_dir_expression = "${k0}=${v0}"
is_partition_field_write_in_file = true
custom_filename = true
file_name_expression = "${transactionId}_${now}"
filename_time_format = "yyyy.MM.dd"
sink_columns = ["name","age"]
is_enable_transaction=true
hadoop_s3_properties {
"fs.s3a.buffer.dir" = "/data/st_test/s3a"
"fs.s3a.fast.upload.buffer" = "disk"
}
}
對於 Parquet 文件格式,僅需用 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
進行配置:
S3File {
bucket = "s3a://seatunnel-test"
tmp_path = "/tmp/seatunnel"
path="/seatunnel/parquet"
fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
access_key = "xxxxxxxxxxxxxxxxx"
secret_key = "xxxxxxxxxxxxxxxxx"
file_format_type = "parquet"
hadoop_s3_properties {
"fs.s3a.buffer.dir" = "/data/st_test/s3a"
"fs.s3a.fast.upload.buffer" = "disk"
}
}
對於 orc 文件僅需配置 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
:
S3File {
bucket = "s3a://seatunnel-test"
tmp_path = "/tmp/seatunnel"
path="/seatunnel/orc"
fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
access_key = "xxxxxxxxxxxxxxxxx"
secret_key = "xxxxxxxxxxxxxxxxx"
file_format_type = "orc"
}
更新日誌
2.3.0-beta 2022-10-20
- 添加 S3File Sink 連接器
2.3.0 2022-12-30
- Bug修複
- 修複了以下導致數據寫入文件失敗的錯誤:
- 當上游欄位為空時會拋出 NullPointerException
- Sink 列映射失敗
- 從狀態中恢覆寫入器時直接獲取事務失敗 (3258)
- 修複了以下導致數據寫入文件失敗的錯誤:
- 功能
下一版本
- [優化]支持文件壓縮(3699)
本文由 白鯨開源 提供發佈支持!