在 Apache Flink 中實現高效的 Top N 數據處理,尤其是涉及時間視窗和多條件排序時,需要精細地控制數據流和狀態管理。 普通計算TopN: 1. 定義數據源(Source) 首先,我們需要定義數據源。這可能是 Kafka 流、文件、資料庫或任何其他支持的數據源。 val stream: ...
在 Apache Flink 中實現高效的 Top N 數據處理,尤其是涉及時間視窗和多條件排序時,需要精細地控制數據流和狀態管理。
普通計算TopN:
1. 定義數據源(Source)
首先,我們需要定義數據源。這可能是 Kafka 流、文件、資料庫或任何其他支持的數據源。
val stream: DataStream[YourType] = env.addSource(...)
2. 定義業務邏輯(Transformation)
接下來,我們需要根據業務需求對數據進行轉換。這可能包括映射、過濾、聚合等操作。
val transformedStream: DataStream[YourTransformedType] = stream
.map(...) // 例如,映射操作
.filter(...) // 例如,過濾操作
// 其他轉換操作...
3. 計算 Top N
在 Flink 中,計算 Top N 可以通過使用 KeyedProcessFunction 或 Window(視窗)實現。以下是兩種常見的方法:
方法 A:使用 KeyedProcessFunction
定義一個狀態來存儲當前 Top N 的元素。這通常是一個 ListState 或 MapState。
使用 keyBy 函數對數據進行分組。這是根據某個鍵(如用戶 ID、產品類別等)進行分組。
使用 process 函數處理每個元素。在這個函數中,會去更新狀態,並保留當前的 Top N 元素。
stream
.keyBy(...) // 分組鍵
.process(new KeyedProcessFunction[KeyType, InputType, OutputType] {
// 定義狀態
private var state: ListState[YourType] = ...
override def processElement(value: InputType, ctx: Context, out: Collector[OutputType]): Unit = {
// 更新狀態
// 計算 Top N
}
})
方法 B:使用 Time Window(時間視窗)
如果我們的 Top N 計算是基於某個時間範圍內的數據(例如,每5分鐘的 Top N),則可以使用時間視窗。
使用 window 函數定義視窗。這可以是滾動視窗(Tumbling Window)、滑動視窗(Sliding Window)等。
在視窗上應用聚合或其他操作,以計算每個視窗的 Top N。
stream
.keyBy(...) // 分組鍵
.timeWindow(Time.minutes(5)) // 定義時間視窗
.process(new ProcessWindowFunction[...] {
override def process(key: KeyType, context: Context, elements: Iterable[InputType], out: Collector[OutputType]): Unit = {
// 計算每個視窗的 Top N
}
})
4. 輸出結果(Sink)
最後,將計算得到的 Top N 結果輸出到所需的目的地,如 Kafka、資料庫、控制台等。
topNStream.addSink(...)
關於劃分視窗計算TopN:
1. 數據預處理與視窗劃分
首先,對數據流進行 keyBy 操作,根據特定的鍵(如分類ID、事件ID等)對數據進行分組。然後,定義一個滑動視窗來平滑數據,併在這個視窗內應用增量聚合函數。
val aggregatedStream = dataStream
.keyBy(...) // 分組鍵
.timeWindow(Time.minutes(5), Time.minutes(1)) // 定義滑動視窗
.aggregate(new MyAggregateFunction, new MyWindowFunction)
其中,MyAggregateFunction
是一個實現了 AggregateFunction
介面的類,用於增量聚合視窗內的數據。MyWindowFunction
是一個實現了 WindowFunction
介面的類,用於在視窗觸發時獲取視窗信息並輸出中間結果。
2. 定義 ProcessFunction 以處理視窗數據
在得到每個視窗的聚合結果後,使用 ProcessFunction 來處理這些數據。在這個函數中,可以使用 ValueState 來存儲每個視窗的數據,並註冊一個定時器來控制何時輸出 Top N 結果。
val topNStream = aggregatedStream
.keyBy(...) // 根據需要的鍵進行二次分組
.process(new KeyedProcessFunction[KeyType, InputType, OutputType] {
// 定義狀態
private var windowDataState: ValueState[List[YourType]] = ...
override def processElement(value: InputType, ctx: Context, out: Collector[OutputType]): Unit = {
// 更新狀態
// 註冊定時器
ctx.timerService().registerEventTimeTimer(value.windowEndTime + 1)
}
override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OutputType]): Unit = {
// 排序並輸出 Top N
}
})
3. 數據排序與 Top N 輸出
在 onTimer 方法中,當定時器觸發時,對存儲在 ValueState 中的數據進行排序,並輸出每個視窗的 Top N 結果。這一步驟確保了只有當一個視窗的所有數據都已經到達時,才進行排序和輸出。
4. 狀態和容錯處理
由於在實時計算中,狀態管理和容錯是關鍵考慮因素,確保狀態管理策略(如使用 ListState 或 ValueState)與容錯需求(如檢查點和保存點)相匹配。
5. 考慮 Watermarks 和數據延遲
由於 Flink 中的時間管理很重要,確保合理地設置 Watermarks,以處理亂序事件和數據延遲。這對於確保定時器準確觸發和視窗正確計算至關重要。
兩次 keyBy 解釋:
在上面的思路中做了兩次 keyBy ,主要是為了在不同維度上進一步細化數據處理邏輯,原因:
1. 針對不同維度的數據處理
初次 keyBy 通常是根據主要維度(如用戶 ID、商品類別等)進行分組,以便在這些維度上進行聚合或其他處理。而在某些情況下,聚合後的數據可能需要根據額外的維度進行進一步的處理。例如,可能需要根據聚合結果的時間視窗或其他業務邏輯相關的維度進行分析和處理。
2. 更精細的數據管理
二次分組允許對數據流進行更精細的切分,使得每個子流可以根據不同的業務邏輯進行獨立處理。這樣可以更靈活地應對複雜的業務需求,例如在不同時間視窗或不同事件類型上實施不同的邏輯。
3. 優化資源利用和性能
通過在不同的維度上分組,可以更有效地利用 Flink 的資源,比如任務的並行度和狀態管理。這種方式有助於提高整體的處理效率,減少不必要的資源浪費。
4. Top N 計算的特殊需求
在進行 Top N 計算時,尤其是當需要根據多個維度(如時間視窗、分類ID、事件ID等)進行排序和選擇時,二次分組變得尤為重要。這樣可以確保每個獨立的子流都有自己的 Top N 計算邏輯,更加精確地反映不同維度組合下的數據特性。
5. 提高容錯性和可維護性
二次分組有助於隔離不同數據流的處理,使得系統更加容錯,易於維護和調試。當處理複雜的數據流時,這種隔離可以使問題定位和解決變得更加容易。
現在我們來看一個例子:
假設我們要計算過去一小時內,每5分鐘更新一次,每個類別中銷售額最高的Top 3產品,並要求這些產品在不同地區的銷售額進行對比。
1. 定義案例類
case class Sale(productId: String, category: String, amount: Double, region: String, timestamp: Long)
2. 環境配置
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
3. 使用集合來模擬數據源
// 示例數據源
class SampleSource extends SourceFunction[Sale] {
override def run(ctx: SourceFunction.SourceContext[Sale]): Unit = {
// 第一個小時的數據
ctx.collect(Sale("product1", "category1", 100.0, "region1", 1609459200L)) // 2021-01-01 00:00:00
ctx.collect(Sale("product2", "category1", 150.0, "region1", 1609459800L)) // 2021-01-01 00:10:00
ctx.collect(Sale("product3", "category1", 120.0, "region2", 1609460400L)) // 2021-01-01 00:20:00
ctx.collect(Sale("product4", "category2", 200.0, "region1", 1609461000L)) // 2021-01-01 00:30:00
ctx.collect(Sale("product5", "category2", 250.0, "region2", 1609461600L)) // 2021-01-01 00:40:00
ctx.collect(Sale("product6", "category3", 300.0, "region1", 1609462200L)) // 2021-01-01 00:50:00
// 第二個小時的數據
ctx.collect(Sale("product7", "category1", 130.0, "region3", 1609462800L)) // 2021-01-01 01:00:00
ctx.collect(Sale("product8", "category2", 210.0, "region1", 1609463400L)) // 2021-01-01 01:10:00
ctx.collect(Sale("product9", "category3", 350.0, "region2", 1609464000L)) // 2021-01-01 01:20:00
ctx.collect(Sale("product10", "category3", 320.0, "region3", 1609464600L)) // 2021-01-01 01:30:00
ctx.collect(Sale("product11", "category1", 140.0, "region1", 1609465200L)) // 2021-01-01 01:40:00
ctx.collect(Sale("product12", "category2", 230.0, "region2", 1609465800L)) // 2021-01-01 01:50:00
}
override def cancel(): Unit = {}
}
4. 設置 WatermarkStrategy
val watermarkStrategy = WatermarkStrategy
.forBoundedOutOfOrderness[Sale](Duration.ofSeconds(10))
.withTimestampAssigner(new SerializableTimestampAssigner[Sale] {
override def extractTimestamp(element: Sale, recordTimestamp: Long): Long = element.timestamp * 1000
})
5. 添加數據源並應用時間戳和水印
val timedSalesStream = env.addSource(new SampleSource())
.assignTimestampsAndWatermarks(watermarkStrategy)
6. 處理數據流的數據
val topSales = timedSalesStream
.keyBy(_.category)
.window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5)))
.aggregate(new TopSalesAggregateFunction())
.map { result =>
result.map { case (category, salesList) =>
val topSalesByRegion = salesList
.groupBy(_.region) // 按地區分組
.mapValues(_.sortBy(-_.amount).take(3)) // 每個地區取銷售額最高的Top 3
.toList
.sortBy(_._1) // 按地區名稱排序
val formattedSales = topSalesByRegion.map { case (region, sales) =>
val salesInfo = sales.map(sale => s"${sale.productId} (amount: ${sale.amount})").mkString(", ")
s"Region: $region, Top Sales: $salesInfo"
}.mkString("; ")
s"Category: $category, $formattedSales"
}.mkString("\n")
}
7. 定義TopSalesAggregateFunction
class TopSalesAggregateFunction extends AggregateFunction[Sale, mutable.Map[String, mutable.PriorityQueue[Sale]], Map[String, List[Sale]]] {
override def createAccumulator(): mutable.Map[String, mutable.PriorityQueue[Sale]] = mutable.Map[String, mutable.PriorityQueue[Sale]]()
override def add(value: Sale, accumulator: mutable.Map[String, mutable.PriorityQueue[Sale]]): mutable.Map[String, mutable.PriorityQueue[Sale]] = {
accumulator.getOrElseUpdate(value.category, mutable.PriorityQueue[Sale]()(Ordering.by(-_.amount))).enqueue(value)
if (accumulator(value.category).size > 3) accumulator(value.category).dequeue()
accumulator
}
override def getResult(accumulator: mutable.Map[String, mutable.PriorityQueue[Sale]]): Map[String, List[Sale]] = {
accumulator.mapValues(_.toList.sortBy(-_.amount)).toMap
}
override def merge(a: mutable.Map[String, mutable.PriorityQueue[Sale]], b: mutable.Map[String, mutable.PriorityQueue[Sale]]): mutable.Map[String, mutable.PriorityQueue[Sale]] = {
for ((category, sales) <- b) {
val mergedQueue = a.getOrElseUpdate(category, mutable.PriorityQueue[Sale]()(Ordering.by(-_.amount)))
sales.foreach(mergedQueue.enqueue(_))
if (mergedQueue.size > 3) mergedQueue.dequeue()
}
a
}
}
8. 輸出與執行作業
topSales.print()
env.execute("Sales Top 3 Analysis")
現在我們來逐行解釋一下代碼:
案例類定義
case class Sale(productId: String, category: String, amount: Double, region: String, timestamp: Long)
case class Sale 定義了一個樣例類 Sale,用於表示銷售數據。它包含以下欄位:
productId
:產品的ID。
category
:產品所屬的類別。
amount
:銷售額。
region
:銷售發生的地區。
timestamp
:銷售發生的時間戳。
環境配置
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env
是 Flink 流執行環境的實例,它用於設置和執行流處理作業。
env.setParallelism(1) 設置作業的並行度為 1。這意味著作業將在單個任務槽中運行。
數據源定義
class SampleSource extends SourceFunction[Sale] {
// ...
}
這段代碼定義了一個名為 SampleSource
的自定義數據源,它實現了 SourceFunction
介面。這個數據源產生 Sale 類型的數據。
數據源的 run 方法
override def run(ctx: SourceFunction.SourceContext[Sale]): Unit = {
// 示例數據
}
run 方法負責在流執行期間不斷發送 Sale 數據。這裡通過調用 ctx.collect
方法來發出數據。
示例數據生成
ctx.collect(Sale("product1", "category1", 100.0, "region1", 1609459200L))
// ...更多數據
這些行創建 Sale 對象,並通過 ctx.collect
發送它們。每個對象包含示例數據,例如產品ID、類別、銷售額、地區和時間戳。
水印策略
val watermarkStrategy = WatermarkStrategy
.forBoundedOutOfOrderness[Sale](Duration.ofSeconds(10))
.withTimestampAssigner(new SerializableTimestampAssigner[Sale] {
override def extractTimestamp(element: Sale, recordTimestamp: Long): Long = element.timestamp * 1000
})
設置了一個水印策略,允許一定程度的亂序(最多10秒延遲)。這對於基於事件時間的視窗操作至關重要。
withTimestampAssigner
定義瞭如何從 Sale 對象中提取時間戳。
數據流配置
val timedSalesStream = env.addSource(new SampleSource())
.assignTimestampsAndWatermarks(watermarkStrategy)
這裡,env.addSource(new SampleSource())
從自定義源 SampleSource 添加數據到流中。
.assignTimestampsAndWatermarks(watermarkStrategy) 將之前定義的水印策略應用於數據流。
聚合操作
val topSales = timedSalesStream
.keyBy(_.category)
.window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5)))
.aggregate(new TopSalesAggregateFunction())
數據流根據 category 欄位進行分組(keyBy(_.category))。
應用了滑動時間視窗(每小時一窗,每5分鐘滑動一次)。
使用自定義的聚合函數 TopSalesAggregateFunction 來計算每個類別中銷售額最高的 Top 3 產品。
聚合函數的定義
class TopSalesAggregateFunction extends AggregateFunction[Sale, mutable.Map[String, mutable.PriorityQueue[Sale]], Map[String, List[Sale]]] {
// 方法實現
}
這個類擴展了 AggregateFunction
,它是 Flink API 中用於自定義聚合邏輯的一部分。
泛型參數解釋:
Sale
:輸入類型,表示每個元素的類型。
mutable.Map[String, mutable.PriorityQueue[Sale]]
:累加器的類型,用於聚合中間結果。
Map[String, List[Sale]]
:聚合結果的類型。
createAccumulator 方法
override def createAccumulator(): mutable.Map[String, mutable.PriorityQueue[Sale]] = mutable.Map[String, mutable.PriorityQueue[Sale]]()
此方法初始化累加器,它是存儲中間聚合狀態的數據結構。
在這裡,累加器是一個映射,它將每個類別映射到一個優先隊列(PriorityQueue)。優先隊列用於保持每個類別銷售額最高的產品。
add 方法
override def add(value: Sale, accumulator: mutable.Map[String, mutable.PriorityQueue[Sale]]): mutable.Map[String, mutable.PriorityQueue[Sale]] = {
accumulator.getOrElseUpdate(value.category, mutable.PriorityQueue[Sale]()(Ordering.by(-_.amount))).enqueue(value)
if (accumulator(value.category).size > 3) accumulator(value.category).dequeue()
accumulator
}
add
方法定義瞭如何將一個新的 Sale
元素添加到累加器中。
對於每個 Sale
元素,它首先檢查累加器中是否已經有該類別的隊列。如果沒有,它會創建一個新的隊列。
隊列根據銷售額進行排序,最高的銷售額在隊列前面。
元素被添加到相應類別的隊列中。
如果隊列的大小超過 3(即我們只關心銷售額最高的前三個產品),則從隊列中移除銷售額最低的產品。
getResult 方法
override def getResult(accumulator: mutable.Map[String, mutable.PriorityQueue[Sale]]): Map[String, List[Sale]] = {
accumulator.mapValues(_.toList.sortBy(-_.amount)).toMap
}
getResult
方法提取聚合的最終結果。
它將每個類別的 PriorityQueue
轉換為一個列表,並按銷售額降序排序。
最終結果是一個映射,將每個類別映射到其銷售額最高的 Top 3 產品列表。
merge 方法
override def merge(a: mutable.Map[String, mutable.PriorityQueue[Sale]], b: mutable.Map[String, mutable.PriorityQueue[Sale]]): mutable.Map[String, mutable.PriorityQueue[Sale]] = {
for ((category, sales) <- b) {
val mergedQueue = a.getOrElseUpdate(category, mutable.PriorityQueue[Sale]()(Ordering.by(-_.amount)))
sales.foreach(mergedQueue.enqueue(_))
if (mergedQueue.size > 3) mergedQueue.dequeue()
}
a
}
merge
方法定義瞭如何合併兩個累加器的結果,這在 Flink 的分散式計算環境中是必要的。
對於累加器 b
中的每個類別和其對應的銷售記錄隊列,方法將這些記錄合併到累加器 a
的對應隊列中。
合併後,如果某個類別的隊列大小超過 3,則移除多餘的元素,確保隊列只包含銷售額最高的 Top 3 產品。
最後,返回合併後的累加器 a
。
以上就是本文全部內容啦५꒰۶⁎⁼̴̀ω⁼̴́⁎꒱۶