問題描述 近期業務反饋, 開啟了 mini-batch 之後, 出現了數據不准的情況, 關掉了 mini-batch 之後, 就正常了, 因此業務方懷疑,是不是 Flink 的 mini-batch 存在 bug ? 問題排查 初步分析 mini-batch 已經在內部大規模使用, 目前沒有發現一例 ...
問題描述
近期業務反饋, 開啟了 mini-batch 之後, 出現了數據不准的情況, 關掉了 mini-batch 之後, 就正常了, 因此業務方懷疑,是不是 Flink 的 mini-batch 存在 bug ?
問題排查
初步分析
- mini-batch 已經在內部大規模使用, 目前沒有發現一例和開啟 mini-batch 有關, 同時 mini-batch 本質只是將數據進行攢批然後計算, 並沒有修改核心的運算邏輯.
- 開關 mini-batch 的關鍵時數據的批量計算, 是否在批量計算使得原本存在 bug 的代碼暴露問題
- 業務在 Flink SQL 使用了多個雙流 join 和 group window,如果不註意使用,很可能導致亂序,最終的錯誤結果是某條數據沒有被正常更新, 和亂序的情況比較類似.
綜上考慮, 整體排查的方向還是排查 SQL 的業務邏輯是否存在亂序的 case, 開啟了 mini-batch 後是否加劇
了這種亂序的產生
代碼邏輯梳理
flowchart LR join1(join1 \n item_day, item_key) --> join2 join2(join2 \n item_day, item_key) --> join3 join3(join3 \n item_day, item_key) --> group1 group1(group1 \n item_day, item_key) --> group2 group2(group2 \n item_day, item_key, key1, key2, key3) --> sink sink(sink \n pk: item_day, item_key)抽象之後的 DAG 如圖所示:
- join1, join2, join3, group1 都是基於 item_day 和 item_key 進行 hash 數據經過這些運算元均按照 [item_day, item_key] 進行 hash
- group2 運算元的 group key 為 [item_day, item_key, key1, key2, key3],Flink 會基於這些欄位整體進行 hash
- Sink 運算元的主鍵為 [item_day, item_key] ,數據流向 Sink 運算元時會按照 [item_day, item_key] 進行 hash.
分析:
key1, key2, key3 時由前面的 join1 運算元補充的維度欄位, 前面的 join 採用的是 left join, 因此可能會存在 item_day 和 item_key 相同的數據, 對應的 key1, key2, key3 並不相同, 經過 group2 會觸發具有相同 [item_day, item_key] 的數據,被 hash 到不同的併發,這種就出現了亂序問題
修複手段
最後的 group by [item_day, item_key, key1, key2, key3], 核心還是為了聚合相同的 item_day和 item_key, key1, key2, key3 不屬於 value 類型數據, 也不參與聚合, 因此將修改 SQL 避免基於 key1, key2, key3 進行聚合即可, 這裡採用 last_value 聚合函數取最後一條數據
-- 原始 SQL
SELECT item_day, item_key, key1, key2, key3, sum(value)
FROM XXX
GROUP BY item_day, item_key, key1, key2, key3
-- 修改為
SELECT item_day, item_key, last_value(key1), last_value(key2), last_value(key3), sum(value)
FROM XXX
GROUP BY item_day, item_key
經過修改之後,保證整個 Flink 處理鏈路中, 相同的主鍵對應的數據,無論經過多少次 hash, 都是在同一個並行處理,這種才能保證最終結果的正確性
結論
修改後, 業務的結果恢復正常, 因此 Mini-batch 並不是導致作業出現問題的核心原因, 核心原因還是亂序, 而開啟 mini-batch 會加劇這種亂序問題的觸發。
開啟 mini-batch 之後, 具有相同 key 的數據, 如果落到了同一個 batch, 這樣物理上的時間差就更短,因而更容易暴露問題。