Flink計算TopN

来源:https://www.cnblogs.com/toycon/archive/2023/12/26/17927099.html
-Advertisement-
Play Games

在 Apache Flink 中實現高效的 Top N 數據處理,尤其是涉及時間視窗和多條件排序時,需要精細地控制數據流和狀態管理。 普通計算TopN: 1. 定義數據源(Source) 首先,我們需要定義數據源。這可能是 Kafka 流、文件、資料庫或任何其他支持的數據源。 val stream: ...


在 Apache Flink 中實現高效的 Top N 數據處理,尤其是涉及時間視窗和多條件排序時,需要精細地控制數據流和狀態管理。


普通計算TopN:

1. 定義數據源(Source)

首先,我們需要定義數據源。這可能是 Kafka 流、文件、資料庫或任何其他支持的數據源。

val stream: DataStream[YourType] = env.addSource(...)

2. 定義業務邏輯(Transformation)

接下來,我們需要根據業務需求對數據進行轉換。這可能包括映射、過濾、聚合等操作。

val transformedStream: DataStream[YourTransformedType] = stream
  .map(...)       // 例如,映射操作
  .filter(...)    // 例如,過濾操作
  // 其他轉換操作...

3. 計算 Top N

在 Flink 中,計算 Top N 可以通過使用 KeyedProcessFunction 或 Window(視窗)實現。以下是兩種常見的方法:

方法 A:使用 KeyedProcessFunction

定義一個狀態來存儲當前 Top N 的元素。這通常是一個 ListState 或 MapState。

使用 keyBy 函數對數據進行分組。這是根據某個鍵(如用戶 ID、產品類別等)進行分組。

使用 process 函數處理每個元素。在這個函數中,會去更新狀態,並保留當前的 Top N 元素。

stream
  .keyBy(...)  // 分組鍵
  .process(new KeyedProcessFunction[KeyType, InputType, OutputType] {
    // 定義狀態
    private var state: ListState[YourType] = ...

    override def processElement(value: InputType, ctx: Context, out: Collector[OutputType]): Unit = {
      // 更新狀態
      // 計算 Top N
    }
  })

方法 B:使用 Time Window(時間視窗)

如果我們的 Top N 計算是基於某個時間範圍內的數據(例如,每5分鐘的 Top N),則可以使用時間視窗。

使用 window 函數定義視窗。這可以是滾動視窗(Tumbling Window)、滑動視窗(Sliding Window)等。

在視窗上應用聚合或其他操作,以計算每個視窗的 Top N。

stream
  .keyBy(...)  // 分組鍵
  .timeWindow(Time.minutes(5))  // 定義時間視窗
  .process(new ProcessWindowFunction[...] {
    override def process(key: KeyType, context: Context, elements: Iterable[InputType], out: Collector[OutputType]): Unit = {
      // 計算每個視窗的 Top N
    }
  })

4. 輸出結果(Sink)

最後,將計算得到的 Top N 結果輸出到所需的目的地,如 Kafka、資料庫、控制台等。

topNStream.addSink(...)

關於劃分視窗計算TopN:

1. 數據預處理與視窗劃分

首先,對數據流進行 keyBy 操作,根據特定的鍵(如分類ID、事件ID等)對數據進行分組。然後,定義一個滑動視窗來平滑數據,併在這個視窗內應用增量聚合函數。

val aggregatedStream = dataStream
  .keyBy(...)  // 分組鍵
  .timeWindow(Time.minutes(5), Time.minutes(1))  // 定義滑動視窗
  .aggregate(new MyAggregateFunction, new MyWindowFunction)

其中,MyAggregateFunction 是一個實現了 AggregateFunction 介面的類,用於增量聚合視窗內的數據。MyWindowFunction 是一個實現了 WindowFunction 介面的類,用於在視窗觸發時獲取視窗信息並輸出中間結果。

2. 定義 ProcessFunction 以處理視窗數據

在得到每個視窗的聚合結果後,使用 ProcessFunction 來處理這些數據。在這個函數中,可以使用 ValueState 來存儲每個視窗的數據,並註冊一個定時器來控制何時輸出 Top N 結果。

val topNStream = aggregatedStream
  .keyBy(...)  // 根據需要的鍵進行二次分組
  .process(new KeyedProcessFunction[KeyType, InputType, OutputType] {
    // 定義狀態
    private var windowDataState: ValueState[List[YourType]] = ...

    override def processElement(value: InputType, ctx: Context, out: Collector[OutputType]): Unit = {
      // 更新狀態
      // 註冊定時器
      ctx.timerService().registerEventTimeTimer(value.windowEndTime + 1)
    }

    override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OutputType]): Unit = {
      // 排序並輸出 Top N
    }
  })

3. 數據排序與 Top N 輸出

在 onTimer 方法中,當定時器觸發時,對存儲在 ValueState 中的數據進行排序,並輸出每個視窗的 Top N 結果。這一步驟確保了只有當一個視窗的所有數據都已經到達時,才進行排序和輸出。

4. 狀態和容錯處理

由於在實時計算中,狀態管理和容錯是關鍵考慮因素,確保狀態管理策略(如使用 ListState 或 ValueState)與容錯需求(如檢查點和保存點)相匹配。

5. 考慮 Watermarks 和數據延遲

由於 Flink 中的時間管理很重要,確保合理地設置 Watermarks,以處理亂序事件和數據延遲。這對於確保定時器準確觸發和視窗正確計算至關重要。


兩次 keyBy 解釋:

在上面的思路中做了兩次 keyBy ,主要是為了在不同維度上進一步細化數據處理邏輯,原因:

1. 針對不同維度的數據處理

初次 keyBy 通常是根據主要維度(如用戶 ID、商品類別等)進行分組,以便在這些維度上進行聚合或其他處理。而在某些情況下,聚合後的數據可能需要根據額外的維度進行進一步的處理。例如,可能需要根據聚合結果的時間視窗或其他業務邏輯相關的維度進行分析和處理。

2. 更精細的數據管理

二次分組允許對數據流進行更精細的切分,使得每個子流可以根據不同的業務邏輯進行獨立處理。這樣可以更靈活地應對複雜的業務需求,例如在不同時間視窗或不同事件類型上實施不同的邏輯。

3. 優化資源利用和性能

通過在不同的維度上分組,可以更有效地利用 Flink 的資源,比如任務的並行度和狀態管理。這種方式有助於提高整體的處理效率,減少不必要的資源浪費。

4. Top N 計算的特殊需求

在進行 Top N 計算時,尤其是當需要根據多個維度(如時間視窗、分類ID、事件ID等)進行排序和選擇時,二次分組變得尤為重要。這樣可以確保每個獨立的子流都有自己的 Top N 計算邏輯,更加精確地反映不同維度組合下的數據特性。

5. 提高容錯性和可維護性

二次分組有助於隔離不同數據流的處理,使得系統更加容錯,易於維護和調試。當處理複雜的數據流時,這種隔離可以使問題定位和解決變得更加容易。


現在我們來看一個例子:
假設我們要計算過去一小時內,每5分鐘更新一次,每個類別中銷售額最高的Top 3產品,並要求這些產品在不同地區的銷售額進行對比。

1. 定義案例類

case class Sale(productId: String, category: String, amount: Double, region: String, timestamp: Long)

2. 環境配置

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

3. 使用集合來模擬數據源

    // 示例數據源
    class SampleSource extends SourceFunction[Sale] {
      override def run(ctx: SourceFunction.SourceContext[Sale]): Unit = {
        // 第一個小時的數據
        ctx.collect(Sale("product1", "category1", 100.0, "region1", 1609459200L)) // 2021-01-01 00:00:00
        ctx.collect(Sale("product2", "category1", 150.0, "region1", 1609459800L)) // 2021-01-01 00:10:00
        ctx.collect(Sale("product3", "category1", 120.0, "region2", 1609460400L)) // 2021-01-01 00:20:00
        ctx.collect(Sale("product4", "category2", 200.0, "region1", 1609461000L)) // 2021-01-01 00:30:00
        ctx.collect(Sale("product5", "category2", 250.0, "region2", 1609461600L)) // 2021-01-01 00:40:00
        ctx.collect(Sale("product6", "category3", 300.0, "region1", 1609462200L)) // 2021-01-01 00:50:00

        // 第二個小時的數據
        ctx.collect(Sale("product7", "category1", 130.0, "region3", 1609462800L)) // 2021-01-01 01:00:00
        ctx.collect(Sale("product8", "category2", 210.0, "region1", 1609463400L)) // 2021-01-01 01:10:00
        ctx.collect(Sale("product9", "category3", 350.0, "region2", 1609464000L)) // 2021-01-01 01:20:00
        ctx.collect(Sale("product10", "category3", 320.0, "region3", 1609464600L)) // 2021-01-01 01:30:00
        ctx.collect(Sale("product11", "category1", 140.0, "region1", 1609465200L)) // 2021-01-01 01:40:00
        ctx.collect(Sale("product12", "category2", 230.0, "region2", 1609465800L)) // 2021-01-01 01:50:00
      }

      override def cancel(): Unit = {}
    }

4. 設置 WatermarkStrategy

    val watermarkStrategy = WatermarkStrategy
      .forBoundedOutOfOrderness[Sale](Duration.ofSeconds(10))
      .withTimestampAssigner(new SerializableTimestampAssigner[Sale] {
        override def extractTimestamp(element: Sale, recordTimestamp: Long): Long = element.timestamp * 1000
      })

5. 添加數據源並應用時間戳和水印

    val timedSalesStream = env.addSource(new SampleSource())
      .assignTimestampsAndWatermarks(watermarkStrategy)

6. 處理數據流的數據

    val topSales = timedSalesStream
      .keyBy(_.category)
      .window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5)))
      .aggregate(new TopSalesAggregateFunction())
      .map { result =>
        result.map { case (category, salesList) =>
          val topSalesByRegion = salesList
            .groupBy(_.region) // 按地區分組
            .mapValues(_.sortBy(-_.amount).take(3)) // 每個地區取銷售額最高的Top 3
            .toList
            .sortBy(_._1) // 按地區名稱排序

          val formattedSales = topSalesByRegion.map { case (region, sales) =>
            val salesInfo = sales.map(sale => s"${sale.productId} (amount: ${sale.amount})").mkString(", ")
            s"Region: $region, Top Sales: $salesInfo"
          }.mkString("; ")

          s"Category: $category, $formattedSales"
        }.mkString("\n")
      }

7. 定義TopSalesAggregateFunction

class TopSalesAggregateFunction extends AggregateFunction[Sale, mutable.Map[String, mutable.PriorityQueue[Sale]], Map[String, List[Sale]]] {
  override def createAccumulator(): mutable.Map[String, mutable.PriorityQueue[Sale]] = mutable.Map[String, mutable.PriorityQueue[Sale]]()

  override def add(value: Sale, accumulator: mutable.Map[String, mutable.PriorityQueue[Sale]]): mutable.Map[String, mutable.PriorityQueue[Sale]] = {
    accumulator.getOrElseUpdate(value.category, mutable.PriorityQueue[Sale]()(Ordering.by(-_.amount))).enqueue(value)
    if (accumulator(value.category).size > 3) accumulator(value.category).dequeue()
    accumulator
  }

  override def getResult(accumulator: mutable.Map[String, mutable.PriorityQueue[Sale]]): Map[String, List[Sale]] = {
    accumulator.mapValues(_.toList.sortBy(-_.amount)).toMap
  }

  override def merge(a: mutable.Map[String, mutable.PriorityQueue[Sale]], b: mutable.Map[String, mutable.PriorityQueue[Sale]]): mutable.Map[String, mutable.PriorityQueue[Sale]] = {
    for ((category, sales) <- b) {
      val mergedQueue = a.getOrElseUpdate(category, mutable.PriorityQueue[Sale]()(Ordering.by(-_.amount)))
      sales.foreach(mergedQueue.enqueue(_))
      if (mergedQueue.size > 3) mergedQueue.dequeue()
    }
    a
  }
}

8. 輸出與執行作業

    topSales.print()

    env.execute("Sales Top 3 Analysis")

現在我們來逐行解釋一下代碼:

案例類定義

case class Sale(productId: String, category: String, amount: Double, region: String, timestamp: Long)

case class Sale 定義了一個樣例類 Sale,用於表示銷售數據。它包含以下欄位:
productId:產品的ID。
category:產品所屬的類別。
amount:銷售額。
region:銷售發生的地區。
timestamp:銷售發生的時間戳。

環境配置

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

env 是 Flink 流執行環境的實例,它用於設置和執行流處理作業。
env.setParallelism(1) 設置作業的並行度為 1。這意味著作業將在單個任務槽中運行。

數據源定義

class SampleSource extends SourceFunction[Sale] {
  // ...
}

這段代碼定義了一個名為 SampleSource 的自定義數據源,它實現了 SourceFunction 介面。這個數據源產生 Sale 類型的數據。

數據源的 run 方法

override def run(ctx: SourceFunction.SourceContext[Sale]): Unit = {
  // 示例數據
}

run 方法負責在流執行期間不斷發送 Sale 數據。這裡通過調用 ctx.collect 方法來發出數據。

示例數據生成

ctx.collect(Sale("product1", "category1", 100.0, "region1", 1609459200L))
// ...更多數據

這些行創建 Sale 對象,並通過 ctx.collect 發送它們。每個對象包含示例數據,例如產品ID、類別、銷售額、地區和時間戳。

水印策略

val watermarkStrategy = WatermarkStrategy
  .forBoundedOutOfOrderness[Sale](Duration.ofSeconds(10))
  .withTimestampAssigner(new SerializableTimestampAssigner[Sale] {
    override def extractTimestamp(element: Sale, recordTimestamp: Long): Long = element.timestamp * 1000
  })

設置了一個水印策略,允許一定程度的亂序(最多10秒延遲)。這對於基於事件時間的視窗操作至關重要。
withTimestampAssigner 定義瞭如何從 Sale 對象中提取時間戳。

數據流配置

val timedSalesStream = env.addSource(new SampleSource())
  .assignTimestampsAndWatermarks(watermarkStrategy)

這裡,env.addSource(new SampleSource()) 從自定義源 SampleSource 添加數據到流中。
.assignTimestampsAndWatermarks(watermarkStrategy) 將之前定義的水印策略應用於數據流。

聚合操作

val topSales = timedSalesStream
  .keyBy(_.category)
  .window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5)))
  .aggregate(new TopSalesAggregateFunction())

數據流根據 category 欄位進行分組(keyBy(_.category))。
應用了滑動時間視窗(每小時一窗,每5分鐘滑動一次)。
使用自定義的聚合函數 TopSalesAggregateFunction 來計算每個類別中銷售額最高的 Top 3 產品。

聚合函數的定義

class TopSalesAggregateFunction extends AggregateFunction[Sale, mutable.Map[String, mutable.PriorityQueue[Sale]], Map[String, List[Sale]]] {
  // 方法實現
}

這個類擴展了 AggregateFunction,它是 Flink API 中用於自定義聚合邏輯的一部分。
泛型參數解釋:
Sale:輸入類型,表示每個元素的類型。
mutable.Map[String, mutable.PriorityQueue[Sale]]:累加器的類型,用於聚合中間結果。
Map[String, List[Sale]]:聚合結果的類型。

createAccumulator 方法

override def createAccumulator(): mutable.Map[String, mutable.PriorityQueue[Sale]] = mutable.Map[String, mutable.PriorityQueue[Sale]]()

此方法初始化累加器,它是存儲中間聚合狀態的數據結構。
在這裡,累加器是一個映射,它將每個類別映射到一個優先隊列(PriorityQueue)。優先隊列用於保持每個類別銷售額最高的產品。

add 方法

override def add(value: Sale, accumulator: mutable.Map[String, mutable.PriorityQueue[Sale]]): mutable.Map[String, mutable.PriorityQueue[Sale]] = {
  accumulator.getOrElseUpdate(value.category, mutable.PriorityQueue[Sale]()(Ordering.by(-_.amount))).enqueue(value)
  if (accumulator(value.category).size > 3) accumulator(value.category).dequeue()
  accumulator
}

add 方法定義瞭如何將一個新的 Sale 元素添加到累加器中。
對於每個 Sale 元素,它首先檢查累加器中是否已經有該類別的隊列。如果沒有,它會創建一個新的隊列。
隊列根據銷售額進行排序,最高的銷售額在隊列前面。
元素被添加到相應類別的隊列中。
如果隊列的大小超過 3(即我們只關心銷售額最高的前三個產品),則從隊列中移除銷售額最低的產品。

getResult 方法

override def getResult(accumulator: mutable.Map[String, mutable.PriorityQueue[Sale]]): Map[String, List[Sale]] = {
  accumulator.mapValues(_.toList.sortBy(-_.amount)).toMap
}

getResult 方法提取聚合的最終結果。
它將每個類別的 PriorityQueue 轉換為一個列表,並按銷售額降序排序。
最終結果是一個映射,將每個類別映射到其銷售額最高的 Top 3 產品列表。

merge 方法

override def merge(a: mutable.Map[String, mutable.PriorityQueue[Sale]], b: mutable.Map[String, mutable.PriorityQueue[Sale]]): mutable.Map[String, mutable.PriorityQueue[Sale]] = {
  for ((category, sales) <- b) {
    val mergedQueue = a.getOrElseUpdate(category, mutable.PriorityQueue[Sale]()(Ordering.by(-_.amount)))
    sales.foreach(mergedQueue.enqueue(_))
    if (mergedQueue.size > 3) mergedQueue.dequeue()
  }
  a
}

merge 方法定義瞭如何合併兩個累加器的結果,這在 Flink 的分散式計算環境中是必要的。
對於累加器 b 中的每個類別和其對應的銷售記錄隊列,方法將這些記錄合併到累加器 a 的對應隊列中。
合併後,如果某個類別的隊列大小超過 3,則移除多餘的元素,確保隊列只包含銷售額最高的 Top 3 產品。
最後,返回合併後的累加器 a

以上就是本文全部內容啦५꒰۶⁎⁼̴̀ω⁼̴́⁎꒱۶


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 大家好,我是R哥。 昨天受邀參加了《知識星球深圳星主交流會》,從早上 10:30 ~ 下午 17:30 左右,幾乎聽了一天的星球分享會,連中午聚餐都是頭腦風暴,一整天讓我醍醐灌頂。 有上百萬粉絲的星主,有短視頻帶貨破 1 億+的星主,有管理 1000+ 位員工的星主,各種行業大佬,雖然R哥也有小小幾 ...
  • 創建名為springboot_springmvc的新module,過程參考3.1節 4.1、重要的配置參數 在 spring boot 中,提供了許多和 web 相關的配置參數(詳見官方文檔),其中有三個比較重要: 4.1.1、server.port 該配置參數用於設置 web 應用程式的服務埠號 ...
  • 前言: 繼上篇:Taurus .Net Core 微服務開源框架:Admin 插件【4-4】 - 配置管理-Mvc【Plugin-CORS 跨域】 本篇繼續介紹下一個內容: 系統配置節點:Mvc - Plugin - Admin 後臺界面: 配置界面如下: 配置說明如下: 1、Admin.IsEna ...
  • 什麼是鍵控服務依賴註入? 在之前的依賴註入中,服務是根據其類型進行註冊和解析的。如果出現同一介面有多個實現怎麼辦呢?這時候就可以使用.NET 8的新功能“鍵控服務依賴註入”。它允許您註冊介面的多個實現,每個實現都與一個唯一鍵相關聯,然後基於該鍵解析所需的實現。 在.NET 8 中的實現 接下來介紹如 ...
  • 一:背景 1. 講故事 為什麼要提 宇宙射線, 太陽耀斑 導致的程式崩潰呢?主要是昨天在知乎上看了這篇文章:莫非我遇到了傳說中的bug? ,由於 rip 中的0x41變成了0x61出現了bit位翻轉導致程式崩潰,截圖如下: 下麵的評論大多是說由於 宇宙射線,這個太玄乎了,說實話看到這個 傳說bug ...
  • 在.NET Core中,UseStaticFiles、UseDefaultFiles、UseDirectoryBrowser和UseFileServer中間件用於處理靜態文件和目錄瀏覽。下麵我將為你提供一個簡單的例子,演示它們的用法。 首先,確保你的項目已經安裝了Microsoft.AspNetCo ...
  • 路是腳踏出來的,歷史是人寫出來的。人的每一步行動都在書寫自己的歷史。 Linux 基礎命令 open:打開文件操作,如環境配置文件。 open ~/.zshrc vi:vi (visual interface), linux 中最經典的文本編輯器 vim(vi improved)是 vi 發展出來的 ...
  • 在運行程式時有時候會需要查看資源占用,以方便部署在其他伺服器上時進行參考。以下是總結了我在linux上查找程式進程資源的兩種方法(cpu和gpu都有)。 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...