Flink 流式聚合性能調優指南

来源:https://www.cnblogs.com/cjblogs/archive/2020/05/27/12973063.html
-Advertisement-
Play Games

原文:Flink 流式聚合性能調優指南 SQL 是數據分析中使用最廣泛的語言。Flink Table API 和 SQL 使用戶能夠以更少的時間和精力定義高效的流分析應用程式。此外,Flink Table API 和 SQL 是高效優化過的,它集成了許多查詢優化和運算元優化。但並不是所有的優化都是預設 ...


原文:Flink 流式聚合性能調優指南

SQL 是數據分析中使用最廣泛的語言。Flink Table API 和 SQL 使用戶能夠以更少的時間和精力定義高效的流分析應用程式。此外,Flink Table API 和 SQL 是高效優化過的,它集成了許多查詢優化和運算元優化。但並不是所有的優化都是預設開啟的,因此對於某些工作負載,可以通過打開某些選項來提高性能。

這裡將介紹一些實用的優化選項以及流式聚合的內部原理,它們在某些情況下能帶來很大的提升。

註意:(1)目前,這裡提到的優化選項僅支持 Blink planner。(2)目前,流聚合優化僅支持無界聚合,視窗聚合優化將在未來支持。

預設情況下,無界聚合運算元是逐條處理輸入的記錄,即:(1)從狀態中讀取累加器,(2)累加/撤回記錄至累加器,(3)將累加器寫回狀態,(4)下一條記錄將再次從(1)開始處理。這種處理模式可能會增加 StateBackend 開銷(尤其是對於 RocksDB StateBackend )。此外,生產中非常常見的數據傾斜會使這個問題惡化,並且容易導致 job 發生反壓。

MiniBatch 聚合
MiniBatch 聚合的核心思想是將一組輸入的數據緩存在聚合運算元內部的緩衝區中。當輸入的數據被觸發處理時,每個 key 只需一個操作即可訪問狀態。這樣可以大大減少狀態開銷並獲得更好的吞吐量。但是,這可能會增加一些延遲,因為它會緩衝一些記錄而不是立即處理它們。這是吞吐量和延遲之間的權衡。

下圖說明瞭 mini-batch 聚合如何減少狀態操作。

Flink 流式聚合性能調優指南
預設情況下 mini-batch 優化是被禁用的。開啟這項優化,需要設置選項
table.exec.mini-batch.enabled、
table.exec.mini-batch.allow-latency 和
table.exec.mini-batch.size。

下麵的例子顯示如何啟用這些選項。

// instantiate table environment
TableEnvironment tEnv = ...

// access flink configuration
Configuration configuration = tEnv.getConfig().getConfiguration();
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true"); // enable mini-batch optimization
configuration.setString("table.exec.mini-batch.allow-latency", "5 s"); // use 5 seconds to buffer input records
configuration.setString("table.exec.mini-batch.size", "5000"); // the maximum number of records can be buffered by each aggregate operator task
Local-Global 聚合
Local-Global 聚合是為解決數據傾斜問題提出的,通過將一組聚合分為兩個階段,首先在上游進行本地聚合,然後在下游進行全局聚合,類似於 MapReduce 中的 Combine + Reduce 模式。例如,就以下 SQL 而言:

SELECT color, sum(id)
FROM T
GROUP BY color
數據流中的記錄可能會傾斜,因此某些聚合運算元的實例必須比其他實例處理更多的記錄,這會產生熱點問題。本地聚合可以將一定數量具有相同 key 的輸入數據累加到單個累加器中。全局聚合將僅接收 reduce 後的累加器,而不是大量的原始輸入數據。這可以大大減少網路 shuffle 和狀態訪問的成本。每次本地聚合累積的輸入數據量基於 mini-batch 間隔。這意味著 local-global 聚合依賴於啟用了 mini-batch 優化。

下圖顯示了 local-global 聚合如何提高性能。

Flink 流式聚合性能調優指南
下麵的例子顯示如何啟用 local-global 聚合。

// instantiate table environment
TableEnvironment tEnv = ...

// access flink configuration
Configuration configuration = tEnv.getConfig().getConfiguration();
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true"); // local-global aggregation depends on mini-batch is enabled
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
configuration.setString("table.exec.mini-batch.size", "5000");
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); // enable two-phase, i.e. local-global aggregation
拆分 distinct 聚合
Local-Global 優化可有效消除常規聚合的數據傾斜,例如 SUM、COUNT、MAX、MIN、AVG。但是在處理 distinct 聚合時,其性能並不令人滿意。

例如,如果我們要分析今天有多少唯一用戶登錄。我們可能有以下查詢:

SELECT day, COUNT(DISTINCT user_id)
FROM T
GROUP BY day
如果 distinct key (即 user_id)的值分佈稀疏,則 COUNT DISTINCT 不適合減少數據。即使啟用了 local-global 優化也沒有太大幫助。因為累加器仍然包含幾乎所有原始記錄,並且全局聚合將成為瓶頸(大多數繁重的累加器由一個任務處理,即同一天)。

這個優化的想法是將不同的聚合(例如 COUNT(DISTINCT col))分為兩個級別。第一次聚合由 group key 和額外的 bucket key 進行 shuffle。bucket key 是使用 HASH_CODE(distinct_key) % BUCKET_NUM 計算的。BUCKET_NUM 預設為1024,可以通過
table.optimizer.distinct-agg.split.bucket-num 選項進行配置。第二次聚合是由原始 group key 進行 shuffle,並使用 SUM 聚合來自不同 buckets 的 COUNT DISTINCT 值。由於相同的 distinct key 將僅在同一 bucket 中計算,因此轉換是等效的。bucket key 充當附加 group key 的角色,以分擔 group key 中熱點的負擔。bucket key 使 job 具有可伸縮性來解決不同聚合中的數據傾斜/熱點。

拆分 distinct 聚合後,以上查詢將被自動改寫為以下查詢:

SELECT day, SUM(cnt)
FROM (
SELECT day, COUNT(DISTINCT user_id) as cnt
FROM T
GROUP BY day, MOD(HASH_CODE(user_id), 1024)
)
GROUP BY day
下圖顯示了拆分 distinct 聚合如何提高性能(假設顏色表示 days,字母表示 user_id)。

Flink 流式聚合性能調優指南
註意:上面是可以從這個優化中受益的最簡單的示例。除此之外,Flink 還支持拆分更複雜的聚合查詢,例如,多個具有不同 distinct key (例如 COUNT(DISTINCT a), SUM(DISTINCT b) )的 distinct 聚合,可以與其他非 distinct 聚合(例如 SUM、MAX、MIN、COUNT )一起使用。

註意 當前,拆分優化不支持包含用戶定義的 AggregateFunction 聚合。

下麵的例子顯示瞭如何啟用拆分 distinct 聚合優化。

// instantiate table environment
TableEnvironment tEnv = ...

tEnv.getConfig() // access high-level configuration
.getConfiguration() // set low-level key-value options
.setString("table.optimizer.distinct-agg.split.enabled", "true"); // enable distinct agg split
在 distinct 聚合上使用 FILTER 修飾符
在某些情況下,用戶可能需要從不同維度計算 UV(獨立訪客)的數量,例如來自 Android 的 UV、iPhone 的 UV、Web 的 UV 和總 UV。很多人會選擇 CASE WHEN,例如:

SELECT
day,
COUNT(DISTINCT user_id) AS total_uv,
COUNT(DISTINCT CASE WHEN flag IN ('android', 'iphone') THEN user_id ELSE NULL END) AS app_uv,
COUNT(DISTINCT CASE WHEN flag IN ('wap', 'other') THEN user_id ELSE NULL END) AS web_uv
FROM T
GROUP BY day
但是,在這種情況下,建議使用 FILTER 語法而不是 CASE WHEN。因為 FILTER 更符合 SQL 標準,並且能獲得更多的性能提升。FILTER 是用於聚合函數的修飾符,用於限制聚合中使用的值。將上面的示例替換為 FILTER 修飾符,如下所示:

SELECT
day,
COUNT(DISTINCT user_id) AS total_uv,
COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('android', 'iphone')) AS app_uv,
COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('wap', 'other')) AS web_uv
FROM T
GROUP BY day
Flink SQL 優化器可以識別相同的 distinct key 上的不同過濾器參數。例如,在上面的示例中,三個 COUNT DISTINCT 都在 user_id 一列上。Flink 可以只使用一個共用狀態實例,而不是三個狀態實例,以減少狀態訪問和狀態大小。在某些工作負載下,可以獲得顯著的性能提升。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • Data Guard環境中資料庫的角色轉換有兩種,分別為Switchover和Failover,通過名稱可知,前者是正常的主備庫之間的角色切換,該切換方式不會丟失數據;後者是故障切換,即主庫不能繼續提供服務的切換,可能發生數據丟失。從12.1版本開始,切換到物理備庫的操作得以簡化,本篇分別對這兩種切 ...
  • 由於主庫和備庫的db_name相同的,只是db_unique_name不同,可以使用備庫進行資料庫的備份,從而減輕主庫備份的負擔。本篇演示DataGuard環境下RMAN工具的使用。 1 查看主庫RMAN配置 [oracle@sz ~]$ rman target / catalog rman/rma ...
  • 1 軟體環境 Oracle Linux 6.9 Oracle 12.1.0.2 主庫:sz.oracle.com,IP:192.168.1.102 備庫:sh.oracle.com,IP:192.168.1.103 2 主資料庫配置 2.1 檢查並設置資料庫 1)首先,主資料庫必須處於強制日誌(Fo ...
  • Oracle Management Agent是Cloud Control 12c的核心組件,它被安裝在每個被CC管理的主機上,並監控主機及該主機上的所有目標,同時將這些目標信息提交給OMS。本篇將演示如何使用CC進行Management Agent的安裝。 1 點擊設置>添加目錄>手動添加目標 2 ...
  • 前面介紹瞭如何通過RMAN Duplicate方式構建物理備用資料庫,本篇將演示如何利用Cloud Control構建物理備用資料庫。 1 軟體環境 Oracle Linux 6.9 Oracle 12.1.0.2 Oracle Enterprise Management Cloud Control ...
  • 上篇演示瞭如何使用Cloud Control在目標機安裝和配置Oracle Management Agent,本篇將介紹如何使用Cloud Control管理資料庫。 1 手動添加資料庫 1)選擇設置>添加目標>手動添加目標 2)選擇指令和目標類型,點擊使用指導式流程添加按鈕 3)選擇目標名稱,點擊 ...
  • 1 軟體環境 Oracle Linux 6.9 Oracle Enterprise Manager Cloud Control 12.1.0.4 Oracle 11.2.0.3 2 軟體準備 1)下載EM資料庫模板 https://www.oracle.com/enterprise-manager/ ...
  • 寫在前面 Oracle 12c 可以通過熱圖和自動數據優化(ADO)實現信息生命周期的管理(ILM),上篇介紹了熱圖,本篇將對自動數據優化進行展開,通過熱圖和自動數據優化,最終實現信息生命周期管理。 1 自動數據優化工作流程 使用自動數據優化,必須先在系統級別啟用熱圖,通過修改初始化參數heat_m ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...