此文選自Google大神Tyler Akidau的另一篇文章:Streaming 102: The world beyond batch 歡迎回來!如果您錯過了我以前的帖子, "Streaming 大數據的未來" ,強烈建議您先花時間閱讀那篇文章。 簡要回顧一下,上一篇我們介紹了Stream ...
此文選自Google大神Tyler Akidau的另一篇文章:Streaming 102: The world beyond batch
歡迎回來!如果您錯過了我以前的帖子,Streaming-大數據的未來,強烈建議您先花時間閱讀那篇文章。
簡要回顧一下,上一篇我們介紹了Streaming,批量與流式計算,正確性與推理時間的工具,數據處理模式,事件事件與處理時間,視窗化。
在這篇文章中,我想進一步關註上次的數據處理模式,但更詳細。
這裡會用到一些Google Cloud Dataflow的代碼片段,這是谷歌的一個框架,類似於Spark Streaming或Storm
。
這裡還有再說三個概念:
Watermarks:水印是關於事件時間的輸入完整性的概念。如果到某一個時間的水印,應該是已經獲取到了小於該時間的所有數據。在處理無界數據時,水印就作為處理進度的標準。
Triggers: 觸發器是一種機制,用於聲明視窗何時應該輸出,觸發器可靈活選擇何時應發出輸出。我們可以隨著時間的推移不斷改進結果,也可以處理那些比水印晚到達的數據,改進結果。
Accumulation: 累積模式指定在同一視窗中觀察到的多個結果之間的關係。這些結果可能是完全脫節的,即隨著時間的推移表示獨立的增量,或者它們之間可能存在重疊。
四個新的問題: what? where? when? How?
計算什麼? 希望通過數據計算的結果,和批處理類似,構建直方圖,計算總和,訓練機器學習等等。
在哪裡計算? 事件時間視窗可以回答這個問題,比如之前提到的(固定,滑動,會話),當然這個時間也可能是處理時間。
什麼時候處理產生結果?通過水印和觸發器來回答。可能有無限的變化,常見的模式是使用水印描述給定視窗的輸入是否完整,觸發器指定早期和後期結果。
結果如何相關? 通過累計模式來回答,丟棄不同的,累積產生的結果。
一、Streaming 101 Redux
詳細介紹Streaming 101的一些概念,並提供一些例子。
What:transformations
計算的結果是什麼?熟悉批處理的應該很熟悉這個。
舉一個例子,計算由10個值組成的簡單數據集的整數和。您可以想象為求一組人的分數和,或者是計費,監控等場景。
如果您瞭解Spark Streaming或Flink之類的東西,那麼您應該相對容易地瞭解Dataflow代碼正在做什麼。
Dataflow Java SDK 模型:
PCollections,表示可以執行並行轉換的數據集(可能是大量的數據集)。
PTransforms,將PCollections創建成新的PCollections。PTransforms可以執行逐元素變換,它們可以將多個元素聚合在一起,或者它們可以是多個PTransforms的組合。
圖二 轉換類型
我們從IO源中獲取消息,以KV的形式轉換,最後求出分數和。示例代碼如下:
PCollection<String> raw = IO.read(...);
PCollection<KV<String, Integer>> input = raw.apply(ParDo.of(new ParseFn());
PCollection<KV<String, Integer>> scores = input
.apply(Sum.integersPerKey());
這個過程可以是在多個機器分散式執行的,分佈的將不同時間情況的數據進行累加,輸出得到最終的結果,我們不用關心分散式的問題,只要把所有的結果集轉換累加即可。
圖三 x為事件時間 y為處理時間
這裡我們計算的是所有事件時間,沒有進行視窗轉換,因此輸出矩形覆蓋整個X軸,但是我們處理無界數據時,這就不夠了,我們不能等到結束了再處理,因為永遠不會結束。所有我們需要考慮在哪裡計算呢?這就需要視窗。
Where:windowing
還記得我們之前提過的三種視窗,固定,滑動,會話。
圖四 三種視窗
我們用剛纔的例子,將其固定為兩分鐘的視窗。
PCollection<KV<String, Integer>> scores = input
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(2))))
.apply(Sum.integersPerKey());
Dataflow提供了一個統一的模型,可以在批處理和流式處理中同時工作,因為批處理實際上只是流的一個子集。
圖五 視窗處理
和以前一樣,輸入的數據在累積,直到它們被完全處理,然後產生輸出。在這種情況下,我們得到四個輸出而不是一個輸出:四個基於這個兩分鐘事件時間視窗中的單個輸出。
現在我們可以通過更具體的水印,觸發器和累計來解決更多的問題了。
二、Streaming 102
剛纔的處理還是通用的批處理方式,延遲很大,但我們已經成功把每個視窗的輸入都計算了,我們目前缺乏一種對無限數據處理方法,還要能保證其完整性。
When
Watermarks
水印是什麼時候處理產生結果?其實也就是我們之前研究事件時間和處理時間的那張圖。
上文圖 事件時間 處理時間 水印
這條紅色曲線就是水印,它隨著處理時間的推移不斷的去捕獲事件時間。從概念上講,我們將其視為從處理時間到事件時間的映射。水印可以有兩種類型:
完美水印:這要求我們對的輸入數據全部瞭解。也就沒有了後期數據,所有的數據準時到達。
啟髮式水印:對於大部分分散式輸入源,完整的瞭解輸入數據是不可能的,這就需要啟髮式水印。啟髮式水印通過分區,分區排序等提供儘可能準確的估計。所以是有可能錯誤的,這就需要觸發器在後期解決,這個一會會講。
下麵是兩個使用了不同水印的流處理引擎:
圖六 左完美 右啟發
在這兩種情況下,當水印通過視窗的末端時,視窗被實現。兩次執行之間的主要區別在於右側水印計算中使用的啟髮式演算法未考慮9的值,這極大地改變了水印的形狀。這些例子突出了水印的兩個缺點:
太慢:如果因為網路等原因導致有數據未處理時,只能延遲輸出結果。左圖比較明顯,遲到的9影響了整體的進度,這對於第二個視窗[12:02,12:04]尤為明顯,從視窗中的第一個值開始到我們看到視窗的任何結果為止需要將近7分鐘。而啟髮式水印要好一點只用了兩分鐘。
太快:當啟髮式水印錯誤地提前超過應有的水平時,水印之前的事件時間數據可能會在一段時間後到達,從而產生延遲數據。這就是右邊示例中發生的情況:在觀察到該視窗的所有輸入數據之前,水印超過了第一個視窗的末尾,導致輸出值不正確,正確的應該是14。這個缺點嚴格來說是啟髮式水印的問題, 他們的啟發性意味著他們有時會出錯。因此,如果您關心正確性,單靠它們來確定何時實現輸出是不夠的。
這時候我們就需要觸發器。
triggers
觸發器用於聲明視窗何時應該輸出。
觸發的信號包括:水印進度,處理時間進度,計數,數據觸發,重覆,邏輯與AND,邏輯或OR,序列。
還是用上面的例子,我們增加一個觸發器:
PCollection<KV<String, Integer>> scores = input
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
.triggering(AtWatermark()))
.apply(Sum.integersPerKey());
這裡規定了觸發的情況,我們可以考慮水印太快和太慢的情況。
太慢時,我們假設任何給定視窗都存在穩定的傳入,我們可以周期性的觸發。
太快時,可以在後期數據到達後去修正結果。如果後期數據不頻繁,並不會影響性能。
最後我們可以綜合考慮,協調早期,準時,晚期的情況:
PCollection<KV<String, Integer>> scores = input
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
.triggering(
AtWatermark()
.withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))
.withLateFirings(AtCount(1))))
.apply(Sum.integersPerKey());
生成結果如下,這個版本有了明顯的改進:
圖七 增加早期晚期
對於[12:02,12:04]視窗太慢的情況,每分鐘定時更新。延遲時間從七分鐘減少到三分半。
對於[12:00,12:02]視窗太快的情況,當值9顯示較晚時,我們立即將其合併到一個值為14的新的已更正窗格中。
但是這裡有一個問題,視窗要保持多長時間呢?這裡我們需要垃圾收集機制。
Garbage collection
在[啟髮式水印示例中,每個視窗的持久狀態在示例的整個生命周期,這是必要的,這樣我們才能夠在他們到達時適當處理遲到的數據。但是,雖然能夠保持所有持久狀態直到時間結束是很棒的,但實際上,在處理無限數據源時,保持給定視窗的狀態通常是不切實際的。無限, 我們最終會耗盡磁碟空間。
因此,任何真實的無序處理系統都需要提供一些方法來限制它正在處理的視窗的生命周期。
我們可以定義一個範圍,當超出這個範圍後,我們就丟棄無用的數據。
PCollection<KV<String, Integer>> scores = input
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
.triggering(
AtWatermark()
.withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))
.withLateFirings(AtCount(1)))
.withAllowedLateness(Duration.standardMinutes(1)))
.apply(Sum.integersPerKey());
一旦水印通過視窗的延遲範圍,該視窗就會關閉,這意味著視窗的所有狀態都將被丟棄。
圖八 垃圾收集
這裡的6在允許遲到的範圍內,可以被收集,而9不在這個範圍,就被丟棄了。
有兩點要註意:
如果您正在使用可獲得完美水印的數據源的數據,就不需要處理延遲數據。
即使在使用啟髮式水印時,如果是將有限數量聚合,而且能保證一直可控,也不用考慮視窗的壽命問題。
現在時間的問題解決了,下麵我們討論如何累積數據。
How:Accumulation
有三種不同的累積模式:
丟棄:當下游的消費者進行累積計算時,直接相加所要的,就可以得到最終結果。
累積:比如未來的可以覆蓋之前的,一直要保持最新狀態,例如Hbase這種鍵值對的存儲。
累積和撤回:和累積類似,但更複雜。比如重新分組的情況,可能不只是覆蓋那麼簡單,需要先刪掉之前的,再加入最新的;還有動態視窗的情況,新視窗會替換舊視窗,但數據要放在不同的位置。
比如上圖中事件時間範圍[12:02,12:04],下表顯示了三種累積模式:
丟棄 | 累積 | 累積和收回 | |
---|---|---|---|
窗格1:[7] | 7 | 7 | 7 |
第2頁:[3,4] | 7 | 14 | 14,-7 |
第3頁:[8] | 8 | 22 | 22,-14 |
觀察到最後的價值 | 8 | 22 | 22 |
總和 | 22 | 51 | 22 |
丟棄:每個窗格僅包含在該特定窗格期間到達的值。因此,觀察到的最終值並未完全捕獲總和。但是,如果您要自己對所有獨立窗格求和,那麼您將得到22的正確答案。
累積:每個窗格結合了特定窗格期間到達的值,加上從先前的窗格中的所有值。因此,正確觀察到的最終值可以捕獲22的總和。
累積和撤回:每個窗格都包含新的累積模式值以及前一個窗格值的縮進。因此,觀察到的最後一個(非回縮)值以及所有物化窗格的總和(包括撤回)都為您提供了22的正確答案。這就是撤回如此強大的原因。
圖九 三種累積模式
隨著丟棄,累積,累積和撤回的順序,存儲和計算成本在提高,因此累積模式的選擇要在正確性,延遲和成本中做出選擇。
When/Where: Processing-time windows
我們已經解決了所有四個問題,What,Where,When,How。但我們都是再事件時間的固定視窗。
所以我們還要討論一下處理時間中的固定視窗和事件時間中的會話視窗。
先討論處理時間中的固定視窗,處理時間視窗很重要,原因有兩個:
- 對於某些用例,例如使用監控(例如,Web服務流量QPS),您希望在觀察到的情況下分析傳入的數據流,處理時視窗絕對是適當的方法。
- 對於事件發生的時間很重要的用例(例如,分析用戶行為趨勢,計費,評分等),處理時間視窗絕對是錯誤的方法,並且能夠識別這些情況是至關重要的。
有兩種方法可用於實現處理時視窗:
觸發器:忽略事件時間(即,使用跨越所有事件時間的全局視窗)並使用觸發器在處理時間軸上提供該視窗的快照。
入口時間:將入口時間指定為數據到達時的事件時間,並使用正常的事件時間視窗。這基本上就像Spark Streaming目前所做的那樣。
處理時間視窗的一個重大缺點是,當輸入的觀察順序發生變化時,視窗的內容會發生變化。為了以更具體的方式展示,我們將看看這三個用例:
這裡我們將兩種事件時間相同而處理時間不同的情況比較。
事件時間視窗
圖10 事件時間視窗
四個視窗最終結果依然相同。
通過觸發器處理時間視窗
使用全局事件時間視窗,在處理時間域定期觸發,使用丟棄模式進行
圖11 觸發器處理時間視窗
- 由於我們通過事件時間窗格模擬處理時間視窗,因此在處理時間軸中描繪了“視窗”,這意味著它們的寬度是在Y軸而不是X軸上測量的。
- 由於處理時間視窗對遇到輸入數據的順序敏感,因此每個“視窗”的結果對於兩個觀察訂單中的每一個都不同,即使事件本身在技術上在每個版本中同時發生。在左邊我們得到12,21,18,而在右邊我們得到7,36,4。
通過入口時間處理時間視窗
當元素到達時,它們的事件時間需要在入口時被覆蓋。返回使用標準的固定事件時間視窗。由於入口時間提供了計算完美水印的能力,我們可以使用預設觸發器,在這種情況下,當水印通過視窗末端時,它會隱式觸發一次。由於每個視窗只有一個輸出,因此累積模式無關緊要。
圖12 入口時間處理時間視窗
- 與其他處理時間視窗示例一樣,即使輸入的值和事件時間保持不變,當輸入的順序發生變化時,我們也會得到不同的結果。
- 與其他示例不同,視窗在事件時域中再次描繪(因此沿X軸)。儘管如此,它們並不是真正的事件時間視窗; 我們只是簡單地將處理時間映射到事件時間域,刪除每個輸入的原始記錄,並用新的輸入替換它,而不是表示管道首次觀察數據的時間。
- 儘管如此,由於水印,觸發器發射仍然與前一個處理時間示例完全相同。此外,產生的輸出值與該示例相同,如預測的那樣:左側為12,21,18,右側為7,36,4。
如果您關心事件實際發生的時間,您必須使用事件時間視窗,否則您的結果將毫無意義。
Where: session windows
動態的,數據驅動的視窗,稱為會話。
會話是一種特殊類型的視窗,它捕獲數據中的一段活動,它們在數據分析中特別有用。
- 會話是數據驅動視窗的一個示例:視窗的位置和大小是輸入數據本身的直接結果,而不是基於某些預定義模式在時間內,如固定視窗和滑動視窗。
- 會話也是未對齊視窗的示例,即,不是均勻地跨數據應用的視窗,而是僅對數據的特定子集(例如,每個用戶)。這與固定視窗和滑動視窗等對齊視窗形成對比,後者通常均勻地應用於數據。
圖13 會話
我們來構建一個會話:
PCollection<KV<String, Integer>> scores = input
.apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(1)))
.triggering(
AtWatermark()
.withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))
.withLateFirings(AtCount(1)))
.accumulatingAndRetractingFiredPanes())
.apply(Sum.integersPerKey());
我們得到結果如下:
圖14 會話視窗
當遇到值為5的第一個記錄時,它被放置在一個原始會話視窗中。
到達的第二個記錄是7,它同樣被放入它自己的原始會話視窗,因為它不與5的視窗重疊。
同時,水印已經過了第一個視窗的末尾,所以5的值在12:06之前被實現為準時結果。此後不久,第二個視窗也被實現為具有值7的推測結果,正如處理時間達到12:06那樣。
我們接下來觀察一系列記錄,3,4和3,原始會話都重疊。結果,它們全部合併在一起,並且在12:07觸發的早期觸發時,發出值為10的單個視窗。
當8在此後不久到達時,它與具有值7的原始會話和具有值10的會話重疊。因此所有三個被合併在一起,形成具有值25的新組合會話。
當9到達時,將值為5的原始會話和值為25的會話加入到值為39的單個較大會話中。
這個非常強大的功能,Spark Streaming已經做了實現。
簡單回顧一下,我們討論了事件時間與處理時間,視窗化,水印,觸發器,累積。探索了What,When,Where,How四個問題。而最終,我們將平衡正確性,延遲和成本問題,得到最適合自己的實時流式處理方案。
更多實時計算,Kafka等相關技術博文,歡迎關註實時流式計算