位元組跳動數據湖團隊在實時數倉構建寬表的業務場景中,探索實踐出的一種基於 Hudi Payload 的合併機制提出的全新解決方案。 位元組跳動數據湖團隊在實時數倉構建寬表的業務場景中,探索實踐出的一種基於 Hudi Payload 的合併機制提出的全新解決方案。 該方案在存儲層提供對多流數據的關聯能力, ...
位元組跳動數據湖團隊在實時數倉構建寬表的業務場景中,探索實踐出的一種基於 Hudi Payload 的合併機制提出的全新解決方案。
位元組跳動數據湖團隊在實時數倉構建寬表的業務場景中,探索實踐出的一種基於 Hudi Payload 的合併機制提出的全新解決方案。
該方案在存儲層提供對多流數據的關聯能力,旨在解決實時場景下多流 JOIN 遇到的一系列問題。接下來,本文會詳細介紹多流拼接方案的背景以及實踐經驗。
業務面臨的挑戰
位元組跳動存在較多業務場景需要基於具有相同主鍵的多個數據源實時構建一個大寬表,數據源一般包括 Kafka 中的指標數據,以及 KV 資料庫中的維度數據。
業務側通常會基於實時計算引擎在流上做多個數據源的 JOIN 產出這個寬表,但這種解決方案在實踐中面臨較多挑戰,主要可分為以下兩種情況:
- 維表 JOIN
-
場景挑戰:指標數據與維度數據進行關聯,其中維度數據量比較大,指標數據 QPS 比較高,導致數據可能會產出延遲。
-
當前方案:將部分維度數據緩存起起來,緩解高 QPS 下訪問維度數據存儲引擎產生的任務背壓問題。
-
存在問題:由於業務方的維度數據和指標數據時間差比較大,所以指標數據流無法設置合理的 TTL;而且存在 Cache 中維度數據沒有及時更新,導致下游數據不准確的問題。
- 多流 JOIN
- 場景挑戰:多個指標數據進行關聯,不同指標數據可能會出現時間差比較大的異常情況。
- 當前方案:使用基於視窗的 JOIN,並且維持一個比較大的狀態。
- 存在問題:維持大的狀態不僅會給記憶體帶來的一定的壓力,同時 Checkpoint 和 Restore 的時間會變 得更長,可能會導致任務背壓.
分析與對策
總結上述場景遇到的挑戰,主要可歸結為以下兩點:
- 由於多流之間時間差比較大,需要維持大狀態,同時 TTL 不好設置。
- 由於對維度數據做了 Cache,維度數據數據更新不及時,導致下游數據不准確。
針對這些問題,並結合業務場景對數據延遲有一定容忍,但對數據準確性要求比較高的背景,我們在不斷的實踐中探索出了基於 Hudi Payload 機制的多流拼接方案:
- 多流數據完全在存儲層進行拼接,與計算引擎無關,因此不需要保留狀態及其 TTL 的設置。
- 維度數據和指標數據作為不同的流獨立更新,更新過程中不需要做多流數據合併,下游讀取時再 Merge 多流數據,因此不需要緩存維度數據,同時可以在執行 Compact 時進行 Merge,加速下游查詢。
此外,多流拼接方案還支持:
- 內置通用模板,支持數據去重等通用介面,同時可滿足用戶定製化數據處理需求。
- 支持離線場景和流批混合場景。
方案介紹
基本概念
首先簡單介紹下本方案依賴 Hudi 的一些核心概念:
- Hudi MetaStore
這是一個中心化的數據湖元數據管理系統。它基於 Timeline 樂觀鎖實現併發寫控制,可以支持列級別的衝突檢查。這在 Hudi 多流拼接方案中能夠實現併發寫入至關重要,更多細節可參考位元組跳動數據湖團隊向社區貢獻的 RFC-36。
- MergeOnRead 表讀寫邏輯
MergeOnRead 表裡面的文件包含兩種, LogFile (行存) 和 BaseFile (列存),適用於實時高頻更新場景,更新數據會直接寫入 LogFile 中,讀時再進行合併。為了減少讀放大的問題,會定期合併 LogFile 到 BaseFile 中,此過程叫 Compact。
原理概述
針對上述業務場景,我們設計了一種完全基於存儲層的多流拼接方案,支持多個數據流併發寫入,讀時按照主鍵合併多流數據,此外還支持非同步 Compact 來加速下游讀取數據。
圖 1 Hudi 多流拼接概念圖(本文所有圖中示例數據均與圖 1 一致)
現以一個簡單的示例流程對方案原理進行闡述,圖 1 為多流拼接示意圖。圖中的寬表包含 BCDE 五列,是由兩個實時流和一個離線流拼接而成,其中 A 是主鍵列,實時流 1 負責寫入 ABC 三列,實時 流 2 負責寫入 AD 兩列,離線流負責寫入 AE 兩列,此處僅對兩個實時流的拼接過程進行介紹。
圖 1 中顯示兩個流寫入數據以 LogFile 形式存儲,Merge 過程是合併 LogFile 和 BaseFile 中的數據。合併過程中,LogFile 中每一列的值被更新到 BaseFile 中對應的列上,BaseFile 中未被更新的列保持原來的值不變,如圖 1 中 BCD 三列被更新成新值,E 列保持舊值不變。
寫入過程
多流數據拼接方案支持多流併發寫入,相互獨立。對於單個流的寫入,邏輯與 Hudi 原有寫入流程一致,即數據以 Upsert 的方式寫入 Hudi 表,以 LogFile 的形式存儲,併在數據寫入的過程中對數據去重。在多流寫入的場景,核心點在於如何處理併發問題。
圖 2 顯示了數據併發寫入的流程。流 1 和 流 2 是兩個併發的任務,檢查這兩個任務寫入的列除了主鍵以外是不是存在其它交集。例如:
流 1 的 Schema 包含三列 (A,B,C),流 2 的 Schema 包含兩列 (A,D)。
在併發寫入的時候,先在 Hudi MetaStore 對兩個任務發起的 DeltaCommit 做列衝突檢查,即除了主鍵列外的其它列是否存在交集,如圖中的 (B,C) 和 (D):
- 如果有交集,則後發起的 DeltaCommit 失敗。
- 如果沒有交集,則兩個任務繼續後續的寫入。
圖 2 數據寫入過程示意圖
讀取過程
接下來,介紹多流拼接場景下 Snapshot Query 的核心過程,即先對 LogFile 進行去重合併,然後再合併 BaseFile 和 去重後的 LogFile 中的數據。圖 3 顯示了整個數據合併的過程,具體可以拆分成以下 兩個過程:
- Merge LogFile
Hudi 現有邏輯是將 LogFile 中的數據讀出來存放在 Map 中,對於 LogFile 中每條 Record,如果 Key 不存在 Map 中,則直接放入 Map,如果 Key 已經存在於 Map 中,則需要更新操作。
在多流拼接中,因為 LogFile 中存在不同數據流寫入的數據,即每條數據的列可能不相同,所以在更新的時候需要判斷相同 Key 的兩個 Record 是否來自同一個流,是則做更新,不是則做拼接。
如圖 3 所示,讀到 LogFile2 中的主鍵是 key1 的 Record 時,key1 對應的 Record 在 Map 中已經存在,但這兩個 Record 來自不同流,則需要拼接形成一條新的 Record (key1,b0_new,c0_new,d0_new) 放入 Map 中。
- Merge BaseFile and LogFile
Hudi 現有預設邏輯是對於每一條存在於 BaseFile 中的 Record,查看 Map 中是否存在 key 相同的 Record,如果存在,則用 Map 中的 Record 覆蓋 BaseFile 中的 Record。在多流拼接中,Map 中的 Record 不會完整覆蓋 BaseFile 中對應的 Record,可能只會更新部分列的值,即 Map 中的 Record 對應的列。
如圖 3 所示,以最簡單的覆蓋邏輯為例,當讀到 BaseFile 中的主鍵是 key1 的 Record 時,發現 key1 在 Map 中已經存在並且對應的 Record 有 BCD 三列的值,則更新 BaseFile 中的 BCD 列,得到新的 Record(key1,b0_new,c0_new,d0_new,e0),註意 E 列沒有被更新,所以保持原來的值 e0。
對於新增的 Key 如 Key3 對應的 Record,則需要將 BCE 三列補上預設值形成一條完整的 Record。
圖3 SnapShot Query 中數據合併過程
非同步 Compaction
為了提升讀取性能,某些數據源的寫入任務會同步執行 Compaction,但實踐過程中發現同步執行 Compaction 會阻塞寫入任務,而且 Compaction 任務需要資源比較多,可能會搶占流式導入任務的資源。
針對這類場景,通過獨立的 Compaction Service 來隔離 Compaction 任務和流式數據導入任務。與 Hudi 本身自帶的非同步 Compaction 不同的是,用戶無需指定要執行的 Compaction Instant,且有一個獨立的 Compaction Service 負責所有的表的 Compaction 操作。關於 Compaction Service 的細節就不在本文展開,詳情可參考 RFC-43。
具體過程是流式導入任務同步生成 Schedule Compaction Plan,並將 Plan 存入 Hudi MetaStore。有一個獨立於流式導入任務的 Async Compactor,它從 Hudi MetaStore 迴圈拉取 Compaction Plan 並執行。
場景實踐與未來規劃
最終,基於 Hudi 多流拼接的方案,在實時數倉的 DWS 層落地,單表支持了 3+ 數據流的併發導入,覆蓋了數百 TB 的數據。
此外,在使用 Spark 對寬表數據進行查詢時,在單次掃描量幾十 TB 的查詢中,性能相比於直接使用多表關聯性能提升在 200% 以上,在一些更加複雜的查詢下,也有 40-140% 的性能提升。
目前,基於 Hudi 多流拼接方案易用性不足,單個任務至少需要配置超過 10 個參數,為了進一步降低用戶使用成本,後續會做部分列插入和更新的 SQL 的語法支持以及參數的收斂。
除此之外,為了進一步提升寬表數據查詢性能,還計劃在多流拼接場景下支持基於列存格式的 LogFile,提供列裁剪和過濾條件下推等功能。
數據湖團隊正在招人,
歡迎關註位元組跳動數據平臺同名公眾號
相關產品
- 火山引擎湖倉一體分析服務 LAS
面向湖倉一體架構的Serverless數據處理分析服務,提供一站式的海量數據存儲計算和交互分析能力,完全相容 Spark、Presto、Flink 生態,幫助企業輕鬆完成數據價值洞察。點擊瞭解
- 火山引擎E-MapReduce
支持構建開源 Hadoop 生態的企業級大數據分析系統,完全相容開源,提供 Hadoop、Spark、Hive、Flink 集成和管理,幫助用戶輕鬆完成企業大數據平臺的構建,降低運維門檻,快速形成大數據分析能力。點擊瞭解