本文為Apache SeaTunnel已經支持的SftpFile Source Connector使用文檔,旨在幫助讀者理解如何高效地使用SFTP文件源連接器,以便輕鬆地使用Apache SeaTunnel集成和管理您的SftpFil數據源。 SftpFile 是指通過 SFTP(Secure Fi ...
本文為Apache SeaTunnel已經支持的SftpFile Source Connector使用文檔,旨在幫助讀者理解如何高效地使用SFTP文件源連接器,以便輕鬆地使用Apache SeaTunnel集成和管理您的SftpFil數據源。
SftpFile 是指通過 SFTP(Secure File Transfer Protocol)協議進行文件操作的對象或組件。在網路編程和數據集成中,SFTPFile 通常用來表示和操作存儲在遠程 SFTP 伺服器上的文件。SFTP 是一種安全的文件傳輸協議,基於 SSH(Secure Shell)協議,提供了加密的文件傳輸和遠程文件操作功能。
支持的引擎
Spark
Flink
SeaTunnel Zeta
主要特性
描述
從 SFTP 文件伺服器讀取數據。
支持的數據源信息
使用 SftpFile 連接器,需要以下依賴項。可以通過 install-plugin.sh
下載,也可以從 Maven 中央倉庫獲取。
數據源 | 支持的版本 | 依賴項 |
---|---|---|
SftpFile | 通用 | 下載 |
- 提示
如果你使用的是 Spark/Flink,請確保 Spark/Flink 集群已經集成了 Hadoop。Hadoop 2.x 版本已通過測試。
如果使用 SeaTunnel 引擎,安裝 SeaTunnel 引擎時會自動集成 Hadoop JAR 包。可以在 ${SEATUNNEL_HOME}/lib
目錄下檢查這個 JAR 包是否存在。
為了支持更多的文件類型,我們做了一些妥協,所以在內部訪問 Sftp 時我們使用了 HDFS 協議,這個連接器需要一些 Hadoop 依賴項,且僅支持 Hadoop 版2.9.X+ 版本。
數據類型映射
文件沒有特定的類型列表,我們可以通過在配置中指定模式來指示要將哪個 SeaTunnel 數據類型轉換為相應的數據。
SeaTunnel 數據類型 |
---|
STRING |
SHORT |
INT |
BIGINT |
BOOLEAN |
DOUBLE |
DECIMAL |
FLOAT |
DATE |
TIME |
TIMESTAMP |
BYTES |
ARRAY |
MAP |
Source選項
名稱 | 類型 | 必填 | 預設值 | 描述 |
---|---|---|---|---|
host | 字元串 | 是 | - | 目標 SFTP 主機地址 |
port | 整數 | 是 | - | 目標 SFTP 埠號 |
user | 字元串 | 是 | - | 目標 SFTP 用戶名 |
password | 字元串 | 是 | - | 目標 SFTP 密碼 |
path | 字元串 | 是 | - | 源文件路徑 |
file_format_type | 字元串 | 是 | - | 請查看下文的 #file_format_type |
file_filter_pattern | 字元串 | 否 | - | 用於文件過濾的過濾器模式。 |
delimiter | 字元串 | 否 | \001 | 欄位分隔符,用於告訴連接器如何在讀取文本文件時切割欄位。預設 \001 ,與 Hive 的預設分隔符相同。 |
parse_partition_from_path | 布爾型 | 否 | true | 控制是否從文件路徑中解析分區鍵和值。 例如,如果從路徑中讀取文件 oss://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26 那麼文件中的每條記錄將添加這兩個欄位: name age tyrantlucifer 26 提示:不要在模式選項中定義分區欄位。 |
date_format | 字元串 | 否 | yyyy-MM-dd | 日期類型的格式,用於告訴連接器如何將字元串轉換為日期,支持以下格式:yyyy-MM-dd yyyy.MM.dd yyyy/MM/dd ,預設 yyyy-MM-dd 。 |
datetime_format | 字元串 | 否 | yyyy-MM-dd HH:mm:ss | 日期時間類型的格式,用於告訴連接器如何將字元串轉換為日期時間,支持以下格式:yyyy-MM-dd HH:mm:ss yyyy.MM.dd HH:mm:ss yyyy/MM/dd HH:mm:ss yyyyMMddHHmmss ,預設 yyyy-MM-dd HH:mm:ss 。 |
time_format | 字元串 | 否 | HH:mm:ss | 時間類型的格式,用於告訴連接器如何將字元串轉換為時間,支持以下格式:HH:mm:ss HH:mm:ss.SSS ,預設 HH:mm:ss 。 |
skip_header_row_number | 長整型 | 否 | 0 | 跳過前幾行,僅對 txt 和 csv 文件有效。 例如,設置如下: skip_header_row_number = 2 那麼 SeaTunnel 將跳過源文件的前兩行。 |
sheet_name | 字元串 | 否 | - | 讀取工作簿的工作表名稱,僅在文件格式為 Excel 時使用。 |
schema | 配置項 | 否 | - | 請查看下文的 #schema |
通用選項 | 否 | - | Source 插件通用參數,請參考 Source通用選項 獲取詳細信息。 |
file_format_type [字元串]
支持以下文件類型:
text
csv
parquet
orc
json
excel
如果將文件類型指定為 json
,需要配置 Schema 模式選項,向連接器說明如何解析數據為你需要所需的 Row。
例如:上游數據如下:
{"code": 200, "data": "get success", "success": true}
也可以將多個數據保存在一個文件中,並通過換行符進行分隔:
{"code": 200, "data": "get success", "success": true}
{"code": 300, "data": "get failed", "success": false}
需要按照以下方式配置 Schema:
schema {
fields {
code = int
data = string
success = boolean
}
}
連接器將生成以下數據:
code | data | success |
---|---|---|
200 | 獲取成功 | true |
如果將文件類型指定為 parquet 或 orc ,則無需指定模式選項,連接器可以自動查找上游數據的模式。 |
||
如果將文件類型指定為 text 或 csv ,則可以選擇是否指定模式信息或不指定。 |
例如,上游數據如下:
tyrantlucifer#26#male
如果不配置 Schema,Connector 將這樣處理上游數據:
如果分配數據模式,則除了 CSV 文件類型外,還應分配選項 delimiter。
內容 |
---|
tyrantlucifer#26#male |
如果配置了數據 Schema,除了CSV文件類型,還需要配置選項分隔符。 |
需要配置 Schema 和分隔符如下:
delimiter = "#"
schema {
fields {
name = string
age = int
gender = string
}
}
連接器將生成以下數據:
姓名 | 年齡 | 性別 |
---|---|---|
tyrantlucifer | 26 | 男 |
Schema [配置項]
fields [配置項]
上游數據的 Schema。
如何創建 Sftp 數據同步任務
以下示例演示瞭如何創建一個數據同步任務,從 Sftp 讀取數據併在本地客戶端上列印出來:
# 設置要執行的任務的基本配置
env {
execution.parallelism = 1
job.mode = "BATCH"
}
# 創建連接到 Sftp 的源
source {
SftpFile {
host = "sftp"
port = 22
user = seatunnel
password = pass
path = "tmp/seatunnel/read/json"
file_format_type = "json"
result_table_name = "sftp"
schema = {
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(38, 18)"
c_timestamp = timestamp
c_row = {
C_MAP = "map<string, string>"
C_ARRAY = "array<int>"
C_STRING = string
C_BOOLEAN = boolean
C_TINYINT = tinyint
C_SMALLINT = smallint
C_INT = int
C_BIGINT = bigint
C_FLOAT = float
C_DOUBLE = double
C_BYTES = bytes
C_DATE = date
C_DECIMAL = "decimal(38, 18)"
C_TIMESTAMP = timestamp
}
}
}
}
}
# 控制台列印讀取的 Sftp 數據
sink {
Console {
parallelism = 1
}
}
本文由 白鯨開源 提供發佈支持!