Flink處理函數解析(ProcessFunction和KeyedProcessFunction)

来源:https://www.cnblogs.com/toycon/archive/2023/12/20/17917609.html
-Advertisement-
Play Games

Flink中的處理函數(ProcessFunction和KeyedProcessFunction)在對於數據進行顆粒化的精確計算時使用較多,處理函數提供了一個定時服務(TimerService),可以向未來註冊一個定時服務, ...


Flink中的處理函數(ProcessFunction和KeyedProcessFunction)在對於數據進行顆粒化的精確計算時使用較多,處理函數提供了一個定時服務(TimerService),可以向未來註冊一個定時服務,我們可以把它理解為一個鬧鐘,當鬧鐘響起時,就調用ProcessFunction中的onTimer()方法,會對數據進行一些計算。我們來解析一下這兩個函數。

本文基於Flink1.14版本

ProcessFunction

ProcessFunction是Flink中的較為底層的API,當我們對於DataStream調用process函數的時候,需要在裡面傳入一個對象,即new ProcessFunction[] {},ProcessFunction是一個抽象類,我們看一下這個抽象類的源碼:

源碼解析

@PublicEvolving
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
    private static final long serialVersionUID = 1L;

    public ProcessFunction() {
    }

    public abstract void processElement(I var1, ProcessFunction<I, O>.Context var2, Collector<O> var3) throws Exception;

    public void onTimer(long timestamp, ProcessFunction<I, O>.OnTimerContext ctx, Collector<O> out) throws Exception {
    }

    public abstract class OnTimerContext extends ProcessFunction<I, O>.Context {
        public OnTimerContext() {
            super();
        }

        public abstract TimeDomain timeDomain();
    }

    public abstract class Context {
        public Context() {
        }

        public abstract Long timestamp();

        public abstract TimerService timerService();

        public abstract <X> void output(OutputTag<X> var1, X var2);
    }
}

我們可以看到ProcessFunction這個抽象類有兩個泛型參數<I, O>分別是輸入(Input)和輸出(Output)以及它繼承了AbstractRichFunction。
ProcessFunction中processElement這個方法是一個抽象方法,代表實現這個抽象類時必須實現這個方法。這個方法的參數有三個I var1, ProcessFunction<I, O>.Context var2, Collector<O> var3,我們來解析一下:var1的類型是上面抽象類的泛型I,代表輸入的數據,var2的類型是ProcessFunction<I, O>.Context,它是 Flink 中 ProcessFunction 的一個內部抽象類,用於為 processElement 方法提供上下文信息。

ProcessFunction<I, O>.Context 的主要功能:

1.獲取時間戳:

timestamp(): 這個方法返回當前正在處理的事件的時間戳。如果您使用的是事件時間(EventTime),這個時間戳就是事件本身的時間戳;如果是處理時間(ProcessingTime),這個值可能是 null。

2.定時服務訪問:

timerService(): 提供對 TimerService 的訪問,您可以用它來註冊定時器和查詢當前時間。這對於需要基於時間做決策的應用來說非常重要。

3.側輸出:

output(OutputTag tag, X value): 允許您將數據發送到非主輸出(即側輸出)。側輸出可以用於將數據發送到多個不同的流,或者用於處理特殊情況的數據,例如錯誤記錄或監控事件。

最後Collector<O> 是一個泛型介面,其中 O 表示輸出數據類型,可以使用 Collector 來發射任何類型的數據。Collector 允許從一個輸入產生多個輸出,這在處理複雜的邏輯或進行多階段處理時非常有用。

我們來看一個例子:

現在我們有訂單數據,需要處理每個訂單字元串,如果訂單金額超過100,則將其標記為"大訂單",否則標記為"小訂單"。

假設我們的訂單數據是一個簡單的字元串格式,包含訂單ID和訂單金額,格式為 "orderId,amount":

    // 示例訂單數據流
    val orderDataStream: DataStream[String] = env.fromElements(
      "order1,100.5",
      "order2,50.5",
      "order3,200.0"
    )
1.定義樣例類

目的是為了將數據轉為對象,便於操作

case class Order(orderId: String, amount: Double)
2.使用 ProcessFunction 處理訂單數據
    val processedStream = orderDataStream
      .process(new OrderProcessFunction)  //OrderProcessFunction為自己實現的ProcessFunction
3.實現OrderProcessFunction
class OrderProcessFunction extends ProcessFunction[String, String] {
  override def processElement(
      value: String,
      ctx: ProcessFunction[String, String]#Context,
      out: Collector[String]): Unit = {

    // 解析訂單數據
    val parts = value.split(",")	//使用逗號分割數據
    val order = Order(parts(0), parts(1).toDouble)	//將數據轉為對象

    // 根據金額分類訂單
    val orderType = if (order.amount > 100) "大訂單" else "小訂單"
    out.collect(s"訂單ID: ${order.orderId}, 類型: $orderType")	//將數據發送輸出
  }
}
3.列印處理後的結果
    processedStream.print()

結果為:

訂單ID: order1, 類型: 大訂單
訂單ID: order2, 類型: 小訂單
訂單ID: order3, 類型: 大訂單
完整代碼:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector

// 定義訂單數據的樣例類
case class Order(orderId: String, amount: Double)

object OrderProcessing {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 示例訂單數據流
    val orderDataStream: DataStream[String] = env.fromElements(
      "order1,100.5",
      "order2,50.5",
      "order3,200.0"
    )

    // 使用 ProcessFunction 處理訂單數據
    val processedStream = orderDataStream
      .process(new OrderProcessFunction)

    // 列印處理後的結果
    processedStream.print()

    env.execute("Order Processing")
  }
}

class OrderProcessFunction extends ProcessFunction[String, String] {
  override def processElement(
      value: String,
      ctx: ProcessFunction[String, String]#Context,
      out: Collector[String]): Unit = {

    // 解析訂單數據
    val parts = value.split(",")
    val order = Order(parts(0), parts(1).toDouble)

    // 根據金額分類訂單
    val orderType = if (order.amount > 100) "大訂單" else "小訂單"
    out.collect(s"訂單ID: ${order.orderId}, 類型: $orderType")
  }
}



KeyedProcessFunction

KeyedProcessFunction允許對一個鍵控(keyed)流的每個元素進行操作,並提供對狀態和定時器的訪問。這使得它非常適合處理需要細粒度控制的場景。

鍵控流:在 Flink 中,一個鍵控流是通過 keyBy 方法從一個普通流創建的。在鍵控流中,所有的元素都根據指定的鍵分組。

KeyedProcessFunction 有兩個主要方法:

processElement:

輸入:每個流元素,以及上下文信息(Context)。
功能:處理每個元素,可以訪問當前元素的鍵、狀態和定時器。
使用場景:適用於每個事件的獨立處理,如更新狀態、發射輸出、註冊定時器等。

onTimer:

輸入:定時器觸發的時間戳,以及定時器上下文信息(OnTimerContext)。
功能:在特定時間點觸發的邏輯處理。
使用場景:適用於需要基於時間的操作,如超時檢測、定時輸出等。

KeyedProcessFunction 還有附加特性:

狀態管理:KeyedProcessFunction 支持 Flink 的狀態管理,可以為每個鍵保存和更新狀態。這在需要跟蹤每個鍵的歷史信息或執行聚合操作時非常有用。

定時器:可以註冊處理時間(processing time)或事件時間(event time)定時器。定時器在指定的時間點觸發,執行 onTimer 方法中的邏輯。

側輸出(Side Outputs):除了主輸出流外,還可以發射數據到側輸出標簽。這允許您處理如異常數據、監控事件等特殊用途的輸出。

源碼解析

我們來看一下KeyedProcessFunction的源碼:

@PublicEvolving
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
    private static final long serialVersionUID = 1L;

    public KeyedProcessFunction() {
    }

    public abstract void processElement(I var1, KeyedProcessFunction<K, I, O>.Context var2, Collector<O> var3) throws Exception;

    public void onTimer(long timestamp, KeyedProcessFunction<K, I, O>.OnTimerContext ctx, Collector<O> out) throws Exception {
    }

    public abstract class OnTimerContext extends KeyedProcessFunction<K, I, O>.Context {
        public OnTimerContext() {
            super();
        }

        public abstract TimeDomain timeDomain();

        public abstract K getCurrentKey();
    }

    public abstract class Context {
        public Context() {
        }

        public abstract Long timestamp();

        public abstract TimerService timerService();

        public abstract <X> void output(OutputTag<X> var1, X var2);

        public abstract K getCurrentKey();
    }
}

KeyedProcessFunction<K, I, O> 是一個抽象類,用於處理鍵控流中的元素。它允許對每個元素進行複雜的處理,並提供了對定時器和狀態的控制。其中 K (即"Key)表示鍵的類型,I 表示輸入元素的類型,O 表示輸出元素的類型。

KeyedProcessFunction繼承自 AbstractRichFunction,表明 KeyedProcessFunction 可以訪問 Flink 的富函數(rich function)特性,如生命周期方法和運行時上下文。

processElement方法和onTimer方法與ProcessFunction中的類似

OnTimerContext類 是 Context 的子類,為定時器提供額外的上下文信息。其中的timeDomain() 能返回當前定時器的時間域(處理時間或事件時間)。getCurrentKey() 可以返回當前處理的鍵。

Context 類中的 timestamp() 用於返回當前處理元素的時間戳。timerService()提供對定時器服務的訪問,允許註冊和取消定時器。output(OutputTag<X> var1, X var2)允許將數據發送到側輸出。getCurrentKey()返回當前處理的鍵。



下麵我們來看一個使用KeyedProcessFunction處理訂單數據的例子:

例子二:

假設我們有訂單數據流,每個訂單包含訂單ID、用戶ID、訂單金額和時間戳。現在我們需要計算每個用戶的訂單總金額,同時對於每個用戶,如果在指定時間內沒有新的訂單,發出一個警告消息,以及對於超過特定金額的訂單,將其發送到一個側輸出。

數據源:

    // 示例訂單數據流
    val orderDataStream: DataStream[Order] = env.fromElements(
      Order("order1", "user1", 50.0, 1000),
      Order("order2", "user2", 300.0, 2000),
      Order("order3", "user1", 200.0, 3000),
    )
1.定義訂單樣例類
case class Order(orderId: String, userId: String, amount: Double, timestamp: Long)
2.定義側輸出標簽
val largeOrderTag = new OutputTag[String]("large-order")
3.使用 ProcessFunction 處理訂單數據
    val processedStream = orderDataStream
      .keyBy(_.userId)
      .process(new ComplexOrderProcessFunction)
4.實現 ComplexOrderProcessFunction
class ComplexOrderProcessFunction extends KeyedProcessFunction[String, Order, String] {
  // 每個用戶的訂單總額
  val userAmounts = mutable.HashMap[String, Double]()
  // 定時器時間
  val timerInterval: Long = 60000 // 1分鐘

  override def processElement(
      order: Order,
      ctx: KeyedProcessFunction[String, Order, String]#Context,
      out: Collector[String]): Unit = {

    // 更新用戶的訂單總額
    val totalAmount = userAmounts.getOrElse(order.userId, 0.0) + order.amount
    userAmounts(order.userId) = totalAmount

    // 檢查訂單是否超過特定金額,如果是,則輸出到側輸出
    if (order.amount > 250.0) {
      ctx.output(largeOrderTag, s"大額訂單: ${order.orderId} - ${order.amount}")
    }

    // 註冊定時器,當前時間加上間隔
    ctx.timerService().registerEventTimeTimer(order.timestamp + timerInterval)

    // 輸出用戶的訂單總額
    out.collect(s"用戶 ${order.userId} 的訂單總額: $totalAmount")
  }

  override def onTimer(
      timestamp: Long,
      ctx: KeyedProcessFunction[String, Order, String]#OnTimerContext,
      out: Collector[String]): Unit = {
    // 定時器觸發,發出警告
    out.collect(s"用戶 ${ctx.getCurrentKey} 在過去的一分鐘內沒有新的訂單。")
  }
}

5.列印輸出
    // 列印主輸出
    processedStream.print()

    // 列印側輸出
    processedStream.getSideOutput(largeOrderTag).print("large-orders")

在這個例子中:

  • 當一個訂單到達時,我們更新基於用戶ID的訂單總額。
  • 如果訂單金額超過 250,則將其作為大額訂單輸出到側輸出。
  • 為每個用戶設置一個定時器,如果在一分鐘內沒有新訂單,則發出警告。
  • 在主輸出中,我們發射每個用戶的訂單總額。
結果:
large-orders:1> 大額訂單: order2 - 300.0
1> 用戶 user2 的訂單總額: 300.0
1> 用戶 user2 在過去的一分鐘內沒有新的訂單。
12> 用戶 user1 的訂單總額: 50.0
12> 用戶 user1 的訂單總額: 250.0
12> 用戶 user1 在過去的一分鐘內沒有新的訂單。
12> 用戶 user1 在過去的一分鐘內沒有新的訂單。
完整代碼:
// 定義訂單樣例類
case class Order(orderId: String, userId: String, amount: Double, timestamp: Long)

object ComplexOrderProcessing {

  // 側輸出標簽
  val largeOrderTag = new OutputTag[String]("large-order")

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 示例訂單數據流
    val orderDataStream: DataStream[Order] = env.fromElements(
      Order("order1", "user1", 50.0, 1000),
      Order("order2", "user2", 300.0, 2000),
      Order("order3", "user1", 200.0, 3000),
    )

    // 使用 ProcessFunction 處理訂單數據
    val processedStream = orderDataStream
      .keyBy(_.userId)
      .process(new ComplexOrderProcessFunction)

    // 列印主輸出
    processedStream.print()

    // 列印側輸出
    processedStream.getSideOutput(largeOrderTag).print("large-orders")

    env.execute("Complex Order Processing")
  }
}

class ComplexOrderProcessFunction extends KeyedProcessFunction[String, Order, String] {
  // 每個用戶的訂單總額
  val userAmounts = mutable.HashMap[String, Double]()
  // 定時器時間
  val timerInterval: Long = 60000 // 1分鐘

  override def processElement(
                               order: Order,
                               ctx: KeyedProcessFunction[String, Order, String]#Context,
                               out: Collector[String]): Unit = {

    // 更新用戶的訂單總額
    val totalAmount = userAmounts.getOrElse(order.userId, 0.0) + order.amount
    userAmounts(order.userId) = totalAmount

    // 檢查訂單是否超過特定金額,如果是,則輸出到側輸出
    if (order.amount > 250.0) {
      ctx.output(largeOrderTag, s"大額訂單: ${order.orderId} - ${order.amount}")
    }

    // 註冊定時器,當前時間加上間隔
    ctx.timerService().registerEventTimeTimer(order.timestamp + timerInterval)

    // 輸出用戶的訂單總額
    out.collect(s"用戶 ${order.userId} 的訂單總額: $totalAmount")
  }

  override def onTimer(
                        timestamp: Long,
                        ctx: KeyedProcessFunction[String, Order, String]#OnTimerContext,
                        out: Collector[String]): Unit = {
    // 定時器觸發,發出警告
    out.collect(s"用戶 ${ctx.getCurrentKey} 在過去的一分鐘內沒有新的訂單。")
  }
}

以上就是本文的全部內容啦(͏ ˉ ꈊ ˉ)✧˖°


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • Qt 是一個跨平臺C++圖形界面開發庫,利用Qt可以快速開發跨平臺窗體應用程式,在Qt中我們可以通過拖拽的方式將不同組件放到指定的位置,實現圖形化開發極大的方便了開發效率,本章將重點介紹`TableWidget`表格組件的常用方法及靈活運用。`QTableWidget` 是 Qt 中用於顯示表格數據... ...
  • QMdiArea(Multiple Document Interface Area)是Qt中用於創建多文檔界面的組件。它提供了一種在單個視窗中管理多個文檔的方式,每個文檔通常是一個子視窗(`QMdiSubWindow`)。該組件主要用於設計多文檔界面應用程式,具備有多種窗體展示風格,實現了在父窗體中... ...
  • 代理在電腦網路很常見,比如伺服器群組內部通常只會開一個口進行對外訪問,就可以通過內網代理來進行處理,從而更好的保護內網伺服器。代理讓我們網路更安全,但是警惕非正規的代理可能會竊取您的數據。請用HTTPS內容訪問更安全。 ...
  • 1. 選擇結構 If(...) Begin ​ 語句塊 ​ End ​ else if(...) Begin ​ 語句塊 ​ End ​ Else ​ Begin ​ 語句塊 ​ End; 註意事項 語法中begin..end相當於C#中的{} 執行語句只有一條時,begin..end可以省略 () ...
  • create database step2_unit12; go use step2_unit12; go -- 部門表 CREATE TABLE [dbo].[Department]( [Id] [int] PRIMARY KEY IDENTITY(1,1) NOT NULL, [Name] [v ...
  • create database step2_unit13; go use step2_unit13; go -- 創建數據表 CREATE TABLE account ( id INT PRIMARY KEY identity, NAME VARCHAR(10), balance decimal(1 ...
  • 概述:.NET依賴註入(DI)通過反射自動註冊服務,示例展示了註冊指定類、帶特性類、項目下所有介面實現的類。簡化配置,提高可維護性。 在.NET中,進行依賴註入(DI)的自動註冊,可以通過反射機制和程式集掃描來實現。以下是詳細的步驟以及相應的C#源代碼示例,包括註冊指定類、註冊帶有自定義特性的類、以 ...
  • 問題 在調試接收串口數據的Qt程式中發現,數據存在延遲和粘包現象。下位機發送數據包頻率是100Hz,一包56位元組,波特率115200,在列印port->readAll()的值的時候發現並不是每10ms讀到一包數據,而是大概每50ms左右一次接收到5包數據,在其他電腦上調試,以及下載其他串口助手調試後 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...