Spark調度管理(讀書筆記) 轉載請註明出處: "http://www.cnblogs.com/BYRans/" Spark調度管理 本文主要介紹在單個任務內Spark的調度管理,Spark調度相關概念如下: Task(任務):單個分區數據及上的最小處理流程單元。 TaskSet(任務集):由一組 ...
Spark調度管理(讀書筆記)
轉載請註明出處:http://www.cnblogs.com/BYRans/
Spark調度管理
本文主要介紹在單個任務內Spark的調度管理,Spark調度相關概念如下:
- Task(任務):單個分區數據及上的最小處理流程單元。
- TaskSet(任務集):由一組關聯的,但互相之間沒有Shuffle依賴關係的任務所組成的任務集。
- Stage(調度階段):一個任務集對應的調度階段。
- Job(作業):有一個RDD Action生成的一個或多個調度階段所組成的一次計算作業。
- Application(應用程式):Spark應用程式,由一個或多個作業組成。
各概念間的邏輯關係如下圖所示:
Spark的調度管理模塊中,最重要的類是DAGScheduler和TaskScheduler,TaskScheduler負責每個具體任務的實際物理調度,DAGScheduler負責將作業拆分成不同階段的具有依賴關係的多批任務,可以理解為DAGScheduler負責任務的邏輯調度。Spark調度管理示意圖如下:
調度階段的拆分
一個Spark任務提交後,DAGScheduler從RDD依賴鏈末端的RDD出發,遍歷整個RDD依賴鏈,將Job分解成具有前後依賴關係的多個stage。DAGScheduler是根據ShuffleDependency劃分stage的,也就是說當某個RDD的運算需要將數據進行shuffle操作時,這個包含了shuffle依賴關係的RDD將被用來作為輸入信息,構建一個新的調度階段。以此為依據劃分調度階段,可以確保有依賴關係的數據能夠按照正確的順序得到處理和運算。
調度階段的提交
在劃分Stage的步驟中會得到一個或多個有依賴關係的Stage,其中直接觸發作業的RDD關聯的調度階段被稱為FinalStage,DAGScheduler從FinalStage開始生成一個Job。Job和Stage的關係存儲在一個映射表中,用於在該調度階段全部完成時做一些後續處理,如報告狀態、清理作業相關數據等。
具體提交一個Stage時,首先判斷其依賴的所有父Stage的結果是否可用。如果所有父Stage的結果都可用,則提交該Stage。如果有任何一個父Stage的結果不可用,則嘗試迭代提交當前不可用的父Stage。在迭代過程中,父Stage還未運行的Stage都被放到等待隊列中,等待將來被提交。
下圖是一個具有四個調度階段的Job的Stage提交順序:
當一個屬於中間過程調度階段的任務(這種類型的任務所對應的類為ShuffleMapTask)完成後,DAGScheduler會檢查對應調度階段的所有任務是否都完成了。如果完成了,則DAGScheduler將重新掃描一次等待列表中所有的Stage,檢查它們是否還有依賴的Stage沒有完成。如果所有依賴的Stage都已執行完畢,則提交該Stage。
在這裡,博主有一個疑問:能否按照DAG劃分的Stage的拓撲順序提交執行Stage?求大家指點迷津。
任務結果的獲取
根據任務結果的大小不同,ResultTask返回的結果分為兩中形式:
- 如果結果足夠小,則直接放在DirectTaskResult對象內。
- 如果超過特定尺寸(預設約10MB),則在Executor端會將DirectTaskResult序列化,將序列化的結果作為一個數據塊存放在BlockManager中,然後將BlockManager返回的BlockId放在IndirectTaskResult對象中返回給TaskScheduler,TaskScheduler進而調用TaskResultGetter將IndirectTaskResult中的BlockId取出並通過BlockManager最終取得對應的DirectTaskResult。