面對今日頭條、抖音等不同產品線的複雜數據質量場景,火山引擎 DataLeap 數據質量平臺如何滿足多樣的需求? ...
更多技術交流、求職機會,歡迎關註位元組跳動數據平臺微信公眾號,回覆【1】進入官方交流群
面對今日頭條、抖音等不同產品線的複雜數據質量場景,火山引擎 DataLeap 數據質量平臺如何滿足多樣的需求?本文將介紹我們在彌合大數據場景下數據質量校驗與計算消耗資源大、校驗計算時間長的衝突等方面的經驗,同時介紹火山引擎 DataLeap 數據質量平臺是如何用一套架構框架來滿足流批方面的數據質量監控。
什麼是數據質量管理
廣義上來說,數據質量的定義是數據滿足一組固有特性(質量維度)要求的程度。業界通常有 6 個維度:
- 完整性:指數據的記錄和信息是否完整,是否存在缺失的情況。數據缺失主要包括記錄的缺失和記錄中某個欄位信息的缺失,兩者都會造成統計結果不准確,所以說完整性是數據質量最基礎的保障。在做監控時,需要考慮兩個方面:數據條數是否少了;某些欄位的取值是否缺失。完整性的監控,多出現在日誌級別的監控上,一般會在數據接入的時候來做數據完整性校驗。
- 準確性:指數據中記錄的信息和數據是否準確,是否存在異常或者錯誤。一般準確性的監控多集中在對業務結果數據的監控,比如每日的活躍、收入等數據是否正常。
- 一致性:指同一指標在不同地方的結果是否一致。數據不一致的情況,多出現在數據系統達到一定的複雜度後,同一指標會在多處進行計算,由於計算口徑或者開發人員的不同,容易造成同一指標出現不同的結果。
- 及時性:在確保數據的完整性、準確性和一致性後,接下來就要保障數據能夠及時產出,這樣才能體現數據的價值。及時性很容易理解,主要就是數據計算出來的速度是否夠快,這點在數據質量監控中可以體現在監控結果數據是否在指定時間點前計算完成。
- 規範性:指數據是否按照要求的規則進行存儲,如郵箱校驗、IP 地址校驗、電話格式校驗等,具有一定的語義意義。
- 唯一性: 指數據是否有重覆,如欄位的唯一值、欄位的重覆值等。
我們對數據質量有一些流程和規範,並針對上述一些維度開發了一套數據質量平臺,主要關註數據質量及其生產鏈路。
上圖展示了在數據開發的流程中,火山引擎 DataLeap 數據質量平臺可以提供哪些功能:
- 數據探查:可以根據各種維度來查看數據明細和分佈情況。
- 數據對比:開發同學可能經常會發現線上表和測試表不一致,所以我們在任務上線的環節提供了數據對比的功能。
- 任務監控:監控線上數據,提供報警和熔斷功能。
火山引擎 DataLeap 數據質量平臺最有代表性的功能是:對數據開發平臺產出的 Hive 表數據進行主鍵重覆檢測,如果存在重覆則進行報警。
火山引擎 DataLeap 數據質量監控最有用的場景是防止數據問題蔓延到下游。舉個例子:數據任務產出一張 Hive 表,該表可能會同步一些信息到 Hive metastore(HMS)。HMS 的主從架構可能存在一定的延遲,假設 HMS 出現問題,下游任務可能會讀到臟數據,這時如果我們使用數據質量監控,就能及時發現問題,阻止下游任務運行。
數據質量挑戰
目前我們的數據質量挑戰有哪些?可以通過幾個用戶 case 瞭解一下。
User Story 1
某流量級產品商業化系統,M 級日誌條數/秒;希望秒級監控日誌延遲、關鍵欄位空值,T+1 檢測日誌波動率。
User Story 2
某內部業務系統,日誌存儲 ES;希望每 5 分鐘檢測上一周期日誌波動情況。
User Story 3
某內部指標平臺,業務數據由 Hive 定期同步到 ClickHouse;希望每次同步任務後檢查 Hive 與 ClickHouse 中的指標是否一致。
通過上面的介紹,大家應該也大致清楚了當前數據質量需要解決的問題。可能有人會說,數據質量平臺我也做過,問題歸總起來也不複雜,總而言之就是對數據進行各種計算,對比計算來的閾值即可,一般直接依賴於 Spark 引擎或者 Hive 引擎計算即可。確實,這也是我們數據質量最開始的樣子。那為什麼會演化到目前這樣,我們面臨了一些什麼問題?
首先是場景需求非常複雜:
- 離線監控,主要是不同存儲的數據質量監控,比如 Hive 或者 ClickHouse 。
- 位元組跳動內部的廣告系統對時效性和準確性要求很高,如果用微批系統 10 min 才做一次檢測,可能線上損失就上百萬了甚至千萬了。所以廣告系統對實時性要求相對較高。
- 另外一個是複雜拓撲情況下的流式延遲監控。
- 最後是微批,指一段時間內的定時調度,有些 Kafka 導入 ES 的流式場景,需要每隔幾分鐘對比下前一周期。
此外,位元組跳動各種產品會產出海量的日誌數據,我們需要用有限的資源來滿足大家對質量監控的需求。
面臨這些挑戰,我們的解決方案是什麼?
流批數據質量解決方案
產品功能架構
火山引擎 DataLeap 流批數據質量解決方案有 4 個大的功能:
- 離線數據質量監控:解決批和微批監控場景,支持 Hive、ClickHouse、ES 等多種數據源,並有欄位、唯一性等多種監控維度,允許通過 SQL 自定義維度聚合進行監控。
- 流式數據質量監控:解決流式監控場景,支持 Kafka/BMQ 等數據源。
- 數據探查:解決數據開發之前對數據內容存疑問題,支持 Hive 數據源。
- 數據對比:解決新舊表數據一致性問題,支持 Hive/Hive SQL 數據源。
系統架構
上圖是火山引擎 DataLeap 數據質量平臺的系統架構圖,主要分為 5 個部分:
-
Scheduler:外部調度器,觸發離線監控。主要分兩種類型:
對外提供 API 調用任務;
定時調度,通過 calljob 調用數據。
-
Backend:後端服務,偏服務層,處理業務邏輯。主要負責:
質量平臺和外部的交互,所有 API 響應都是通過這一層進行;
任務提交:用戶在質量平臺配置的規則會放到業務存儲,Scheduler 被調用後,Backend 會將任務相關的參數配置進行任務提交;
獲取質量監控的結果併進行判斷,然後和外部系統進行交互,在需要時發送警報通知用戶。
-
Executor:平臺核心的任務執行模塊,集成了一些引擎,例如數據探查使用 OLAP 引擎。質量監控部分使用 Griffin 的 Measure 進行數據統計。
-
Monitor:是一個相對獨立的模塊,主要進行狀態服務的流轉,提供重覆報警等功能。
-
Alert Center:質量平臺強依賴於該平臺。它是外部報警服務,接收各種報警事件
離線數據檢測流程
下麵看一下離線數據的檢測流程。
離線數據的監控、探查、對比的執行流程一致,主要分為 4 步:
- 監控觸發:調度系統調用質量模塊 Backend API;
- 作業提交:Backend 以 Cluster 模式提交 Spark 作業至 Yarn;
- 結果回傳:作業結束 (成功、失敗),Driver 將結果 sync 至 Backend;
- 消息觸發:Backend 根據結果觸發相應動作 (例如:報警、消息提示)。
我們總結了一下火山引擎 DataLeap 數據質量平臺的優勢:
- 調度系統低耦合:數據質量平臺沒有和調度系統強綁定,一般可以用業務系統的 API 實現互相調用。
- 事件觸發高效,Backend 水平擴展能力強:Backend 是無狀態的實例服務,如果質量監控的業務系統較多,Backend 可以採用水平擴展的方式部署,接收請求並提交作業。
- 沒有 Quota 限制:平臺本身沒有維護數據質量監控單獨需要的資源隊列,而是把這個許可權開放給用戶,用他們自身的資源做資源監控。這樣就把 Quota 問題轉換成了用戶資源問題。
當然任何一個工具都不可能是完美的,火山引擎 DataLeap 數據質量平臺暫時還有一些待提升的地方:
- 非 CPU 密集型查詢較重:整個平臺的設計是以任務提交的方式完成離線場景的需求。但是後來我們發現其實不需要啟動 Spark 的作業仍然會啟動一個 Spark 作業,如 ES SQL 查詢,這個查詢是很重的。
- 依賴 Yarn 做調度穩定性不高:平臺上的任務在資源不充足或被擠占的情況下,會出現任務運行或調用很慢。
流式監控執行
對於流式數據的監控,我們選擇了 Flink 引擎,因為流式數據不同於離線數據,不能用快照的方式低成本拿到過程。所以我們要依賴一些外部的時序資料庫再加規則引擎來展示對數據的監控。
平臺上流式數據監控的流程為:
- 根據規則定義,創建 Flink 作業;
- 根據報警條件,註冊 Bosun 報警事件;
- Flink 作業消費 Kafka 數據,計算監控指標寫 Metrics;
- Bosun 基於 Metrics 的時序數據,定時檢測,觸發報警;
- Backend 接收報警回調,處理報警發送邏輯。
下麵著重介紹兩個模塊的實現。
Executor 實現
Executor 是基於 Apache Griffin 的 Measure 模塊改造的一個 Spark Application。功能包括:
- 適配數據源
- 數據轉化為 DataFrame
- 規則轉化為 SQL 操作
- 計算結果
Executor 的選型有以下幾方面的考慮:
- 擴展性要足夠強,能夠適配不同的數據源,如 Hive,MySQL 等等
- 計算性能要較強
- 支持的監控類型種類需要足夠多
考慮到以上方面的信息,我們選用了 Apache Griffin 的 Measure 模塊作為 Executor。它基於 Spark 開發,能夠適配不同的數據源,並且對於 DSL 做了一系列拓展。基於平臺的設計,我們需要和 Backend 進行較多的互動,並把數據進行回傳。其實 Griffin Measure 本身就支持了一些基本的數據質量監控,比如重覆值檢測、自定義 SQL 等等,這裡重點說明一下我們對 Measure 模塊的改造:
- 改造數據源、Sink 使其能夠通過 HTTP 訪問遠程 API;
- 部分功能增強、修改,例如:支持正則表達式;
- 流式監控從 Spark Engine 切換為 Flink Engine,優化整體流式監控方案。Measure 本身是 Spark 生態的一部分,只能用 Spark Engine 做理線或者用微批模擬流式做監控。位元組跳動內部本身有一定的 Flink 的能力,並且 Flink 對流式數據的處理能力比微批要好很多,所以我們就進行了這樣的改造。
Monitor 實現
Monitor 模塊主要是為了實現失敗報警重試和重覆報警功能,根據事件類型觸發相應事件(重覆報警、失敗重試等)。因為業務數據全部存儲在 MySQL,平臺之前的 Monitor 重覆報警做的也比較簡單,即直接通過輪詢的方式從 MySQL 中輪詢拉起已報警實例,然後通過重覆提交的方式進行報警。
隨著監控的規則越來越多,庫的壓力會非常大,Monitor 的掃描也遇到了一些瓶頸,因此我們對 Monitor 進行了技術架構升級,具體改造內容包括:
- 有狀態服務,主節點對外提供服務;主備保證 HA
- 接收 Backend 事件:監控失敗、報警
- 記憶體定時隊列,事件性觸發機制。
最佳實踐
前面介紹了數據質量平臺的一些實現方式,下麵為大家介紹一些我們在數據量和資源這兩個方面的最佳實踐。
表行數信息-優先 HMS 獲取
內部的離線監控中,表行數的監控占比非常大,可能至少 50% 以上的離線規則都是表行數的監控。對於表行數,之前我們是通過 Spark,Select Count* 提交作業,對資源的消耗非常大。
後來我們對其做了一些優化。在任務提交的過程中,底層引擎在產出表的過程中將表行數記錄寫入相應分區信息中,我們就可以直接從 HMS 分區里直接獲取表行數信息,從而避免了 Spark 任務的提交。
優化後的效果非常明顯,目前對於表行數的監控,HMS 獲取行數占比約 90 %,HMS 行數監控平均運行時長在秒級別。
註:這個功能需要推動底層服務配合支持,比如 Spark 需要把保存在本地 metric 裡面的信息寫入到 HMS 中,其他數據傳輸系統也需要支持。
離線監控優化
這一塊是基於 Griffin 的 Measure 來進行,Measure 本身有豐富的功能,我們對其進行了裁剪以節約耗時。主要的裁剪和優化包括:
- 裁剪掉部分異常數據收集功能;
- 優化非必要的 join 流程。
另外,我們也對離線監控的執行參數進行了優化,主要包括:
- 根據不同的監控類型,添加不同的參數 (shuffle to hdfs 等);
- 根據監控特性,預設參數優化(上調 vcore 等)。
舉個例子:用戶寫了 SQL 進行數據的 join,執行引擎可以分析出執行計劃。對於 join 類的操作,shuffle 可能非常大,這種情況下我們預設會開一些 Spark 參數。根據表行數來預判數據表的大小,如果判斷數據表比較大,會預設微調 vcore 和 memory。以上這些優化都能在一定程度上提升性能,目前平臺上各類監控的平均運行時長縮短了 10% 以上。
引入 OLAP 引擎
平臺上很多數據表和業務表(除了日誌表以外),在數倉上層的表監控數據量不是很大,這種情況很適合進行 OLAP 的查詢。
這種情況下我們在數據探查場景引入了 presto。之前在該場景下通過 Spark 做探查,引入 presto 之後通過快速 fail 機制,大數據量、計算複雜的探查任務 fallback 到提交 Spark 作業,探查時間中位數從之前的 7min 縮短到目前的不到 40s,效果非常顯著。
流式監控支持抽樣 & 單 Topic 多 Rule 優化
Kafka 數據抽樣
一般流式數據的問題都是通用性問題,可以通過數據採樣發現問題。因此我們開發了數據採樣的功能,減少數據資源的占比消耗。Flink Kafka Connector 支持抽樣,可直接操作 kafka topic 的 offset 來達到抽樣的目的。比如,我們按照 1% 的比例進行抽樣,原來上 W 個 partition 的 Topic,我們只需要 ** 個機器就可以支撐。
單 Topic 多 Rule 優化
最早的時候我們是對一個 Topic 定義一個 Rule,然後開啟一個 Flink 任務進行消費,執行 Rule。後來我們發現一些關鍵的數據需要對多個維度進行監控,也就是要定義多個維度的 Rule,對每一條 Rule 都開任務去消費是非常耗資源的,所以我們利用監控不是 CPU 密集型作業的特性,復用讀取部分,單 slot 中執行多個 Rule,對 Topic 級別進行單一消費,在一個任務中把相關 Rule 都執行完。
未來演進方向
本文介紹了火山引擎 DataLeap 數據質量平臺的實現和最佳實踐,最後談談平臺未來的演進方向。
- 底層引擎統一,流批一體:目前平臺的離線任務大部分是基於 Spark 完成的,流式數據採用了 Flink 處理,OLAP 引擎又引進了 presto,導致這套系統架構的運維成本比較高。我們看到 Flink 目前的 presto 能力和 Flinkbatch 的能力也在不斷發展,因此我們後續會嘗試切一些任務,做到真正意義上的統一引擎。
- 智能:引入演算法進行數據驅動。考慮引入 ML 方法輔助閾值選取或者智能報警,根據數據等級自動推薦質量規則。舉幾個例子,比如我們可以基於時序演算法智能的波動率監控來解決節假日流量高峰和平常的硬規則閾值的提升。
- 便捷:OLAP 對性能提升比較顯著,但是目前我們只用在了數據探查功能上。後續可以將 OLAP 引擎應用於質量檢測、數據據探查、數據對比應用與數據開發流程。
- 優化:比如通過單一 Job,同時運行多個監控,將監控和數據探查結合。我們現在在嘗試將數據質量的規則生成和數據探查做結合,做到所見即所得的數據和規則的對應關係。
本文介紹的數據質量監控的能力目前大部分已通過火山引擎 DataLeap 對外提供服務。
點擊跳轉Dataleap瞭解更多