Spark作業調度階段分析

来源:http://www.cnblogs.com/softlin/archive/2016/03/26/5321933.html
-Advertisement-
Play Games

Spark作為分散式的大數據處理框架必然或涉及到大量的作業調度,如果能夠理解Spark中的調度對我們編寫或優化Spark程式都是有很大幫助的; 在Spark中存在 轉換操作(Transformation Operation) 與 行動操作(Action Operation) 兩種;而轉換操作只是會從 ...


Spark作為分散式的大數據處理框架必然或涉及到大量的作業調度,如果能夠理解Spark中的調度對我們編寫或優化Spark程式都是有很大幫助的;
  在Spark中存在轉換操作(Transformation Operation)行動操作(Action Operation)兩種;而轉換操作只是會從一個RDD中生成另一個RDD且是lazy的,Spark中只有行動操作(Action Operation)才會觸發作業的提交,從而引發作業調度;在一個計算任務中可能會多次調用 轉換操作這些操作生成的RDD可能存在著依賴關係,而由於轉換都是lazy所以當行動操作(Action Operation )觸發時才會有真正的RDD生成,這一系列的RDD中就存在著依賴關係形成一個DAG(Directed Acyclc Graph),在Spark中DAGScheuler是基於DAG的頂層調度模塊;

相關名詞

  Application:使用Spark編寫的應用程式,通常需要提交一個或多個作業;
  Job:在觸發RDD Action操作時產生的計算作業
  Task:一個分區數據集中最小處理單元也就是真正執行作業的地方
  TaskSet:由多個Task所組成沒有Shuffle依賴關係的任務集
  Stage:一個任務集對應的調度階段 ,每個Job會被拆分成諾幹個Stage

    作業調度關係圖
          1.1 作業調度關係圖

RDD Action作業提交流程

  這裡根據Spark源碼跟蹤觸發Action操作時觸發的Job提交流程,Count()是RDD中的一個Action操作所以調用Count時會觸發Job提交;
  在RDD源碼count()調用SparkContext的runJob,在runJob方法中根據partitions(分區)大小創建Arrays存放返回結果;

RDD.scala

/**
* Return the number of elements in the RDD.
*/
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

SparkContext.scala

def runJob[T, U: ClassTag](
  rdd: RDD[T],
  func: (TaskContext, Iterator[T]) => U,
  partitions: Seq[Int],
  resultHandler: (Int, U) => Unit): Unit = {

  val callSite = getCallSite
  val cleanedFunc = clean(func)
  logInfo("Starting job: " + callSite.shortForm)
  if (conf.getBoolean("spark.logLineage", false)) {
    logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
  }
  dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
}

  在SparkContext中將調用DAGScheduler的runJob方法提交作業,DAGScheduler主要任務是計算作業與任務依賴關係,處理調用邏輯;DAGScheduler提供了submitJob與runJob方法用於 提交作業,runJob方法會一直等待作業完成,submitJob則返回JobWaiter對象可以用於判斷作業執行結果;
  在runJob方法中將調用submitJob,在submitJob中把提交操作放入到事件迴圈隊列(DAGSchedulerEventProcessLoop)中;

def submitJob[T, U](
  rdd: RDD[T],
  func: (TaskContext, Iterator[T]) => U,
  partitions: Seq[Int],
  callSite: CallSite,
  resultHandler: (Int, U) => Unit,
  properties: Properties): JobWaiter[U] = {
      ......  
      eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      SerializationUtils.clone(properties)))
      ......
  }  

  在事件迴圈隊列中將調用eventprocessLoop的onReceive方法;

Stage拆分

  提交作業時DAGScheduler會從RDD依賴鏈尾部開始,遍歷整個依賴鏈劃分調度階段;劃分階段以ShuffleDependency為依據,當沒有ShuffleDependency時整個Job 只會有一個Stage;在事件迴圈隊列中將會調用DAGScheduler的handleJobSubmitted方法,此方法會拆分Stage、提交Stage;

 private[scheduler] def handleJobSubmitted(jobId: Int,
  finalRDD: RDD[_],
  func: (TaskContext, Iterator[_]) => _,
  partitions: Array[Int],
  callSite: CallSite,
  listener: JobListener,
  properties: Properties) {
var finalStage: ResultStage = null
......
  finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
......

val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
......
val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
  SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)

submitWaitingStages()
}

調度階段提交

  在提交Stage時會先調用getMissingParentStages獲取父階段Stage,迭代該階段所依賴的父調度階段如果存在則先提交該父階段的Stage 當不存在父Stage或父Stage執行完成時會對當前Stage進行提交;

 private def submitStage(stage: Stage) {
  val jobId = activeJobForStage(stage)
  if (jobId.isDefined) {
    if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
      val missing = getMissingParentStages(stage).sortBy(_.id)
      if (missing.isEmpty) {
        submitMissingTasks(stage, jobId.get)
      } else {
        for (parent <- missing) {
          submitStage(parent)
        }
        waitingStages += stage
      }
    }
  }
  ......
}

參考資料:
http://spark.apache.org/docs/latest/

文章首發地址:Solinx
http://www.solinx.co/archives/579


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 本篇之所以起這樣一個名字,是因為重點並非如何自定義控制項,不涉及創建CustomControl和UserControl使用的Template和XAML概念。而是通過繼承的方法來擴展一個現有的類,在繼承的子類中增加屬性和擴展行為。 我們在《UWP開發入門(七)——下拉刷新》中提到過嵌套ScrollVie ...
  • 一、Git簡介 1.Git是什麼 Git是分散式版本控制系統 2.Git有什麼特點 (1)Git是分散式的SCM,SVN是集中式的 (2)Git每個歷史版本存儲完整的文件,SVN存儲文件差異 (3)Git可離線完成大部分操作,SVN則相反 (4)Git有著更優雅的分支和合併實現 (5)Git有著更強... ...
  • 學習如何在MVC項目中配置AutoMapper。 一:首先在MVC項目中引用AutoMapper的DLL文件,接著創建一個介面,這裡面我們需要定義兩個方法,介面裡面的方法只能定義不能實現,也沒有什麼修飾符,實現介面的類必須實現裡面全部的方法。 定義介面IStartupTask,裡面有兩個方法。 pu... ...
  • 一、開發環境 編譯器:VS2013 .Net版本:4.5 二、開發過程 1.畫一條直線 private void btnDrawLine_Click(object sender, EventArgs e) { //創建一個畫圖圖面 Graphics g = this.CreateGraphics()... ...
  • 由於項目升級到了.NetFramework 4.6.1,開發工具轉向了vs2015,趁機嘗試下C#6.0.結果在網上搜的一些教程總結的不是太完整,有的代碼隨著vs正式版的發佈也有所修改.那些個教程也沒更新.所以把自己學習到的記錄一下. 1.自動屬性初始化(Auto-property initiali ...
  • 生日悖論,指如果一個房間里有23個或23個以上的人,那麼至少有兩個人的生日相同的概率要大於50%,準確的說是50.7左右,這就意味著在一個典型的標準小學班級(30人)中,存在兩人生日相同的可能性更高。對於60或者更多的人,這種概率要大於99%。從引起邏輯矛盾的角度來說生日悖論並不是一種悖論,從這個數 ...
  • 使用spring的jdbcTemplate 使用具名參數 在JDBC用法中,SQL參數是用占位符?表示,並且受到位置的限制,定位參數的問題在於,一旦參數的位置發生變化,必須改變參數的綁定,在Spring JDBC中,綁定SQL參數的另一種選擇是使用具名參數,SQL具名參數是按照名稱綁定,而不是位置綁 ...
  • refresh用於刷新與跳轉(重定向)頁面 refresh出現在http-equiv屬性中,使用content屬性表示刷新或跳轉的開始時間與跳轉的網址 refresh示例 5秒之後刷新本頁面: <meta http-equiv="refresh" content="5"/> 5秒之後轉到夢之都首頁: ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...