FlinkSQL 之亂序問題

来源:https://www.cnblogs.com/0x12345678/archive/2022/06/09/16344727.html
-Advertisement-
Play Games

亂序問題 在業務編寫 FlinkSQL 時, 非常常見的就是亂序相關問題, 在出現問題時,非常難以排查,且無法穩定復現,這樣無論是業務方,還是平臺方,都處於一種非常尷尬的地步。 亂序問題 在業務編寫 FlinkSQL 時, 非常常見的就是亂序相關問題, 在出現問題時,非常難以排查,且無法穩定復現,這 ...


亂序問題

在業務編寫 FlinkSQL 時, 非常常見的就是亂序相關問題, 在出現問題時,非常難以排查,且無法穩定復現,這樣無論是業務方,還是平臺方,都處於一種非常尷尬的地步。

亂序問題

在業務編寫 FlinkSQL 時, 非常常見的就是亂序相關問題, 在出現問題時,非常難以排查,且無法穩定復現,這樣無論是業務方,還是平臺方,都處於一種非常尷尬的地步。

在實時 join 中, 如果是 Regular Join, 則使用的是 Hash Join 方式, 左表和右表根據 Join Key 進行hash,保證具有相同 Join Key 的數據能夠 Hash 到同一個併發,進行 join 的計算 。
在實時聚合中, 主要普通的 group window, over window, time window 這幾中開窗方式,都涉及到 task 和 task 之間 hash 方式進行數據傳輸。
因此, 在比較複雜的邏輯中, 一條數據在整個數據流中需要進行不同的 hash 方式, 特別時當我們處理 CDC 數據時, 一定要要求數據嚴格有序, 否則可能會導致產生錯誤的結果。

以下麵的例子進行說明, 以下有三張表, 分別是訂單表, 訂單明細表, 和商品類目 。

  • 這三張表的實時數據都從 MySQL 採集得到並實時寫入 Kafka, 均會實時發生變化, 無法使用視窗計算
  • 除了訂單表有訂單時間, 其他兩張表都沒有時間屬性, 因此無法使用watermark

CREATE TABLE orders (
	order_id VARCHAR,
	order_time TIMESTAMP
) WITH (
	'connector' = 'kafka',
	'format' = 'changelog-json'
	...
);

CREATE TABLE order_item (
	order_id VARCHAR,
	item_id VARCHAR
) WITH (
	'connector' = 'kafka',)
	'format' = 'changelog-json'
	...
);

CREATE TABLE item_detail (
	item_id VARCHAR,
	item_name VARCHAR,
	item_price BIGINT
) WITH (
	'connector' = 'kafka',
	'format' = 'changelog-json'
	...
);

使用 Regular Join 進行多路 Join,數據表打寬操作如下所示


SELECT o.order_id, i.item_id, d.item_name, d.item_price, o.order_time
FROM orders o
LEFT JOIN order_item i ON o.order_id = i.order_id
LEFT JOIN item_detail d ON i.item_id = d.item_id

最終生成的 DAG 圖如下所示:

可以發現:
第一個 join (後面統一簡稱為ijoin1)的條件是 order_id,該 join 的兩個輸入會以 order_id 進行hash,具有相同 order_id 的數據能夠被髮送到同一個 subtask

第二個 join (後面統一簡稱為 join2)的條件則是 item_id, 該 join 的兩個輸入會以 item_id 進行hash,具有相同 item_id 的數據則會被髮送到同一個 subtask.

正常情況下, 具有相同 order_id 的數據, 一定具有相同的 item_id,但由於上面的示例代碼中,我們使用的是 left join 的寫法, 即使沒有 join 上, 也會輸出為 null 的數據,這樣可能導致了最終結果的不確定性。

以下麵的數據為示例,再詳細說明一下:

TABLE orders

order_id order_time
id_001 2022-06-03 00:00:00

TABLE order_item

order_id item_id
id_001 item_001

TABLE item_detail

item_id item_name item_price
item_001 類目1 10

輸出數據如下:
1) 表示輸出數據的併發
+I 表示數據的屬性 (+I, , -D, -U, +U)
第一個 JOIN 輸出

1) +I(id_001, null, 2022-06-03 00:00:00)
1) -D(id_001, null, 2022-06-03 00:00:00)
1) +I(id_001, item_001, 2022-06-03 00:00:00)

第二個 JOIN 輸出

1) +I(id_001, null, null, null, 2022-06-03 00:00:00)
1) -D(id_001, null, null, null, 2022-06-03 00:00:00)
2) +I(id_001, item_001, null, null, 2022-06-03 00:00:00)
2) -D(id_001, item_001, null, null, 2022-06-03 00:00:00)
2) +I(id_001, item_001, 類目1, 10, 2022-06-03 00:00:00)

以上結果只是上述作業可能出現的情況之一,實際運行時並不一定會出現。 我們可以發現 join1 結果發送到 join2 之後, 相同的 order_id 並不一定會發送到同一個 subtask,因此當數據經過了 join2, 相同的 order_id 的數據會落到不同的併發,這樣在後續的數據處理中, 有非常大的概率會導致最終結果的不確定性。

我們再細分以下場景考慮, 假設經過 join2 之後的結果為 join_view:

  1. 假設 join2 之後,我們基於 item_id 進行聚合, 統計相同類目的訂單數
SELECT item_id, sum(order_id)
FROM join_view
GROUP BY item_id

很顯然, 上述的亂序問題並不會影響這段邏輯的結果, item_id 為 null 的數據會進行計算, 但並不會影響 item_id 為 item_001 的結果.

  1. 假設 join2 之後, 我們將結果直接寫入 MySQL, MySQL 主鍵為 order_id
CREATE TABLE MySQL_Sink (
	order_id VARCHAR,
	item_id VARCHAR,
	item_name VARCHAR,
	item_price INT,
	order_time TIMESTAMP,
	PRIMARY KEY (order_id) NOT ENFORCED
) with (
	'connector' = 'jdbc'
);

INSERT INTO MySQL_Sink SELECT * FROM JOIN_VIEW;

由於我們在 Sink connector 中未單獨設置併發, 因此 sink 的併發度是和 join2 的併發是一樣的, 因此 join2 的輸出會直接發送給 sink 運算元, 並寫入到 MySQL 中。
由於是不同併發同時在寫 MySQL ,所以實際寫 MySQL的順序可能如下所示:

2) +I(id_001, item_001, null, null, 2022-06-03 00:00:00)
2) -D(id_001, item_001, null, null, 2022-06-03 00:00:00)
2) +I(id_001, item_001, 類目1, 10, 2022-06-03 00:00:00)
1) +I(id_001, null, null, 2022-06-03 00:00:00)
1) -D(id_001, null, null, 2022-06-03 00:00:00)

很顯然, 最終結果會是 為 空, 最終寫入的是一條 delete 數據

  1. 假設 join2 之後, 我們將結果直接寫入 MySQL, 主鍵為 order_id, item_id, item_name
CREATE TABLE MySQL_Sink (
	order_id VARCHAR,
	item_id VARCHAR,
	item_name VARCHAR,
	item_price INT,
	order_time TIMESTAMP,
	PRIMARY KEY (order_id) NOT ENFORCED
) with (
	'connector' = 'jdbc'
);

INSERT INTO MySQL_Sink SELECT * FROM JOIN_VIEW;

和示例2一樣, 我們未單獨設置 sink 的併發, 因此數據會之間發送到 sink 運算元, 假設寫入 MySQL 的順序和示例2一樣:

2) +I(id_001, item_001, null, null, 2022-06-03 00:00:00)
2) -D(id_001, item_001, null, null, 2022-06-03 00:00:00)
2) +I(id_001, item_001, 類目1, 10, 2022-06-03 00:00:00)
1) +I(id_001, null, null, null, 2022-06-03 00:00:00)
1) -D(id_001, null, null, null, 2022-06-03 00:00:00)

最終結果會是

2) +I(id_001, item_001, 類目1, 2022-06-03 00:00:00)

由於 MySQL 的主鍵是 order_id, item_id, item_name 所以最後的 -D 記錄並不會刪除 subtask 2 寫入的數據, 這樣最終的結果是正確的。

  1. 假設 join2 之後, 我們將結果寫入 kafka,寫入格式為 changelog-json , 下游作業消費 kafka 併進行處理
CREATE TABLE kafka_sink (
	order_id VARCHAR,
	item_id VARCHAR,
	item_name VARCHAR,
	item_price INT,
	order_time TIMESTAMP,
	PRIMARY KEY (order_id, item_id, item_name) NOT ENFORCED
) with (
	'connector' = 'kafka',
	'format' = 'changelog-json',
	'topic' = 'join_result'
);

INSERT INTO kafka_sink select * from JOIN_VIEW;

預設,如果不設置 partitioner, kafka sink 會以我們在 DDL 中配置的主鍵生成對應的 hash key, 用於通過 hash 值生成 partition id。
有一點我們需要註意, 由於 join2 的輸出已經在不同的併發了, 所以無論 kafka_sink 選擇以 order_id 作為唯一的主鍵, 還是以 order_id, item_id, item_name 作為主鍵, 我們都無法控制不同併發寫入 kafka 的順序, 我們只能確保相同的併發的數據能夠有序的被寫入 kafka 的同一 partition 。

  • 如果設置 order_id 為主鍵, 我們可以保證上述的所有數據能夠被寫入同一個 partition
  • 如果設置 order_id, item_id, item_name 則上面不同併發的輸出可能會被寫入到不同的 partition

所以,我們需要關註的是, 當數據寫入 kafka 之後, 下游怎麼去處理這一份數據:

  1. 基於 order_id 進行去重,並按天聚合,計算當天的累加值。
    以下麵的 SQL 為例, 下游在消費 kafka 時, 為了避免數據重覆, 先基於 order_id 做了一次去重, 用 order_id 作為分區條件, 基於proctime() 進行去重 (增加table.exec.source.cdc-events-duplicate 該參數, 框架會自動生成去重運算元).
-- https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/
set 'table.exec.source.cdc-events-duplicate'='true';

CREATE TABLE kafka_source (
	order_id VARCHAR,
	item_id VARCHAR,
	item_name VARCHAR,
	item_price INT,
	order_time TIMESTAMP,
	PRIMARY KEY (order_id) NOT ENFORCED
) with (
	'connector' = 'kafka',
	'format' = 'changelog-json',
	'topic' = 'join_result'
);

-- 按order_time 聚合, 計算每天的營收

SELECT DATE_FORMAT(order_time, 'yyyy-MM-dd'), sum(item_price)
FROM kafka_source
GROUP BY DATE_FORMAT(order_time, 'yyyy-MM-dd')

結果上述的計算,我們預計結果會如何輸出:
去重之後可能的輸出為:

1) +I(id_001, item_001, null, null, 2022-06-03 00:00:00)
1) -D(id_001, item_001, null, null, 2022-06-03 00:00:00)
1) +I(id_001, item_001, 類目1, 10, 2022-06-03 00:00:00)
1) -D(id_001, item_001, 類目1, 10, 2022-06-03 00:00:00)
1) +I(id_001, item_001, null, null, 2022-06-03 00:00:00)
1) -D(id_001, item_001, null, null, 2022-06-03 00:00:00)

經過聚合運算元運算元:

1) +I(2022-06-03, null)
1) -D(2022-06-03, null)
1) +I(2022-06-03, 10)
1) -D(2022-06-03, 10)
1) +I(2022-06-03, 0)
1) -D(2022-06-03, 0)
1) +I(2022-06-03, null)
1) -D(2022-06-03, null)

可以發現,最終輸出結果為2022-06-03, null,本文列舉的示例不夠完善, 正常情況下, 當天肯定會有其他的記錄, 結果當天的結果可能不會為 null, 但我們可以知道的是,由於數據的亂序, 數據和實際結果已經不准確了。
2) 基於order_id, item_id, item_name 去重,之後按天聚合,計算當天的累加值。

-- https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/
set 'table.exec.source.cdc-events-duplicate'='true';

CREATE TABLE kafka_source (
	order_id VARCHAR,
	item_id VARCHAR,
	item_name VARCHAR,
	item_price INT,
	order_time TIMESTAMP,
	PRIMARY KEY (order_id, item_id, item_name) NOT ENFORCED
) with (
	'connector' = 'kafka',
	'format' = 'changelog-json',
	'topic' = 'join_result'
);

-- 按order_time 聚合, 計算每天的營收

SELECT DATE_FORMAT(order_time, 'yyyy-MM-dd'), sum(item_price)
FROM kafka_source
GROUP BY DATE_FORMAT(order_time, 'yyyy-MM-dd')

去重之後的輸出:

1) +I(id_001, item_001, null, null, 2022-06-03 00:00:00)
1) -D(id_001, item_001, null, null, 2022-06-03 00:00:00)
1) +I(id_001, item_001, 類目1, 10, 2022-06-03 00:00:00)
2) +I(id_001, item_001, null, null, 2022-06-03 00:00:00)
2) -D(id_001, item_001, null, null, 2022-06-03 00:00:00)

由於我們主鍵設置的是 order_id, item_id, item_name 所以
(id_001, item_001, null, null, 2022-06-03 00:00:00)
1) +I(id_001, item_001, 類目1, 10, 2022-06-03 00:00:00) 是不同的主鍵, 所以並不會互相影響。
經過聚合之後的結果:

1) +I(2022-06-03, null)
1) -D(2022-06-03, null)
1) +I(2022-06-03, 10)
1) -D(2022-06-03, 10)
1) +I(2022-06-03, 10)
1) -D(2022-06-03, 10)
1) +I(2022-06-03, 10)

以上就是最終結果的輸出, 可以發現我們最終的結果是沒有問題的。

原因分析

下圖是原始作業的數據流轉變化情況

graph LR orders(orders) --> |hash:order_id| join1(join1) order_item(order_item) -->|hash:order_id| join1 join1 --> |hash:item_id| join2(join2) item_detail(item_detail) --> |hash:item_id| join2
  • A 基於 item_id 聚合, 計算相同類目的訂單數 (結果正確)
graph LR join2(join2) --> |hash:item_id| group(group count:order_id)
  • B 將join的數據 sink 至 MySQL (主鍵為 order_id) (結果錯誤)
graph LR join2(join2) --> |forward| sink(sink) sink --> |jdbc send| MySQL(MySQL primarykey:order_id)
  • C 將join的數據 sink 至 MySQL (主鍵為 order_id, item_id, item_name) (結果正確)
graph LR join2(join2) --> |forward| sink(sink) sink --> |jdbc send| MySQL(MySQL primarykey:order_id)
  • D 將 join 的數據 sink 至 kafka, 下游消費 kafka 數據併進行去重處理, 下游處理時,又可以分為兩種情況。
    • D-1 按 order_id 分區並去重 (結果錯誤)
    • D-2 按 order_id, item_id, item_name 分區並去重 (結果正確)
graph TD join2(join2) --> |forward| sink(sink) sink --> |kafka client| kafka(Kafka fixed partitioner) kafka --> |hash:order_id| rank(rank orderby:proctime) rank --> |"hash:date_format(order_time, 'yyyy-MM-dd')"| group("group agg:sum(item_price)")
graph TD join2(join2) --> |forward| sink(sink key) sink --> |kafka client| kafka(Kafka fixed partitioner) kafka --> |hash:order_id+item_id+item_name| rank(rank orderby:proctime) rank --> |"hash:date_format(order_time, 'yyyy-MM-dd')"| group("group agg:sum(item_price)")

從上面 A, B, C, D-1, D-2 這四種 case, 我們不難發現, 什麼情況下會導致錯誤的結果, 什麼情況下不會導致錯誤的結果, 關鍵還是要看每個 task 之間的 hash 規則。

case B 產生亂序主要原因時在 sink operator, hash的條件由原來的 order_id+item_id_item_name 變成了 order_id
case D-1 產生亂序主要發生在去重的 operator, hash 的規則由原來的 order_id+item_id+item_name 變為了 order_id

我們大概能總結以下幾點經驗

  • Flink 框架在可以保證 operator 和 operator hash 時, 一定是可以保證具有相同 hash 值的數據的在兩個 operator 之間傳輸順序性
  • Flink 框架無法保證數據連續多個 operator hash 的順序, 當 operator 和 operator 之間的 hash 條件發生變化, 則有可能出現數據的順序性問題。
  • 當 hash 條件由少變多時, 不會產生順序問題, 當 hash 條件由多變少時, 則可能會產生順序問題。

總結

大多數業務都是拿著原來的實時任務, 核心邏輯不變,只是把原來的 Hive 替換成 消息隊列的 Source 表, 這樣跑出來的結果,一般情況下就很難和離線對上,雖然流批一體是 Flink 的優勢, 但對於某些 case , 實時的結果和離線的結果還是會產生差異, 因此我們在編寫 FlinkSQL 代碼時, 一定要確保數據的準備性, 在編寫代碼時,一定要知道我們的數據大概會產生怎樣的流動, 產生怎樣的結果, 這樣寫出來的邏輯才是符合預期的。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • Kubectl 自動補全 BASH source <(kubectl completion bash) # 在 bash 中設置當前 shell 的自動補全,要先安裝 bash-completion 包。 echo "source <(kubectl completion bash)" >> ~/. ...
  • 鏡像下載、功能變數名稱解析、時間同步請點擊 阿裡雲開源鏡像站 一、安裝vmware虛擬機 個人使用可獲得免費許可證,註冊用戶激活即可。 激活成功獲得免費許可證後登陸自己的用戶進官網下載鏈接: 下載完成後,一直下一步即可安裝成功。 二、Centos8 鏡像支持M1晶元 三、安裝Centos8 系統 1、打開虛 ...
  • LNMP是Linux+Nginx+Mysql+PHP所構建的一個動態開發環鏡 我這裡使用的系統是華為的OpenEnler系統,使用了Nginx1.12版本、Mysql8和PHP7.4 如果有出錯的地方可能是作者沒做到位,見諒 安裝依賴包並安裝nginx: # mount /dev/cdrom /mn ...
  • 本文講解Linux伺服器 Ubuntu20.04 設置靜態IP方法。 ...
  • 英文原文: https://www.kernel.org/doc/html/latest/admin-guide/cgroup-v1/cpuacct.html CPU Accounting Controller CPU統計控制器(CPU Accounting Controller)用來分組使用cgr ...
  • 鏡像下載、功能變數名稱解析、時間同步請點擊 阿裡巴巴開源鏡像站 前言 最近,學習了胡老師的《ROS入門21講》,在Ubuntu18.04上安裝ROS過程中遇到了一些問題,解決這些問題耗費了大半天,故通過本文進行詳細安裝介紹,以便其他學者在安裝這塊少花時間,把更多的精力放在研究上。 一、環境配置 我的環境:虛 ...
  • 作用:命令行多視窗顯示;命令行程式與本機脫離 1 安裝tmux (1)redhat、centos系統 yum install tmux (2)ubuntu系統 apt-get install tmux 2 使用tmux (1)啟動 首先,我們使用遠程登錄工具,登錄到遠程伺服器上,然後執行下麵的命令: ...
  • 虛擬機關鍵配置名詞解釋 虛擬⽹絡編輯器 橋接模式 可以訪問互聯⽹,配置的地址信息和物理主機⽹段地址信息相同,容易造成地址衝突 NAT模式 可以訪問互聯⽹,配置的地址信息和物理主機⽹段地址信息不同,造成不了地址衝突 僅主機模式 不可以訪問互聯⽹,獲取地址主要⽤於虛擬主機之間溝通,但不能訪問外部⽹絡 網 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...