使用Flink完成流數據統計

来源:https://www.cnblogs.com/Jcloud/archive/2023/12/11/17893709.html
-Advertisement-
Play Games

Flink程式構建的基本單元是stream和transformation(DataSet實質上也是stream)。stream是一個中間結果數據,transformation對數據的加工和操作,該操作以一個或多個stream為輸入,計算輸出一個或多個stream為結果,最後可以sink來存儲數據。 ...


一、統計流程

所有流計算統計的流程都是:

1、接入數據源

2、進行多次數據轉換操作(過濾、拆分、聚合計算等)

3、計算結果的存儲 其中數據源可以是多個、數據轉換的節點處理完數據可以發送到一個和多個下一個節點繼續處理數據

Flink程式構建的基本單元是stream和transformation(DataSet實質上也是stream)。stream是一個中間結果數據,transformation對數據的加工和操作,該操作以一個或多個stream為輸入,計算輸出一個或多個stream為結果,最後可以sink來存儲數據。

包括數據源,每一次發射出來的數據結果都通過DataStream來傳遞給下一級繼續處理

每一個Transformation要有2步:

1、處理數據

2、將處理完的數據發射出去

二、Flink的數據源

Flink提供數據源只需要實現SourceFunction介面即可。 SourceFunction有一個抽象實現類RichParallelSourceFunction 繼承該實現類,實現3個方法,既可以自定義Source public void open(Configuration parameters) //初始化時調用,可以初始化一些參數 public void run(SourceContext ctx)//發送數據 在該方法里調用ctx的collect方法將數據發射出去。

該例子中是每20秒發送出去一個Order類型的實體。

三、Flink的數據轉換操作

Flink針對於不同的場景提供了不同的解決方案,減少了用戶去關註處理過程中的效率問題。

常見的操作有下麵這些:“map”就是做一些映射,比如我們把兩個字元串合併成一個字元串,把一個字元串拆成兩個或者三個字元串。

“flatMap”類似於把一個記錄拆分成兩條、三條、甚至是四條記錄,例如把一個字元串分割成一個字元數組。

“Filter”就類似於過濾。

“keyBy”就等效於SQL里的group by。

“aggregate”是一個聚合操作,如計數、求和、求平均等。

“reduce”就類似於MapReduce里的reduce。

“join”操作就有點類似於我們資料庫裡面的join。

“connect”實現把兩個流連成一個流。

“repartition”是一個重新分區操作(還沒研究)。

“project”操作就類似於SQL裡面的snacks(還沒研究)。

常見的操作有filter、map、flatMap、keyBy(分組)、aggregate(聚合) 具體的使用方式後面的例子中會體現。

三、視窗

流數據的計算可以把連續不斷的數據按照一定的規則拆分成大量的片段,在片段內進行統計和計算。比如可以把一小時內的數據保存到一個小的資料庫表裡,然後對這部分數據進行計算和統計,而流計算只不過是實時進行的。

常見的視窗有:

1、以時間為單位的Time Window,例如:每1秒鐘、每1個小時等

2、以數據的數量為單位的Count Window,例如:每一百個元素

Flink給我們提供了一些通用的時間視窗模型。

1、Tumbling Windows(不重疊的)

數據流中的每一條數據僅屬於一個視窗。每一個都有固定的大小,同時視窗間彼此之間不會出現重疊的部分。如果指定一個大小為5分鐘的tumbling視窗,那麼每5分鐘便會啟動一個視窗,如下圖所示:

2、Sliding Windows(重疊的)

與Tumbling視窗不同的是,在構建Sliding視窗時不僅需要指定視窗大小,還會指定一個視窗滑動參數(window slide parameter)來確定視窗的開始位置。因此當視窗滑動參數小於視窗大小時,視窗之間可能會出現重覆的區域。 例如,當你指定視窗大小為10分鐘,滑動參數為5分鐘時,如下圖所示:

3、Session Windows (會話視窗)

當數據流中一段時間沒有數據,則Session視窗會關閉。因此,Session Windows沒有固定的大小,無法計算Session視窗的開始位置。

四、Flink中的時間概念

Flink中有3中不同的時間概念

  1. 處理時間 Processing Time指的是我們上面進行Transformation操作時,當時的系統時間。

2.事件時間 Event Time指的是業務發生時間,每一條業務記錄上會攜帶一個時間戳,我們需要指定數據中那一個屬性中獲取。在按業務發生時間統計數據時,我們面臨一個問題,當我們接收的數據的時間是無序的時候,我們什麼時間去觸發聚合計算,我們不可能無限制的等待。Flink引入了Watermark的概念,這個Watermark是添加在視窗上的,是告訴視窗我們最長等待的時間是多久,超過這個時間的數據就拋棄不再處理。

  1. 提取時間 Ingestion Time指的是數據進入Flink當時的系統時間。

五、訂單統計的例子

第四步:設置時間戳和Watermarks

 DataStream<Order> marksSource = vilidatedSource
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<Order>(Time.minutes(1)){
            @Override
            public long extractTimestamp(Order o) {
                return o.getTimestamp().getTime();
            }
        });


前面已經設置了使用EventTime來處理數據,那麼在進行時間視窗計算前必須給數據分配獲取時間戳的欄位,這裡設置了Order的timestamp欄位為EventTime,同時這裡也設置了一個1分鐘的Watermarks,表示最多等待1分鐘,業務發生時間超過系統時間1分鐘的數據都不進行統計。

第五步:數據分組

KeyedStream<Order, Tuple> keyedStream =
                marksSource.keyBy("biz");//先以biz來Group


這裡設置了以Order中biz欄位進行分組,這就意味著所有biz相同的數據會進入到同一個時間視窗中進行計算。

第六步:指定時間視窗、聚合計算

DataStream<List<Tuple2<String, String>>> results = keyedStream
                .window(TumblingEventTimeWindows.of(Time.minutes(1)))
                .aggregate(new OrderSumAggregator()).setParallelism(1);


這裡設置了一個以1分鐘為單位的不重疊的TumblingEventTimeWindow。 然後使用OrderSumAggregator來進行聚合計算。 需要註意的是如果最前面設置的是使用ProcessTime來處理數據,這裡的視窗就會變成TumblingProcessTimeWinwow,前後必須一一對應,之前就因為前後不對應,統計結果不正確一直招不到原因。

六、聚合計算

上面例子中比較核心的部分就是聚合計算,也就是我們的OrderSumAggregator 聚合計算我們只需要實現Flink給我們提供的AggregateFunction介面,重寫其方法即可。

ACC createAccumulator();//創建一個數據統計的容器,提供給後續操作使用。

ACC add(IN in, ACC acc);//每個元素被添加進視窗的時候調用。 第一個參數是添加進視窗的元素,第二個參數是統計的容器(上面創建的那個)。

OUT getResult(ACC acc);//視窗統計事件觸發時調用來返回出統計的結果。

ACC merge(ACC acc1, ACC acc2);//只有在當視窗合併的時候調用,合併2個容器

其中這個容器根據情況也可以是在記憶體里提供,也可以是在其他存儲設備中提供。

通過上面的例子我們就實現了按照業務時間來統計每分鐘內的訂單數量,訂單最多可以延遲1分鐘上報。 但是我們為了等待1分鐘內上報的數據,造成了數據會延遲1分鐘進行統計,例如8點02分我們才能統計到8點到8點01分上報的數據。 為瞭解決這個問題,我們可以給window再增加一個自定義的統計觸發器,這個觸發器可以在整點觸發統計事件(也就是調用上面的getResults方法),這樣就達到了8點到8點01分這個時間段的數據,在8點01分統計一次,在8點02分再重新統計一次(加上後面1分鐘上報的數據)。

作者:京東科技 梁發文

來源:京東雲開發者社區 轉載請註明來源


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 前言 windows 電腦 連接藍牙耳機沒有麥克風,明明已經顯示麥克風圖標,為什麼錄製不到聲音 原因 電腦連藍牙耳機有兩個模式:Hand-free和Stereo。Hand-Free是可以語音通話的,但是音質沒有那麼好;Stereo是立體聲模式,音效很好,但是無法使用麥克風。 解決辦法 依次打開:控制 ...
  • 普通用戶許可權超算集群安裝OpenFOAM-LIGGGHTS-CFDEM,由於缺少root許可權,無法根據CFDEM官方instruction進行安裝。 本文以CSU超算平臺為例,利用普通用戶許可權進行編譯安裝。文章修正了已有教程中部分問題,特別感謝希望先生與記得小蘋初見教程。 1. 安裝必要依賴包 此部 ...
  • 1 準備工作 1.1 環境準備 操作系統:Microsoft Windows 10 專業工作站版 軟體版本:Python 3.9.6 第三方包: pip install pandas2.1.0 pip install pymysql1.1.0 pip install sqlalchemy==2.0. ...
  • 目錄JDBC建表用法示例JDBC表函數資料分享參考文章 JDBC 允許CH通過JDBC連接到外部資料庫。 要實現JDBC連接,CH需要使用以後臺進程運行的程式 clickhouse-jdbc-bridge。 該引擎支持Nullable數據類型。 建表 CREATE TABLE [IF NOT EXI ...
  • 本文將介紹使用DataX讀出Cos的Orc文件往StarRocks裡面寫。 需求: 需要將騰訊雲cos上84TB的數據, 同步到StarRocks某個大表。正常每個分區數據量20~30億,600GB。 工具:DataX插件:hdfsreader、starrockswriter對象存儲COS:非融合 ...
  • 瞭解Arch Linux Arch Linux是一個輕量、靈活、基於x86-64架構的Linux發行版,遵循K.I.S.S.原則。註重代碼正確、優雅和極簡主義,期待用戶能夠願意去理解系統的操作。 1.簡潔 Arch Linux將簡潔定義為:避免任何不必要的添加、修改和複雜增加。簡單來說,archli ...
  • GeminiDB Cassandra介面在綜合備份成本、恢復時效和粒度得到充分驗證的情況下,推出新特性PITR(Point-In-Time-Recover)支持任意時間點恢復。 ...
  • 回顧MySQL的執行過程,幫助介紹如何進行sql優化。 (1)客戶端發送一條查詢語句到伺服器; (2)伺服器先查詢緩存,如果命中緩存,則立即返回存儲在緩存中的數據; (3)未命中緩存後,MySQL通過關鍵字將SQL語句進行解析,並生成一顆對應的解析樹,MySQL解析器將使用MySQL語法進行驗證和解 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...