實時計算大數據處理的基石-Google Dataflow

来源:https://www.cnblogs.com/tree1123/archive/2019/08/22/11394923.html
-Advertisement-
Play Games

​ 此文選自Google大神Tyler Akidau的另一篇文章:Streaming 102: The world beyond batch ​ 歡迎回來!如果您錯過了我以前的帖子, "Streaming 大數據的未來" ,強烈建議您先花時間閱讀那篇文章。 簡要回顧一下,上一篇我們介紹了Stream ...


​ 此文選自Google大神Tyler Akidau的另一篇文章:Streaming 102: The world beyond batch

​ 歡迎回來!如果您錯過了我以前的帖子,Streaming-大數據的未來,強烈建議您先花時間閱讀那篇文章。

簡要回顧一下,上一篇我們介紹了Streaming,批量與流式計算,正確性與推理時間的工具,數據處理模式,事件事件與處理時間,視窗化。

在這篇文章中,我想進一步關註上次的數據處理模式,但更詳細。

​ 這裡會用到一些Google Cloud Dataflow的代碼片段,這是谷歌的一個框架,類似於Spark Streaming或Storm

這裡還有再說三個概念:

Watermarks:水印是關於事件時間的輸入完整性的概念。如果到某一個時間的水印,應該是已經獲取到了小於該時間的所有數據。在處理無界數據時,水印就作為處理進度的標準。

Triggers: 觸發器是一種機制,用於聲明視窗何時應該輸出,觸發器可靈活選擇何時應發出輸出。我們可以隨著時間的推移不斷改進結果,也可以處理那些比水印晚到達的數據,改進結果。

Accumulation: 累積模式指定在同一視窗中觀察到的多個結果之間的關係。這些結果可能是完全脫節的,即隨著時間的推移表示獨立的增量,或者它們之間可能存在重疊。

四個新的問題: what? where? when? How?

計算什麼? 希望通過數據計算的結果,和批處理類似,構建直方圖,計算總和,訓練機器學習等等。

在哪裡計算? 事件時間視窗可以回答這個問題,比如之前提到的(固定,滑動,會話),當然這個時間也可能是處理時間。

什麼時候處理產生結果?通過水印和觸發器來回答。可能有無限的變化,常見的模式是使用水印描述給定視窗的輸入是否完整,觸發器指定早期和後期結果。

結果如何相關? 通過累計模式來回答,丟棄不同的,累積產生的結果。

一、Streaming 101 Redux

詳細介紹Streaming 101的一些概念,並提供一些例子。

What:transformations

計算的結果是什麼?熟悉批處理的應該很熟悉這個。

舉一個例子,計算由10個值組成的簡單數據集的整數和。您可以想象為求一組人的分數和,或者是計費,監控等場景。

如果您瞭解Spark Streaming或Flink之類的東西,那麼您應該相對容易地瞭解Dataflow代碼正在做什麼。

Dataflow Java SDK 模型:

  • PCollections,表示可以執行並行轉換的數據集(可能是大量的數據集)。

  • PTransforms,將PCollections創建成新的PCollections。PTransforms可以執行逐元素變換,它們可以將多個元素聚合在一起,或者它們可以是多個PTransforms的組合。

file

圖二 轉換類型

我們從IO源中獲取消息,以KV的形式轉換,最後求出分數和。示例代碼如下:

PCollection<String> raw = IO.read(...);
PCollection<KV<String, Integer>> input = raw.apply(ParDo.of(new ParseFn());
PCollection<KV<String, Integer>> scores = input
  .apply(Sum.integersPerKey());

這個過程可以是在多個機器分散式執行的,分佈的將不同時間情況的數據進行累加,輸出得到最終的結果,我們不用關心分散式的問題,只要把所有的結果集轉換累加即可。

file
圖三 x為事件時間 y為處理時間

這裡我們計算的是所有事件時間,沒有進行視窗轉換,因此輸出矩形覆蓋整個X軸,但是我們處理無界數據時,這就不夠了,我們不能等到結束了再處理,因為永遠不會結束。所有我們需要考慮在哪裡計算呢?這就需要視窗。

Where:windowing

還記得我們之前提過的三種視窗,固定,滑動,會話。

file

圖四 三種視窗

我們用剛纔的例子,將其固定為兩分鐘的視窗。

PCollection<KV<String, Integer>> scores = input
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2))))
  .apply(Sum.integersPerKey());

Dataflow提供了一個統一的模型,可以在批處理和流式處理中同時工作,因為批處理實際上只是流的一個子集。

file

圖五 視窗處理

和以前一樣,輸入的數據在累積,直到它們被完全處理,然後產生輸出。在這種情況下,我們得到四個輸出而不是一個輸出:四個基於這個兩分鐘事件時間視窗中的單個輸出。

現在我們可以通過更具體的水印,觸發器和累計來解決更多的問題了。

二、Streaming 102

剛纔的處理還是通用的批處理方式,延遲很大,但我們已經成功把每個視窗的輸入都計算了,我們目前缺乏一種對無限數據處理方法,還要能保證其完整性。

When

Watermarks

水印是什麼時候處理產生結果?其實也就是我們之前研究事件時間和處理時間的那張圖。

file

上文圖 事件時間 處理時間 水印

這條紅色曲線就是水印,它隨著處理時間的推移不斷的去捕獲事件時間。從概念上講,我們將其視為從處理時間到事件時間的映射。水印可以有兩種類型:

完美水印:這要求我們對的輸入數據全部瞭解。也就沒有了後期數據,所有的數據準時到達。

啟髮式水印:對於大部分分散式輸入源,完整的瞭解輸入數據是不可能的,這就需要啟髮式水印。啟髮式水印通過分區,分區排序等提供儘可能準確的估計。所以是有可能錯誤的,這就需要觸發器在後期解決,這個一會會講。

下麵是兩個使用了不同水印的流處理引擎:

file

圖六 左完美 右啟發

在這兩種情況下,當水印通過視窗的末端時,視窗被實現。兩次執行之間的主要區別在於右側水印計算中使用的啟髮式演算法未考慮9的值,這極大地改變了水印的形狀。這些例子突出了水印的兩個缺點:

太慢:如果因為網路等原因導致有數據未處理時,只能延遲輸出結果。左圖比較明顯,遲到的9影響了整體的進度,這對於第二個視窗[12:02,12:04]尤為明顯,從視窗中的第一個值開始到我們看到視窗的任何結果為止需要將近7分鐘。而啟髮式水印要好一點只用了兩分鐘。

太快:當啟髮式水印錯誤地提前超過應有的水平時,水印之前的事件時間數據可能會在一段時間後到達,從而產生延遲數據。這就是右邊示例中發生的情況:在觀察到該視窗的所有輸入數據之前,水印超過了第一個視窗的末尾,導致輸出值不正確,正確的應該是14。這個缺點嚴格來說是啟髮式水印的問題, 他們的啟發性意味著他們有時會出錯。因此,如果您關心正確性,單靠它們來確定何時實現輸出是不夠的。

這時候我們就需要觸發器。

triggers

觸發器用於聲明視窗何時應該輸出。

觸發的信號包括:水印進度,處理時間進度,計數,數據觸發,重覆,邏輯與AND,邏輯或OR,序列。

還是用上面的例子,我們增加一個觸發器:

PCollection<KV<String, Integer>> scores = input
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
               .triggering(AtWatermark()))
  .apply(Sum.integersPerKey());

這裡規定了觸發的情況,我們可以考慮水印太快和太慢的情況。

太慢時,我們假設任何給定視窗都存在穩定的傳入,我們可以周期性的觸發。

太快時,可以在後期數據到達後去修正結果。如果後期數據不頻繁,並不會影響性能。

最後我們可以綜合考慮,協調早期,準時,晚期的情況:

PCollection<KV<String, Integer>> scores = input
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
               .triggering(
                 AtWatermark()
                   .withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))
                   .withLateFirings(AtCount(1))))
  .apply(Sum.integersPerKey());

生成結果如下,這個版本有了明顯的改進:

file

圖七 增加早期晚期

對於[12:02,12:04]視窗太慢的情況,每分鐘定時更新。延遲時間從七分鐘減少到三分半。

對於[12:00,12:02]視窗太快的情況,當值9顯示較晚時,我們立即將其合併到一個值為14的新的已更正窗格中。

但是這裡有一個問題,視窗要保持多長時間呢?這裡我們需要垃圾收集機制。

Garbage collection

在[啟髮式水印示例中,每個視窗的持久狀態在示例的整個生命周期,這是必要的,這樣我們才能夠在他們到達時適當處理遲到的數據。但是,雖然能夠保持所有持久狀態直到時間結束是很棒的,但實際上,在處理無限數據源時,保持給定視窗的狀態通常是不切實際的。無限, 我們最終會耗盡磁碟空間。

因此,任何真實的無序處理系統都需要提供一些方法來限制它正在處理的視窗的生命周期。

我們可以定義一個範圍,當超出這個範圍後,我們就丟棄無用的數據。

PCollection<KV<String, Integer>> scores = input
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
               .triggering(
                 AtWatermark()
                   .withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))
                   .withLateFirings(AtCount(1)))
               .withAllowedLateness(Duration.standardMinutes(1)))
  .apply(Sum.integersPerKey());

一旦水印通過視窗的延遲範圍,該視窗就會關閉,這意味著視窗的所有狀態都將被丟棄。

file

圖八 垃圾收集

這裡的6在允許遲到的範圍內,可以被收集,而9不在這個範圍,就被丟棄了。

有兩點要註意:

如果您正在使用可獲得完美水印的數據源的數據,就不需要處理延遲數據。

即使在使用啟髮式水印時,如果是將有限數量聚合,而且能保證一直可控,也不用考慮視窗的壽命問題。

現在時間的問題解決了,下麵我們討論如何累積數據。

How:Accumulation

有三種不同的累積模式:

丟棄:當下游的消費者進行累積計算時,直接相加所要的,就可以得到最終結果。

累積:比如未來的可以覆蓋之前的,一直要保持最新狀態,例如Hbase這種鍵值對的存儲。

累積和撤回:和累積類似,但更複雜。比如重新分組的情況,可能不只是覆蓋那麼簡單,需要先刪掉之前的,再加入最新的;還有動態視窗的情況,新視窗會替換舊視窗,但數據要放在不同的位置。

比如上圖中事件時間範圍[12:02,12:04],下表顯示了三種累積模式:

丟棄 累積 累積和收回
窗格1:[7] 7 7 7
第2頁:[3,4] 7 14 14,-7
第3頁:[8] 8 22 22,-14
觀察到最後的價值 8 22 22
總和 22 51 22

丟棄:每個窗格僅包含在該特定窗格期間到達的值。因此,觀察到的最終值並未完全捕獲總和。但是,如果您要自己對所有獨立窗格求和,那麼您將得到22的正確答案。

累積:每個窗格結合了特定窗格期間到達的值,加上從先前的窗格中的所有值。因此,正確觀察到的最終值可以捕獲22的總和。

累積和撤回:每個窗格都包含新的累積模式值以及前一個窗格值的縮進。因此,觀察到的最後一個(非回縮)值以及所有物化窗格的總和(包括撤回)都為您提供了22的正確答案。這就是撤回如此強大的原因。

file

圖九 三種累積模式

隨著丟棄,累積,累積和撤回的順序,存儲和計算成本在提高,因此累積模式的選擇要在正確性,延遲和成本中做出選擇。

When/Where: Processing-time windows

我們已經解決了所有四個問題,What,Where,When,How。但我們都是再事件時間的固定視窗。

所以我們還要討論一下處理時間中的固定視窗和事件時間中的會話視窗。

先討論處理時間中的固定視窗,處理時間視窗很重要,原因有兩個:

  • 對於某些用例,例如使用監控(例如,Web服務流量QPS),您希望在觀察到的情況下分析傳入的數據流,處理時視窗絕對是適當的方法。
  • 對於事件發生的時間很重要的用例(例如,分析用戶行為趨勢,計費,評分等),處理時間視窗絕對是錯誤的方法,並且能夠識別這些情況是至關重要的。

有兩種方法可用於實現處理時視窗:

觸發器:忽略事件時間(即,使用跨越所有事件時間的全局視窗)並使用觸發器在處理時間軸上提供該視窗的快照。

入口時間:將入口時間指定為數據到達時的事件時間,並使用正常的事件時間視窗。這基本上就像Spark Streaming目前所做的那樣。

處理時間視窗的一個重大缺點是,當輸入的觀察順序發生變化時,視窗的內容會發生變化。為了以更具體的方式展示,我們將看看這三個用例:

這裡我們將兩種事件時間相同而處理時間不同的情況比較。

事件時間視窗

file

圖10 事件時間視窗

四個視窗最終結果依然相同。

通過觸發器處理時間視窗

使用全局事件時間視窗,在處理時間域定期觸發,使用丟棄模式進行

file

圖11 觸發器處理時間視窗

  • 由於我們通過事件時間窗格模擬處理時間視窗,因此在處理時間軸中描繪了“視窗”,這意味著它們的寬度是在Y軸而不是X軸上測量的。
  • 由於處理時間視窗對遇到輸入數據的順序敏感,因此每個“視窗”的結果對於兩個觀察訂單中的每一個都不同,即使事件本身在技術上在每個版本中同時發生。在左邊我們得到12,21,18,而在右邊我們得到7,36,4。

通過入口時間處理時間視窗

當元素到達時,它們的事件時間需要在入口時被覆蓋。返回使用標準的固定事件時間視窗。由於入口時間提供了計算完美水印的能力,我們可以使用預設觸發器,在這種情況下,當水印通過視窗末端時,它會隱式觸發一次。由於每個視窗只有一個輸出,因此累積模式無關緊要。

file

圖12 入口時間處理時間視窗

  • 與其他處理時間視窗示例一樣,即使輸入的值和事件時間保持不變,當輸入的順序發生變化時,我們也會得到不同的結果。
  • 與其他示例不同,視窗在事件時域中再次描繪(因此沿X軸)。儘管如此,它們並不是真正的事件時間視窗; 我們只是簡單地將處理時間映射到事件時間域,刪除每個輸入的原始記錄,並用新的輸入替換它,而不是表示管道首次觀察數據的時間。
  • 儘管如此,由於水印,觸發器發射仍然與前一個處理時間示例完全相同。此外,產生的輸出值與該示例相同,如預測的那樣:左側為12,21,18,右側為7,36,4。

如果您關心事件實際發生的時間,您必須使用事件時間視窗,否則您的結果將毫無意義。

Where: session windows

動態的,數據驅動的視窗,稱為會話。

會話是一種特殊類型的視窗,它捕獲數據中的一段活動,它們在數據分析中特別有用。

  • 會話是數據驅動視窗的一個示例:視窗的位置和大小是輸入數據本身的直接結果,而不是基於某些預定義模式在時間內,如固定視窗和滑動視窗。
  • 會話也是未對齊視窗的示例,即,不是均勻地跨數據應用的視窗,而是僅對數據的特定子集(例如,每個用戶)。這與固定視窗和滑動視窗等對齊視窗形成對比,後者通常均勻地應用於數據。

file

圖13 會話

我們來構建一個會話:

PCollection<KV<String, Integer>> scores = input
  .apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(1)))
               .triggering(
                 AtWatermark()
                   .withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))
                   .withLateFirings(AtCount(1)))
               .accumulatingAndRetractingFiredPanes())
  .apply(Sum.integersPerKey());

我們得到結果如下:

file

圖14 會話視窗

當遇到值為5的第一個記錄時,它被放置在一個原始會話視窗中。

到達的第二個記錄是7,它同樣被放入它自己的原始會話視窗,因為它不與5的視窗重疊。

同時,水印已經過了第一個視窗的末尾,所以5的值在12:06之前被實現為準時結果。此後不久,第二個視窗也被實現為具有值7的推測結果,正如處理時間達到12:06那樣。

我們接下來觀察一系列記錄,3,4和3,原始會話都重疊。結果,它們全部合併在一起,並且在12:07觸發的早期觸發時,發出值為10的單個視窗。

當8在此後不久到達時,它與具有值7的原始會話和具有值10的會話重疊。因此所有三個被合併在一起,形成具有值25的新組合會話。

當9到達時,將值為5的原始會話和值為25的會話加入到值為39的單個較大會話中。

這個非常強大的功能,Spark Streaming已經做了實現。

簡單回顧一下,我們討論了事件時間與處理時間,視窗化,水印,觸發器,累積。探索了What,When,Where,How四個問題。而最終,我們將平衡正確性,延遲和成本問題,得到最適合自己的實時流式處理方案。

更多實時計算,Kafka等相關技術博文,歡迎關註實時流式計算

file


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 轉自Clement-Xu的csdn博客 http://blog.csdn.net/clementad/article/details/41620631 Apache在Linux系統中,其實叫“httpd”,可以使用yum來安裝。 1、查看httpd包是否可用: # yum list | grep h ...
  • 1、某進程1執行 SETNX lock 以嘗試獲取鎖 2、由於某進程2已獲得了鎖,所以進程1執行 SETNX lock 返回0,即獲取鎖失敗 3、進程1執行 GET lock 來檢測鎖是否已超時,如果沒超時,則線程等待一段時間,再次檢測 4、如果進程1檢測到鎖已超時,即當前的時間大於鍵 lock 的 ...
  • [20190821]關於CPU成本計算.txt--//有人問鏈接http://blog.itpub.net/267265/viewspace-2653964/中CPU成本如何計算的,實際上一般在優化中考慮這個細節很少,--//因為CPU COST占整個COST的比例很少,至於如何計算說複雜很複雜,說 ...
  • MySql學習筆記1 1.SQL 分類 1. DDL(Data Definition Language)數據定義語言 2. DML(Data Manipulation Language)數據操作語言 3. DQL(Data Query Language)數據查詢語言 4. DCL(Data Cont ...
  • SQL概要與表的創建 1.表的結構 ​ 關係資料庫通過類似Excel 工作表那樣的、由行和列組成的二維表來管理數據。用來管理數據的二維表在關係資料庫中簡稱為表。 ​ 根據 SQL 語句的內容返回的數據同樣必須是二維表的形式 ,這也是關係資料庫的特征之一 。返回結果如果不是二維表的SQL 語句則無法執 ...
  • 語法:DELETE FROM 表名 WHERE id NOT IN ( SELECT temp.min_id FROM ( SELECT MIN(id) min_id FROM 表名 GROUP BY 欄位名 )AS temp ); 備註:id:也是表欄位 實例:DELETE FROM pro_co ...
  • 參考資料:https://stackoverflow.com/questions/22008859/the-name-is-not-a-valid-identifier-error-in-function 執行存儲過程中sql字元串: ...
  • 我們都知道 sql語句中的排序有desc(降序)、asc(升序),這兩個都是按順序排列的,最近有一個需求是不按順序排序了 ,抽出個別的排在前面,並且這種需求是應對的問題中的數據是比較少的,而且沒有規律可循,用程式處理的話雖然能實現,但是處理起來會複雜很多,下麵我們就通過對order by 後面的排序 ...
一周排行
    -Advertisement-
    Play Games
  • Timer是什麼 Timer 是一種用於創建定期粒度行為的機制。 與標準的 .NET System.Threading.Timer 類相似,Orleans 的 Timer 允許在一段時間後執行特定的操作,或者在特定的時間間隔內重覆執行操作。 它在分散式系統中具有重要作用,特別是在處理需要周期性執行的 ...
  • 前言 相信很多做WPF開發的小伙伴都遇到過表格類的需求,雖然現有的Grid控制項也能實現,但是使用起來的體驗感並不好,比如要實現一個Excel中的表格效果,估計你能想到的第一個方法就是套Border控制項,用這種方法你需要控制每個Border的邊框,並且在一堆Bordr中找到Grid.Row,Grid. ...
  • .NET C#程式啟動閃退,目錄導致的問題 這是第2次踩這個坑了,很小的編程細節,容易忽略,所以寫個博客,分享給大家。 1.第一次坑:是windows 系統把程式運行成服務,找不到配置文件,原因是以服務運行它的工作目錄是在C:\Windows\System32 2.本次坑:WPF桌面程式通過註冊表設 ...
  • 在分散式系統中,數據的持久化是至關重要的一環。 Orleans 7 引入了強大的持久化功能,使得在分散式環境下管理數據變得更加輕鬆和可靠。 本文將介紹什麼是 Orleans 7 的持久化,如何設置它以及相應的代碼示例。 什麼是 Orleans 7 的持久化? Orleans 7 的持久化是指將 Or ...
  • 前言 .NET Feature Management 是一個用於管理應用程式功能的庫,它可以幫助開發人員在應用程式中輕鬆地添加、移除和管理功能。使用 Feature Management,開發人員可以根據不同用戶、環境或其他條件來動態地控制應用程式中的功能。這使得開發人員可以更靈活地管理應用程式的功 ...
  • 在 WPF 應用程式中,拖放操作是實現用戶交互的重要組成部分。通過拖放操作,用戶可以輕鬆地將數據從一個位置移動到另一個位置,或者將控制項從一個容器移動到另一個容器。然而,WPF 中預設的拖放操作可能並不是那麼好用。為瞭解決這個問題,我們可以自定義一個 Panel 來實現更簡單的拖拽操作。 自定義 Pa ...
  • 在實際使用中,由於涉及到不同編程語言之間互相調用,導致C++ 中的OpenCV與C#中的OpenCvSharp 圖像數據在不同編程語言之間難以有效傳遞。在本文中我們將結合OpenCvSharp源碼實現原理,探究兩種數據之間的通信方式。 ...
  • 一、前言 這是一篇搭建許可權管理系統的系列文章。 隨著網路的發展,信息安全對應任何企業來說都越發的重要,而本系列文章將和大家一起一步一步搭建一個全新的許可權管理系統。 說明:由於搭建一個全新的項目過於繁瑣,所有作者將挑選核心代碼和核心思路進行分享。 二、技術選擇 三、開始設計 1、自主搭建vue前端和. ...
  • Csharper中的表達式樹 這節課來瞭解一下表示式樹是什麼? 在C#中,表達式樹是一種數據結構,它可以表示一些代碼塊,如Lambda表達式或查詢表達式。表達式樹使你能夠查看和操作數據,就像你可以查看和操作代碼一樣。它們通常用於創建動態查詢和解析表達式。 一、認識表達式樹 為什麼要這樣說?它和委托有 ...
  • 在使用Django等框架來操作MySQL時,實際上底層還是通過Python來操作的,首先需要安裝一個驅動程式,在Python3中,驅動程式有多種選擇,比如有pymysql以及mysqlclient等。使用pip命令安裝mysqlclient失敗應如何解決? 安裝的python版本說明 機器同時安裝了 ...