原文:Flink 流式聚合性能調優指南 SQL 是數據分析中使用最廣泛的語言。Flink Table API 和 SQL 使用戶能夠以更少的時間和精力定義高效的流分析應用程式。此外,Flink Table API 和 SQL 是高效優化過的,它集成了許多查詢優化和運算元優化。但並不是所有的優化都是預設 ...
SQL 是數據分析中使用最廣泛的語言。Flink Table API 和 SQL 使用戶能夠以更少的時間和精力定義高效的流分析應用程式。此外,Flink Table API 和 SQL 是高效優化過的,它集成了許多查詢優化和運算元優化。但並不是所有的優化都是預設開啟的,因此對於某些工作負載,可以通過打開某些選項來提高性能。
這裡將介紹一些實用的優化選項以及流式聚合的內部原理,它們在某些情況下能帶來很大的提升。
註意:(1)目前,這裡提到的優化選項僅支持 Blink planner。(2)目前,流聚合優化僅支持無界聚合,視窗聚合優化將在未來支持。
預設情況下,無界聚合運算元是逐條處理輸入的記錄,即:(1)從狀態中讀取累加器,(2)累加/撤回記錄至累加器,(3)將累加器寫回狀態,(4)下一條記錄將再次從(1)開始處理。這種處理模式可能會增加 StateBackend 開銷(尤其是對於 RocksDB StateBackend )。此外,生產中非常常見的數據傾斜會使這個問題惡化,並且容易導致 job 發生反壓。
MiniBatch 聚合
MiniBatch 聚合的核心思想是將一組輸入的數據緩存在聚合運算元內部的緩衝區中。當輸入的數據被觸發處理時,每個 key 只需一個操作即可訪問狀態。這樣可以大大減少狀態開銷並獲得更好的吞吐量。但是,這可能會增加一些延遲,因為它會緩衝一些記錄而不是立即處理它們。這是吞吐量和延遲之間的權衡。
下圖說明瞭 mini-batch 聚合如何減少狀態操作。
Flink 流式聚合性能調優指南
預設情況下 mini-batch 優化是被禁用的。開啟這項優化,需要設置選項
table.exec.mini-batch.enabled、
table.exec.mini-batch.allow-latency 和
table.exec.mini-batch.size。
下麵的例子顯示如何啟用這些選項。
// instantiate table environment
TableEnvironment tEnv = ...
// access flink configuration
Configuration configuration = tEnv.getConfig().getConfiguration();
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true"); // enable mini-batch optimization
configuration.setString("table.exec.mini-batch.allow-latency", "5 s"); // use 5 seconds to buffer input records
configuration.setString("table.exec.mini-batch.size", "5000"); // the maximum number of records can be buffered by each aggregate operator task
Local-Global 聚合
Local-Global 聚合是為解決數據傾斜問題提出的,通過將一組聚合分為兩個階段,首先在上游進行本地聚合,然後在下游進行全局聚合,類似於 MapReduce 中的 Combine + Reduce 模式。例如,就以下 SQL 而言:
SELECT color, sum(id)
FROM T
GROUP BY color
數據流中的記錄可能會傾斜,因此某些聚合運算元的實例必須比其他實例處理更多的記錄,這會產生熱點問題。本地聚合可以將一定數量具有相同 key 的輸入數據累加到單個累加器中。全局聚合將僅接收 reduce 後的累加器,而不是大量的原始輸入數據。這可以大大減少網路 shuffle 和狀態訪問的成本。每次本地聚合累積的輸入數據量基於 mini-batch 間隔。這意味著 local-global 聚合依賴於啟用了 mini-batch 優化。
下圖顯示了 local-global 聚合如何提高性能。
Flink 流式聚合性能調優指南
下麵的例子顯示如何啟用 local-global 聚合。
// instantiate table environment
TableEnvironment tEnv = ...
// access flink configuration
Configuration configuration = tEnv.getConfig().getConfiguration();
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true"); // local-global aggregation depends on mini-batch is enabled
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
configuration.setString("table.exec.mini-batch.size", "5000");
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); // enable two-phase, i.e. local-global aggregation
拆分 distinct 聚合
Local-Global 優化可有效消除常規聚合的數據傾斜,例如 SUM、COUNT、MAX、MIN、AVG。但是在處理 distinct 聚合時,其性能並不令人滿意。
例如,如果我們要分析今天有多少唯一用戶登錄。我們可能有以下查詢:
SELECT day, COUNT(DISTINCT user_id)
FROM T
GROUP BY day
如果 distinct key (即 user_id)的值分佈稀疏,則 COUNT DISTINCT 不適合減少數據。即使啟用了 local-global 優化也沒有太大幫助。因為累加器仍然包含幾乎所有原始記錄,並且全局聚合將成為瓶頸(大多數繁重的累加器由一個任務處理,即同一天)。
這個優化的想法是將不同的聚合(例如 COUNT(DISTINCT col))分為兩個級別。第一次聚合由 group key 和額外的 bucket key 進行 shuffle。bucket key 是使用 HASH_CODE(distinct_key) % BUCKET_NUM 計算的。BUCKET_NUM 預設為1024,可以通過
table.optimizer.distinct-agg.split.bucket-num 選項進行配置。第二次聚合是由原始 group key 進行 shuffle,並使用 SUM 聚合來自不同 buckets 的 COUNT DISTINCT 值。由於相同的 distinct key 將僅在同一 bucket 中計算,因此轉換是等效的。bucket key 充當附加 group key 的角色,以分擔 group key 中熱點的負擔。bucket key 使 job 具有可伸縮性來解決不同聚合中的數據傾斜/熱點。
拆分 distinct 聚合後,以上查詢將被自動改寫為以下查詢:
SELECT day, SUM(cnt)
FROM (
SELECT day, COUNT(DISTINCT user_id) as cnt
FROM T
GROUP BY day, MOD(HASH_CODE(user_id), 1024)
)
GROUP BY day
下圖顯示了拆分 distinct 聚合如何提高性能(假設顏色表示 days,字母表示 user_id)。
Flink 流式聚合性能調優指南
註意:上面是可以從這個優化中受益的最簡單的示例。除此之外,Flink 還支持拆分更複雜的聚合查詢,例如,多個具有不同 distinct key (例如 COUNT(DISTINCT a), SUM(DISTINCT b) )的 distinct 聚合,可以與其他非 distinct 聚合(例如 SUM、MAX、MIN、COUNT )一起使用。
註意 當前,拆分優化不支持包含用戶定義的 AggregateFunction 聚合。
下麵的例子顯示瞭如何啟用拆分 distinct 聚合優化。
// instantiate table environment
TableEnvironment tEnv = ...
tEnv.getConfig() // access high-level configuration
.getConfiguration() // set low-level key-value options
.setString("table.optimizer.distinct-agg.split.enabled", "true"); // enable distinct agg split
在 distinct 聚合上使用 FILTER 修飾符
在某些情況下,用戶可能需要從不同維度計算 UV(獨立訪客)的數量,例如來自 Android 的 UV、iPhone 的 UV、Web 的 UV 和總 UV。很多人會選擇 CASE WHEN,例如:
SELECT
day,
COUNT(DISTINCT user_id) AS total_uv,
COUNT(DISTINCT CASE WHEN flag IN ('android', 'iphone') THEN user_id ELSE NULL END) AS app_uv,
COUNT(DISTINCT CASE WHEN flag IN ('wap', 'other') THEN user_id ELSE NULL END) AS web_uv
FROM T
GROUP BY day
但是,在這種情況下,建議使用 FILTER 語法而不是 CASE WHEN。因為 FILTER 更符合 SQL 標準,並且能獲得更多的性能提升。FILTER 是用於聚合函數的修飾符,用於限制聚合中使用的值。將上面的示例替換為 FILTER 修飾符,如下所示:
SELECT
day,
COUNT(DISTINCT user_id) AS total_uv,
COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('android', 'iphone')) AS app_uv,
COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('wap', 'other')) AS web_uv
FROM T
GROUP BY day
Flink SQL 優化器可以識別相同的 distinct key 上的不同過濾器參數。例如,在上面的示例中,三個 COUNT DISTINCT 都在 user_id 一列上。Flink 可以只使用一個共用狀態實例,而不是三個狀態實例,以減少狀態訪問和狀態大小。在某些工作負載下,可以獲得顯著的性能提升。