這次向大家分享Microsoft發表在SOSP'13的另一篇關於流處理系統論文Naiad,TimelyDataflow是它的開源實現。該論文促進了後續的流圖系統的設計與創新,從其調度框架設計中也可以看到TuGraph Analytics調度器的影子。 ...
前面通過文章《論文圖譜當如是:Awesome-Graphs用200篇圖系統論文打個樣》向大家介紹了論文圖譜項目Awesome-Graphs,並分享了Google的Pregel、OSDI'12的PowerGraph、SOSP'13的X-Stream。這次向大家分享Microsoft發表在SOSP'13的另一篇關於流處理系統論文Naiad,TimelyDataflow是它的開源實現。該論文促進了後續的流圖系統的設計與創新,從其調度框架設計中也可以看到TuGraph Analytics調度器的影子。
對圖計算技術感興趣的同學可以多做瞭解,也非常歡迎大家關註和參與論文圖譜的開源項目:
- Awesome-Graphs:https://github.com/TuGraph-family/Awesome-Graphs
- OSGraph:https://github.com/TuGraph-family/OSGraph
提前感謝給項目點Star的小伙伴,接下來我們直接進入正文!
摘要
Naiad是一個可執行有環數據流的分散式數據並行系統,提供了高吞吐的批處理、低延遲的流處理,以及迭代和增量計算的能力。
1. 介紹
支持特性:
- 迴圈結構化,支持反向邊(feedback)。
- 有狀態的數據流節點,支持無需全局協調的生產消費能力。
- 節點收齊特定輪次/迭代的輸入後的通知機制。
2. 及時數據流
數據流圖可以包含嵌套的迴圈結構,時間戳用於區分數據是由哪個輪次/迭代產生的。
2.1 圖結構
及時數據流圖包含輸入/輸出節點,輸入節點從外部的生產者接受消息序列,輸出節點將消息序列發送到外部消費者。
外部的生產者為每個消息打標了一個輪次(epoch),當沒有消息需要輸入時,會主動通知輸入節點。
生產者也可以關閉輸入節點,表示輸入節點將不會再收到任何消息。
輸出節點的消息也會打標這個輪次,同樣當沒有消息需要輸出時,也會通知外部消費者。
及時數據流圖裡可以包含嵌套的迴圈上下文(loop contexts):
- 入口點(ingress vertex):數據流圖的邊進入迴圈上下文必須經過入口點,如I。
- 出口點(egress vertex):數據流圖的邊離開迴圈上下文必須經過出口點,如E。
- 反饋點(feedback vertex):迴圈上下文內必須包含反饋點,如F。
針對上圖所表達的計算語義解釋:
關鍵概念:邏輯時間戳(logical timestamp):
- e:消息的輪次。
- k:迴圈嵌套的深度。
- c:向量,每層迴圈的迭代次數。
邏輯時間戳變化規則:
- 經過入口點:c增加一個維度,初始化為0,表示迴圈開始。
- 經過反饋點:c的最後一個維度+1,表示迴圈次數累計。
- 經過出口點:c的最後一個維度提出,恢覆成與入口點一致。
邏輯時間戳大小比較,t1=(e1, <c1, ..., cm>),t2=(e2, <c1, ..., cn>):
- 條件1:整數比較,e1 < e2。
- 條件2:字元串比較,c1 + ... + cm < c1 + ... + cn。
2.2 節點計算
數據流的節點可以接收、發送帶邏輯時間戳的消息(message),以及通知(notification)。
每個節點v實現兩個回調函數:
- v.OnRecieve(Edge e, Message m, Timestamp t):接收消息。
- v.OnNotify(Timestamp t):接收通知。
並可以調用系統提供的兩個函數:
- this.SendBy(Edge e, Message m, Timestamp t):發送消息。
- this.NotifyAt(Timestamp t):發送通知。
對於數據流邊e=(u, v),u.SendBy將觸發v.OnRecieve,u.NotifyAt將觸發v.onNotify。
數據流系統保證v.OnNotify(t)一定發生在v.OnRecieve(e, m, t')之後,其中t' < t,即保證處理完所有t之前的消息後再處理通知,以讓節點具備機會清理t之前的工作狀態。
這種機制保證了消息處理不會發生時光回溯(backwards in time)。
如下示例代碼描述了一個雙出的數據流節點實現distinct、count運算元的邏輯。
class DistinctCount<S,T> : Vertex<T>
{
Dictionary<T, Dictionary<S,int>> counts;
void OnRecv(Edge e, S msg, T time)
{
if (!counts.ContainsKey(time)) {
counts[time] = new Dictionary<S,int>();
this.NotifyAt(time);
}
if (!counts[time].ContainsKey(msg)) {
counts[time][msg] = 0;
this.SendBy(output1, msg, time);
}
counts[time][msg]++;
}
void OnNotify(T time)
{
foreach (var pair in counts[time])
this.SendBy(output2, pair, time);
counts.Remove(time);
}
}
2.3 實現及時數據流
數據流處理受限於未處理的事件(events:消息、通知)和數據流圖的結構。
關鍵概念:pointstamp:
- u.SendBy(e, m, t):生成pointstamp (t, e)。
- u.NotifyAt(t):生成pointstamp (t, v)。
單線程調度器實現:
- 維護一個激活pointstamp(active pointstamp) 集合,集合大小至少為1。對於每個pointstamp,有兩個計數器:
- OC(occurrence count):未完成的pointstamp數。
- PC(precursor count):上游激活的pointstamp數。
- 系統初始化時,為輸入節點生成第一個pointstamp,其中t=e,OC=1,PC=0。當e完成後,繼續生成t=e+1的pointstamp。
- 當激活pointstamp p時,初始化PC為上游所有激活的pointstamp數,並遞增下游節點所有pointstamp的PC值。
- 當OC[p]=0時,從active集合刪除p,並遞減下游節點所有pointstamp的PC值。
- 當PC[p]=0時,表示上游沒有激活的pointstamp影響到p,則稱p是frontier,調度器會把所有通知發送給frontier。
OC的計算規則為:
3. 分散式實現
- Naiad集群包含多個進程,每個進程包含多個worker,worker管理數據流節點的一個分區。
- worker之間通過本地的共用記憶體或者遠程TCP連接交換消息。
- 進程遵循分散式進度追蹤協議(Progress Tracking Protocol),用於協調通知的分發。
3.1 數據並行
- 邏輯數據流圖:stages+connectors。
- connectors包含一個分區函數。
- 運行時邏輯數據流圖被展開為物理數據流圖,stage被替換為一組節點,connectors被替換為一組邊。
3.2 Workers
- 分發消息優先於分發通知。
- 分發策略多樣,如基於最早的pointstamp分發降低端到端延遲。
- worker使用共用隊列進行通信。
- 如果分發的目標節點在同一個worker,那麼SendBy會直接調用目標節點的OnRecieve。
- 如果存在環則需要強制進入隊列,或者控制遞歸深度避免系統過載。
3.3 分散式進度追蹤
- 每個worker維護各自的狀態,通過廣播OC進行狀態共用。
- 優化手段:
- 使用映射的pointstamp實現進度跟蹤,以降低併發衝突和更新規模。
- 更新廣播前先進行本地聚合。
3.4 錯誤容忍和可用性
- Checkpoint和Restore介面。
3.5 預防抖動
- 網路。
- 數據結構競爭。
- 垃圾回收。
4. 使用Naiad寫程式
5. 性能評估
6. 現實應用
- 批量迭代圖計算
- 批量迭代機器學習
- 流式無環計算
- 流式迭代圖分析
7. 總結
Naiad通過允許程式按需協調,支持了混合的同步+非同步計算。
作者:Florian 本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文鏈接,否則作者保留追究法律責任的權利。若本文對你有所幫助,您的 關註 和 推薦 是我分享知識的動力!