在Flink中的每個函數和運算符都是有狀態的。在處理過程中可以用狀態來存儲數據,這樣可以利用狀態來構建複雜操作。為了讓狀態容錯,Flink需要設置checkpoint狀態。Flink程式是通過checkpoint來保證容錯,通過checkpoint機制,Flink可恢復作業的狀態和計算位置。chec ...
在Flink中的每個函數和運算符都是有狀態的。在處理過程中可以用狀態來存儲數據,這樣可以利用狀態來構建複雜操作。為了讓狀態容錯,Flink需要設置checkpoint狀態。Flink程式是通過checkpoint來保證容錯,通過checkpoint機制,Flink可恢復作業的狀態和計算位置。
checkpoint檢查點
前提條件
Flink的checkpoin機制需要與流和狀態的持久化存儲交互,一般它要求:
- 一個持久化的數據源
- 當Flink程式出現問題時,可以通過checkpoint持久化存儲中恢復,然後從出錯的地方開始重新消費數據
- 該數據源可以在一定時間內重跑數據,例如:Kafka、RabbitMQ或者文件系統HDFS、S3、…
- 狀態的持久存儲
- 狀態需要永久的保存下來,通常是分散式文件系統(例如:HDFS、S3、GFS、…)
啟用和配置檢查點
預設情況,Flink是禁用檢查點。要啟用檢查點,調用
// 啟用檢查點// 單位:毫秒
env.enableCheckpointing(1000);
在啟用檢查點時,還可以配置檢查點的其他參數。
- exactly-one or at-least-once(僅一次或者至少一次)
- 大多數程式都是設置為exactly-once,只有在某些超低延遲的應用(例如:始終要求是毫秒級的應用)
- 通過查看源碼,我們看到,Flink預設是 exactly-once
public static final CheckpointingMode DEFAULT_MODE = CheckpointingMode.EXACTLY_ONCE;
- 檢查點超過規定的時間就會自動終止
- 檢查點之間的最小時間
- 下一個檢查點將在上一個檢查點完成後5秒鐘啟動
- 檢查點最小間隔時間不會受檢查點間隔更容易配置
- 檢查點的併發數目。預設情況一個檢查點在運行時不會觸發另一個檢查點,這樣可以確保Flink不會花太多時間在checkpoint上,並確保流可以有效進行。
- 可以設置多個重疊的checkpoint,這對容許有一定延遲,並希望較頻繁的檢查(100ms)來重新處理故障是有用的
- 外部檢查點
- 可以將檢查點設置為外部持久化,這樣檢查點的元數據將寫入持久存儲,並且但作業運行失敗是不會自動清理
- 這樣可以做雙重保險
- 檢查點執行發生錯誤,是否執行任務。
- 預設情況,如果checkpoint失敗,任務也將失敗
- 即時最近有更多的savepoint可用於恢復,flink依然會選擇使用最近一次的checkpoint來進行錯誤恢復
參考配置:
// -------- // 配置checkpoint // 啟用檢查點 env.enableCheckpointing(1000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); env.getCheckpointConfig().setCheckpointTimeout(60000); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
選擇狀態後端存儲
Flink的checkpoint機制可以存儲計時器和有狀態operation的所有快照,包括:連接器、視窗或者用戶自定義狀態。具體checkpoint存儲在哪兒(例如:是JobManager記憶體、文件系統或者資料庫),依賴於狀態後端的配置。
預設情況,狀態保存在TaskManager的記憶體中,檢查點存儲在TM的記憶體中。為了適當地保存大狀態,Flink支持其他的存儲。我們可以通過:
StreamExecutionEnvironment.setStateBackend(…)
來指定存儲方式
Flink狀態管理
狀態的應用場景:
- 當應用程式想要按照某種模式搜索某些事件時,狀態可以保存迄今所有的事件序列
- 當每分鐘/小時/天需要對流數據進行聚合,狀態可以保存掛起的聚合
- 當在數據流上訓練機器學習模型時,狀態可以用來保存某一類參數的版本
- 當需要管理歷史數據時,狀態允許訪問過去歷史數據
Flink狀態可以保存在堆內、或者是堆外。Flink也可以管理應用程式的狀態,必要時也可以溢出到磁碟,如果應用要保持非常大的狀態,可以不修改程式邏輯情況下配置狀態後端存儲。
Flink狀態分類
Flink中有兩種基本的狀態:
- Keyed State
- Operator State
Keyed State
Keyed State通常和key相關,僅僅在KeyedStream的方法和運算元中使用。可以把 Keyed State看作是分區,而且每一個key僅出現在一個分區內。邏輯上每個 keyed-state和唯一元組<運算元併發實例, key>綁定,由於每個key僅屬於運算元的一個併發,因此可以簡化為<運算元, key>
Operator State
對於 Operator State來說,每個Operator State和一個併發實例綁定。Kafka connector是Flink中使用operator state的一個很好的示例。每個Kafka消費者的併發在Operator State中維護一個 topic partition到offset的映射關係。
Operator state在Flink作業的併發改變後,會重新分髮狀態,分發的策略和keyed stated不一樣。
Raw State與Managed State
Keyed Stated和Operator State分別有兩種形式:managed 和 raw
Managed State是由Flink運行時管理的數據結構來表示的,例如:內部的Hash Table或者RocksDB。例如:ValueState、ListState等。Flink運行時會對這些狀態進行編碼並寫入Checkpoint。
Raw State則保存在自己的數據結構中。checkpoint的時候,Flink並不知道狀態裡面具體的內容,僅僅寫入一串位元組序列到checkpoint中。
所有的DataStream的function都可以使用managed state,但raw state只能在實現運算元時使用。由於Flink可以在修改併發時更好的分髮狀態數據,並且能夠更好的管理記憶體,因為講義使用 managed state.
使用Managed Keyed State
Managed keyed state介面提供不同類型的狀態訪問介面,這些狀態都作用在當前輸入數據的key下。這些狀態僅可在KeyedStream上使用,可以通過 stream.keyBy(…)得到KeyedStream。
所有支持的狀態類型如下:
- ValueState<T>
- 保存一個可以更新和獲取的值,運算元接收到的每個key都可能對應一個值
- 可以通過update(T)進行更新,通過value()獲取
- ListState<T>
- 保存一個元素的列表,可以往這個列表中追加數據,併在當前列表上檢索
- 可以通過 add(T)或者addAll(List<T>)進行追加元素
- 通過get()獲取整個列表
- 通過 update(List<T>)覆蓋當前列表
- ReducingState<T>
- 保存一個單值,表示添加到狀態的所有值的聚合。介面與ListState類似
- AggregatingState<IN, OUT>
- 保存一個單值,表示添加到狀態的所有值的聚合
- 與ReducingState相反的是,聚合類型可能與添加到狀態的元素類型不同。介面與ListState類似
- FoldingState<T, ACC>(後續將過期)
- 保存一個單值,白搜狐添加到狀態的所有值的集合
- 與ReducingState相反的是,聚合類型可能與添加到狀態的元素類型不同。介面與ListState類似
- MapState<UK, UV>
- 維護一個映射列表,可以添加鍵值到狀態中,可以獲取當前映射的迭代器
- 使用put、putAll添加映射,使用 get檢索特定key
註意:
- 這些狀態對象僅用於狀態交互。狀態本身不一定存儲在記憶體中,還有可能保存在磁碟或者其他位置
- 從狀態中獲取的值取決於輸入元素說代表的key,因此,在不同key上調用同一個介面,可能得到不同的值
使用Managed Operator State
可以通過實現 CheckpointedFunction 或者 ListCheckpointed<T extends Serialized>介面來使用Managed Operator State。
CheckpointedFunction介面:
void snapshotState(FunctionSnapshotContext context) throws Exception;void initializeState(FunctionInitializationContext context) throws Exception;
在Flink進行checkpoint時,會調用snapshotstate(),用戶自定義函數初始化時會調用 initializeState。初始化包括第一次自定義函數初始化和從之前的 checkpoint 回覆。因此,initializeState 中應該也包括狀態恢復的邏輯。
Managed Operator State以list的形式存在,這些狀態是一個可序列化對象的集合List,彼此獨立,方便在改變併發後進行狀態的重新分派。換句話說,這些對象是重新分配 non-keyed state的最細粒度。根據狀態的不同訪問方式,有以下兩種分配模式:
- Even-split redistribution
- 每個運算元都存儲一個列表形式的狀態集合,整個狀態由所有的列表拼接而成
- 但作業恢復或者重新分配時,整個狀態按照運算元的並行度均勻分配
- Union redistribution
- 每個運算元保存一個列表形式的狀態集合,整個狀態由所有的列表拼接而成
- 但作業恢復或者重新分配時,每個運算元都將獲得所有的狀態數據
ListCheckpointed介面:
ListCheckpointed介面是CheckpointedFunction介面的精簡版,僅支持 even-split redistribution的list state
List<T> snapshotState(long checkpointId, long timestamp) throws Exception;void restoreState(List<T> state) throws Exception;
snapshotState()需要返回一個將寫入到checkpoint的對象列表, restoreState則需要處理恢復回來的對象列表。
參考文獻:
Flink官方文檔:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/stream/state/checkpointing.html
https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/ops/state/checkpoints.html
https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/stream/state/state.html