Partial Update 數據打寬 通過不同的流寫不同的欄位,打寬了數據的維度,填充了數據內容;如下所示: --FlinkSQL參數設置 set `table.dynamic-table-options.enabled` = `true`; SET `env.state.backend` = ` ...
Partial Update
數據打寬
通過不同的流寫不同的欄位,打寬了數據的維度,填充了數據內容;如下所示:
--FlinkSQL參數設置
set
`table.dynamic-table-options.enabled` = `true`;
SET
`env.state.backend` = `rocksdb`;
SET
`execution.checkpointing.interval` = `60000`;
SET
`execution.checkpointing.tolerable-failed-checkpoints` = `3`;
SET
`execution.checkpointing.min-pause` = `60000`;
--創建Paimon catalog
CREATE CATALOG paimon WITH (
'type' = 'paimon',
'metastore' = 'hive',
'uri' = 'thrift://localhost:9083',
'warehouse' = 'hdfs://paimon',
'table.type' = 'EXTERNAL'
);
--創建Partial update結果表
CREATE TABLE if not EXISTS paimon.dw.order_detail (
`order_id` string,
`product_type` string,
`plat_name` string,
`ref_id` bigint,
`start_city_name` string,
`end_city_name` string,
`create_time` timestamp(3),
`update_time` timestamp(3),
`dispatch_time` timestamp(3),
`decision_time` timestamp(3),
`finish_time` timestamp(3),
`order_status` int,
`binlog_time` bigint,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'bucket' = '20',
-- 指定20個bucket
'bucket-key' = 'order_id',
-- 記錄排序欄位
'sequence.field' = 'binlog_time',
-- 選擇 full-compaction ,在compaction後產生完整的changelog
'changelog-producer' = 'full-compaction',
-- compaction 間隔時間
'changelog-producer.compaction-interval' = '2 min',
'merge-engine' = 'partial-update',
-- 忽略DELETE數據,避免運行報錯
'partial-update.ignore-delete' = 'true'
);
INSERT INTO
paimon.dw.order_detail
-- order_info表提供主要欄位
SELECT
order_id,
product_type,
plat_name,
ref_id,
cast(null as string) as start_city_name,
cast(null as string) as end_city_name,
create_time,
update_time,
dispatch_time,
decision_time,
finish_time,
order_status,
binlog_time
FROM
paimon.ods.order_info
/*+ OPTIONS ('scan.mode'='latest') */
union
all
-- order_address表提供城市欄位
SELECT
order_id,
cast(null as string) as product_type,
cast(null as string) as plat_name,
cast(null as bigint) as ref_id,
start_city_name,
end_city_name,
cast(null as timestamp(3)) as create_time,
cast(null as timestamp(3)) as update_time,
cast(null as timestamp(3)) as dispatch_time,
cast(null as timestamp(3)) as decision_time,
cast(null as timestamp(3)) as finish_time,
cast(null as int) as order_status,
binlog_time
FROM
paimon.ods.order_address
/*+ OPTIONS ('scan.mode'='latest') */
;
完整的Changlog
Paimon中的表被多流填充數據且打寬維度後,支持流讀、批讀的方式提供完整的Changelog給下游。
Sequence-Group
配置:'fields.G.sequence-group'='A,B'
由欄位G
控制是否更新欄位A, B
;總得來說,G
的值如果為null或比更新值大將不更新A,B
;如下單測
public void testSequenceGroup() {
sql(
"CREATE TABLE SG ("
+ "k INT, a INT, b INT, g_1 INT, c INT, d INT, g_2 INT, PRIMARY KEY (k) NOT ENFORCED)"
+ " WITH ("
+ "'merge-engine'='partial-update', "
+ "'fields.g_1.sequence-group'='a,b', "
+ "'fields.g_2.sequence-group'='c,d');");
sql("INSERT INTO SG VALUES (1, 1, 1, 1, 1, 1, 1)");
// g_2 should not be updated
sql("INSERT INTO SG VALUES (1, 2, 2, 2, 2, 2, CAST(NULL AS INT))");
// select *
assertThat(sql("SELECT * FROM SG")).containsExactlyInAnyOrder(Row.of(1, 2, 2, 2, 1, 1, 1));
// projection
assertThat(sql("SELECT c, d FROM SG")).containsExactlyInAnyOrder(Row.of(1, 1));
// g_1 should not be updated
sql("INSERT INTO SG VALUES (1, 3, 3, 1, 3, 3, 3)");
assertThat(sql("SELECT * FROM SG")).containsExactlyInAnyOrder(Row.of(1, 2, 2, 2, 3, 3, 3));
// d should be updated by null
sql("INSERT INTO SG VALUES (1, 3, 3, 3, 2, 2, CAST(NULL AS INT))");
sql("INSERT INTO SG VALUES (1, 4, 4, 4, 2, 2, CAST(NULL AS INT))");
sql("INSERT INTO SG VALUES (1, 5, 5, 3, 5, CAST(NULL AS INT), 4)");
assertThat(sql("SELECT a, b FROM SG")).containsExactlyInAnyOrder(Row.of(4, 4));
assertThat(sql("SELECT c, d FROM SG")).containsExactlyInAnyOrder(Row.of(5, null));
}
其作用是:
- 在多個數據流更新期間的無序問題。每個數據流都定義自己的序列組。
- 真正的部分更新,而不僅僅是非空值的更新。
- 接受刪除記錄來撤銷部分列。
Changelog-Producer
Paimon通過Changelog-Producer支持生成changelog,並支持下游以流讀、批讀的形式讀取changelog。
Changelog的生成有多種方式,input、lookup、full-compaction;其生成代價是由低到高。
None
不查找舊值,不額外寫Changelog;但會下游任務中通過ChangelogNormalize運算元補足Changelog。
Input
不查找舊值,額外寫Changelog;適用與CDC的數據源。
Lookup
查找舊值,額外寫Changelog;如果不是CDC數據源,需要通過LookupCompaction查找舊值,即在 compaction 的過程中, 會去向高層查找本次新增 key 的舊值, 如果沒有查找到, 那麼本次的就是新增 key, 如果有查找到, 那麼就生成完整的 UB 和 UA 消息。
Full-Compaction
查找舊值,額外寫Changelog;在 full compact 的過程中, 其實數據都會被寫到最高層, 所以所有 value 的變化都是可以推演出來的.
數據一致性
數據版本
通過Flink的checkpoint機制,生成Snapshot並標記版本,即,一個Snapshot對應數據的一個版本。
比如 Job-A 基於 Table-A 的 Snapshot-20 產出了 Table-B 的 Snapshot-11。Job-B 基於 Table-A 的Snapshot-20產出了 Table-C 的 Snapshot-15。那麼 Job-C 的查詢就應該基於 Table-B 的 Snapshot-11 和 Table-C 的 Snapshot-15 進行計算,明確了數據版本,從而實現計算的一致性。
生成的snapshot-xx,就是數據的版本號。
數據對齊
將 Checkpoint 插入到兩個 Snapshot 的數據之間。如果當前的 Snapshot 還沒有完全被消費,這個 Checkpoint 的觸發會被推遲,從而實現按照 Snapshot 對數據進行劃分和對齊。
實現分為兩個部分。
- 在提交階段,需要去血緣關係表中查詢上下游表的一致性版本,並且基於查詢結果給對應的上游表設置起始的消費位置。
- 在運行階段,按照消費的 Snapshot 來協調 Checkpoint,在 Flink 的 Checkpoint Coordinator 向 Source 發出 Checkpoint 的請求時,會強制要求將 Checkpoint 插入到兩個 Snapshot 的數據之間。如果當前的 Snapshot 還沒有完全被消費,這個 Checkpoint 的觸發會被推遲,從而實現按照 Snapshot 對數據進行劃分和處理。
數據血緣
概念
數據從產生到消費的整個流轉過程中所經歷的各種轉換、處理和流動的軌跡。數據血緣提供了數據的來源、去向以及中間處理過程的透明度,幫助用戶理解數據如何在系統中被處理和移動,以及數據是如何從原始狀態轉化為最終的可消費形態。
實現
在checkpoint的提交時將數據的血緣關係寫入到System Table,記錄血緣關係。