一、Flink中的狀態 官方文檔 有狀態的計算是流處理框架要實現的重要功能,因為稍複雜的流處理場景都需要記錄狀態,然後在新流入數據的基礎上不斷更新狀態。下麵的幾個場景都需要使用流處理的狀態功能: 數據流中的數據有重覆,想對重覆數據去重,需要記錄哪些數據已經流入過應用,當新數據流入時,根據已流入過的數 ...
目錄
一、Flink中的狀態
有狀態的計算是流處理框架要實現的重要功能,因為稍複雜的流處理場景都需要記錄狀態,然後在新流入數據的基礎上不斷更新狀態。下麵的幾個場景都需要使用流處理的狀態功能:
- 數據流中的數據有重覆,想對重覆數據去重,需要記錄哪些數據已經流入過應用,當新數據流入時,根據已流入過的數據來判斷去重。
- 檢查輸入流是否符合某個特定的模式,需要將之前流入的元素以狀態的形式緩存下來。比如,判斷一個溫度感測器數據流中的溫度是否在持續上升。
- 對一個時間視窗內的數據進行聚合分析,分析一個小時內某項指標的75分位或99分位的數值。
個狀態更新和獲取的流程如下圖所示,一個運算元子任務接收輸入流,獲取對應的狀態,根據新的計算結果更新狀態。
1)鍵控狀態(Keyed State)
keyed state 介面提供不同類型狀態的訪問介面,這些狀態都作用於當前輸入數據的 key 下。換句話說,這些狀態僅可在 KeyedStream 上使用,在Java/Scala API上可以通過
stream.keyBy(...)
得到 KeyedStream,在Python API上可以通過stream.key_by(...)
得到 KeyedStream。
1、控制項狀態特點
- 鍵控狀態是根據輸入數據流中定義的鍵(key)來維護和訪問的
- Flink 為每個 key 維護一個狀態實例,並將具有相同鍵的所有數據,都分區到同一個運算元任務中,這個任務會維護和處理這個 key 對應的狀態
- 當任務處理一條數據時,它會自動將狀態的訪問範圍限定為當前數據的 key
2、鍵控狀態類型
鍵控狀態類型 | 說明 | 方法 |
---|---|---|
ValueState[T] | 值狀態,保存一個可以更新和檢索的值 | ValueState.update(value: T) ValueState.value() |
ListState[T] | 列表狀態,保存一個元素的列表可以往這個列表中追加數據,併在當前的列表上進行檢索。 | ListState.add(value: T) ListState.addAll(values: java.util.List[T]) ListState.update(values: java.util.List[T]) ListState.get()(註意:返回的是Iterable[T]) |
ReducingState |
聚合狀態,保存一個單值,表示添加到狀態的所有值的聚合,介面與 ListState 類似,但使用 add(T) 增加元素,會使用提供的 ReduceFunction 進行聚合。 | ReducingState.add(value: T) ReducingState.get() |
AggregatingState<IN, OUT> | 聚合狀態,保留一個單值,表示添加到狀態的所有值的聚合。和 ReducingState 相反的是, 聚合類型可能與 添加到狀態的元素的類型不同。 介面與 ListState 類似,但使用 add(IN) 添加的元素會用指定的 AggregateFunction 進行聚合。 | AggregatingState.add(value: T) AggregatingState.get() |
MapState<UK, UV> | 映射狀態,維護了一個映射列表,保存Key-Value對。 | MapState.get(key: K) MapState.put(key: K, value: V) MapState.contains(key: K) MapState.remove(key: K) |
【溫馨提示】所有類型的狀態還有一個
clear()
方法,清除當前 key 下的狀態數據,也就是當前輸入元素的 key。
3、狀態有效期 (TTL)
任何類型的 keyed state 都可以有 有效期 (TTL)。所有狀態類型都支持單元素的 TTL。 這意味著列表元素和映射元素將獨立到期。
【官網示例】
package com
import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.common.time.Time
object StateTest001 {
def main(args: Array[String]): Unit = {
val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build
val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
stateDescriptor.enableTimeToLive(ttlConfig)
}
}
TTL 配置有以下幾個選項:
- newBuilder 的第一個參數表示數據的有效期,是【必選項】。
- TTL 的更新策略(預設是 OnCreateAndWrite)
- StateTtlConfig.UpdateType.OnCreateAndWrite - 僅在創建和寫入時更新
- StateTtlConfig.UpdateType.OnReadAndWrite - 讀取時也更新
1)過期數據的清理
預設情況下,過期數據會在讀取的時候被刪除,例如 ValueState#value,同時會有後臺線程定期清理(如果 StateBackend 支持的話)。可以通過 StateTtlConfig 配置關閉後臺清理:
import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.disableCleanupInBackground
.build
2)全量快照時進行清理
可以啟用全量快照時進行清理的策略,這可以減少整個快照的大小。當前實現中不會清理本地的狀態,但從上次快照恢復時,不會恢復那些已經刪除的過期數據。 該策略可以通過 StateTtlConfig 配置進行配置:
import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.time.Time
val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupFullSnapshot
.build
【溫馨提示】這種策略在 RocksDBStateBackend 的增量
checkpoint
模式下無效。
3)增量數據清理
另外可以選擇增量式清理狀態數據,在狀態訪問或/和處理時進行。如果某個狀態開啟了該清理策略,則會在存儲後端保留一個所有狀態的惰性全局迭代器。 每次觸發增量清理時,從迭代器中選擇已經過期的數進行清理。
4)在 RocksDB 壓縮時清理
如果使用 RocksDB state backend,則會啟用 Flink 為 RocksDB 定製的壓縮過濾器。RocksDB 會周期性的對數據進行合併壓縮從而減少存儲空間。 Flink 提供的 RocksDB 壓縮過濾器會在壓縮時過濾掉已經過期的狀態數據。該特性可以通過 StateTtlConfig 進行配置:
import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupInRocksdbCompactFilter(1000)
.build
【註意】
- 如果沒有 state 訪問,也沒有處理數據,則不會清理過期數據。
- 增量清理會增加數據處理的耗時。
- 現在僅 Heap state backend 支持增量清除機制。在 RocksDB state backend 上啟用該特性無效。
- 如果 Heap state backend 使用同步快照方式,則會保存一份所有 key 的拷貝,從而防止併發修改問題,因此會增加記憶體的使用。但非同步快照則沒有這個問題。
- 對已有的作業,這個清理方式可以在任何時候通過 StateTtlConfig 啟用或禁用該特性,比如從 savepoint 重啟後。
4、鍵控狀態的使用
除了上面描述的介面之外,Scala API 還在 KeyedStream 上對 map() 和 flatMap() 訪問 ValueState 提供了一個更便捷的介面
mapWithState
。 用戶函數能夠通過 Option 獲取當前 ValueState 的值,並且返回即將保存到狀態的值。
val stream: DataStream[(String, Int)] = ...
val counts: DataStream[(String, Int)] = stream
.keyBy(_._1)
.mapWithState((in: (String, Int), count: Option[Int]) =>
count match {
case Some(c) => ( (in._1, c), Some(c + in._2) )
case None => ( (in._1, 0), Some(in._2) )
})
2)運算元狀態(Operatior State)
運算元狀態(或者非 keyed 狀態)是綁定到一個並行運算元實例的狀態。Kafka Connector 是 Flink 中使用運算元狀態一個很具有啟發性的例子。Kafka consumer 每個並行實例維護了 topic partitions 和偏移量的 map 作為它的運算元狀態。
【溫馨提示】 Python DataStream API 仍無法支持運算元狀態。
1、運算元狀態特點
- 運算元狀態的作用範圍限定為運算元任務,由同一併行任務所處理的所有數據都可以訪問到相同的狀態
- 狀態對於同一子任務而言是共用的
- 運算元狀態不能由相同或不同運算元的另一個子任務訪問
2、運算元狀態類型
鍵控狀態類型 | 說明 |
---|---|
列表狀態(ListState) | 將狀態表示為一組數據的列表 |
聯合列表狀態(UnionListState) | 也將狀態表示為數據的列表。它與常規列表狀態的區別在於,在發生故障時,或者從保存點(savepoint)啟動應用程式時如何恢復 |
廣播狀態(BroadcastState) | 如果一個運算元有多項任務,而它的每項任務狀態又都相同,那麼這種特殊情況最適合應用廣播狀態。 |
3)廣播狀態 (Broadcast State)
廣播狀態是一種特殊的運算元狀態。引入它的目的在於支持一個流中的元素需要廣播到所有下游任務的使用情形。在這些任務中廣播狀態用於保持所有子任務狀態相同。 該狀態接下來可在第二個處理記錄的數據流中訪問。廣播狀態和其他運算元狀態的不同之處在於:
- 它具有 map 格式,
- 它僅在一些特殊的運算元中可用。這些運算元的輸入為一個廣播數據流和非廣播數據流,
- 這類運算元可以擁有不同命名的多個廣播狀態 。
【溫馨提示】 Python DataStream API 仍無法支持運算元狀態。
二、狀態後端(State Backends)
狀態的存儲、訪問以及維護,由一個可插入的組件決定,這個組件就
叫做狀態後端(state backend) ,狀態後端主要負責兩件事:本地的狀態管理,以及將檢查點(checkpoint)狀態寫入遠程存儲。
1)三種狀態存儲方式
存儲方式 | 說明 |
---|---|
MemoryStateBackend | 【預設模式】狀將鍵控狀態作為記憶體中的對象進行管理,將它們存儲在TaskManager的JVM堆上,將checkpoint存儲在JobManager的記憶體中。主要適用於本地開發和調試。 |
FsStateBackend | 基於文件系統進行存儲,可以是本地文件系統,也可以是 HDFS 等分散式文件系統。 需要註意而是雖然選擇使用了 FsStateBackend ,但正在進行的數據仍然是存儲在 TaskManager 的記憶體中的,只有在 checkpoint 時,才會將狀態快照寫入到指定文件系統上。 |
RocksDBStateBackend | 將所有狀態序列化後,存入本地的RocksDB中存儲。 |
【溫馨提示】特別在
MemoryStateBackend
內使用HeapKeyedStateBackend
時,Checkpoint 序列化數據階段預設有最大 5 MB數據的限制。
對於HeapKeyedStateBackend,有兩種實現:
- 支持非同步 Checkpoint(預設):存儲格式 CopyOnWriteStateMap
- 僅支持同步 Checkpoint:存儲格式 NestedStateMap
2)配置方式
Flink 支持使用兩種方式來配置後端管理器:
1、【第一種方式】基於代碼方式進行配置
【溫馨提示】只對當前作業生效
// 配置 FsStateBackend
env.setStateBackend(new FsStateBackend("hdfs://namenode:port/flink/checkpoints"));
// 配置 RocksDBStateBackend
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:port/flink/checkpoints"));
配置 RocksDBStateBackend 時,需要額外導入下麵的依賴:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.9.0</version>
</dependency>
2、【第二種方式】基於 flink-conf.yaml 配置文件的方式進行配置
【溫馨提示】對所有部署在該集群上的作業都生效
state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:port/flink/checkpoints
三、容錯機制(checkpoint)
checkpoint是Flink容錯的核心機制。它可以定期地將各個Operator處理的數據進行快照存儲( Snapshot )。如果Flink程式出現宕機,可以重新從這些快照中恢複數據。
1)一致性
談到容錯性,就沒法避免一致性這個概念。所謂一致性就是:成功處理故障並恢復之後得到的結果與沒有發生任何故障是得到的結果相比,前者的正確性。換句大白話,就是故障的發生是否影響得到的結果。在流處理過程,一致性分為3個級別:
at-most-once
:至多一次。故障發生之後,計算結果可能丟失,就是無法保證結果的正確性;at-least-once
:至少一次。計算結果可能大於正確值,但絕不會小於正確值,就是計算程式發生故障後可能多算,但是絕不可能少算;exactly-once
:精確一次。系統保證發生故障後得到的計算結果的值和正確值一致;
Flink的容錯機制保證了exactly-once,也可以選擇at-least-once。Flink的容錯機制是通過對數據流不停的做快照(snapshot)實現的。
2)檢查點(checkpoint)
Flink 中的每個方法或運算元都能夠是有狀態的,為了讓狀態容錯,Flink 需要為狀態添加
checkpoint(檢查點)
。Checkpoint 使得 Flink 能夠恢復狀態和在流中的位置,從而嚮應用提供和無故障執行時一樣的語義。官方文檔
1、開啟與配置 Checkpoint
預設情況下 checkpoint 是禁用的。通過調用
StreamExecutionEnvironment
的enableCheckpointing(n)
來啟用 checkpoint,裡面的 n 是進行 checkpoint 的間隔,單位毫秒。
2、Checkpoint 屬性
屬性 | 說明 |
---|---|
精確一次(exactly-once) | 你可以選擇向 enableCheckpointing(long interval, CheckpointingMode mode) 方法中傳入一個模式來選擇保證等級級別。 |
checkpoint 超時 | 如果 checkpoint 執行的時間超過了該配置的閾值,還在進行中的 checkpoint 操作就會被拋棄。 |
checkpoints 之間的最小時間 | 該屬性定義在 checkpoint 之間需要多久的時間,以確保流應用在 checkpoint 之間有足夠的進展。如果值設置為了 5000, 無論 checkpoint 持續時間與間隔是多久,在前一個 checkpoint 完成時的至少五秒後會才開始下一個 checkpoint。 |
checkpoint 可容忍連續失敗次數 | 該屬性定義可容忍多少次連續的 checkpoint 失敗。超過這個閾值之後會觸發作業錯誤 fail over。 預設次數為“0”,這意味著不容忍 checkpoint 失敗,作業將在第一次 checkpoint 失敗時fail over。 |
併發 checkpoint 的數目 | 預設情況下,在上一個 checkpoint 未完成(失敗或者成功)的情況下,系統不會觸發另一個 checkpoint。這確保了拓撲不會在 checkpoint 上花費太多時間,從而影響正常的處理流程。 不過允許多個 checkpoint 並行進行是可行的,對於有確定的處理延遲(例如某方法所調用比較耗時的外部服務),但是仍然想進行頻繁的 checkpoint 去最小化故障後重跑的 pipelines 來說,是有意義的。 |
externalized checkpoints | 你可以配置周期存儲 checkpoint 到外部系統中。Externalized checkpoints 將他們的元數據寫到持久化存儲上並且在 job 失敗的時候不會被自動刪除。 這種方式下,如果你的 job 失敗,你將會有一個現有的 checkpoint 去恢復。更多的細節請看 Externalized checkpoints 的部署文檔。 |
【官網示例】
val env = StreamExecutionEnvironment.getExecutionEnvironment()
// 每 1000ms 開始一次 checkpoint
env.enableCheckpointing(1000)
// 高級選項:
// 設置模式為精確一次 (這是預設值)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 確認 checkpoints 之間的時間會進行 500 ms
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
// Checkpoint 必須在一分鐘內完成,否則就會被拋棄
env.getCheckpointConfig.setCheckpointTimeout(60000)
// 允許兩個連續的 checkpoint 錯誤
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2)
// 同一時間只允許一個 checkpoint 進行
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
// 使用 externalized checkpoints,這樣 checkpoint 在作業取消後仍就會被保留
env.getCheckpointConfig().enableExternalizedCheckpoints(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
// 開啟實驗性的 unaligned checkpoints
env.getCheckpointConfig.enableUnalignedCheckpoints()
3)從檢查點恢復狀態
- 【第一步】遇到故障之後,第一步就是重啟應用
- 【第二步】是從 checkpoint 中讀取狀態,將狀態重置,從檢查點重新啟動應用程式後,其內部狀態與檢查點完成時的狀態完全相同
- 【第三步】開始消費並處理檢查點到發生故障之間的所有數據,這種檢查點的保存和恢復機制可以為應用程式狀態提供
“精確一次”(exactly- once)
的一致性,因為所有運算元都會保存檢查點並恢復其所有狀態,這樣一來所有的輸入流就都會被重置到檢查點完成時的位置
4)檢查點的實現演算法
- 【一種簡單的想法】:暫停應用,保存狀態到檢查點,再重新恢復應用
- 【Flink 的改進實現】:
- 基於 Chandy-Lamport 演算法的分散式快照
- 將檢查點的保存和數據處理分離開,不暫停整個應用
5)檢查點演算法
基於Chandy-Lamport演算法實現的分散式快照
1、檢查點分界線(Checkpoint Barrier)
-
將barrier插入到數據流中,作為數據流的一部分和數據一起向下流動。Barrier不會幹擾正常數據,數據流嚴格有序。
-
一個barrier把數據流分割成兩部分:一部分進入到當前快照,另一部分進入到下一個快照。
-
每一個barrier都帶有快照ID,並且barrier之前的數據都進入了此快照。Barrier不會幹擾數據流處理,所以非常輕量。
-
多個不同快照的多個barrier會在流中同時出現,即多個快照可能同時創建。
2、Barrier對齊
當一個opeator有多個輸入流的時候,checkpoint barrier n 會進行對齊,就是已到達的會先緩存到buffer里等待其他未到達的,一旦所有流都到達,則會向下游廣播,exactly-once 就是利用這一特性實現的,at least once 因為不會進行對齊,就會導致有的數據被重覆處理。
3、執行一次檢查點步驟
- jobManager會向每個source任務發送一條帶有新檢查點ID的消息,通過這種方式來啟動檢查點。
- 數據源將他們各自的狀態寫入檢查點後,並向下游所有分區發出一個檢查點barrier。狀態後端在狀態存入檢查點之後,會返回通知給source任務,source任務再向jobmanager確認檢查點完成。
- barrier向下游傳遞,下游任務會等待所有輸入分區的barrier的到達後再做狀態保存通知jobmanager狀態保存完成,並再向下游所有分區發送收到的檢查點barrier。
【溫馨提示】對於barrier已經到達的分區,繼續到達的數據會被緩存;對於barrier未到達的分區,數據會被正常處理所有barrier都到達後,做完狀態保存且向下游發送檢查點barrier後,當前任務繼續處理緩存的數據和後面到來的數據。
- sink任務向jobmanager確認狀態保存到checkpoint完成。即所有任務都確認已成功將狀態保存到檢查點時,檢查點就真正完成了。
6)保存點(savepoint)
1、概述
- Flink 還提供了可以自定義的鏡像保存功能,就是保存點(savepoints);
- 原則上,創建保存點使用的演算法與檢查點完全相同,因此保存點可以認為就是具有一些額外元數據的檢查點;
- Flink不會自動創建保存點,因此用戶(或者外部調度程式)必須明確地觸發創建操作;
- 保存點是一個強大的功能。除了故障恢復外,保存點可以用於:有計劃的手動備份,更新應用程式,版本遷移,暫停和重啟應用,等等。
2、savepoint觸發的三種方式
-
使用 flink savepoint 命令觸發 Savepoint,其是在程式運行期間觸發 savepoint。
-
使用 flink cancel -s 命令,取消作業時,並觸發 Savepoint。
-
使用 Rest API 觸發 Savepoint,格式為:/jobs/:jobid /savepoints
7)檢查點(checkpoint)與 保存點(savepoint)的區別與聯繫
- checkpoint的側重點是“容錯”,即Flink作業意外失敗並重啟之後,能夠直接從早先打下的checkpoint恢復運行,且不影響作業邏輯的準確性。而savepoint的側重點是“維護”,即Flink作業需要在人工干預下手動重啟、升級、遷移或A/B測試時,先將狀態整體寫入可靠存儲,維護完畢之後再從savepoint恢復現場。
- savepoint是“通過checkpoint機制”創建的,所以savepoint本質上是特殊的checkpoint。
- checkpoint面向Flink Runtime本身,由Flink的各個TaskManager定時觸發快照並自動清理,一般不需要用戶干預;savepoint面向用戶,完全根據用戶的需要觸發與清理。
- checkpoint的頻率往往比較高(因為需要儘可能保證作業恢復的準確度),所以checkpoint的存儲格式非常輕量級,但作為trade-off犧牲了一切可移植(portable)的東西,比如不保證改變並行度和升級的相容性。savepoint則以二進位形式存儲所有狀態數據和元數據,執行起來比較慢而且“貴”,但是能夠保證portability,如並行度改變或代碼升級之後,仍然能正常恢復。
未完待續,請耐心等待~