Spark RPC 框架源碼分析(三)Spark 心跳機制分析

来源:https://www.cnblogs.com/listenfwind/archive/2019/01/17/10284444.html
-Advertisement-
Play Games

前兩次講了 Spark RPC 的基礎內容以及源碼時序分析。這次我們來看看Spark 如何用 RPC 實現心跳。 ...


一. Spark 心跳概述

前面兩節中介紹了 Spark RPC 的基本知識,以及深入剖析了 Spark RPC 中一些源碼的實現流程。

具體可以看這裡:

這一節我們來看看一個 Spark RPC 中的運用實例 -- Spark 的心跳機制。這次主要還是從代碼的角度來看。

我們首先要知道 Spark 的心跳有什麼用。心跳是分散式技術的基礎,我們知道在 Spark 中,是有一個 Master 和眾多的 Worker,那麼 Master 怎麼知道每個 Worker 的情況呢,這就需要藉助心跳機制了。心跳除了傳輸信息,另一個主要的作用就是 Worker 告訴 Master 它還活著,當心跳停止時,方便 Master 進行一些容錯操作,比如數據轉移備份等等。

我們同樣分成兩部分來分析 Spark 的心跳機制,分為服務端(Spark Context)和客戶端(Executor)。

二. Spark 心跳服務端 heartbeatReceiver 解析

我們可以發現,SparkContext 中有關於心跳的類以及 RpcEndpoint 註冊代碼。

class SparkContext(config: SparkConf) extends Logging {
    ......
    private var _heartbeatReceiver: RpcEndpointRef = _
    ......
    //向 RpcEnv 註冊 Endpoint。
    _heartbeatReceiver = env.rpcEnv.setupEndpoint(HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
    ......
      val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
    _schedulerBackend = sched
    _taskScheduler = ts
    _dagScheduler = new DAGScheduler(this)
    _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
    ......
}

這裡 rpcEnv 已經在上下文中創建好,通過 setupEndpoint 向 rpcEnv 註冊一個心跳的 Endpoint。還記得上一節中 HelloworldServer 的例子嗎,在 setupEndpoint 方法中,會去調用 Dispatcher 創建這個 Endpoint(這裡就是HeartbeatReceiver) 對應的 Inbox 和 EndpointRef ,然後在 Inbox 監聽是否有新消息,有新消息則處理它。註冊完會返回一個 EndpointRef (註意這裡有 Refer,即是客戶端,用來發送消息的)。

所以這一句

_heartbeatReceiver = env.rpcEnv.setupEndpoint(HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))

就已經完成了心跳服務端監聽的功能。
那麼這條代碼的作用呢?

_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

這裡我們要看上面那句 val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode) ,它會根據 master url 創建 SchedulerBackend 和 TaskScheduler。這兩個類都是和資源調度有關的,所以需要藉助心跳機制來傳送消息。其中 TaskScheduler 負責任務調度資源分配,SchedulerBackend 負責與 Master、Worker 通信收集 Worker 上分配給該應用使用的資源情況。

這裡主要是告訴 HeartbeatReceiver(心跳) 的監聽端 ,告訴它 TaskScheduler 這個東西已經設置好啦。HeartbeatReceiver 就會回應你說好的,我知道的,並持有這個 TaskScheduler。

到這裡服務端 heartbeatReceiver 就差不多完了,我們可以發現,HeartbeatReceiver 除了向 RpcEnv 註冊並監聽消息之外,還會去持有一些資源調度相關的類 ,比如 TaskSchedulerIsSet 。

三. Spark 心跳客戶端發送心跳解析

發送心跳發送在 Worker ,每個 Worker 都會有一個 Executor ,所以我們可以發現在 Executor 中發送心跳的代碼。

private[spark] class Executor(
    executorId: String,
    executorHostname: String,
    env: SparkEnv,
    userClassPath: Seq[URL] = Nil,
    isLocal: Boolean = false)
  extends Logging {
  ......
  // must be initialized before running startDriverHeartbeat()
  //創建心跳的 EndpointRef
  private val heartbeatReceiverRef = RpcUtils.makeDriverRef(HeartbeatReceiver.ENDPOINT_NAME, conf, env.rpcEnv)
  ......
  startDriverHeartbeater()
  ......
    /**
   * Schedules a task to report heartbeat and partial metrics for active tasks to driver.
   * 用一個 task 來報告活躍任務的信息以及發送心跳。
   */
  private def startDriverHeartbeater(): Unit = {
    val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")

    // Wait a random interval so the heartbeats don't end up in sync
    val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int]

    val heartbeatTask = new Runnable() {
      override def run(): Unit = Utils.logUncaughtExceptions(reportHeartBeat())
    }
    //heartbeater是一個單線程線程池,scheduleAtFixedRate 是定時執行任務用的,和 schedule 類似,只是一些策略不同。
    heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS)
  }
  ......
}

可以看到,在 Executor 中會創建心跳的 EndpointRef ,變數名為 heartbeatReceiverRef 。

然後我們主要看 startDriverHeartbeater() 這個方法,它是關鍵。
我們可以看到最後部分代碼

    val heartbeatTask = new Runnable() {
      override def run(): Unit = Utils.logUncaughtExceptions(reportHeartBeat())
    }
    heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS)

heartbeatTask 是一個 Runaable,即一個線程任務。scheduleAtFixedRate 則是 java concurrent 包中用來執行定時任務的一個類,這裡的意思是每隔 10s 跑一次 heartbeatTask 中的線程任務,超時時間 30s 。

為什麼到這裡還是沒看到 heartbeatReceiverRef 呢,說好的發送心跳呢?別急,其實在 heartbeatTask 線程任務中又調用了另一個方法,我們到裡面去一探究竟。

private[spark] class Executor(
    executorId: String,
    executorHostname: String,
    env: SparkEnv,
    userClassPath: Seq[URL] = Nil,
    isLocal: Boolean = false)
  extends Logging {
  ......
  private def reportHeartBeat(): Unit = {
    // list of (task id, accumUpdates) to send back to the driver
    val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulatorV2[_, _]])]()
    val curGCTime = computeTotalGcTime()

    for (taskRunner <- runningTasks.values().asScala) {
      if (taskRunner.task != null) {
        taskRunner.task.metrics.mergeShuffleReadMetrics()
        taskRunner.task.metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime)
        accumUpdates += ((taskRunner.taskId, taskRunner.task.metrics.accumulators()))
      }
    }

    val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId)
    try {
      //終於看到 heartbeatReceiverRef 的身影了
      val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse](
          message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s"))
      if (response.reregisterBlockManager) {
        logInfo("Told to re-register on heartbeat")
        env.blockManager.reregister()
      }
      heartbeatFailures = 0
    } catch {
      case NonFatal(e) =>
        logWarning("Issue communicating with driver in heartbeater", e)
        heartbeatFailures += 1
        if (heartbeatFailures >= HEARTBEAT_MAX_FAILURES) {
          logError(s"Exit as unable to send heartbeats to driver " +
            s"more than $HEARTBEAT_MAX_FAILURES times")
          System.exit(ExecutorExitCode.HEARTBEAT_FAILURE)
        }
    }
  }
  ......
  
}

可以看到,這裡 heartbeatReceiverRef 和我們上一節的例子, HelloworldClient 類似,核心也是調用了 askWithRetry() 方法,這個方法是通過同步的方式發送 Rpc 消息。而這個方法里其他代碼其實就是獲取 task 的信息啊,或者是一些容錯處理。核心就是調用 askWithRetry() 方法來發送消息。

看到這你就明白了吧。Executor 初始化便會用一個定時任務不斷發送心跳,同時當有 task 的時候,會獲取 task 的信息一併發送。這就是心跳的大概內容了。

OK,Spark RPC 三部曲完畢。如果你能看到這裡那不容易呀,給自己點個贊吧!!


推薦閱讀 :
從分治演算法到 MapReduce
大數據存儲的進化史 --從 RAID 到 Hadoop Hdfs
一個故事告訴你什麼才是好的程式員


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 字元串,是Java中最重要的類。這句肯定的推斷不是Java之父詹姆斯·高斯林說的,而是沉默王二說的,因此你不必懷疑它的準確性。 關於字元串,有很多的面試題,但我總覺得理論知識繞來繞去沒多大意思。你比如說:String cmower = new String("沉默王二");定義了幾個對象? 我總覺得 ...
  • 首先嘗試網友們的方法 按照網上大部分的教程仍無法解決自己的問題,不管是更改.project文件還是使用其他修改配置的方法,始終不能解決問題。 嘗試自己解決問題 想到了可能是eclipse版本的問題,我之前使用的是eclipse mars版本。我隨即去官網下載了最新版本的eclipse。下載後導入項目 ...
  • 背景:併發知識是一個程式員段位升級的體現,同樣也是進入BAT的必經之路,有必要把併發知識重新梳理一遍。 說到併發concurrent,肯定首先想到了線程,創建線程有兩種方法:1、從Java.lang.Thread類派生一個新的線程類,重載它的run()方法;2、實現Runnalbe介面,重載Runn ...
  • 本系列來自《編寫高質量代碼 改善python程式的91個建議》的讀書筆記整理。 ...
  • Shell是一個命令解釋器。它不僅是操作系統內核與用戶之間的絕緣層,同時也是一種功能相當強大的編程語言。一個Shell程式,通常稱為腳本,它是一個由系統調用,命令工具,軟體包和已編譯的二進位包"粘合" 起來的極易使用的工具。事實上,整個UNIX系統命令,軟體包和工具都能由一個shell腳本調用。如果 ...
  • 一、文件處理基本形式 二、打開文件的模式 三、操作文件的方法 ...
  • 哈哈,其實很簡單,寥寥幾行代碼網頁爬一部小說,不賣關子,立刻開始。 首先安裝所需的包,requests,BeautifulSoup4 控制台執行 pip install requests pip install BeautifulSoup4 如果不能正確安裝,請檢查你的環境變數,至於環境變數配置,在 ...
  • 因為http,https是無狀態的,也就是當我們連續兩次訪問同一個web網站,網站是無法分辨這兩次訪問是來自同一個人。對於它來說,這兩次訪問是沒有關係的。 就像是,我們進入了澡堂洗澡,中途要從入口出來接電話,可是當我們再次進去的時候,人家就不認識你了,還管著問你要貴賓卡呢!那麼怎麼讓這個門口迎賓能認 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...