Flink實戰(11)-Exactly-Once語義之兩階段提交

来源:https://www.cnblogs.com/JavaEdge/archive/2023/11/24/17854033.html
-Advertisement-
Play Games

0 大綱 [Apache Flink]2017年12月發佈的1.4.0版本開始,為流計算引入里程碑特性:TwoPhaseCommitSinkFunction。它提取了兩階段提交協議的通用邏輯,使得通過Flink來構建端到端的Exactly-Once程式成為可能。同時支持: 數據源(source) 和 ...


0 大綱

[Apache Flink]2017年12月發佈的1.4.0版本開始,為流計算引入里程碑特性:TwoPhaseCommitSinkFunction。它提取了兩階段提交協議的通用邏輯,使得通過Flink來構建端到端的Exactly-Once程式成為可能。同時支持:

  • 數據源(source)
  • 和輸出端(sink)

包括Apache Kafka 0.11及更高版本。它提供抽象層,用戶只需實現少數方法就能實現端到端Exactly-Once語義。

新功能及Flink實現邏輯:

  • 描述Flink checkpoint機制如何保證Flink程式結果的Exactly-Once的
  • 顯示Flink如何通過兩階段提交協議與數據源和數據輸出端交互,以提供端到端的Exactly-Once保證
  • 通過一個簡單的示例,瞭解如何使用TwoPhaseCommitSinkFunction實現Exactly-Once的文件輸出

1 Flink應用中的Exactly-Once語義

Exactly-Once,指每個輸入的事件隻影響最終結果一次。即使機器或軟體故障,既沒有重覆數據,也不會丟數據。

Flink很久就提供Exactly-Once,checkpoint機制是Flink有能力提供Exactly-Once語義的核心。

一次checkpoint是以下內容的一致性快照:

  • 應用程式的當前狀態
  • 輸入流的位置

Flink可配置一個固定時間點,定期產生checkpoint,將checkpoint的數據寫入持久存儲系統,如S3或HDFS。將checkpoint數據寫入持久存儲是非同步,即Flink應用程式在checkpoint過程中可以繼續處理數據。

如果發生機器或軟體故障,重新啟動後,Flink應用程式將從最新的checkpoint點恢復處理; Flink會恢復應用程式狀態,將輸入流回滾到上次checkpoint保存的位置,然後重新開始運行。這意味著Flink可以像從未發生過故障一樣計算結果。

Flink 1.4.0前,Exactly-Once語義僅限Flink應用程式內部,沒有擴展到Flink數據處理完後發送的大多數外部系統。Flink應用程式與各種數據輸出端進行交互,開發人員自己維護組件上下文保證Exactly-Once語義。

為提供端到端的Exactly-Once語義 – 即除了Flink應用程式內部,Flink寫入的外部系統也需要能滿足Exactly-Once語義 – 這些外部系統必須提供提交或回滾的方法,然後通過Flink的checkpoint機制協調。

分散式系統中,協調提交和回滾的常用方法是2pc協議。討論Flink的TwoPhaseCommitSinkFunction如何利用2pc提供端到端的Exactly-Once語義。

2 Flink應用程式端到端的Exactly-Once語義

Kafka經常與Flink使用。Kafka 0.11版本添加事務支持。這意味著現在通過Flink讀寫Kafaka,並提供端到端的Exactly-Once語義有了必要支持。

Flink對端到端的Exactly-Once語義的支持不僅局限Kafka,可將它與任何一個提供必要的協調機制的源/輸出端一起使用。如Pravega,來自DELL/EMC的開源流媒體存儲系統,通過Flink的TwoPhaseCommitSinkFunction也能支持端到端的Exactly-Once語義。

image-20231124142310942

示常式序有:

  • 從Kafka讀取的數據源(Flink內置的KafkaConsumer)
  • 視窗聚合
  • 將數據寫回Kafka的數據輸出端(Flink內置的KafkaProducer)

要使數據輸出端提供Exactly-Once保證,須將所有數據通過一個事務提交給Kafka。提交捆綁了兩個checkpoint之間的所有要寫數據。這確保在故障時,能回滾寫入的數據。但分散式系統中,通常有多個併發運行的寫入任務,所有組件須在提交或回滾時“一致”才能確保一致結果。Flink使用2PC及預提交階段解決這問題。

pre-commit

checkpoint開始時,即2PC的“預提交”階段。當checkpoint開始時,Flink的JobManager會將checkpoint barrier(將數據流中的記錄分為進入當前checkpoint與進入下一個checkpoint)註入數據流。

brarrier在operator之間傳遞。對每個operator,它觸發operator的狀態快照寫入state backend。

數據源保存了消費Kafka的偏移量(offset),之後將checkpoint barrier傳遞給下一operator。

這種方式僅適用於operator具有『內部』狀態。

內部狀態

指Flink state backend保存和管理的。如第二個operator中window聚合算出來的sum值。當一個進程有它的內部狀態時,除了在checkpoint前需將數據變更寫入state backend,無需在pre-commit階段執行其他操作。

Flink負責在checkpoint成功時正確提交這些寫入或故障時中止這些寫入。

image-20231124150402626

3 Flink應用啟動pre-commit階段

當進程具有『外部』狀態,需額外處理。外部狀態通常以寫入外部系統(如Kafka)的形式出現。此時,為提供Exactly-Once保證,外部系統須【支持事務】,才能和兩階段提交協議集成。

示例數據需寫入Kafka,因此數據輸出端(Data Sink)有外部狀態。此時,在預提交階段:

  • 除了將其狀態寫入state backend
  • 數據輸出端還必須預先提交其外部事務

當checkpoint barrier在所有operator都傳遞了一遍,並且觸發的checkpoint回調成功完成時,預提交階段結束。所有觸發的狀態快照都被視為該checkpoint的一部分。checkpoint是整個應用程式狀態的快照,包括預先提交的外部狀態。若故障,可回滾到上次成功完成快照的時間點。

下一步是通知所有operator,checkpoint已經成功了。這是2PC的提交階段,JobManager為應用程式中的每個operator發出checkpoint已完成的回調。

數據源和 widnow operator沒有外部狀態,因此在提交階段,這些operator不必執行任何操作。但是,數據輸出端(Data Sink)擁有外部狀態,此時應該提交外部事務。

總結

  • 一旦所有operator完成預提交,就提交一個commit。
  • 如果至少有一個預提交失敗,則所有其他提交都將中止,我們將回滾到上一個成功完成的checkpoint。
  • 在預提交成功之後,提交的commit需要保證最終成功 – operator和外部系統都需要保障這點。如果commit失敗(例如,由於間歇性網路問題),整個Flink應用程式將失敗,應用程式將根據用戶的重啟策略重新啟動,還會嘗試再提交。這個過程至關重要,因為如果commit最終沒有成功,將會導致數據丟失。

因此,我們可以確定所有operator都同意checkpoint的最終結果:所有operator都同意數據已提交,或提交被中止並回滾。

4 在Flink中實現兩階段提交Operator

完整的實現兩階段提交協議可能有點複雜,這就是為什麼Flink將它的通用邏輯提取到抽象類TwoPhaseCommitSinkFunction中的原因。

接下來基於輸出到文件的簡單示例,說明如何使用TwoPhaseCommitSinkFunction。用戶只需要實現四個函數,就能為數據輸出端實現Exactly-Once語義:

  • beginTransaction – 在事務開始前,我們在目標文件系統的臨時目錄中創建一個臨時文件。隨後,我們可以在處理數據時將數據寫入此文件。
  • preCommit – 在預提交階段,我們刷新文件到存儲,關閉文件,不再重新寫入。我們還將為屬於下一個checkpoint的任何後續文件寫入啟動一個新的事務。
  • commit – 在提交階段,我們將預提交階段的文件原子地移動到真正的目標目錄。需要註意的是,這會增加輸出數據可見性的延遲。
  • abort – 在中止階段,我們刪除臨時文件。

我們知道,如果發生任何故障,Flink會將應用程式的狀態恢復到最新的一次checkpoint點。一種極端的情況是,預提交成功了,但在這次commit的通知到達operator之前發生了故障。在這種情況下,Flink會將operator的狀態恢復到已經預提交,但尚未真正提交的狀態。

我們需要在預提交階段保存足夠多的信息到checkpoint狀態中,以便在重啟後能正確的中止或提交事務。在這個例子中,這些信息是臨時文件和目標目錄的路徑。

TwoPhaseCommitSinkFunction已經把這種情況考慮在內了,並且在從checkpoint點恢復狀態時,會優先發出一個commit。我們需要以冪等方式實現提交,一般來說,這並不難。在這個示例中,我們可以識別出這樣的情況:臨時文件不在臨時目錄中,但已經移動到目標目錄了。

在TwoPhaseCommitSinkFunction中,還有一些其他邊界情況也會考慮在內,請參考Flink文檔瞭解更多信息。

FAQ

flink sink在如果過來一個checkpoint barrier,會去存儲state,這個動作會和普通的write並行嗎?還是串列?

在Flink的checkpoint機制中,當一個Checkpoint Barrier過來時,sink會觸發對狀態的snapshot,這個snapshot動作預設是和普通的write操作並行進行的。

具體來說:

  • Flink的checkpoint機制是通過在datastream中註入Checkpoint Barrier來實現的。

  • 當source接收到Checkpoint Barrier時,會將其傳遞給下游的transformation和sink。

  • 當sink接收到Checkpoint Barrier時,會啟動一個新的線程來執行state snapshot(狀態保存)。

  • 這個狀態snapshot線程會從狀態後端Snapshot State,並存儲檢查點。

  • 而sink的主線程在接收到Checkpoint Barrier時,會繼續處理正常的write。

  • 這樣,狀態snapshot和正常的write操作就是並行進行的。

但是也可以通過Sink的配置來設置snapshot和write的執行策略,主要有兩種模式:

  1. 並行模式(預設):snapshot和write同時進行

  2. 串列模式:snapshot完成後再進行write

綜上,Flink sink在預設的並行checkpoint模式下,狀態snapshot和普通的write操作是並行執行的。可以通過配置來改變其行為。這樣可以根據實際需要進行平衡。

總結

  • Flink的checkpoint機制是支持兩階段提交協議並提供端到端的Exactly-Once語義的基礎。
  • 這個方案的優點是: Flink不像其他一些系統那樣,通過網路傳輸存儲數據 – 不需要像大多數批處理程式那樣將計算的每個階段寫入磁碟。
  • Flink的TwoPhaseCommitSinkFunction提取了兩階段提交協議的通用邏輯,基於此將Flink和支持事務的外部系統結合,構建端到端的Exactly-Once成為可能。
  • 從Flink 1.4.0開始,Pravega和Kafka 0.11 producer都提供了Exactly-Once語義;Kafka在0.11版本首次引入了事務,為在Flink程式中使用Kafka producer提供Exactly-Once語義提供了可能性。
  • Kafaka 0.11 producer的事務是在TwoPhaseCommitSinkFunction基礎上實現的,和at-least-once producer相比只增加了非常低的開銷。

本文由博客一文多發平臺 OpenWrite 發佈!


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 這裡給大家分享我在網上總結出來的一些知識,希望對大家有所幫助 最近喜歡研究起了手錶,對勞力士這款“百事圈”實在是心水的不行啊! 心癢難耐無奈錢包不支持,作為一個前端程式員,買不起的東西該怎麼辦? 當然是自己做一個啊! 說乾就乾,熬夜自己做了個“百事圈”出來!源碼在最後! 先看成品 還是有那麼六七成相 ...
  • 沒有什麼是不可能的,只是需要找到正確的方法。 1. 什麼是狀態? 狀態就是組件內部特有數據的載體(組件數據掛載方式),改變數據頁面就會刷新,由組件自己設置和更改,也就是說由組件自己產生、維護,使用狀態的目的就是為了在不同的狀態下使組件的顯示不同(自己管理),這在 React 中稱為:條件渲染。 為什 ...
  • Node.js是一個基於ChromeV8引擎的JavaScript運行環境,使用了一個事件驅動、非阻塞式I/O模型,讓JavaScript 運行在服務端的開發平臺,它讓JavaScript成為與PHP、Python、Perl、Ruby等服務端語言平起平坐的腳本語言。Node中增添了很多內置的模塊,提... ...
  • Promise對象用於清晰的處理非同步任務的完成,返回最終的結果值,本次分享主要介紹Promise的基本屬性以及Promise內部的基礎實現,能夠幫我們更明確使用場景、更快速定位問題。 ...
  • 公眾號「架構成長指南」,專註於生產實踐、雲原生、分散式系統、大數據技術分享。 資料庫和Redis如何保持強一致性,這篇文章告訴你 目的 Redis和Msql來保持數據同步,並且強一致,以此來提高對應介面的響應速度,剛開始考慮是用mybatis的二級緩存,發現坑不少,於是決定自己搞 要關註的問題點 操 ...
  • 十六、C++字元串(一) 1、原生字元串實現將兩個字元串拼接 //原生字元串實現將兩個字元串拼接 #include <iostream> #include <locale> int main() { char strA[0x10] = "123"; //定義字元串 char strB[0x10] = ...
  • ReentrantReadWriteLock讀寫鎖 樂觀鎖和悲觀鎖 樂觀鎖 樂觀鎖,就是給需要共用的數據,添加一個版本號version,例如1,每次有線程更新共用數據後,version+1,每次線程進行數據更新時,要比較當前線程持有的數據的版本號,相等則修改,不相等則不修改,支持併發訪問。 悲觀鎖 ...
  • 哈嘍大家好,我是鹹魚 接觸過 Python 的小伙伴應該對【字典】這一數據類型都瞭解吧 雖然 Python 沒有顯式名稱為“哈希表”的內置數據結構,但是字典是哈希表實現的數據結構 在 Python 中,字典的鍵(key)被哈希,哈希值決定了鍵對應的值(value)在字典底層數據存儲中的位置 那麼今天 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...