什麼是流式處理呢?這個問題其實我們大部分時候是沒有考慮過的,大多數,我們是把流式處理和實時計算放在一起來說的。我們先來瞭解下,什麼是數據流。數據流(事件流)數據流是無邊界數據集的抽象我們之前接觸的數據處理,大多都都是有界的。例如:處理某天的數據、某個季度的數據等無界意味著數據是無限地、持續增長的數據... ...
什麼是流式處理呢?
這個問題其實我們大部分時候是沒有考慮過的,大多數,我們是把流式處理和實時計算放在一起來說的。我們先來瞭解下,什麼是數據流。
數據流(事件流)
- 數據流是無邊界數據集的抽象
- 我們之前接觸的數據處理,大多都都是有界的。例如:處理某天的數據、某個季度的數據等
- 無界意味著數據是無限地、持續增長的
- 數據流會隨著時間的推移,源源不斷地加入進來
- 數據流無處不再
- 信息卡交易
- 電商購物
- 快遞
- 網路交換機的流向數據
- 設備感測器發出的數據
- …
- 這些數據都是無窮無盡的
- 每一件事情,都可以看成事件序列
- 數據流是有序的
- 數據的到來總是有個先後順序
- 數據流是不可變的
- 事件一旦發生,就不能被改變
- 它陳述了某一個時刻的事實
- 數據流是可以重播的
- 為了處理的一些問題、糾正過去的錯誤,可以重跑數據流
- 藉助於Kafka,我們可以重新消費幾個月之前的原始數據流
流式處理
流式處理就是指實時地處理一個或多個事件流。它是一種編程範式。其他編程領域,主要有3種編程範式:
- 請求與響應
- 延遲最小的一種方式,響應時間要求亞毫秒級到毫秒之間
- 響應時間一般分穩定
- 發出請求,等待響應(大部分的JavaEE同學,都是開發這一類編程範式的應用),其實就是OLTP
- 批處理
- 特點:高延遲、高吞吐
- 一般是固定某個時刻開始啟動執行,讀取所有的數據,然後輸出介面
- 每次讀取到的都是舊數據
- 主要應用在DWH或BI中
- 流式處理
- 特點:介於上述兩者之間
- 流式處理可以讓業務報告保持更新,持續響應
流的定義不依賴某個框架,只要儲蓄從一個無邊界數據集中讀取數據,並對它們進行處理生成結果,就是進行流式處理。重點是:整個過程必須是持續的。
流式處理中的時間
上述我們已經說過了,數據流都是有序的。某一時刻的數據是確定的。時間是流式處理中非常重要的概念。大部分流式應用的操作都是基於時間視窗的。
流式系統一般包含以下幾個時間概念(熟悉Flink的同學應該會很熟悉):
- 事件時間(Eventtime)
- 事件實際發生的時間
- 用戶一般只對事件發生時間感興趣
- 日誌追加時間
- 日誌追加時間是指事件保存到事件存儲源的時間
- 例如:數據是什麼到達Kafka的(Kafka是可以啟用自動添加時間戳功能的)
- 處理時間
- 流式處理應用接收到事件後,要對齊進行處理的時間
- 處理時間取決於流式處理應用何時讀取到這個時間
- 如果應用程式使用了兩個線程來讀取同一個事件,這個時間戳可能會不一樣
- 這個時間戳非常不可靠,應該避免使用它
狀態
如果流式處理是來一個事件就處理一個事件,那麼流式處理就很簡單。但如果操作中包含了多個事件,流式處理就有意思了。例如:我們想在流式處理中統計北京用戶的訂單數量、消費金額等等。此時,就不能光處理單個事件了,我們需要獲取更多的事件。事件與事件之間的信息就稱之為狀態。例如簡單的,求某個類型的訂單數等。
這些狀態一般就保存在流式處理程式本地變數(本地記憶體)中,例如:使用HashMap來保存計數。但這種做法是很不可靠的,流式處理處理的是無界數據集,一旦應用程式出現異常,就會出現狀態丟失,這是我們說不能接受的。所以,每一種流式計算框架都會很小心地持久化狀態。如果應用程式重啟,需要將這些數據恢復。
流式處理一般包含兩種狀態:
- 本地狀態
- 這種狀態只能被應用程式實例訪問(不過Flink 1.9版本是可以外部來訪問本地狀態的)
- 內嵌到應用程式的資料庫中進行維護和管理
- 特點:速度快,但受記憶體大小的限制,所以,很多流式處理系統都將數據拆分到多個子流中處理
- 外部狀態
- 用外部存儲來處理,一般使用NoSQL系統,例如:Cassadra
- 特點:沒有大小限制,可以被應用程式多個實例訪問、甚至外部應用訪問,但引入額外的系統會造成延遲、複雜性(例如:要維護內部和外部狀態一致性問題)
時間視窗
大部分針對流的操作都是基於時間視窗的。例如:計算一周內銷量最好的產品。兩個流的合併也是基於時間視窗的。流式系統會合併發生在相同時間段上的事件。視窗是有類型的。以下幾點是我們設計視窗需要考慮的:
- 視窗的大小
- 是基於5分鐘計算還是基於15分鐘、甚至是一天
- 視窗越小,就能越快地發現變更,不過雜訊也就越多
- 視窗越大,變更就跟平滑,不過延遲也越嚴重
- 視窗的移動頻率(移動間隔)
- 5分鐘的視窗,可以1分鐘計算一次,或者每秒鐘計算一次,或者每當有新事件到達時計算一次
- 如果“移動頻率”與視窗大小相等,這種稱為滾動視窗(tumbling window)
- 如果視窗隨著每一條記錄移動,這種情況稱為滑動視窗(sliding window)
- 視窗的可更新時長
- 假設:計算了 00:00 – 00:05 之間的訂單總數,一個小時後,又得到了一些“事件時間”是 00:02的事件(例如:因為網路通信故障,這個消息晚到了一段時間),這種情況,是否需要更新 00:00 – 00:05 這個視窗的結果呢?或者就不處理了?
- 理想情況下,可以定義一個時間段,只要在這個時間段內,事件可以被添加到對應的時間片段里。例如:如果事件處於4個小時以內,就更新,否則,就忽略掉。
- 視窗時間對齊
- 視窗可以與時間對齊,例如:5分鐘的視窗如果每分鐘移動一次,那麼第一個分片可以是:00:00 – 00:05,第二個就是 00:01 – 00:06
- 視窗也可以不與時間對齊,例如:應用可以在任何時間啟動,那麼第一個分片有可能是03:17 – 03:22
- 滑動視窗永遠不會與時間對齊,只要有新的記錄到達,就會發生移動
下麵這張圖,說明瞭滾動視窗與滑動視窗的區別。
滾動視窗:假設視窗的大小為5分鐘,這裡確定的3個時間視窗
滑動視窗:假設每分鐘滑動一次,那麼這個時候會有5個時間視窗,計算結果會發生重疊
流式處理的設計模式
單個事件處理
這是流式處理最基本的模式。這種模式也叫:map或filter模式。經常被用來過濾無用的事件或者用於轉換事件。
這種模式,應用程式讀取流中的數據,修改數據,然後把事件生成到另一個流上。這一類應用程式無需在程式內部維護狀態,每一個事件都是獨立處理的。這種錯誤恢復和進行負載均衡都很容易。因為無需進行狀態恢復操作。
使用本地狀態
大部分流式處理應用關係如何聚合數據。特別是:基於時間視窗進行聚合。例如:找到每天最低、最高的交易價格。要實現這種操作,就需要維護流的狀態。例如:我們需要將最小值、最大值保存下來,用它們與每一個新值對比。這類操作,可以通過本地狀態來實現。例如:每一個分組都維護自己分組的狀態。
一旦流式處理中包含了本地狀態,就需要解決以下問題。
- 記憶體使用
- 必須要有足夠的記憶體來保存本地狀態
- 持久化
- 確保應用程式關閉時,不會丟失狀態
- 例如:我們可以使用RocksDB將本地狀態保存到記憶體里、同時持久化到磁碟上,以便重啟後恢復。而且需要將本地狀態的變更發送到Kafka的主題上
- 重新負載均衡
- 有時候,分區被重新分配給不同的消費者。這種情況,失去分區的實例必須把最後的狀態保存下來,或得分區的實例必須要知道如何恢復到正確的狀態
多階段處理和重分區
有些時候,我們要通過所有可用的數據來獲得結果。例如:要發佈每天的“前10支”股票,這10支股票需要從每天的交易股票中挑選出來。如果僅僅在單個實例上處理是不夠的,因為10支股票分佈在多個實例上。
此種,我們分為多個階段來處理。
1、計算每支股票當天的漲跌。這個計算可以在每個實例上執行
2、將結果寫入到單個分區
3、再用一個實例找出當天的前10支股票
這一類操作就與MapReduce很像了。
使用外部查找——流和表的連接
有時候,流式處理需要將外部數據和流集成在一日。例如:外部數據中保存了一些規則、或者將完整完整地用戶信息拉取到流中。
這種case最大的問題,外部查找會帶來嚴重的延遲,一般在 5-15 ms之間,這在很多情況下是不可行的。而且,外部系統也無法承受這種額外的負載——流式處理系統每秒可以處理10-50W個事件,而資料庫正常情況下每秒只能處理1W個事件,所以需要伸縮性更強的解決方案。
為了獲取更好的性能和更強的伸縮性,需要將外部資料庫的信息緩存到流式處理應用中。但考慮以下問題:
如何保證緩存里的數據是最新的?
如果刷新太頻繁,仍然會對資料庫造成很大壓力,緩存也就無用了。
如果刷新不及時,那麼流式處理中所用的數據就會過時。
如果能夠捕捉資料庫的變更事件,並形成事件流,流式處理作業就可以監聽事件流,並及時更新緩存。捕捉資料庫的變更事件並形成數據流,這個過程稱為CDC(Change Data Capture)。例如:我們可以通過Canal來捕獲MySQL資料庫的變化、可以通過ogg來捕獲Oracle資料庫的變化
流與流的連接
有時候需要連接兩個真實的事件流。要連接兩個流,就是連接所有的歷史事件(將兩個妞中具有相同鍵、發生在相同時間視窗內的事件匹配起來),這種流和流的連接稱為:基於時間視窗的連接(windowed-join)。連接兩個流,通常包含一個滑動時間視窗。
亂序事件
不管對於流式處理、還是傳統的ETL系統,處理亂序事件都是一個挑戰。物聯網領域經常發生亂序事件:一個移動設備斷開Wifi連接幾個小時,在重新連上WiFi後,將幾個小時堆積的事件一併發出去。要讓流式處理應用處理好這些場景,需要做到幾下:
- 識別亂序事件
- 應用程式需要檢查事件的時間,並將其與當前時間進行比較
- 規定一個時間段用於重排亂序事件
- 例如:3個小時以內的事件可以重排,但3個小時以外的事件就可以直接扔掉
- 具有一定時間段內重排事件的能力
- 這是流式處理應用和批處理的重要不同點
- 假設有一個每天運行的作業,一些事件在作業結束之後才到達,那麼可以重新運行昨天的作業來更新
- 而在流式處理中,重新運行昨天的作業是不存在的,亂序事件和新到達的事件必須一起處理
- 具備更新結果的能力
- 如果處理的結果保存在資料庫你,那麼可以通過put或update對結果進行更新
重新處理
該重要模式是重新處理事件:
- 流式處理應用更新了,要使用新版本應用處理同一個事件流,生成新的結果,並比較兩種版本的結果,然後某個時間點將客戶端切換到新的結果流
- 現有的流式處理出現了缺陷,修複後,需要重新處理並重新計算結果
第一種情況,需要Kafka將事件流長時間地保存在可伸縮的數據存儲中
- 將新版本的應用作為一個新的消費者組
- 新的版本從輸入主題的第一個偏移量開始讀取數據
- 檢查結果流,在新版本的處理作業趕上進度時,將客戶端應用程式切換到新的結果流上
第二種情況,需要應用程式回到輸入流的起始位置開始處理,同時重置本地狀態,還要清理之前的輸出流。這種方式處理起來比較困難。建議還是使用第一種方案。
參考文獻:
《Kafka全文指南》