Spark Core 1.3.1源碼解析及個人總結

来源:http://www.cnblogs.com/pojishou/archive/2017/01/17/6275055.html
-Advertisement-
Play Games

本篇源碼基於趙星對Spark 1.3.1解析進行整理。話說,我不認為我這下文源碼的排版很好,不能適應的還是看總結吧。 雖然1.3.1有點老了,但對於standalone模式下的Master、Worker和劃分stage的理解是很有幫助的。 總結: master和worker都要創建ActorSyst ...


本篇源碼基於趙星對Spark 1.3.1解析進行整理。話說,我不認為我這下文源碼的排版很好,不能適應的還是看總結吧。

雖然1.3.1有點老了,但對於standalone模式下的Master、Worker和劃分stage的理解是很有幫助的。
=====================================================
總結:

master和worker都要創建ActorSystem來創建自身的Actor對象,master內部維護了一個保存workerinfo的hashSet和一個key為workerid,
value為workerInfo的HashMap。
master構造方法執行後會啟動一個定時器,定期檢查超時的worker。
worker構造方法執行後會嘗試與master建立連接併發送註冊消息,master收到消息後,封裝worker並持久化,再給worker反饋,
worker收到反饋後,啟動定時任務向master發送心跳,master收到心跳後更新心跳時間。

new SparkContext(),執行主構造器,創建SparkEnv,env里創建了ActorSystem用於通信,
然後創建TaskScheduler,創建DAGScheduler。TaskScheduler里創建了2個actor分別負責與master和executors進行通信。
ClientActor創建之前,會準備一大堆的參數,包括spark參數,java參數,executor的實現類等,
封裝進AppClient,然後創建ClientActor與Master建立連接發送註冊信息,Master收到後保存app的信息並反饋。
這時Master開始調度資源並啟動worker,有兩種調度方式:儘量打散,儘量集中,預設打散。
Master發消息給Worker,worker拼接Java命令,啟動子進程。
(TaskScheduler 里會創建一個backend,backend調用start方法後,會先調用父類的start方法,父類的start方法會創建DriverActor,再執行自己的start方法創建ClientActor)

執行到Action運算元會執行sparkContext里的runJob(),再調用DAGScheduler的runJob(),通過2個HashSet和1個Stack劃分stage,然後提交stage。
將stage創建成多個Task,分為shuffleMapTask和ResultTask,組成taskSet,由taskScheduler通過DriverActor向Executor進行提交。

DAG的邏輯:
val parents = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
val waitingForVisit = new Stack[RDD[_]]
將最後一個rdd壓棧waitingForVisit,當waitingForVisit非空時while迴圈,waitingForVisit彈棧出的rdd判斷是否在visited中,
否,則rdd添加進visited,迴圈rdd的父rdd,如果不是shuffleMapStage,將rdd壓棧waitingForVisit,是shuffleMapStage,則再
求父stage加入parents,求父stage是調用本方法的遞歸過程。
=====================================================
object Master
  |--def main()
    |--載入配置文件並解析。
    |--//創建ActorSystem和Actor
    |--def startSyatemAndActor()
      |--//通過AkkaUtils工具類創建ActorSystem
      |--AkkaUtils.createActorSystem()
        |--//定義一個函數,創建ActorSystem
        |--val startService: Int => (返回值) = {doCreateActorSystem()}
          |--val (actorSystem, boundPort) = doCreateActorSystem()
            |--//創建ActorSystem
            |--//準備Akka參數
            |--val akkaConf = xxxx
            |--//創建ActorSystem
            |--val actorSystem = ActorSyatem(name, akkaConf)
            |--return (actorSystem, boundPort)
        |--//調用函數
        |--Utils.startServiceOnPort(startService())
          |--從一個沒有被占用的埠啟動服務,調用startService函數
    |--//通過ActorSystem創建Actor:master
    |--val actor(master) = actorSystem.actorOf(master)//創建master,master也是一個actor
  |--//成員變數:保存workerInfo
  |--val workers = new HashSet[WorkerInfo]
  |--//成員變數:保存(workerId,workInfo)
  |--val idToWorker = new HashMap[String, WorkerInfo]
  |--//構造方法之後,receive方法之前
  |--def preStart()
    |--//啟動一個定時器,定時檢測超時的worker
    |--context.system.scheduler.scheduler(self,checkxxx)//自己給自己發消息,發送到自己的recevice方法,啟動任務
  |--接收worker向master註冊的消息
  |--case RegisterWorker()
    |--//封裝worker信息
    |--val worker = new WorkerInfo()
    |--//持久化到zk
    |--persistenceEngine.addWorker(worker)
    |--//向worker反饋信息
    |--sender ! RegisteredWorker(masterUrl)
    |--//任務調度
    |--schedule()
  |--case Heartbeat(workerId)//worker發來的心跳
    |--//更新上一次心跳時間
    |--workerInfo.lastHeartbeat = Syatem.currentTimeMillis()
--------接SparkContext,Driver創建ClientActor向Master註冊應用信息-----------
  |--case RegisterApplication(description)
    |--//封裝消息
    |--val app = createApplication(description, sender)
    |--//註冊消息,即存入集合
    |--registerApplication(app) //方法內部就是把app放進map等
      |--HashMap waitingApps(appid, app)
    |--//持久化保存
    |--persistenceEngine.addApplication(app)
    |--//Master向ClientActor發送註冊成功的消息
    |--sender ! RegisterApplication(app.id, masterUrl)
    |--//Master開始調度資源,將任務啟動到worker上
    |--//兩種情況下會進行調度:
    |--//1、提交任務,殺死任務
    |--//2、worker新增或減少
    |--schedule()
--------Master進行資源的調度-------------
  |--//兩種調度方式:儘量打散,儘量集中
  |--def schedule()
    |--//儘量打散
      |--//進行一系列的判斷過濾,例如worker上剩餘的核數或記憶體是否大於app所需資源
      |--//分核數的邏輯:
      |--//假設需要10個核心,現有4台機器,各有8個核心
      |--//創建一個長度為4的數組,角標為0,角標=(角標+1)%4,那角標只會在0~3之間迴圈,
      |--//迴圈一次需要的核心-1,worker(角標)的核心+1
      |--//Master發信息讓worker啟動executor
      |--launchExecutor(usableWorkers(pos), exec)
    |--//儘量集中
      |--//一下子把worker剩餘的資源全部分配完在分配下一個worker
      |--//Master發信息讓worker啟動executor
      |--launchExecutor(worker, exec)
--------Master發信息讓worker啟動executor-------------
  |--def launchExecutor(worker, exec)
    |--//記錄worker使用資源
    |--worker.addExecutor(exec)
    |--//master發消息給worker,將參數傳遞給worker,讓他啟動executor
    |--worker.actor ! LaunchExecutor(.....)
    |--//Master向ClientActor發消息,告訴他executor已經啟動了
    |--exec.application.driver ! ExecutorAdded(......)
-----------------------------------------------------
object Worker
  |--def main()
    |--//創建ActorSystem和Actor
    |--def startSyatemAndActor()
      |--與Master過程相同
    |--//通過ActorSystem創建Actor:worker
    |--val actor(worker) = actorSystem.actorOf(worker)
  |--//構造方法之後,receive方法之前
  |--def preStart()
    |--//與master建立連接,發送註冊消息
    |--registerWithMaster()
      |--//嘗試註冊,如果失敗嘗試多次
      |--tryRegidterAllMasters()
        |--//建立連接
        |--val actor(master) = context.actorSelection(masterAkkaUrl)
        |--//發送註冊消息
        |--actor ! RegisterWorker(workId, host, port, cores, memory...)
  |--//Master發給Worker註冊成功的消息
  |--case RegisteredWorker(masterUrl)
    |--//啟動定時器,定期發送心跳
    |--context.system.scheduler.scheduler(self,SendHeartbeat)//自己給自己發消息,發送到自己的recevice方法,啟動任務
  |--case SendHeartbeat
    |--//發送心跳
    |--master ! Heartbeat(workid)
-------------上接:Master發信息讓worker啟動executor-----------
  |--case LaunchExecutor(...)
    |--創建ExecutorRunner,將參數放入其中,然後再通過他啟動Executor
    |--val manager = new ExecutorRunner(...)
    |--//調用ExecutorRunner的start方法來啟動executor java子進程
    |--manager.start()
class ExecutorRunner
  |--def start()
    |--//創建線程,通過線程的start來啟動java子進程
    |--workerThread = new Thread(){def run(){fetchAndRunExecutor()}}
    |--workerThread.start()
  |--def fetchAndRunExecutor()
    |--//啟動子進程
    |--//有具體的類,拼接java命令啟動相應的類

總結:Master和Worker之間的通信:
master和worker都要創建ActorSystem來創建自身的Actor對象,master內部維護了一個保存workerinfo的hashSet和一個key為workerid,
value為workerInfo的HashMap。
master構造方法執行後會啟動一個定時器,定期檢查超時的worker。
worker構造方法執行後會嘗試與master建立連接併發送註冊消息,master收到消息後,封裝worker並持久化,再給worker反饋,
worker收到反饋後,啟動定時任務向master發送心跳,master收到心跳後更新心跳時間。
=====================================================
class SparkContext//即Driver端
  |--//主構造器
    |--def this()
    |--//創建SparkEnv,包含了一個ActorSyatem
    |--val env = createSparkEnv()
    |--//創建ActorSyatem的方法
    |--def createSparkEnv()
      |--//調用 SparkEnv 的靜態方法創建SparkEnv
      |--SparkEnv.createDriverEnv()
    |--//創建 TaskScheduler
    |--var taskScheduler(schedulerBackend, taskScheduler) = SparkContext.createTaskScheduler(this, master)
    |--//創建 executors 和 DriverActor 的心跳Actor
    |--val heartbeatReceiver = env.actorSystem.actorOf(new HeartbeatReceiver(taskScheduler),...)
    |--//創建DAGScheduler
    |--var dagScheduler = new DAGScheduler(this)
    |--//啟動TaskSecheduler
    |--taskScheduler.start()
  |--//創建 TaskScheduler 方法,
  |--//根據提交任務時指定的url(本地/yarn/standalone)創建相應的 TaskScheduler
  |--def createTaskScheduler()
    |--//spark的standalone模式
    |--case SPARK_REGEX(sparkUrl)
      |--//創建 TaskSchedulerImpl
      |--val scheduler = new TaskSchedulerImpl(sc)
      |--//創建 SparkDeploySchedulerBackend
      |--val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
      |--//調用 initialize 創建調度器,預設使用先進先出的調度器
      |--scheduler.initialize(backend)

class TaskSchedulerImpl
  |--def initialize(backend)
    |--val backend = backend
  |--def start()
    |--//首先調用 SparkDeploySchedulerBackend 的start()
    |--backend.start()
-----------★★★調用taskScheduler的submitTasks方法來提交TaskSet-------------
  |--def submitTasks(taskSet)
    |--//Driver發消息任務
    |--backend.reviveOffers()
class SparkDeploySchedulerBackend extends CoarseGrainedSchedulerBackend
  |--def start()
    |--//調用父類的 start 來創建 DriverActor
    |--super.start() //CoarseGrainedSchedulerBackend 的 start 方法
    |--//準備一大堆的參數,例如spark的參數,java的參數,在Driver端都準備好,屆時直接發給master,master拿到後發給executor執行即可
    |--conf......
    |--//將參數封裝成Command,這是以後executor的實現類,類名也封裝好了,yarn中啟動的也是這個,所以不是yarnChild
    |--val command = Command("org.apache.executor.CoarseGrainedExecutorBackend",conf,...)
    |--//將參數封裝到ApplicationDescription
    |--val appDesc = new ApplicationDescription(sc.appName, command, ....)
    |--創建AppClient
    |--client = new AppClient(sc.actorSystem, masters, appDesc, ...)
    |--//調用AppClient的start方法,創建ClientActor用於與Master通信
    |--client.start()
class CoarseGrainedSchedulerBackend
  |--def start()
    |--//通過 actorSystem 創建 DriverActor
    |--driverActor = actorSystem.actorOf(new DriverActor(..)) //等待 executor 過來通信
----------上接:Executor向Driver註冊"|--//Driver建立連接,註冊exectuor"------------------------
  |--def receiveWithLogging
    |--//Driver收到executor發來的註冊消息
    |--case RegisterExecutor()
      |--//反饋註冊成功
      |--//★★★查看是否有任務需要提交
      |--makeOffers()//暫時沒有任務,還沒有構建DAG
-----------上接:提交前面的stage-------------------------
  |--def makeOffers()
    |--//調用launchTask向Executor提交Task
    |--launchTask(tasks)
  |--def launchTask(tasks)
    |--//序列化task
    |--val serializedTask = ser.serialize(task)
    |--//向Executor發送序列化好的Task
    |--executorData.executorActor ! LaunchTask(new SerializableBuffer(serializedTask))
-----------上接:backend.reviveOffers()------------------
  |--def reviveOffers()
    |--driverActor ! ReviveOffers
class DriverActor
  |--★★★調用makeOffers向Executor提交Task
  |--case ReviveOffers => makeOffers()
class AppClient
  |--def start()
    |--//創建ClientActor用於與Master通信
    |--actor = actorSystem.actorOf(new ClientActor)
  |--//主構造器
    |--def preStart()
      |--//ClientActor向Master註冊
      |--registerWithMaster()
  |--def registerWithMaster()
    |--//向Master註冊
    |--tryRegidterAllMasters()
  |--def tryRegidterAllMasters()
    |--//迴圈所有Master,建立連接
    |--val actor = context.actorSelection(masterAkkaUrl)
    |--//拿到Master的引用,向master註冊,備用的master不給反饋,活躍的才給
    |--//參數都保存在appDescription中,例如核數,記憶體大小,java參數,executor實現類
    |--actor ! RegisterApplication(appDescription)
  |--def receiveWithLogging
    |--//ClientActor收到Master發來的註冊成功的消息
    |--case RegisterApplication
      |--//更新Master地址
      |--changeMaster(masterUrl)
object SparkEnv
  |--def createDriverEnv()
    |//調用 create 創建 Actor
    |--create
      |--//創建 ActorSystem
      |--val (actorSystem, boundPort) = AkkaUtils.createActorSystem()
總結:new SparkContext(),執行主構造器,創建SparkEnv,env里創建了ActorSystem用於通信,
然後創建TaskScheduler,創建AGScheduler。TaskScheduler里創建了2個actor分別負責與master
和executors進行通信。(TaskScheduler 里會創建一個backend,backend調用start方法後,會
先調用父類的start方法,父類的start方法會創建DriverActor,再執行自己的start方法創建ClientActor)
ClientActor創建之前,會準備一大堆的參數,包括spark參數,java參數,executor的實現類等,
封裝進AppClient,然後創建ClientActor與Master建立連接發送註冊信息,Master收到後保存app的信息並反饋。
這時Master開始調度資源並啟動worker,有兩種調度方式:儘量打散,儘量集中,預設打散。
Master發消息給Worker,worker拼接Java命令,啟動子進程。
=====================================================
spark-submit腳本提交流程源碼分析:
spark-submit腳本
|--/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
spark-class腳本
|--1.3.1 echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@" 1>&2 //org.apache.spark.deploy.SparkSubmit
|--1.6.1/2.0 "$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
-----------------------------------------------------
object org.apache.spark.deploy.SparkSubmit
  |--def main()
    |--//進行匹配
    |--appArgs.action match{case SparkSubmitAction.SUBMIT => submit(appArgs)}
  |--def submit()
    |--def doRunMain()
      |--
    |--//調用doRunMain
    |--doRunMain()
      |--proxyUser.doAs(new xxxAction(){
        override def run():Unit = {
          runMain(...,childMainClass,...)
        }
      })
    |--def runMain(...,childMainClass,...)
      |--//反射自定義的spark程式 class
      |--mainClass = Class.forName(childMainClass,...)
      |--//調用main方法
      |--val mainMethod = mainClass.getMethod("main",...)
      |--mainMethod.invoke(null, childArgs.toArray)
總結: spark-submit啟動了一個spark自己的submit程式,通過反射調用我們自定義的spark程式
=====================================================
Executor跟Driver通信過程源碼分析
org.apache.executor.CoarseGrainedExecutorBackend
  |--def main()
    |--//解析一大堆參數
    |--//調用run方法
    |--run(....)
  |--def run()
    |--//在executor里創建ActorSystem
    |--val fetcher = AkkaUtils.createActorSystem(...)
    |--//跟Driver建立連接
    |--env.actorSystem.actorOf(new CoarseGrainedExecutorBackend)
  |--def preStart()
    |--//Driver建立連接,註冊exectuor
    |--.....
  |--def receiveWithLogging
    |--//Driver反饋註冊成功
    |--case RegisteredExecutor
      |--//創建Executor實例,執行業務邏輯
      |--executor = new Executor(....)
Executor
|--//初始化線程池
|--val threadPool = Utils.newDaemonCachedThreadPoll()
總結:Executor啟動後,創建actor向driver註冊,創建Executor實例執行業務邏輯
=====================================================
任務提交流源碼分析,DAScheduler執行過程
sc.textFile-->hadoopFile-->hadoopRDD-->MapParitionsRDD-->shuffleRDD
rdd.saveAsTextFile()-->MapPartitionsRDD
Driver端提交任務,執行self.context.runJob(....)

class SparkContext
  |--def runJob()
    |--//DAGScheduler切分Stage,轉成TaskSet給TaskScheduler再提交給Executor
    |--DAGScheduler.runJob(.....)
class DAGScheduler
  |--//runjob切分stage
  |--def runJob()
    |--//調用submitJob返回一個回調器
    |--val waiter = submitJob(rdd, ...)
    |--//進行模式匹配
    |--waiter.awaitResult() match
      |--case JobSuccesded
      |--case JobFailed
  |--def submitJob(rdd, ...)
    |--//將數據封裝到事件中放入eventProcessLoop的阻塞隊列中
    |--eventProcessLoop.post(JobSubmitted(...))
  |--val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
class DAGSchedulerEventProcessLoop extends EventLoop
  |--def onReceive()
    |--//提交計算任務
    |--case JobSubmitted(jobId,...)
      |--//調用DAGScheduler的handleJobSubmitted方法處理
      |--dagScheduler.handleJobSubmitted(jobId,...)
  |--//切分stage
  |--def handleJobSubmitted(jobId,...)
    |--★★★劃分stage
    |--finalStage = newStage(finalRDD, partitons.size, None, jobId, ...)
    |--//開始提交Stage
    |--submitStage(finalStage)
  |--def submitStage(finalStage)
    |--//獲取父stage
    |--val missing = getMissingParentStages(stage).sortBy(_.id)
    |--if(missing == null){
        //提交前面的stage
        submitMissingTasks(stage, jobId.get)
      }else{
        //有父stage,遞歸執行本方法
        for(parent <- missing){
          submitStage(parent)
        }
        |--//放進waitingStages
        |--waitingStages += stage
      }
  |--def submitMissingTasks(stage, jobId.get)
    |--//將stage創建成多個Task,分為shuffleMapTask和ResultTask
    |--new ShuffleMapTask(stage.id, taskBinary, part, locs)
    |--new ResultTask(stage.id, taskBinary, part, locs, id)
    |--//★★★調用taskScheduler的submitTasks方法來提交TaskSet
    |--taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, ..., stage.jobId, properties))
  |--def newStage
    |--//獲取父stage
    |--val parentStages = getParentStages(rdd, jobId)
    |--val stage = new Stage(...,parentStages,...)
  |--def getParentStages
    |--//使用了3個數據結構來處理父類stage
    |--val parents = new HashSet[Stage]
    |--val visited = new HashSet[RDD]
    |--val waitingForVisit = new Stack[RDD]
    |--//思路:通過遞歸,壓棧彈棧
    |--//見最後源碼
  |--def getMissingParentStages
    |--//與getParentStages一樣的數據結構找父stage
class EventLoop
  |--//阻塞隊列
  |--val eventQueue = new LinkedBlockingDeque()
  |--//不停的取事件
  |--val eventThread = new Thread(name){
      def run(){
        while(){
          val event = eventQueue.take()
          onReceive(event)
        }
      }
    }

總結:Action運算元會執行sparkContext里的runJob(),再調用DAGScheduler的runJob(),
通過2個HashSet和1個Stack劃分stage,然後提交stage

=======================劃分stage源碼==============================
/**
* Create a Stage -- either directly for use as a result stage, or as part of the (re)-creation
* of a shuffle map stage in newOrUsedStage. The stage will be associated with the provided
* jobId. Production of shuffle map stages should always use newOrUsedStage, not newStage
* directly.
*/
//★★★用於創建Stage
private def newStage(
  rdd: RDD[_],
  numTasks: Int,
  shuffleDep: Option[ShuffleDependency[_, _, _]],
  jobId: Int,
  callSite: CallSite)
  : Stage =
{
  //★★★獲取他的父Stage
  val parentStages = getParentStages(rdd, jobId)
  val id = nextStageId.getAndIncrement()
  val stage = new Stage(id, rdd, numTasks, shuffleDep, parentStages, jobId, callSite)
  stageIdToStage(id) = stage
  updateJobIdStageIdMaps(jobId, stage)
  stage
}
------------------------------------------------------
/**
* Get or create the list of parent stages for a given RDD. The stages will be assigned the
* provided jobId if they haven't already been created with a lower jobId.
*/
//TODO 用戶獲取父Stage
private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {
  val parents = new HashSet[Stage]
  val visited = new HashSet[RDD[_]]
  // We are manually maintaining a stack here to prevent StackOverflowError
  // caused by recursively visiting
  val waitingForVisit = new Stack[RDD[_]]
  def visit(r: RDD[_]) {
    if (!visited(r)) {
      visited += r
      // Kind of ugly: need to register RDDs with the cache here since
      // we can't do it in its constructor because # of partitions is unknown
    for (dep <- r.dependencies) {
      dep match {
        case shufDep: ShuffleDependency[_, _, _] =>
          //★★★把寬依賴傳進去,獲得父Stage
          parents += getShuffleMapStage(shufDep, jobId)
        case _ =>
          waitingForVisit.push(dep.rdd)
        }
      }
    }
  }
  waitingForVisit.push(rdd)
  while (!waitingForVisit.isEmpty) {
    visit(waitingForVisit.pop())
  }
  parents.toList
}
------------------------------------------------------
/**
* Get or create a shuffle map stage for the given shuffle dependency's map side.
* The jobId value passed in will be used if the stage doesn't already exist with
* a lower jobId (jobId always increases across jobs.)
*/
private def getShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): Stage = {
  shuffleToMapStage.get(shuffleDep.shuffleId) match {
    case Some(stage) => stage
    case None =>
      // We are going to register ancestor shuffle dependencies
      registerShuffleDependencies(shuffleDep, jobId)
      // Then register current shuffleDep

      val stage =
      //★★★創建服父Stage
        newOrUsedStage(
          shuffleDep.rdd, shuffleDep.rdd.partitions.size, shuffleDep, jobId,
          shuffleDep.rdd.creationSite)
      shuffleToMapStage(shuffleDep.shuffleId) = stage

      stage
    }
  }
------------------------------------------------------
/**
* Create a shuffle map Stage for the given RDD. The stage will also be associated with the
* provided jobId. If a stage for the shuffleId existed previously so that the shuffleId is
* present in the MapOutputTracker, then the number and location of available outputs are
* recovered from the MapOutputTracker
*/
private def newOrUsedStage(
  rdd: RDD[_],
  numTasks: Int,
  shuffleDep: ShuffleDependency[_, _, _],
  jobId: Int,
  callSite: CallSite)
  : Stage =
{
  //★★★遞歸
  val stage = newStage(rdd, numTasks, Some(shuffleDep), jobId, callSite)
  if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
    val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
    val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
    for (i <- 0 until locs.size) {
      stage.outputLocs(i) = Option(locs(i)).toList // locs(i) will be null if missing
    }
    stage.numAvailableOutputs = locs.count(_ != null)
  } else {
    // Kind of ugly: need to register RDDs with the cache and map output tracker here
    // since we can't do it in the RDD constructor because # of partitions is unknown
    logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
    mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.size)
  }
  stage
}


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 編輯 用戶: SYSTEM 1.1, 1.2, 2.1, 2.2, 3.1, 3.2, 4.1, 4.2, 5.1, 5.2, 6.1, 6.2, 7.1, 7.2, 8.1, ...
  • 執行計劃顯示SQL執行的開銷 工具→ SQL Server Profiler : SQL Server 分析器,監視系統調用的SQL Server查詢 Top查詢 -- Top Percent 選擇百分比 SELECT top 30 percent * FROM [SchoolDB].[dbo].[... ...
  • 前言 最近連續接觸了4個OA系統,均存在著不同的性能問題,本文記述對某移動OA系統的優化全過程,讓看官們對資料庫優化流程有一個瞭解,並揭開隱式轉換這無情殺手的神秘面紗。 本文使用的工具:SQL專家雲平臺專業體檢工具 :www.zhuancloud.com 系統情況 硬體配置 軟體情況 資料庫情況 系 ...
  • 遇到的問題 1、最初階段 系統中做了一個監控功能,用於記錄所有的請求數據,數據插入頻繁,量非常大,比如一天1000萬條。考慮到數據插入的效率,就使用記憶體KV緩存來保存。寫入過程是在接收到請求後放入到線程池中,然後線程池非同步處理後寫入。到這問題基本上沒什麼事情。 2、新的需求 後面數據保存了,就需要在 ...
  • httpd yum install httpd -y systemctl status httpd systemctl start httpd systemctl stop firewalld Mariadb yum install mariadb-server mariadb-client -y ...
  • SQL Server 其實從SQL Server 2005開始,也提供了類似ORACLE中固定執行計劃的功能,只是好像很少人使用這個功能。當然在SQL Server中不叫"固定執行計劃"這個概念,而是叫"執行計劃指南"(Plan Guide 很多翻譯是計劃指南,個人覺得執行計劃指南稍好一些)。當然兩... ...
  • 最近在做oracle相關的項目,剛接觸oracle,與sqlserver語法上還是有區別的 sqlserver : 示例:FX+當前年月日+00001 如下圖流水號實力所示 原理: 首先 'FX'是固定的,獲取當前年月日方法在sqlserver中分別是: 1.年:YEAR(GETDATE()) 2. ...
  • 一、博客前言 自接觸學習MySQL已有一段時間了,對於MySQL的基礎知識還是有一定的瞭解的。在這一路學習過來,每次不管看書還是網上看的資料,對於MySQL數據類型中的時間日期類型總是一掃而過,不曾停下來認認真真的研究學習。最近在圖書館借了一本關於MysQL的書籍,打算全面的學習研究一遍。 在之前, ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...