【Flink入門修煉】2-2 Flink State 狀態

来源:https://www.cnblogs.com/shuofxz/p/18057385
-Advertisement-
Play Games

- 什麼是狀態?狀態有什麼作用? - 如果你來設計,對於一個流式服務,如何根據不斷輸入的數據計算呢? - 又如何做故障恢復呢? ...


  • 什麼是狀態?狀態有什麼作用?
  • 如果你來設計,對於一個流式服務,如何根據不斷輸入的數據計算呢?
  • 又如何做故障恢復呢?

一、為什麼要管理狀態

流計算不像批計算,數據是持續流入的,而不是一個確定的數據集。在進行計算的時候,不可能把之前已經輸入的數據全都保存下來,然後再和新數據合併計算。效率低下不說,記憶體也扛不住。
另外,如果程式出現故障重啟,沒有之前計算過的狀態保存,那麼也就無法再繼續計算了。

因此,就需要一個東西來記錄各個運算元之前已經計算過值的結果,當有新數據來的時候,直接在這個結果上計算更新。這個就是狀態

常見的流處理狀態功能如下:

  • 數據流中的數據有重覆,我們想對重覆數據去重,需要記錄哪些數據已經流入過應用,當新數據流入時,根據已流入過的數據來判斷去重。
  • 檢查輸入流是否符合某個特定的模式,需要將之前流入的元素以狀態的形式緩存下來。比如,判斷一個溫度感測器數據流中的溫度是否在持續上升。
  • 對一個時間視窗內的數據進行聚合分析,分析一個小時內某項指標的75分位或99分位的數值。
  • 線上機器學習場景下,需要根據新流入數據不斷更新機器學習的模型參數。

二、state 簡介

Flink的狀態是由運算元的子任務來創建和管理的。一個狀態更新和獲取的流程如下圖所示,一個運算元子任務接收輸入流,獲取對應的狀態,根據新的計算結果更新狀態。
image.png

狀態的保存:
需要考慮的問題:

  • container 異常後,狀態不丟
  • 狀態可能越來越大

因此,狀態不能直接放在記憶體中,以上兩點問題都無法保證。
需要有一個外部持久化存儲方式,常見的如放到 HDFS 中。(此部分讀者感興趣可自行搜索資料探索)

image.png

一)Managed State 和 Raw State

  • Managed State 是由 Flink 管理的。Flink幫忙存儲、恢復和優化。
  • Raw State 是開發者自己管理的,需要自己序列化(較少用到)。

在 Flink 中推薦用戶使用Managed State管理狀態數據 ,主要原因是 Managed State 能夠更好地支持狀態數據的重平衡以及更加完善的記憶體管理。

Managed State Raw State
狀態管理方式 Flink Runtime 管理,自動存儲,自動恢復,記憶體管理方式上優化明顯 用戶自己管理,需要用戶自己序列化
狀態數據結構 已知的數據結構 value , list ,map flink不知道你存的是什麼結構,都轉換為二進位位元組數據
使用場景 大多數場景適用 需要滿足特殊業務,自定義operator時使用,flink滿足不了你的需求時候,使用複雜

下文將重點介紹Managed State。

二)Keyed State 和 Operator State

Managed State 又有兩種類型:Keyed State 和 Operator State。

keyed state operator state
適用場景 只能應用在 KeyedSteam 上 可以用於所有的運算元
State 處理方式 每個 key 對應一個 state,一個 operator 處理多個 key ,會訪問相應的多個 state 一個 operator 對應一個 state
併發改變 併發改變時,state隨著key在實例間遷移 併發改變時需要你選擇分配方式,內置:1.均勻分配 2.所有state合併後再分發給每個實例
訪問方式 通過RuntimeContext訪問,需要operator是一個richFunction 需要你實現CheckPointedFunction或ListCheckPointed介面
支持數據結構 ValuedState, ListState, Reducing State, Aggregating State, MapState, FoldingState(1.4棄用) 只支持 listState

Keyed State

簡單來說,通過 keyBy 分組的就會用到 Keyed State。就是按照分組來的狀態。(Keyed State 是Operator State的特例,區別在於 Keyed State 事先按照 key 對數據集進行了分區,每個 Key State 僅對應ー個Operator和 Key 的組合。)
image.png

Keyed State可以通過 Key Groups 進行管理,主要用於當運算元並行度發生變化時,自動重新分佈Keyed State數據 。分配代碼如下:

// KeyGroupRangeAssignment.java
    public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
		return MathUtils.murmurHash(keyHash) % maxParallelism;
	}

Operator State

Operator State 可以用在所有運算元上,每個運算元子任務或者說每個運算元實例共用一個狀態,流入這個運算元子任務的數據可以訪問和更新這個狀態。
例如 Kafka Connector 中,每一個並行的 Kafka Consumer 都在 Operator State 中維護當前 Consumer 訂閱的 partiton 和 offset。
image.png

image.png

在開發中,需要保存的狀態也有不同的數據結構,那麼 Flink 也提供了相應的類。
如上圖所示:

  • ValueState[T] 保存單一變數狀態
  • MapState[K, V] 同 java map,保存 kv 型狀態
  • ListState[T] 數組類型狀態
  • ReducingState[T] 單一狀態,將原狀態和新狀態合併後再更新
  • AggregatingState[IN, OUT] 同樣是合併更新,只不過前後數據類型可以不一樣

四、實踐

實現一個簡單的計數視窗。
輸入數據是一個元組 Tuple2.of(1L, 3L),把元組的第一個元素當作 key(在示例中都 key 都是 “1”),第二個元素當 value。
該函數將出現的次數以及總和存儲在 ValueState 中。 一旦出現次數達到 2,則將平均值發送到下游,並清除狀態重新開始。 請註意,我們會為每個不同的 key(元組中第一個元素)保存一個單獨的值。

public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

    /**
     * The ValueState handle. The first field is the count, the second field a running sum.
     */
    private transient ValueState<Tuple2<Long, Long>> sum;

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

        // access the state value
        Tuple2<Long, Long> currentSum = sum.value();

        // update the count
        currentSum.f0 += 1;

        // add the second field of the input value
        currentSum.f1 += input.f1;

        // update the state
        sum.update(currentSum);

        // if the count reaches 2, emit the average and clear the state
        if (currentSum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average", // the state name
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
        sum = getRuntimeContext().getState(descriptor);
    }
}

// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
        .keyBy(value -> value.f0)
        .flatMap(new CountWindowAverage())
        .print();

// the printed output will be (1,4) and (1,5)

四、小結

本節我們介紹了 Flink 狀態,是用於流式計算中中間數據存儲和故障恢復的。
Flink 狀態分為 Raw State 和 Manage State,其中 Manage State 中又包含 Keyed State 和 Operator State。最重要的是 Keyed State 要重點理解和掌握。
在編程開發過程中,針對不同的數據結構,Flink 提供了對應的 State 類。並提供了一個 state demo 代碼供學習。


參考文章:
七、Flink入門--狀態管理_flink流式任務如何保證7*24小時運行-CSDN博客
Flink狀態管理詳解:Keyed State和Operator List State深度解析
爆肝 3 月,3w 字、15 章節詳解 Flink 狀態管理!(建議收藏)-騰訊雲開發者社區-騰訊雲(較詳細)
Flink 筆記二 Flink的State--狀態原理及原理剖析_flink key state是每個key對應一個state還是每個分區對應一個state-CSDN博客(源碼剖析)
Flink 狀態管理詳解(State TTL、Operator state、Keyed state)-騰訊雲開發者社區-騰訊雲
Flink 源碼閱讀筆記(10)- State 管理


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 什麼是訂單履約系統? 訂單履約系統用來管理從接收客戶訂單到將商品送達客戶手中的全過程。 它連接了上游交易(客戶在銷售平臺下單環)和下游倉儲配送(如庫存管理、物流配送),確保信息流順暢、操作協同,提升整個供應鏈的效率和響應速度。 系統定位 訂單履約系統的目標是讓訂單處理更快、更清晰,提高客戶體驗。 履 ...
  • Spring Boot允許外部化項目配置,以便您可以在不同的環境中使用相同的應用程式代碼。您可以使用各種外部配置源,包括Java屬性文件、YAML文件、環境變數和命令行參數。 屬性值可以通過使用@Value註釋直接註入到bean中,通過Spring的環境抽象進行訪問,或者通過@Configurati ...
  • druid-spring-boot-3-starter目前最新版本是1.2.20,雖然適配了SpringBoot3,但缺少自動裝配的配置文件,會導致載入時報載入驅動異常。 <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-sp ...
  • 獨立樣本T檢驗適用於比較兩組獨立樣本的均值差異,而配對T檢驗則適用於比較同一組樣本在不同條件下的均值差異。在Python中,我們可以利用scipy庫進行T檢驗的實現和結果判斷。通過比較P值與顯著性水平,我們可以判斷兩組樣本均值是否存在顯著差異。T值的大小也對判斷兩組樣本均值差異的統計學意義起著重要作... ...
  • STL STL提供了六大組件,彼此之間可以組合套用,這六大組件分別是:容器、演算法、迭代器、仿函數、適配器、空間配置器。 數據結構和容器管理:STL 提供了多種數據結構和容器,如向量(vector)、鏈表(list)、集合(set)、映射(map)等。這些容器可以幫助程式員方便地存儲和管理數據,根據需 ...
  • import java.awt.Color; import java.awt.Font; import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; import java.util.Lis ...
  • 介面 在 Java 中,實現抽象的另一種方式是使用介面。 介面定義 介面是一個完全抽象的類,用於將具有空方法體的相關方法分組: // 介面 interface Animal { public void animalSound(); // 介面方法(沒有具體實現體) public void run() ...
  • 既然Python是一門全球流行的語言,那麼對於網路通信的HTTP的支持肯定也是非常的優秀的。Python中原生的urllib模塊也有對HTTP的支持,雖然也可以用來發送 HTTP 請求,但使用起來相對繁瑣,並且 API 設計不夠直觀。 requests 庫的出現填補了 Python 在 HTTP 請 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...