「Flink」理解流式處理重要概念

来源:https://www.cnblogs.com/ilovezihan/archive/2020/02/03/12254479.html
-Advertisement-
Play Games

什麼是流式處理呢?這個問題其實我們大部分時候是沒有考慮過的,大多數,我們是把流式處理和實時計算放在一起來說的。我們先來瞭解下,什麼是數據流。數據流(事件流)數據流是無邊界數據集的抽象我們之前接觸的數據處理,大多都都是有界的。例如:處理某天的數據、某個季度的數據等無界意味著數據是無限地、持續增長的數據... ...


什麼是流式處理呢?

這個問題其實我們大部分時候是沒有考慮過的,大多數,我們是把流式處理和實時計算放在一起來說的。我們先來瞭解下,什麼是數據流。

數據流(事件流)

  • 數據流是無邊界數據集的抽象
    • 我們之前接觸的數據處理,大多都都是有界的。例如:處理某天的數據、某個季度的數據等
    • 無界意味著數據是無限地、持續增長的
    • 數據流會隨著時間的推移,源源不斷地加入進來
  • 數據流無處不再
    • 信息卡交易
    • 電商購物
    • 快遞
    • 網路交換機的流向數據
    • 設備感測器發出的數據
    • 這些數據都是無窮無盡的
    • 每一件事情,都可以看成事件序列
  • 數據流是有序的
    • 數據的到來總是有個先後順序
  • 數據流是不可變的
    • 事件一旦發生,就不能被改變
    • 它陳述了某一個時刻的事實
  • 數據流是可以重播的
    • 為了處理的一些問題、糾正過去的錯誤,可以重跑數據流
    • 藉助於Kafka,我們可以重新消費幾個月之前的原始數據流

流式處理

流式處理就是指實時地處理一個或多個事件流。它是一種編程範式。其他編程領域,主要有3種編程範式:

  1. 請求與響應
    • 延遲最小的一種方式,響應時間要求亞毫秒級到毫秒之間
    • 響應時間一般分穩定
    • 發出請求,等待響應(大部分的JavaEE同學,都是開發這一類編程範式的應用),其實就是OLTP
  2. 批處理
    • 特點:高延遲、高吞吐
    • 一般是固定某個時刻開始啟動執行,讀取所有的數據,然後輸出介面
    • 每次讀取到的都是舊數據
    • 主要應用在DWH或BI中
  3. 流式處理
    • 特點:介於上述兩者之間
    • 流式處理可以讓業務報告保持更新,持續響應

流的定義不依賴某個框架,只要儲蓄從一個無邊界數據集中讀取數據,並對它們進行處理生成結果,就是進行流式處理。重點是:整個過程必須是持續的。

流式處理中的時間

上述我們已經說過了,數據流都是有序的。某一時刻的數據是確定的。時間是流式處理中非常重要的概念。大部分流式應用的操作都是基於時間視窗的。

流式系統一般包含以下幾個時間概念(熟悉Flink的同學應該會很熟悉):

  • 事件時間(Eventtime)
    • 事件實際發生的時間
    • 用戶一般只對事件發生時間感興趣
  • 日誌追加時間
    • 日誌追加時間是指事件保存到事件存儲源的時間
    • 例如:數據是什麼到達Kafka的(Kafka是可以啟用自動添加時間戳功能的)
  • 處理時間
    • 流式處理應用接收到事件後,要對齊進行處理的時間
    • 處理時間取決於流式處理應用何時讀取到這個時間
    • 如果應用程式使用了兩個線程來讀取同一個事件,這個時間戳可能會不一樣
    • 這個時間戳非常不可靠,應該避免使用它

狀態

如果流式處理是來一個事件就處理一個事件,那麼流式處理就很簡單。但如果操作中包含了多個事件,流式處理就有意思了。例如:我們想在流式處理中統計北京用戶的訂單數量、消費金額等等。此時,就不能光處理單個事件了,我們需要獲取更多的事件。事件與事件之間的信息就稱之為狀態。例如簡單的,求某個類型的訂單數等。


這些狀態一般就保存在流式處理程式本地變數(本地記憶體)中,例如:使用HashMap來保存計數。但這種做法是很不可靠的,流式處理處理的是無界數據集,一旦應用程式出現異常,就會出現狀態丟失,這是我們說不能接受的。所以,每一種流式計算框架都會很小心地持久化狀態。如果應用程式重啟,需要將這些數據恢復。


流式處理一般包含兩種狀態:

  • 本地狀態
    • 這種狀態只能被應用程式實例訪問(不過Flink 1.9版本是可以外部來訪問本地狀態的)
    • 內嵌到應用程式的資料庫中進行維護和管理
    • 特點:速度快,但受記憶體大小的限制,所以,很多流式處理系統都將數據拆分到多個子流中處理
  • 外部狀態
    • 用外部存儲來處理,一般使用NoSQL系統,例如:Cassadra
    • 特點:沒有大小限制,可以被應用程式多個實例訪問、甚至外部應用訪問,但引入額外的系統會造成延遲、複雜性(例如:要維護內部和外部狀態一致性問題)

時間視窗

大部分針對流的操作都是基於時間視窗的。例如:計算一周內銷量最好的產品。兩個流的合併也是基於時間視窗的。流式系統會合併發生在相同時間段上的事件。視窗是有類型的。以下幾點是我們設計視窗需要考慮的:

  • 視窗的大小
    • 是基於5分鐘計算還是基於15分鐘、甚至是一天
    • 視窗越小,就能越快地發現變更,不過雜訊也就越多
    • 視窗越大,變更就跟平滑,不過延遲也越嚴重
  • 視窗的移動頻率(移動間隔)
    • 5分鐘的視窗,可以1分鐘計算一次,或者每秒鐘計算一次,或者每當有新事件到達時計算一次
    • 如果“移動頻率”與視窗大小相等,這種稱為滾動視窗(tumbling window)
    • 如果視窗隨著每一條記錄移動,這種情況稱為滑動視窗(sliding window)
  • 視窗的可更新時長
    • 假設:計算了 00:00 – 00:05 之間的訂單總數,一個小時後,又得到了一些“事件時間”是 00:02的事件(例如:因為網路通信故障,這個消息晚到了一段時間),這種情況,是否需要更新 00:00 – 00:05 這個視窗的結果呢?或者就不處理了?
    • 理想情況下,可以定義一個時間段,只要在這個時間段內,事件可以被添加到對應的時間片段里。例如:如果事件處於4個小時以內,就更新,否則,就忽略掉。
  • 視窗時間對齊
    • 視窗可以與時間對齊,例如:5分鐘的視窗如果每分鐘移動一次,那麼第一個分片可以是:00:00 – 00:05,第二個就是 00:01 – 00:06
    • 視窗也可以不與時間對齊,例如:應用可以在任何時間啟動,那麼第一個分片有可能是03:17 – 03:22
    • 滑動視窗永遠不會與時間對齊,只要有新的記錄到達,就會發生移動


下麵這張圖,說明瞭滾動視窗與滑動視窗的區別。

滾動視窗:假設視窗的大小為5分鐘,這裡確定的3個時間視窗

滑動視窗:假設每分鐘滑動一次,那麼這個時候會有5個時間視窗,計算結果會發生重疊

image

流式處理的設計模式

單個事件處理

這是流式處理最基本的模式。這種模式也叫:map或filter模式。經常被用來過濾無用的事件或者用於轉換事件。


這種模式,應用程式讀取流中的數據,修改數據,然後把事件生成到另一個流上。這一類應用程式無需在程式內部維護狀態,每一個事件都是獨立處理的。這種錯誤恢復和進行負載均衡都很容易。因為無需進行狀態恢復操作。


使用本地狀態

大部分流式處理應用關係如何聚合數據。特別是:基於時間視窗進行聚合。例如:找到每天最低、最高的交易價格。要實現這種操作,就需要維護流的狀態。例如:我們需要將最小值、最大值保存下來,用它們與每一個新值對比。這類操作,可以通過本地狀態來實現。例如:每一個分組都維護自己分組的狀態。


一旦流式處理中包含了本地狀態,就需要解決以下問題。

  • 記憶體使用
    • 必須要有足夠的記憶體來保存本地狀態
  • 持久化
    • 確保應用程式關閉時,不會丟失狀態
    • 例如:我們可以使用RocksDB將本地狀態保存到記憶體里、同時持久化到磁碟上,以便重啟後恢復。而且需要將本地狀態的變更發送到Kafka的主題上
  • 重新負載均衡
    • 有時候,分區被重新分配給不同的消費者。這種情況,失去分區的實例必須把最後的狀態保存下來,或得分區的實例必須要知道如何恢復到正確的狀態


多階段處理和重分區

有些時候,我們要通過所有可用的數據來獲得結果。例如:要發佈每天的“前10支”股票,這10支股票需要從每天的交易股票中挑選出來。如果僅僅在單個實例上處理是不夠的,因為10支股票分佈在多個實例上。


此種,我們分為多個階段來處理。

1、計算每支股票當天的漲跌。這個計算可以在每個實例上執行

2、將結果寫入到單個分區

3、再用一個實例找出當天的前10支股票


這一類操作就與MapReduce很像了。


使用外部查找——流和表的連接

有時候,流式處理需要將外部數據和流集成在一日。例如:外部數據中保存了一些規則、或者將完整完整地用戶信息拉取到流中。

這種case最大的問題,外部查找會帶來嚴重的延遲,一般在 5-15 ms之間,這在很多情況下是不可行的。而且,外部系統也無法承受這種額外的負載——流式處理系統每秒可以處理10-50W個事件,而資料庫正常情況下每秒只能處理1W個事件,所以需要伸縮性更強的解決方案。


為了獲取更好的性能和更強的伸縮性,需要將外部資料庫的信息緩存到流式處理應用中。但考慮以下問題:

如何保證緩存里的數據是最新的?

如果刷新太頻繁,仍然會對資料庫造成很大壓力,緩存也就無用了。

如果刷新不及時,那麼流式處理中所用的數據就會過時。

如果能夠捕捉資料庫的變更事件,並形成事件流,流式處理作業就可以監聽事件流,並及時更新緩存。捕捉資料庫的變更事件並形成數據流,這個過程稱為CDC(Change Data Capture)。例如:我們可以通過Canal來捕獲MySQL資料庫的變化、可以通過ogg來捕獲Oracle資料庫的變化


流與流的連接

有時候需要連接兩個真實的事件流。要連接兩個流,就是連接所有的歷史事件(將兩個妞中具有相同鍵、發生在相同時間視窗內的事件匹配起來),這種流和流的連接稱為:基於時間視窗的連接(windowed-join)。連接兩個流,通常包含一個滑動時間視窗

image


亂序事件

不管對於流式處理、還是傳統的ETL系統,處理亂序事件都是一個挑戰。物聯網領域經常發生亂序事件:一個移動設備斷開Wifi連接幾個小時,在重新連上WiFi後,將幾個小時堆積的事件一併發出去。要讓流式處理應用處理好這些場景,需要做到幾下:

  • 識別亂序事件
    • 應用程式需要檢查事件的時間,並將其與當前時間進行比較
  • 規定一個時間段用於重排亂序事件
    • 例如:3個小時以內的事件可以重排,但3個小時以外的事件就可以直接扔掉
  • 具有一定時間段內重排事件的能力
    • 這是流式處理應用和批處理的重要不同點
    • 假設有一個每天運行的作業,一些事件在作業結束之後才到達,那麼可以重新運行昨天的作業來更新
    • 而在流式處理中,重新運行昨天的作業是不存在的,亂序事件和新到達的事件必須一起處理
  • 具備更新結果的能力
    • 如果處理的結果保存在資料庫你,那麼可以通過put或update對結果進行更新


重新處理

該重要模式是重新處理事件:

  • 流式處理應用更新了,要使用新版本應用處理同一個事件流,生成新的結果,並比較兩種版本的結果,然後某個時間點將客戶端切換到新的結果流
  • 現有的流式處理出現了缺陷,修複後,需要重新處理並重新計算結果

第一種情況,需要Kafka將事件流長時間地保存在可伸縮的數據存儲中

  • 將新版本的應用作為一個新的消費者組
  • 新的版本從輸入主題的第一個偏移量開始讀取數據
  • 檢查結果流,在新版本的處理作業趕上進度時,將客戶端應用程式切換到新的結果流上

第二種情況,需要應用程式回到輸入流的起始位置開始處理,同時重置本地狀態,還要清理之前的輸出流。這種方式處理起來比較困難。建議還是使用第一種方案。


參考文獻:

《Kafka全文指南》


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • " 返回《C 併發編程》" "1. 調度到線程池" "2. 任務調度器" "2.1. Default 調度器" "2.2. 捕獲當前同步上下文 調度器" "2.3. ConcurrentExclusiveSchedulerPair 調度器" "3. 調度並行代碼" "4. 用調度器實現數據流的同步" ...
  • 場景 一個對象A,希望它的某些狀態在發生改變時通知到B(或C、D),常見的做法是在A中定義一個事件(或直接用委托),當狀態改變時A去觸發這個事件。而B直接訂閱這個事件 這種設計有點問題B由於要訂閱A的事件,所以B得完全引用A,其實有時候沒必要,因為我只關心A的狀態變化而已狀態變更通知這種場景很多,有 ...
  • " 返回《C 併發編程》" "1. 取消請求" "2. 超時後取消" "3. 取消並行" "4. 取消響應式代碼" "5. 與其他取消體系的互操作" 是一個等同於 預設 的特殊值,表示這個方法是永遠不會被取消的。 實例代碼 輸出: 1. 取消請求 2. 超時後取消 輸出: 只要執行代碼時用到了超時, ...
  • " 返回《C 併發編程》" "1. 簡介" "2. 不可變棧和隊列" "3. 不可變列表" "4. 不可變Set集合" "5. 不可變字典" "6. 線程安全字典" "7. 阻塞隊列" "8. 阻塞棧和包" "9. 非同步隊列" "10. 非同步棧和包" "11. 阻塞/非同步隊列" 1. 簡介 + 不可 ...
  • 匿名方法:通過匿名委托 、lamada表達式定義的函數具體操作並複製給委托類型;匿名委托:委托的一種簡單化聲明方式通過delegate關鍵字聲明;內置泛型委托:系統已經內置的委托類型主要是不帶返回值的Action和帶返回值的Func實例代碼(運行環境netcoreapp3.1)class demoF... ...
  • " 返回《C 併發編程》" "1. 用 async 代碼封裝非同步方法與 Completed 事件" "2. 用 async 代碼封裝 Begin/End 方法" "3. 用 async 代碼封裝並行代碼" "4. 用 async 代碼封裝 Rx Observable 對象" "5. 用 Rx Obs ...
  • " 返回《C 併發編程》" "1. 轉換.NET事件" "1.1. 進度通知" "1.2. 定時器示例" "1.3. 錯誤傳遞" "2. 發通知給上下文" "3. 用視窗和緩衝對事件分組" "4. 用限流和抽樣抑制事件流" "4.1. Throttle" "4.2. Sample" "5. 超時" ...
  • " 返回《C 併發編程》" "1. 簡介" "2. 鏈接數據流塊" "3. 傳遞出錯信息" "4. 斷開鏈接" "5. 限制流量" "6. 數據流塊的並行處理" "7. 創建自定義數據流塊" 1. 簡介 TPL 數據流(dataflow)庫的功能很強大,可用來創建 網格 (mesh)和 管道 (pi ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...