Spark作為分散式的大數據處理框架必然或涉及到大量的作業調度,如果能夠理解Spark中的調度對我們編寫或優化Spark程式都是有很大幫助的; 在Spark中存在 轉換操作(Transformation Operation) 與 行動操作(Action Operation) 兩種;而轉換操作只是會從 ...
Spark作為分散式的大數據處理框架必然或涉及到大量的作業調度,如果能夠理解Spark中的調度對我們編寫或優化Spark程式都是有很大幫助的;
在Spark中存在轉換操作(Transformation Operation)與 行動操作(Action Operation)兩種;而轉換操作只是會從一個RDD中生成另一個RDD且是lazy的,Spark中只有行動操作(Action Operation)才會觸發作業的提交,從而引發作業調度;在一個計算任務中可能會多次調用 轉換操作這些操作生成的RDD可能存在著依賴關係,而由於轉換都是lazy所以當行動操作(Action Operation )觸發時才會有真正的RDD生成,這一系列的RDD中就存在著依賴關係形成一個DAG(Directed Acyclc Graph),在Spark中DAGScheuler是基於DAG的頂層調度模塊;
相關名詞
Application:使用Spark編寫的應用程式,通常需要提交一個或多個作業;
Job:在觸發RDD Action操作時產生的計算作業
Task:一個分區數據集中最小處理單元也就是真正執行作業的地方
TaskSet:由多個Task所組成沒有Shuffle依賴關係的任務集
Stage:一個任務集對應的調度階段 ,每個Job會被拆分成諾幹個Stage
1.1 作業調度關係圖
RDD Action作業提交流程
這裡根據Spark源碼跟蹤觸發Action操作時觸發的Job提交流程,Count()是RDD中的一個Action操作所以調用Count時會觸發Job提交;
在RDD源碼count()調用SparkContext的runJob,在runJob方法中根據partitions(分區)大小創建Arrays存放返回結果;
RDD.scala
/**
* Return the number of elements in the RDD.
*/
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
SparkContext.scala
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
}
在SparkContext中將調用DAGScheduler的runJob方法提交作業,DAGScheduler主要任務是計算作業與任務依賴關係,處理調用邏輯;DAGScheduler提供了submitJob與runJob方法用於 提交作業,runJob方法會一直等待作業完成,submitJob則返回JobWaiter對象可以用於判斷作業執行結果;
在runJob方法中將調用submitJob,在submitJob中把提交操作放入到事件迴圈隊列(DAGSchedulerEventProcessLoop)中;
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
......
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
......
}
在事件迴圈隊列中將調用eventprocessLoop的onReceive方法;
Stage拆分
提交作業時DAGScheduler會從RDD依賴鏈尾部開始,遍歷整個依賴鏈劃分調度階段;劃分階段以ShuffleDependency為依據,當沒有ShuffleDependency時整個Job 只會有一個Stage;在事件迴圈隊列中將會調用DAGScheduler的handleJobSubmitted方法,此方法會拆分Stage、提交Stage;
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
......
finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
......
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
......
val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)
submitWaitingStages()
}
調度階段提交
在提交Stage時會先調用getMissingParentStages獲取父階段Stage,迭代該階段所依賴的父調度階段如果存在則先提交該父階段的Stage 當不存在父Stage或父Stage執行完成時會對當前Stage進行提交;
private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)
if (missing.isEmpty) {
submitMissingTasks(stage, jobId.get)
} else {
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
}
......
}
參考資料:
http://spark.apache.org/docs/latest/
文章首發地址:Solinx
http://www.solinx.co/archives/579