本篇源碼基於趙星對Spark 1.3.1解析進行整理。話說,我不認為我這下文源碼的排版很好,不能適應的還是看總結吧。 雖然1.3.1有點老了,但對於standalone模式下的Master、Worker和劃分stage的理解是很有幫助的。 總結: master和worker都要創建ActorSyst ...
本篇源碼基於趙星對Spark 1.3.1解析進行整理。話說,我不認為我這下文源碼的排版很好,不能適應的還是看總結吧。
雖然1.3.1有點老了,但對於standalone模式下的Master、Worker和劃分stage的理解是很有幫助的。
=====================================================
總結:
master和worker都要創建ActorSystem來創建自身的Actor對象,master內部維護了一個保存workerinfo的hashSet和一個key為workerid,
value為workerInfo的HashMap。
master構造方法執行後會啟動一個定時器,定期檢查超時的worker。
worker構造方法執行後會嘗試與master建立連接併發送註冊消息,master收到消息後,封裝worker並持久化,再給worker反饋,
worker收到反饋後,啟動定時任務向master發送心跳,master收到心跳後更新心跳時間。
new SparkContext(),執行主構造器,創建SparkEnv,env里創建了ActorSystem用於通信,
然後創建TaskScheduler,創建DAGScheduler。TaskScheduler里創建了2個actor分別負責與master和executors進行通信。
ClientActor創建之前,會準備一大堆的參數,包括spark參數,java參數,executor的實現類等,
封裝進AppClient,然後創建ClientActor與Master建立連接發送註冊信息,Master收到後保存app的信息並反饋。
這時Master開始調度資源並啟動worker,有兩種調度方式:儘量打散,儘量集中,預設打散。
Master發消息給Worker,worker拼接Java命令,啟動子進程。
(TaskScheduler 里會創建一個backend,backend調用start方法後,會先調用父類的start方法,父類的start方法會創建DriverActor,再執行自己的start方法創建ClientActor)
執行到Action運算元會執行sparkContext里的runJob(),再調用DAGScheduler的runJob(),通過2個HashSet和1個Stack劃分stage,然後提交stage。
將stage創建成多個Task,分為shuffleMapTask和ResultTask,組成taskSet,由taskScheduler通過DriverActor向Executor進行提交。
DAG的邏輯:
val parents = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
val waitingForVisit = new Stack[RDD[_]]
將最後一個rdd壓棧waitingForVisit,當waitingForVisit非空時while迴圈,waitingForVisit彈棧出的rdd判斷是否在visited中,
否,則rdd添加進visited,迴圈rdd的父rdd,如果不是shuffleMapStage,將rdd壓棧waitingForVisit,是shuffleMapStage,則再
求父stage加入parents,求父stage是調用本方法的遞歸過程。
=====================================================
object Master
|--def main()
|--載入配置文件並解析。
|--//創建ActorSystem和Actor
|--def startSyatemAndActor()
|--//通過AkkaUtils工具類創建ActorSystem
|--AkkaUtils.createActorSystem()
|--//定義一個函數,創建ActorSystem
|--val startService: Int => (返回值) = {doCreateActorSystem()}
|--val (actorSystem, boundPort) = doCreateActorSystem()
|--//創建ActorSystem
|--//準備Akka參數
|--val akkaConf = xxxx
|--//創建ActorSystem
|--val actorSystem = ActorSyatem(name, akkaConf)
|--return (actorSystem, boundPort)
|--//調用函數
|--Utils.startServiceOnPort(startService())
|--從一個沒有被占用的埠啟動服務,調用startService函數
|--//通過ActorSystem創建Actor:master
|--val actor(master) = actorSystem.actorOf(master)//創建master,master也是一個actor
|--//成員變數:保存workerInfo
|--val workers = new HashSet[WorkerInfo]
|--//成員變數:保存(workerId,workInfo)
|--val idToWorker = new HashMap[String, WorkerInfo]
|--//構造方法之後,receive方法之前
|--def preStart()
|--//啟動一個定時器,定時檢測超時的worker
|--context.system.scheduler.scheduler(self,checkxxx)//自己給自己發消息,發送到自己的recevice方法,啟動任務
|--接收worker向master註冊的消息
|--case RegisterWorker()
|--//封裝worker信息
|--val worker = new WorkerInfo()
|--//持久化到zk
|--persistenceEngine.addWorker(worker)
|--//向worker反饋信息
|--sender ! RegisteredWorker(masterUrl)
|--//任務調度
|--schedule()
|--case Heartbeat(workerId)//worker發來的心跳
|--//更新上一次心跳時間
|--workerInfo.lastHeartbeat = Syatem.currentTimeMillis()
--------接SparkContext,Driver創建ClientActor向Master註冊應用信息-----------
|--case RegisterApplication(description)
|--//封裝消息
|--val app = createApplication(description, sender)
|--//註冊消息,即存入集合
|--registerApplication(app) //方法內部就是把app放進map等
|--HashMap waitingApps(appid, app)
|--//持久化保存
|--persistenceEngine.addApplication(app)
|--//Master向ClientActor發送註冊成功的消息
|--sender ! RegisterApplication(app.id, masterUrl)
|--//Master開始調度資源,將任務啟動到worker上
|--//兩種情況下會進行調度:
|--//1、提交任務,殺死任務
|--//2、worker新增或減少
|--schedule()
--------Master進行資源的調度-------------
|--//兩種調度方式:儘量打散,儘量集中
|--def schedule()
|--//儘量打散
|--//進行一系列的判斷過濾,例如worker上剩餘的核數或記憶體是否大於app所需資源
|--//分核數的邏輯:
|--//假設需要10個核心,現有4台機器,各有8個核心
|--//創建一個長度為4的數組,角標為0,角標=(角標+1)%4,那角標只會在0~3之間迴圈,
|--//迴圈一次需要的核心-1,worker(角標)的核心+1
|--//Master發信息讓worker啟動executor
|--launchExecutor(usableWorkers(pos), exec)
|--//儘量集中
|--//一下子把worker剩餘的資源全部分配完在分配下一個worker
|--//Master發信息讓worker啟動executor
|--launchExecutor(worker, exec)
--------Master發信息讓worker啟動executor-------------
|--def launchExecutor(worker, exec)
|--//記錄worker使用資源
|--worker.addExecutor(exec)
|--//master發消息給worker,將參數傳遞給worker,讓他啟動executor
|--worker.actor ! LaunchExecutor(.....)
|--//Master向ClientActor發消息,告訴他executor已經啟動了
|--exec.application.driver ! ExecutorAdded(......)
-----------------------------------------------------
object Worker
|--def main()
|--//創建ActorSystem和Actor
|--def startSyatemAndActor()
|--與Master過程相同
|--//通過ActorSystem創建Actor:worker
|--val actor(worker) = actorSystem.actorOf(worker)
|--//構造方法之後,receive方法之前
|--def preStart()
|--//與master建立連接,發送註冊消息
|--registerWithMaster()
|--//嘗試註冊,如果失敗嘗試多次
|--tryRegidterAllMasters()
|--//建立連接
|--val actor(master) = context.actorSelection(masterAkkaUrl)
|--//發送註冊消息
|--actor ! RegisterWorker(workId, host, port, cores, memory...)
|--//Master發給Worker註冊成功的消息
|--case RegisteredWorker(masterUrl)
|--//啟動定時器,定期發送心跳
|--context.system.scheduler.scheduler(self,SendHeartbeat)//自己給自己發消息,發送到自己的recevice方法,啟動任務
|--case SendHeartbeat
|--//發送心跳
|--master ! Heartbeat(workid)
-------------上接:Master發信息讓worker啟動executor-----------
|--case LaunchExecutor(...)
|--創建ExecutorRunner,將參數放入其中,然後再通過他啟動Executor
|--val manager = new ExecutorRunner(...)
|--//調用ExecutorRunner的start方法來啟動executor java子進程
|--manager.start()
class ExecutorRunner
|--def start()
|--//創建線程,通過線程的start來啟動java子進程
|--workerThread = new Thread(){def run(){fetchAndRunExecutor()}}
|--workerThread.start()
|--def fetchAndRunExecutor()
|--//啟動子進程
|--//有具體的類,拼接java命令啟動相應的類
總結:Master和Worker之間的通信:
master和worker都要創建ActorSystem來創建自身的Actor對象,master內部維護了一個保存workerinfo的hashSet和一個key為workerid,
value為workerInfo的HashMap。
master構造方法執行後會啟動一個定時器,定期檢查超時的worker。
worker構造方法執行後會嘗試與master建立連接併發送註冊消息,master收到消息後,封裝worker並持久化,再給worker反饋,
worker收到反饋後,啟動定時任務向master發送心跳,master收到心跳後更新心跳時間。
=====================================================
class SparkContext//即Driver端
|--//主構造器
|--def this()
|--//創建SparkEnv,包含了一個ActorSyatem
|--val env = createSparkEnv()
|--//創建ActorSyatem的方法
|--def createSparkEnv()
|--//調用 SparkEnv 的靜態方法創建SparkEnv
|--SparkEnv.createDriverEnv()
|--//創建 TaskScheduler
|--var taskScheduler(schedulerBackend, taskScheduler) = SparkContext.createTaskScheduler(this, master)
|--//創建 executors 和 DriverActor 的心跳Actor
|--val heartbeatReceiver = env.actorSystem.actorOf(new HeartbeatReceiver(taskScheduler),...)
|--//創建DAGScheduler
|--var dagScheduler = new DAGScheduler(this)
|--//啟動TaskSecheduler
|--taskScheduler.start()
|--//創建 TaskScheduler 方法,
|--//根據提交任務時指定的url(本地/yarn/standalone)創建相應的 TaskScheduler
|--def createTaskScheduler()
|--//spark的standalone模式
|--case SPARK_REGEX(sparkUrl)
|--//創建 TaskSchedulerImpl
|--val scheduler = new TaskSchedulerImpl(sc)
|--//創建 SparkDeploySchedulerBackend
|--val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
|--//調用 initialize 創建調度器,預設使用先進先出的調度器
|--scheduler.initialize(backend)
class TaskSchedulerImpl
|--def initialize(backend)
|--val backend = backend
|--def start()
|--//首先調用 SparkDeploySchedulerBackend 的start()
|--backend.start()
-----------★★★調用taskScheduler的submitTasks方法來提交TaskSet-------------
|--def submitTasks(taskSet)
|--//Driver發消息任務
|--backend.reviveOffers()
class SparkDeploySchedulerBackend extends CoarseGrainedSchedulerBackend
|--def start()
|--//調用父類的 start 來創建 DriverActor
|--super.start() //CoarseGrainedSchedulerBackend 的 start 方法
|--//準備一大堆的參數,例如spark的參數,java的參數,在Driver端都準備好,屆時直接發給master,master拿到後發給executor執行即可
|--conf......
|--//將參數封裝成Command,這是以後executor的實現類,類名也封裝好了,yarn中啟動的也是這個,所以不是yarnChild
|--val command = Command("org.apache.executor.CoarseGrainedExecutorBackend",conf,...)
|--//將參數封裝到ApplicationDescription
|--val appDesc = new ApplicationDescription(sc.appName, command, ....)
|--創建AppClient
|--client = new AppClient(sc.actorSystem, masters, appDesc, ...)
|--//調用AppClient的start方法,創建ClientActor用於與Master通信
|--client.start()
class CoarseGrainedSchedulerBackend
|--def start()
|--//通過 actorSystem 創建 DriverActor
|--driverActor = actorSystem.actorOf(new DriverActor(..)) //等待 executor 過來通信
----------上接:Executor向Driver註冊"|--//Driver建立連接,註冊exectuor"------------------------
|--def receiveWithLogging
|--//Driver收到executor發來的註冊消息
|--case RegisterExecutor()
|--//反饋註冊成功
|--//★★★查看是否有任務需要提交
|--makeOffers()//暫時沒有任務,還沒有構建DAG
-----------上接:提交前面的stage-------------------------
|--def makeOffers()
|--//調用launchTask向Executor提交Task
|--launchTask(tasks)
|--def launchTask(tasks)
|--//序列化task
|--val serializedTask = ser.serialize(task)
|--//向Executor發送序列化好的Task
|--executorData.executorActor ! LaunchTask(new SerializableBuffer(serializedTask))
-----------上接:backend.reviveOffers()------------------
|--def reviveOffers()
|--driverActor ! ReviveOffers
class DriverActor
|--★★★調用makeOffers向Executor提交Task
|--case ReviveOffers => makeOffers()
class AppClient
|--def start()
|--//創建ClientActor用於與Master通信
|--actor = actorSystem.actorOf(new ClientActor)
|--//主構造器
|--def preStart()
|--//ClientActor向Master註冊
|--registerWithMaster()
|--def registerWithMaster()
|--//向Master註冊
|--tryRegidterAllMasters()
|--def tryRegidterAllMasters()
|--//迴圈所有Master,建立連接
|--val actor = context.actorSelection(masterAkkaUrl)
|--//拿到Master的引用,向master註冊,備用的master不給反饋,活躍的才給
|--//參數都保存在appDescription中,例如核數,記憶體大小,java參數,executor實現類
|--actor ! RegisterApplication(appDescription)
|--def receiveWithLogging
|--//ClientActor收到Master發來的註冊成功的消息
|--case RegisterApplication
|--//更新Master地址
|--changeMaster(masterUrl)
object SparkEnv
|--def createDriverEnv()
|//調用 create 創建 Actor
|--create
|--//創建 ActorSystem
|--val (actorSystem, boundPort) = AkkaUtils.createActorSystem()
總結:new SparkContext(),執行主構造器,創建SparkEnv,env里創建了ActorSystem用於通信,
然後創建TaskScheduler,創建AGScheduler。TaskScheduler里創建了2個actor分別負責與master
和executors進行通信。(TaskScheduler 里會創建一個backend,backend調用start方法後,會
先調用父類的start方法,父類的start方法會創建DriverActor,再執行自己的start方法創建ClientActor)
ClientActor創建之前,會準備一大堆的參數,包括spark參數,java參數,executor的實現類等,
封裝進AppClient,然後創建ClientActor與Master建立連接發送註冊信息,Master收到後保存app的信息並反饋。
這時Master開始調度資源並啟動worker,有兩種調度方式:儘量打散,儘量集中,預設打散。
Master發消息給Worker,worker拼接Java命令,啟動子進程。
=====================================================
spark-submit腳本提交流程源碼分析:
spark-submit腳本
|--/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
spark-class腳本
|--1.3.1 echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@" 1>&2 //org.apache.spark.deploy.SparkSubmit
|--1.6.1/2.0 "$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
-----------------------------------------------------
object org.apache.spark.deploy.SparkSubmit
|--def main()
|--//進行匹配
|--appArgs.action match{case SparkSubmitAction.SUBMIT => submit(appArgs)}
|--def submit()
|--def doRunMain()
|--
|--//調用doRunMain
|--doRunMain()
|--proxyUser.doAs(new xxxAction(){
override def run():Unit = {
runMain(...,childMainClass,...)
}
})
|--def runMain(...,childMainClass,...)
|--//反射自定義的spark程式 class
|--mainClass = Class.forName(childMainClass,...)
|--//調用main方法
|--val mainMethod = mainClass.getMethod("main",...)
|--mainMethod.invoke(null, childArgs.toArray)
總結: spark-submit啟動了一個spark自己的submit程式,通過反射調用我們自定義的spark程式
=====================================================
Executor跟Driver通信過程源碼分析
org.apache.executor.CoarseGrainedExecutorBackend
|--def main()
|--//解析一大堆參數
|--//調用run方法
|--run(....)
|--def run()
|--//在executor里創建ActorSystem
|--val fetcher = AkkaUtils.createActorSystem(...)
|--//跟Driver建立連接
|--env.actorSystem.actorOf(new CoarseGrainedExecutorBackend)
|--def preStart()
|--//Driver建立連接,註冊exectuor
|--.....
|--def receiveWithLogging
|--//Driver反饋註冊成功
|--case RegisteredExecutor
|--//創建Executor實例,執行業務邏輯
|--executor = new Executor(....)
Executor
|--//初始化線程池
|--val threadPool = Utils.newDaemonCachedThreadPoll()
總結:Executor啟動後,創建actor向driver註冊,創建Executor實例執行業務邏輯
=====================================================
任務提交流源碼分析,DAScheduler執行過程
sc.textFile-->hadoopFile-->hadoopRDD-->MapParitionsRDD-->shuffleRDD
rdd.saveAsTextFile()-->MapPartitionsRDD
Driver端提交任務,執行self.context.runJob(....)
class SparkContext
|--def runJob()
|--//DAGScheduler切分Stage,轉成TaskSet給TaskScheduler再提交給Executor
|--DAGScheduler.runJob(.....)
class DAGScheduler
|--//runjob切分stage
|--def runJob()
|--//調用submitJob返回一個回調器
|--val waiter = submitJob(rdd, ...)
|--//進行模式匹配
|--waiter.awaitResult() match
|--case JobSuccesded
|--case JobFailed
|--def submitJob(rdd, ...)
|--//將數據封裝到事件中放入eventProcessLoop的阻塞隊列中
|--eventProcessLoop.post(JobSubmitted(...))
|--val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
class DAGSchedulerEventProcessLoop extends EventLoop
|--def onReceive()
|--//提交計算任務
|--case JobSubmitted(jobId,...)
|--//調用DAGScheduler的handleJobSubmitted方法處理
|--dagScheduler.handleJobSubmitted(jobId,...)
|--//切分stage
|--def handleJobSubmitted(jobId,...)
|--★★★劃分stage
|--finalStage = newStage(finalRDD, partitons.size, None, jobId, ...)
|--//開始提交Stage
|--submitStage(finalStage)
|--def submitStage(finalStage)
|--//獲取父stage
|--val missing = getMissingParentStages(stage).sortBy(_.id)
|--if(missing == null){
//提交前面的stage
submitMissingTasks(stage, jobId.get)
}else{
//有父stage,遞歸執行本方法
for(parent <- missing){
submitStage(parent)
}
|--//放進waitingStages
|--waitingStages += stage
}
|--def submitMissingTasks(stage, jobId.get)
|--//將stage創建成多個Task,分為shuffleMapTask和ResultTask
|--new ShuffleMapTask(stage.id, taskBinary, part, locs)
|--new ResultTask(stage.id, taskBinary, part, locs, id)
|--//★★★調用taskScheduler的submitTasks方法來提交TaskSet
|--taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, ..., stage.jobId, properties))
|--def newStage
|--//獲取父stage
|--val parentStages = getParentStages(rdd, jobId)
|--val stage = new Stage(...,parentStages,...)
|--def getParentStages
|--//使用了3個數據結構來處理父類stage
|--val parents = new HashSet[Stage]
|--val visited = new HashSet[RDD]
|--val waitingForVisit = new Stack[RDD]
|--//思路:通過遞歸,壓棧彈棧
|--//見最後源碼
|--def getMissingParentStages
|--//與getParentStages一樣的數據結構找父stage
class EventLoop
|--//阻塞隊列
|--val eventQueue = new LinkedBlockingDeque()
|--//不停的取事件
|--val eventThread = new Thread(name){
def run(){
while(){
val event = eventQueue.take()
onReceive(event)
}
}
}
總結:Action運算元會執行sparkContext里的runJob(),再調用DAGScheduler的runJob(),
通過2個HashSet和1個Stack劃分stage,然後提交stage
=======================劃分stage源碼==============================
/**
* Create a Stage -- either directly for use as a result stage, or as part of the (re)-creation
* of a shuffle map stage in newOrUsedStage. The stage will be associated with the provided
* jobId. Production of shuffle map stages should always use newOrUsedStage, not newStage
* directly.
*/
//★★★用於創建Stage
private def newStage(
rdd: RDD[_],
numTasks: Int,
shuffleDep: Option[ShuffleDependency[_, _, _]],
jobId: Int,
callSite: CallSite)
: Stage =
{
//★★★獲取他的父Stage
val parentStages = getParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
val stage = new Stage(id, rdd, numTasks, shuffleDep, parentStages, jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}
------------------------------------------------------
/**
* Get or create the list of parent stages for a given RDD. The stages will be assigned the
* provided jobId if they haven't already been created with a lower jobId.
*/
//TODO 用戶獲取父Stage
private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {
val parents = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
val waitingForVisit = new Stack[RDD[_]]
def visit(r: RDD[_]) {
if (!visited(r)) {
visited += r
// Kind of ugly: need to register RDDs with the cache here since
// we can't do it in its constructor because # of partitions is unknown
for (dep <- r.dependencies) {
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
//★★★把寬依賴傳進去,獲得父Stage
parents += getShuffleMapStage(shufDep, jobId)
case _ =>
waitingForVisit.push(dep.rdd)
}
}
}
}
waitingForVisit.push(rdd)
while (!waitingForVisit.isEmpty) {
visit(waitingForVisit.pop())
}
parents.toList
}
------------------------------------------------------
/**
* Get or create a shuffle map stage for the given shuffle dependency's map side.
* The jobId value passed in will be used if the stage doesn't already exist with
* a lower jobId (jobId always increases across jobs.)
*/
private def getShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): Stage = {
shuffleToMapStage.get(shuffleDep.shuffleId) match {
case Some(stage) => stage
case None =>
// We are going to register ancestor shuffle dependencies
registerShuffleDependencies(shuffleDep, jobId)
// Then register current shuffleDep
val stage =
//★★★創建服父Stage
newOrUsedStage(
shuffleDep.rdd, shuffleDep.rdd.partitions.size, shuffleDep, jobId,
shuffleDep.rdd.creationSite)
shuffleToMapStage(shuffleDep.shuffleId) = stage
stage
}
}
------------------------------------------------------
/**
* Create a shuffle map Stage for the given RDD. The stage will also be associated with the
* provided jobId. If a stage for the shuffleId existed previously so that the shuffleId is
* present in the MapOutputTracker, then the number and location of available outputs are
* recovered from the MapOutputTracker
*/
private def newOrUsedStage(
rdd: RDD[_],
numTasks: Int,
shuffleDep: ShuffleDependency[_, _, _],
jobId: Int,
callSite: CallSite)
: Stage =
{
//★★★遞歸
val stage = newStage(rdd, numTasks, Some(shuffleDep), jobId, callSite)
if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
for (i <- 0 until locs.size) {
stage.outputLocs(i) = Option(locs(i)).toList // locs(i) will be null if missing
}
stage.numAvailableOutputs = locs.count(_ != null)
} else {
// Kind of ugly: need to register RDDs with the cache and map output tracker here
// since we can't do it in the RDD constructor because # of partitions is unknown
logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.size)
}
stage
}