Flink中的處理函數(ProcessFunction和KeyedProcessFunction)在對於數據進行顆粒化的精確計算時使用較多,處理函數提供了一個定時服務(TimerService),可以向未來註冊一個定時服務, ...
Flink中的處理函數(ProcessFunction和KeyedProcessFunction)在對於數據進行顆粒化的精確計算時使用較多,處理函數提供了一個定時服務(TimerService),可以向未來註冊一個定時服務,我們可以把它理解為一個鬧鐘,當鬧鐘響起時,就調用ProcessFunction中的onTimer()方法,會對數據進行一些計算。我們來解析一下這兩個函數。
本文基於Flink1.14版本
ProcessFunction
ProcessFunction是Flink中的較為底層的API,當我們對於DataStream調用process函數的時候,需要在裡面傳入一個對象,即new ProcessFunction[] {}
,ProcessFunction是一個抽象類,我們看一下這個抽象類的源碼:
源碼解析
@PublicEvolving
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
public ProcessFunction() {
}
public abstract void processElement(I var1, ProcessFunction<I, O>.Context var2, Collector<O> var3) throws Exception;
public void onTimer(long timestamp, ProcessFunction<I, O>.OnTimerContext ctx, Collector<O> out) throws Exception {
}
public abstract class OnTimerContext extends ProcessFunction<I, O>.Context {
public OnTimerContext() {
super();
}
public abstract TimeDomain timeDomain();
}
public abstract class Context {
public Context() {
}
public abstract Long timestamp();
public abstract TimerService timerService();
public abstract <X> void output(OutputTag<X> var1, X var2);
}
}
我們可以看到ProcessFunction這個抽象類有兩個泛型參數<I, O>
分別是輸入(Input)和輸出(Output)以及它繼承了AbstractRichFunction。
ProcessFunction中processElement這個方法是一個抽象方法,代表實現這個抽象類時必須實現這個方法。這個方法的參數有三個I var1, ProcessFunction<I, O>.Context var2, Collector<O> var3
,我們來解析一下:var1
的類型是上面抽象類的泛型I,代表輸入的數據,var2
的類型是ProcessFunction<I, O>.Context
,它是 Flink 中 ProcessFunction 的一個內部抽象類,用於為 processElement 方法提供上下文信息。
ProcessFunction<I, O>.Context 的主要功能:
1.獲取時間戳:
timestamp(): 這個方法返回當前正在處理的事件的時間戳。如果您使用的是事件時間(EventTime),這個時間戳就是事件本身的時間戳;如果是處理時間(ProcessingTime),這個值可能是 null。
2.定時服務訪問:
timerService(): 提供對 TimerService 的訪問,您可以用它來註冊定時器和查詢當前時間。這對於需要基於時間做決策的應用來說非常重要。
3.側輸出:
output(OutputTag
最後Collector<O>
是一個泛型介面,其中 O 表示輸出數據類型,可以使用 Collector
來發射任何類型的數據。Collector
允許從一個輸入產生多個輸出,這在處理複雜的邏輯或進行多階段處理時非常有用。
我們來看一個例子:
現在我們有訂單數據,需要處理每個訂單字元串,如果訂單金額超過100,則將其標記為"大訂單",否則標記為"小訂單"。
假設我們的訂單數據是一個簡單的字元串格式,包含訂單ID和訂單金額,格式為 "orderId,amount":
// 示例訂單數據流
val orderDataStream: DataStream[String] = env.fromElements(
"order1,100.5",
"order2,50.5",
"order3,200.0"
)
1.定義樣例類
目的是為了將數據轉為對象,便於操作
case class Order(orderId: String, amount: Double)
2.使用 ProcessFunction 處理訂單數據
val processedStream = orderDataStream
.process(new OrderProcessFunction) //OrderProcessFunction為自己實現的ProcessFunction
3.實現OrderProcessFunction
class OrderProcessFunction extends ProcessFunction[String, String] {
override def processElement(
value: String,
ctx: ProcessFunction[String, String]#Context,
out: Collector[String]): Unit = {
// 解析訂單數據
val parts = value.split(",") //使用逗號分割數據
val order = Order(parts(0), parts(1).toDouble) //將數據轉為對象
// 根據金額分類訂單
val orderType = if (order.amount > 100) "大訂單" else "小訂單"
out.collect(s"訂單ID: ${order.orderId}, 類型: $orderType") //將數據發送輸出
}
}
3.列印處理後的結果
processedStream.print()
結果為:
訂單ID: order1, 類型: 大訂單
訂單ID: order2, 類型: 小訂單
訂單ID: order3, 類型: 大訂單
完整代碼:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector
// 定義訂單數據的樣例類
case class Order(orderId: String, amount: Double)
object OrderProcessing {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 示例訂單數據流
val orderDataStream: DataStream[String] = env.fromElements(
"order1,100.5",
"order2,50.5",
"order3,200.0"
)
// 使用 ProcessFunction 處理訂單數據
val processedStream = orderDataStream
.process(new OrderProcessFunction)
// 列印處理後的結果
processedStream.print()
env.execute("Order Processing")
}
}
class OrderProcessFunction extends ProcessFunction[String, String] {
override def processElement(
value: String,
ctx: ProcessFunction[String, String]#Context,
out: Collector[String]): Unit = {
// 解析訂單數據
val parts = value.split(",")
val order = Order(parts(0), parts(1).toDouble)
// 根據金額分類訂單
val orderType = if (order.amount > 100) "大訂單" else "小訂單"
out.collect(s"訂單ID: ${order.orderId}, 類型: $orderType")
}
}
KeyedProcessFunction
KeyedProcessFunction允許對一個鍵控(keyed)流的每個元素進行操作,並提供對狀態和定時器的訪問。這使得它非常適合處理需要細粒度控制的場景。
鍵控流:在 Flink 中,一個鍵控流是通過 keyBy 方法從一個普通流創建的。在鍵控流中,所有的元素都根據指定的鍵分組。
KeyedProcessFunction 有兩個主要方法:
processElement:
輸入:每個流元素,以及上下文信息(Context)。
功能:處理每個元素,可以訪問當前元素的鍵、狀態和定時器。
使用場景:適用於每個事件的獨立處理,如更新狀態、發射輸出、註冊定時器等。
onTimer:
輸入:定時器觸發的時間戳,以及定時器上下文信息(OnTimerContext)。
功能:在特定時間點觸發的邏輯處理。
使用場景:適用於需要基於時間的操作,如超時檢測、定時輸出等。
KeyedProcessFunction 還有附加特性:
狀態管理:KeyedProcessFunction 支持 Flink 的狀態管理,可以為每個鍵保存和更新狀態。這在需要跟蹤每個鍵的歷史信息或執行聚合操作時非常有用。
定時器:可以註冊處理時間(processing time)或事件時間(event time)定時器。定時器在指定的時間點觸發,執行 onTimer 方法中的邏輯。
側輸出(Side Outputs):除了主輸出流外,還可以發射數據到側輸出標簽。這允許您處理如異常數據、監控事件等特殊用途的輸出。
源碼解析
我們來看一下KeyedProcessFunction的源碼:
@PublicEvolving
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
public KeyedProcessFunction() {
}
public abstract void processElement(I var1, KeyedProcessFunction<K, I, O>.Context var2, Collector<O> var3) throws Exception;
public void onTimer(long timestamp, KeyedProcessFunction<K, I, O>.OnTimerContext ctx, Collector<O> out) throws Exception {
}
public abstract class OnTimerContext extends KeyedProcessFunction<K, I, O>.Context {
public OnTimerContext() {
super();
}
public abstract TimeDomain timeDomain();
public abstract K getCurrentKey();
}
public abstract class Context {
public Context() {
}
public abstract Long timestamp();
public abstract TimerService timerService();
public abstract <X> void output(OutputTag<X> var1, X var2);
public abstract K getCurrentKey();
}
}
KeyedProcessFunction<K, I, O>
是一個抽象類,用於處理鍵控流中的元素。它允許對每個元素進行複雜的處理,並提供了對定時器和狀態的控制。其中 K (即"Key)表示鍵的類型,I 表示輸入元素的類型,O 表示輸出元素的類型。
KeyedProcessFunction繼承自 AbstractRichFunction,表明 KeyedProcessFunction 可以訪問 Flink 的富函數(rich function)特性,如生命周期方法和運行時上下文。
processElement方法和onTimer
方法與ProcessFunction中的類似
OnTimerContext
類 是 Context
的子類,為定時器提供額外的上下文信息。其中的timeDomain()
能返回當前定時器的時間域(處理時間或事件時間)。getCurrentKey()
可以返回當前處理的鍵。
Context
類中的 timestamp()
用於返回當前處理元素的時間戳。timerService()
提供對定時器服務的訪問,允許註冊和取消定時器。output(OutputTag<X> var1, X var2)
允許將數據發送到側輸出。getCurrentKey()
返回當前處理的鍵。
下麵我們來看一個使用KeyedProcessFunction處理訂單數據的例子:
例子二:
假設我們有訂單數據流,每個訂單包含訂單ID、用戶ID、訂單金額和時間戳。現在我們需要計算每個用戶的訂單總金額,同時對於每個用戶,如果在指定時間內沒有新的訂單,發出一個警告消息,以及對於超過特定金額的訂單,將其發送到一個側輸出。
數據源:
// 示例訂單數據流
val orderDataStream: DataStream[Order] = env.fromElements(
Order("order1", "user1", 50.0, 1000),
Order("order2", "user2", 300.0, 2000),
Order("order3", "user1", 200.0, 3000),
)
1.定義訂單樣例類
case class Order(orderId: String, userId: String, amount: Double, timestamp: Long)
2.定義側輸出標簽
val largeOrderTag = new OutputTag[String]("large-order")
3.使用 ProcessFunction 處理訂單數據
val processedStream = orderDataStream
.keyBy(_.userId)
.process(new ComplexOrderProcessFunction)
4.實現 ComplexOrderProcessFunction
class ComplexOrderProcessFunction extends KeyedProcessFunction[String, Order, String] {
// 每個用戶的訂單總額
val userAmounts = mutable.HashMap[String, Double]()
// 定時器時間
val timerInterval: Long = 60000 // 1分鐘
override def processElement(
order: Order,
ctx: KeyedProcessFunction[String, Order, String]#Context,
out: Collector[String]): Unit = {
// 更新用戶的訂單總額
val totalAmount = userAmounts.getOrElse(order.userId, 0.0) + order.amount
userAmounts(order.userId) = totalAmount
// 檢查訂單是否超過特定金額,如果是,則輸出到側輸出
if (order.amount > 250.0) {
ctx.output(largeOrderTag, s"大額訂單: ${order.orderId} - ${order.amount}")
}
// 註冊定時器,當前時間加上間隔
ctx.timerService().registerEventTimeTimer(order.timestamp + timerInterval)
// 輸出用戶的訂單總額
out.collect(s"用戶 ${order.userId} 的訂單總額: $totalAmount")
}
override def onTimer(
timestamp: Long,
ctx: KeyedProcessFunction[String, Order, String]#OnTimerContext,
out: Collector[String]): Unit = {
// 定時器觸發,發出警告
out.collect(s"用戶 ${ctx.getCurrentKey} 在過去的一分鐘內沒有新的訂單。")
}
}
5.列印輸出
// 列印主輸出
processedStream.print()
// 列印側輸出
processedStream.getSideOutput(largeOrderTag).print("large-orders")
在這個例子中:
- 當一個訂單到達時,我們更新基於用戶ID的訂單總額。
- 如果訂單金額超過 250,則將其作為大額訂單輸出到側輸出。
- 為每個用戶設置一個定時器,如果在一分鐘內沒有新訂單,則發出警告。
- 在主輸出中,我們發射每個用戶的訂單總額。
結果:
large-orders:1> 大額訂單: order2 - 300.0
1> 用戶 user2 的訂單總額: 300.0
1> 用戶 user2 在過去的一分鐘內沒有新的訂單。
12> 用戶 user1 的訂單總額: 50.0
12> 用戶 user1 的訂單總額: 250.0
12> 用戶 user1 在過去的一分鐘內沒有新的訂單。
12> 用戶 user1 在過去的一分鐘內沒有新的訂單。
完整代碼:
// 定義訂單樣例類
case class Order(orderId: String, userId: String, amount: Double, timestamp: Long)
object ComplexOrderProcessing {
// 側輸出標簽
val largeOrderTag = new OutputTag[String]("large-order")
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 示例訂單數據流
val orderDataStream: DataStream[Order] = env.fromElements(
Order("order1", "user1", 50.0, 1000),
Order("order2", "user2", 300.0, 2000),
Order("order3", "user1", 200.0, 3000),
)
// 使用 ProcessFunction 處理訂單數據
val processedStream = orderDataStream
.keyBy(_.userId)
.process(new ComplexOrderProcessFunction)
// 列印主輸出
processedStream.print()
// 列印側輸出
processedStream.getSideOutput(largeOrderTag).print("large-orders")
env.execute("Complex Order Processing")
}
}
class ComplexOrderProcessFunction extends KeyedProcessFunction[String, Order, String] {
// 每個用戶的訂單總額
val userAmounts = mutable.HashMap[String, Double]()
// 定時器時間
val timerInterval: Long = 60000 // 1分鐘
override def processElement(
order: Order,
ctx: KeyedProcessFunction[String, Order, String]#Context,
out: Collector[String]): Unit = {
// 更新用戶的訂單總額
val totalAmount = userAmounts.getOrElse(order.userId, 0.0) + order.amount
userAmounts(order.userId) = totalAmount
// 檢查訂單是否超過特定金額,如果是,則輸出到側輸出
if (order.amount > 250.0) {
ctx.output(largeOrderTag, s"大額訂單: ${order.orderId} - ${order.amount}")
}
// 註冊定時器,當前時間加上間隔
ctx.timerService().registerEventTimeTimer(order.timestamp + timerInterval)
// 輸出用戶的訂單總額
out.collect(s"用戶 ${order.userId} 的訂單總額: $totalAmount")
}
override def onTimer(
timestamp: Long,
ctx: KeyedProcessFunction[String, Order, String]#OnTimerContext,
out: Collector[String]): Unit = {
// 定時器觸發,發出警告
out.collect(s"用戶 ${ctx.getCurrentKey} 在過去的一分鐘內沒有新的訂單。")
}
}
以上就是本文的全部內容啦(͏ ˉ ꈊ ˉ)✧˖°