翻譯自 Apache Paimon官方文檔 概覽 概述 Apache Paimon (incubating) 是一項流式數據湖存儲技術,可以為用戶提供高吞吐、低延遲的數據攝入、流式訂閱以及實時查詢能力。 簡單來說,Paimon的上游是各個CDC,即changlog數據流;而其自身支持實時sink與s ...
概覽
概述
Apache Paimon (incubating) 是一項流式數據湖存儲技術,可以為用戶提供高吞吐、低延遲的數據攝入、流式訂閱以及實時查詢能力。
簡單來說,Paimon的上游是各個CDC,即changlog數據流;而其自身支持實時sink與search(下沉與查詢)changlog數據流。一般會與Flink等流式計算引擎集成使用。
流式數據湖是一種先進的數據存儲架構,專門為處理大規模實時數據流而設計。在流式數據湖中,數據以流的形式持續不斷地進入系統,而不是批量存儲後處理。
數據湖是一個存儲企業的各種各樣原始數據的大型倉庫,其中的數據可供存取、處理、分析及傳輸。
數據倉庫中的數據是經過優化後(也可以看作是結構化的數據),且與該數據倉庫支持的數據模型吻合的數據。
Paimon提供以下核心功能:
- 統一批處理和流式處理:Paimon支持批量寫入和批量讀取,以及流式寫入更改和流式讀取表change log。
- 數據湖:Paimon作為數據湖存儲,具有成本低、可靠性高、元數據可擴展等優點。
- Merge Engines:Paimon支持豐富的合併引擎(Merge Engines)。預設情況下,保留主鍵的最後一個條目。您還可以使用“部分更新”或“聚合”引擎。
- Changelog Producer:用於在數據湖中生成和跟蹤數據的變更日誌(changelog);Paimon 支持豐富的 Changelog Producer,例如“lookup”和“full-compaction”;正確的changelog可以簡化流式處理管道的構造。
- Append Only Tables:Paimon支持只追加(append only)表,自動壓縮小文件,提供有序的流式讀取。您可以使用它來替換消息隊列。
架構
架構如下所示
讀/寫:Paimon 支持多種讀/寫數據和執行 OLAP 查詢的方式。
- 對於讀取,支持如下三種方式消費數據
- 歷史快照(批處理模式)
- 最新的偏移量(流模式)
- 混合模式下讀取增量快照
- 對於寫入,它支持來自資料庫變更日誌(CDC)的流式同步或來自離線數據的批量插入/覆蓋。
生態系統:除了Apache Flink之外,Paimon還支持Apache Hive、Apache Spark、Trino等其他計算引擎的讀取。
底層存儲:Paimon 將列式文件存儲在文件系統/對象存儲上,並使用 LSM 樹結構來支持大量數據更新和高性能查詢。
統一存儲
對於 Apache Flink 這樣的流引擎,通常有三種類型的connector:
- 消息隊列,例如 Apache Kafka,在該消息管道(pipeline)的源階段和中間階段使用,以保證延遲保持在秒級。
- OLAP系統,例如ClickHouse,它以流方式接收處理後的數據並服務用戶的即席查詢。
- 批量存儲,例如Apache Hive,它支持傳統批處理的各種操作,包括INSERT OVERWRITE。
Paimon 提供抽象概念的表。 它的使用方式與傳統資料庫沒有什麼區別:
- 在批處理執行模式下,它就像一個Hive表,支持Batch SQL的各種操作。 查詢它以查看最新的快照。
- 在流執行模式下,它的作用就像一個消息隊列。 查詢它的行為就像從歷史數據永不過期的消息隊列中查詢stream changelog。
基本概念
Snapshot
snapshot捕獲table在某個時間點的狀態。 用戶可以通過最新的snapshot來訪問表的最新數據。通過time travel,用戶還可以通過較早的快照訪問表的先前狀態。
Partition
Paimon 採用與 Apache Hive 相同的分區概念來分離數據。
分區是一種可選方法,可根據date, city, and department等特定列的值將表劃分為相關部分。每個表可以有一個或多個分區鍵來標識特定分區。
通過分區,用戶可以高效地操作表中的一片記錄。
Bucket
未分區表或分區表中的分區被細分為Bucket(桶),以便為可用於更有效查詢的數據提供額外的結構。
Bucket的範圍由record中的一列或多列的哈希值確定。用戶可以通過提供bucket-key選項來指定分桶列。如果未指定bucket-key選項,則主鍵(如果已定義)或完整記錄將用作存儲桶鍵。
Bucket是讀寫的最小存儲單元,因此Bucket的數量限制了最大處理並行度。 不過這個數字不應該太大,因為它會導致大量 小文件和低讀取性能。 一般來說,每個桶中建議的數據大小約為200MB - 1GB。
Consistency Guarantees
Paimon Writer 使用兩階段提交協議以原子方式將一批record提交到Table中。每次提交時最多生成兩個snapshot。
對於任意兩個同時修改table的寫入者,只要他們不修改同一個Bucket,他們的提交就可以並行發生。如果他們修改同一個Bucket,則僅保證快照隔離。也就是說,最終表狀態可能是兩次提交的混合,但不會丟失任何更改。
文件
概述
一張表的所有文件都存儲在一個基本目錄下。 Paimon 文件以分層方式組織。 下圖說明瞭文件佈局。 從snapshot文件開始,Paimon reader可以遞歸地訪問表中的所有記錄。
Snapshot Files
所有snapshot文件都存儲在snapshot目錄中。
snapshot文件是一個 JSON 文件,包含有關此snapshot的信息,包括
- 正在使用的Schema文件
- 包含此snapshot的所有更改的清單列表(manifest list)
Manifest Files
所有清單(manifest)列表和清單文件都存儲在清單目錄中。
清單列表(manifest list)是清單文件名的列表。
清單文件是包含有關 LSM 數據文件和changelog文件的更改的文件。 例如對應快照中創建了哪個LSM數據文件、刪除了哪個文件。
Data Files
數據文件按分區和桶(Bucket)分組。每個Bucket目錄都包含一個 LSM 樹及其changelog文件。
目前,Paimon 支持使用 orc(預設)、parquet 和 avro 作為數據文件格式。
LSM-Trees
Paimon 採用 LSM 樹(日誌結構合併樹)作為文件存儲的數據結構。 如下簡要介紹
Sorted Runs
LSM 樹將文件組織成多個 sorted runs。 sorted runs由一個或多個數據文件組成,並且每個數據文件恰好屬於一個 sorted runs。
數據文件中的記錄按其主鍵排序。 在 sorted runs中,數據文件的主鍵範圍永遠不會重疊。
如圖所示的,不同的 sorted runs可能具有重疊的主鍵範圍,甚至可能包含相同的主鍵。查詢LSM樹時,必須合併所有 sorted runs,並且必鬚根據用戶指定的合併引擎和每條記錄的時間戳來合併具有相同主鍵的所有記錄。
寫入LSM樹的新記錄將首先緩存在記憶體中。當記憶體緩衝區滿時,記憶體中的所有記錄將被順序並刷新到磁碟,並創建一個新的 sorted runs。
Compaction
當越來越多的記錄寫入LSM樹時,sorted runs的數量將會增加。由於查詢LSM樹需要將所有 sorted runs合併起來,太多 sorted runs將導致查詢性能較差,甚至記憶體不足。
為了限制 sorted runs的數量,我們必須偶爾將多個 sorted runs合併為一個大的 sorted runs。 這個過程稱為壓縮。
然而,壓縮是一個資源密集型過程,會消耗一定的CPU時間和磁碟IO,因此過於頻繁的壓縮可能會導致寫入速度變慢。 這是查詢和寫入性能之間的權衡。 Paimon 目前採用了類似於 Rocksdb 通用壓縮的壓縮策略。
預設情況下,當Paimon將記錄追加到LSM樹時,它也會根據需要執行壓縮。 用戶還可以選擇在專用壓縮作業中執行所有壓縮。
可以將 sorted runs 理解為多個有序的Data File組成的一個有序文件。
主鍵表
Changelog表是創建表時的預設表類型。用戶可以在表中插入、更新或刪除記錄。
主鍵由一組列組成,這些列包含每個記錄的唯一值。Paimon通過對每個bucket中的主鍵進行排序來實現數據排序,允許用戶通過對主鍵應用過濾條件來實現高性能。
通過在變更日誌表上定義主鍵,用戶可以訪問以下特性。
Bucket
桶(Bucket)是進行讀寫操作的最小存儲單元,每個桶目錄包含一個LSM樹。
Fixed Bucket
配置一個大於0的桶,使用Fixed bucket模式,根據Math.abs(key_hashcode % numBuckets)
來計算記錄的桶。
重新縮放桶只能通過離線進程進行。桶的數量過多會導致小文件過多,桶的數量過少會導致寫性能不佳。
Dynamic Bucket
配置'Bucket'='-1'
。 先到達的key會落入舊的bucket,新的key會落入新的bucket,bucket和key的分佈取決於數據到達的順序。 Paimon 維護一個索引來確定哪個鍵對應哪個桶。
Paimon會自動擴大桶的數量。
- Option1: 'dynamic-bucket.target-row-num':控制一個桶的目標行數。
- Option2:'dynamic-bucket.initial-buckets':控制初始化bucket的數量。
Normal Dynamic Bucket Mode
當更新不跨分區(沒有分區,或者主鍵包含所有分區欄位)時,動態桶模式使用 HASH 索引來維護從鍵到桶的映射,它比固定桶模式需要更多的記憶體。
如下:
- 一般來說,沒有性能損失,但會有一些額外的記憶體消耗,一個分區中的 1 億個條目多占用 1 GB 記憶體,不再活動的分區不占用記憶體。
- 對於更新率較低的表,建議使用此模式,以顯著提高性能。
Cross Partitions Upsert Dynamic Bucket Mode
當需要跨分區upsert(主鍵不包含所有分區欄位)時,Dynamic Bucket模式直接維護鍵到分區和桶的映射,使用本地磁碟,併在啟動流寫作業時通過讀取表中所有現有鍵來初始化索引 。 不同的合併引擎有不同的行為:
- Deduplicate:刪除舊分區中的數據,並將新數據插入到新分區中。
- PartialUpdate & Aggregation:將新數據插入舊分區。
- FirstRow:如果有舊值,則忽略新數據。
性能:對於數據量較大的表,性能會有明顯的損失。而且,初始化需要很長時間。
如果你的upsert不依賴太舊的數據,可以考慮配置索引TTL來減少索引和初始化時間:
'cross-partition-upsert.index-ttl'
:rocksdb索引和初始化中的TTL,這樣可以避免維護太多索引而導致性能越來越差。
但請註意,這也可能會導致數據重覆。
Merge Engines
當Paimon sink收到兩條或更多具有相同主鍵的記錄時,它會將它們合併為一條記錄以保持主鍵唯一。 通過指定merge-engine
屬性,用戶可以選擇如何將記錄合併在一起。
Deduplicate
deduplicate合併引擎是預設的合併引擎。 Paimon 只會保留最新的記錄,並丟棄其他具有相同主鍵的記錄。
具體來說,如果最新的記錄是DELETE記錄,則所有具有相同主鍵的記錄都將被刪除。
Partial Update
通過指定 'merge-engine' = 'partial-update'
,用戶可以通過多次更新來更新記錄的列,直到記錄完成。 這是通過使用同一主鍵下的最新數據逐一更新值欄位來實現的。 但是,在此過程中不會覆蓋空值。
如下所示:
- <1, 23.0, 10, NULL>-
- <1, NULL, NULL, 'This is a book'>
- <1, 25.2, NULL, NULL>
假設第一列是主鍵key,那麼最後的結果是 <1, 25.2, 10, 'This is a book'>
Sequence Group
Sequence
欄位並不能解決多流更新的部分更新表的亂序問題,因為多流更新時 Sequence(序列) 欄位可能會被另一個流的最新數據覆蓋。
因此我們引入了部分更新表的序列組(Sequence Group)機制。 它可以解決:
- 多流更新時出現混亂。 每個流定義其自己的序列組。
- 真正的部分更新,而不僅僅是非空更新。
如下所示:
CREATE TABLE T (
k INT,
a INT,
b INT,
g_1 INT,
c INT,
d INT,
g_2 INT,
PRIMARY KEY (k) NOT ENFORCED
) WITH (
'merge-engine'='partial-update',
'fields.g_1.sequence-group'='a,b',
'fields.g_2.sequence-group'='c,d'
);
INSERT INTO T VALUES (1, 1, 1, 1, 1, 1, 1);
-- g_2 is null, c, d should not be updated
INSERT INTO T VALUES (1, 2, 2, 2, 2, 2, CAST(NULL AS INT));
SELECT * FROM T; -- output 1, 2, 2, 2, 1, 1, 1
-- g_1 is smaller, a, b should not be updated
INSERT INTO T VALUES (1, 3, 3, 1, 3, 3, 3);
SELECT * FROM T; -- output 1, 2, 2, 2, 3, 3, 3
對於 sequence-group,有效的比較數據類型包括:DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP 和 TIMESTAMP_LTZ。
Aggregation
可以為輸入欄位指定聚合函數,支持聚合中的所有函數。
如下所示:
CREATE TABLE T (
k INT,
a INT,
b INT,
c INT,
d INT,
PRIMARY KEY (k) NOT ENFORCED
) WITH (
'merge-engine'='partial-update',
'fields.a.sequence-group' = 'b',
'fields.b.aggregate-function' = 'first_value',
'fields.c.sequence-group' = 'd',
'fields.d.aggregate-function' = 'sum'
);
INSERT INTO T VALUES (1, 1, 1, CAST(NULL AS INT), CAST(NULL AS INT));
INSERT INTO T VALUES (1, CAST(NULL AS INT), CAST(NULL AS INT), 1, 1);
INSERT INTO T VALUES (1, 2, 2, CAST(NULL AS INT), CAST(NULL AS INT));
INSERT INTO T VALUES (1, CAST(NULL AS INT), CAST(NULL AS INT), 2, 2);
SELECT * FROM T; -- output 1, 2, 1, 2, 3
Default Value
如果無法保證數據的順序,僅通過覆蓋空值的方式寫入欄位,則讀表時未覆蓋的欄位將顯示為空。
CREATE TABLE T (
k INT,
a INT,
b INT,
c INT,
PRIMARY KEY (k) NOT ENFORCED
) WITH (
'merge-engine'='partial-update'
);
INSERT INTO T VALUES (1, 1, CAST(NULL AS INT), CAST(NULL AS INT));
INSERT INTO T VALUES (1, CAST(NULL AS INT), CAST(NULL AS INT), 1);
SELECT * FROM T; -- output 1, 1, null, 1
如果希望讀表時未被覆蓋的欄位有預設值而不是null,則需要fields.name.default-value
。
CREATE TABLE T (
k INT,
a INT,
b INT,
c INT,
PRIMARY KEY (k) NOT ENFORCED
) WITH (
'merge-engine'='partial-update',
'fields.b.default-value'='0'
);
INSERT INTO T VALUES (1, 1, CAST(NULL AS INT), CAST(NULL AS INT));
INSERT INTO T VALUES (1, CAST(NULL AS INT), CAST(NULL AS INT), 1);
SELECT * FROM T; -- output 1, 1, 0, 1
Aggregation
有時用戶只關心聚合結果。 聚合 合併引擎根據聚合函數將同一主鍵下的各個值欄位與最新數據一一聚合。
每個不屬於主鍵的欄位都可以被賦予一個聚合函數,由 fields.<field-name>.aggregate-function
表屬性指定,否則它將使用 last_non_null_value
聚合作為預設值。 例如,請考慮下表定義。
CREATE TABLE MyTable (
product_id BIGINT,
price DOUBLE,
sales BIGINT,
PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
'merge-engine' = 'aggregation',
'fields.price.aggregate-function' = 'max',
'fields.sales.aggregate-function' = 'sum'
);
price欄位將通過 max 函數聚合,sales欄位將通過 sum 函數聚合。 給定兩個輸入記錄 <1, 23.0, 15> 和 <1, 30.2, 20>,最終結果將是 <1, 30.2, 35>。
當前支持的聚合函數和數據類型有:
- sum:支持 DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT 和 DOUBLE。
- min/max:支持 CHAR、VARCHAR、DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP 和 TIMESTAMP_LTZ。
- last_value / last_non_null_value:支持所有數據類型。
- listagg:支持STRING數據類型。
- bool_and / bool_or:支持BOOLEAN數據類型。
- first_value/first_not_null_value:支持所有數據類型。
只有 sum 支持撤回(UPDATE_BEFORE 和 DELETE),其他聚合函數不支持撤回。 如果允許某些函數忽略撤回消息,可以配置:'fields.${field_name}.ignore-retract'='true'
First Row
通過指定 'merge-engine' = 'first-row'
,用戶可以保留同一主鍵的第一行。 它與Deduplicate合併引擎不同,在First Row合併引擎中,它將生成僅insert changelog。
- First Row合併引擎必須與 lookup changlog producer 一起使用。
- 不能指定sequence.field。
- 不接受 DELETE 和 UPDATE_BEFORE 消息。 可以配置 first-row.ignore-delete 來忽略這兩種記錄。
這對於替代流計算中的log deduplication有很大的幫助。
Changelog Producers
流式查詢會不斷產生最新的變化。
通過在創建表時指定更改changelog-producer表屬性,用戶可以選擇從表文件生成的更改模式。
Changelog:通俗全面的理解就是操作過程中(比如ETL/CRUD),數據變化的日誌;這樣的日誌可以幫助跟蹤數據的歷史變化,確保數據的質量與一致性,並允許回溯到之前的某個數據狀態,幫助進行數據審計、數據分析、數據恢復等。
None
預設情況下,不會將額外的changelog producer應用於表的writer。 Paimon source只能看到跨snapshot的合併更改,例如刪除了哪些鍵以及某些鍵的新值是什麼。
但是,這些合併的更改無法形成完整的changelog,因為我們無法直接從中讀取鍵的舊值。 合併的更改要求消費者“記住”每個鍵的值並重寫這些值而不看到舊的值。 然而,一些消費者需要舊的值來確保正確性或效率。
考慮一個消費者計算某些分組鍵的總和(可能不等於主鍵)。 如果消費者只看到一個新值5,它無法確定應該將哪些值添加到求和結果中。 例如,如果舊值為 4,則應在結果中加 1。 但如果舊值是 6,則應依次從結果中減去 1。 舊的value對於這些類型的消費者來說很重要。
總而言之,沒有一個changelog producer最適合資料庫系統等使用者。 Flink 還有一個內置的"normalize"運算符,可以將每個鍵的值保留在狀態中。 很容易看出,這種操作符的成本非常高,應該避免使用。 (可以通過“scan.remove-normalize”強制刪除“normalize”運算符。)
Input
通過指定 'changelog- Producer' = 'input'
,Paimon Writer依賴他們的輸入作為完整changelog的來源。 所有輸入記錄將保存在單獨的changelog file中,並由 Paimon source提供給消費者。
當 Paimon 編寫者的輸入是完整的changelog(例如來自資料庫 CDC)或由 Flink 狀態計算生成時,可以使用input changelog producer.
Lookup
如果您的輸入無法生成完整的changelog,但想擺脫昂貴的"normalize"運算符,則可以考慮使用'lookup' changelog producer.
通過指定'changelog- Producer' = 'lookup',Paimon將在提交數據寫入之前通過'lookup'生成changelog。
Lookup 會將數據緩存在記憶體和本地磁碟上,您可以使用以下選項來調整性能:
Lookup changelog- Producer 支持changelog- Producer.row-deduplicate
以避免為同一記錄生成-U、+U changelog。
Full Compaction
如果你覺得“lookup”的資源消耗太大,可以考慮使用“full-compaction”changelog Producer,它可以解耦數據寫入和changelog生成,更適合高延遲的場景(例如10分鐘) )。
通過指定 'changelog- Producer' = 'full-compaction',Paimon 將比較完全壓縮之間的結果並生成差異作為changelog。changelog的延遲受到完全壓縮頻率的影響。
通過指定 full-compaction.delta-commits 表屬性,在增量提交(檢查點 checkpoint)後將不斷觸發 full compaction。 預設情況下設置為 1,因此每個檢查點都會進行完全壓縮並生成change log。
Full-compaction changelog- Producer 支持changelog- Producer.row-deduplicate 以避免為同一記錄生成-U、+U 變更日誌。
Sequence Field
預設情況下,主鍵表根據輸入順序確定合併順序(最後輸入的記錄將是最後合併的)。 然而在分散式計算中,會存在一些導致數據混亂的情況。 這時,可以使用時間欄位作為sequence.field
,例如:
CREATE TABLE MyTable (
pk BIGINT PRIMARY KEY NOT ENFORCED,
v1 DOUBLE,
v2 BIGINT,
dt TIMESTAMP
) WITH (
'sequence.field' = 'dt'
);
無論輸入順序如何,具有最大sequence.field 值的記錄將是最後合併的記錄。
Sequence Auto Padding:
當記錄更新或刪除時,sequence.field必須變大,不能保持不變。 對於-U和+U,它們的序列欄位必須不同。 如果您無法滿足此要求,Paimon 提供了自動填充序列欄位的選項。
- 'sequence.auto-padding' = 'row-kind-flag':如果對-U和+U使用相同的值,就像Mysql Binlog中的“op_ts”(資料庫中進行更改的時間)一樣。 建議使用自動填充行類型標誌,它會自動區分-U(-D)和+U(+I)。
- 精度不夠:如果提供的sequence.field不滿足精度,比如大約秒或毫秒,可以將sequence.auto-padding設置為秒到微或毫秒到微,這樣序列號的精度 將由系統彌補到微秒。
- 複合模式:例如“second-to-micro,row-kind-flag“,首先將micro添加到第二個,然後填充row-kind標誌。
Row Kind Field
預設情況下,主鍵表根據輸入行確定行類型。 您還可以定義“rowkind.field”以使用欄位來提取行類型。
有效的行類型字元串應為“+I”、“-U”、“+U”或“-D”。