spark源碼解析--Shuffle輸出追蹤者--MapOutputTracker

来源:https://www.cnblogs.com/zhuge134/archive/2019/06/19/11048963.html
-Advertisement-
Play Games

Shuffle輸出追蹤者 MapOutputTracker 這個組件作為shuffle的一個輔助組件,在整個shuffle模塊中具有很重要的作用。我們在前面一系列的分析中,或多或少都會提到這個組件,比如在DAGScheduler提交一個stage時會將這個stage封裝成一個任務集(TaskSet) ...


Shuffle輸出追蹤者--MapOutputTracker

這個組件作為shuffle的一個輔助組件,在整個shuffle模塊中具有很重要的作用。我們在前面一系列的分析中,或多或少都會提到這個組件,比如在DAGScheduler提交一個stage時會將這個stage封裝成一個任務集(TaskSet),但是可能有的分區已經計算過了,有了結果(stage由於失敗可能會多次提交,其中有部分task可能已經計算完成),這些分區就不需要再次計算,而只需要計算那些失敗的分區,那麼很顯然需要有一個組件來維護shuffle過程中的任務失敗成功的狀態,以及計算結果的位置信息。
此外,在shuffle讀取階段,我們知道一個reduce端的分區會依賴於多個map端的分區的輸出數據,那麼我們在讀取一個reduce分區對應的數據時,就需要知道這個reduce分區依賴哪些map分區,每個block的物理位置是什麼,blockId是什麼,這個block中屬於這個reduce分區的數據量大小是多少,這些信息的記錄維護都是靠MapOutputTracker來實現的,所以我們現在知道MapOutputTracker的重要性了。

MapOutputTracker.scala

MapOutputTracker組件的主要功能類和輔助類全部在這個文件中,我先大概說一下各個類的主要作用,然後重點分析關鍵的類。

  • ShuffleStatus,這個類是對一個stage的shuffle輸出狀態的封裝,它內部的一個主要的成員mapStatuses是一個數組,這個數組的下標就是map的分區序號,存放了每個map分區的輸出情況,關於MapStatus具體可以看MapStatus.scala,這裡不打算展開。
  • MapOutputTrackerMessage,用於rpc請求的消息類,有兩個實現類:GetMapOutputStatuses用於獲取某次shuffle的所有輸出狀態;StopMapOutputTracker用於向driver端的發送停止MapOutputTrackerMasterEndpoint端點的請求。
  • MapOutputTrackerMasterEndpoint,如果熟悉spark的rpc模塊的話,對這個類應該就很熟悉,它就是一個rpc服務端,通過向RpcEnv註冊自己,通過一個名稱標識自己,從而接收到特定一些消息,也就是上面說的兩種消息。
  • MapOutputTracker,這個類是一個抽象類,只是定義了一些操作介面,它的一個最重要的作用可能就是內部維護了一個序列值epoch,這個值表示某一個一致的全局map輸出狀態,一旦有map輸出發生變更,這個值就要加一,executor端會同步最新的epoch以判斷自己的map輸出狀態的緩存是否過期。
  • MapOutputTrackerMaster,運行在driver端,實現類MapOutputTracker的大部分功能,是最核心的類
  • MapOutputTrackerWorker,運行在executor端,主要作用是封裝了rpc調用的邏輯。

總的來看,最核心的類是MapOutputTrackerMaster,其他的類都是圍繞這個類的一些輔助類,所以我們重點分析MapOutputTrackerMaster,其他的類我不打算深入展開,相信讀者自己也能夠較為輕鬆地理解。

MapOutputTrackerMaster

findMissingPartitions

這個方法在上面已經提到了,會在DAGScheduler封裝任務集的時候查找一個stage需要計算的分區時會調用到。

   def findMissingPartitions(shuffleId: Int): Option[Seq[Int]] = {
   shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
   }
  • ShuffleStatus.findMissingPartitions

      def findMissingPartitions(): Seq[Int] = synchronized {
      val missing = (0 until numPartitions).filter(id => mapStatuses(id) == null)
      assert(missing.size == numPartitions - _numAvailableOutputs,
        s"${missing.size} missing, expected ${numPartitions - _numAvailableOutputs}")
      missing
      }

這兩段代碼很簡單,不用多說,就是從map結構中查找。

此外,像registerShuffle,registerMapOutput,unregisterMapOutput,unregisterShuffle,removeOutputsOnHost等等,我們可以看到這幾個方法本身都是很簡答的,無非就是對內部map結構的插入,更新和查找,關鍵的是你要清楚這些方法的調用時機是什麼?弄清這一點,會讓我們對MapOutputTracker在整個spark框架中的作用和充當的角色有更深的理解。方法的調用地點,通過Idea這類IDE工具其實都可以很簡單地定位到,這裡我不做過多展開,僅僅簡單地概括一下:

  • registerShuffle, DAGScheduler在創建一個ShuffleMapStage時會順便把這個stage對應的shuffle註冊進來。
  • registerMapOutput, 在一個shuffleMapTask任務完成後,會把map輸出的信息註冊進來。
  • removeOutputsOnHost,將某個host上的相關map輸出信息全部移除,一般在主機丟失時調用此操作
  • removeOutputsOnExecutor,同樣地,將某個executor上的相關map輸出信息全部移除,一般在executor丟失時調用此操作

getMapSizesByExecutorId

我們來看另一個比較重要的方法,在reduce階段讀取數據時,一個task首先需要知道它依賴於哪些map輸出,這時它回想driver端的MapOutputTrackerMasterEndpoint組件發送一個獲取map輸出的消息,經過一系列方法調用最終會調用這個方法:

def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int)
  : Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition")
shuffleStatuses.get(shuffleId) match {
  case Some (shuffleStatus) =>
    // 將所有的mapStatus數組轉換成(BlockManagerId, Seq[(BlockId, Long)])對象
    shuffleStatus.withMapStatuses { statuses =>
      MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses)
    }
  case None =>
    Seq.empty
}
}

我們看一下:MapOutputTracker.convertMapStatuses,這個方法也很簡單,其實就是將每個map分區輸出切分成reduce分區數量,最後產生的(BlockId, Long)元組數量等於map分區數量*reduce分區數量。

def convertMapStatuses(
  shuffleId: Int,
  startPartition: Int,
  endPartition: Int,
  statuses: Array[MapStatus]): Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
assert (statuses != null)
// 用於存放結果
val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(BlockId, Long)]]
// 最後產生的(BlockId, Long)元組數量等於map分區數量*reduce分區數量
for ((status, mapId) <- statuses.zipWithIndex) {
  if (status == null) {
    val errorMessage = s"Missing an output location for shuffle $shuffleId"
    logError(errorMessage)
    throw new MetadataFetchFailedException(shuffleId, startPartition, errorMessage)
  } else {
    for (part <- startPartition until endPartition) {
      splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) +=
        ((ShuffleBlockId(shuffleId, mapId, part), status.getSizeForBlock(part)))
    }
  }
}

splitsByAddress.toSeq
}

getPreferredLocationsForShuffle

我們來看另外一個比較重要的方法。我們知道reduce端的分區一般會依賴於多個map端分區輸出,但是對於每個map分區依賴的數據量是不同的,舉個極端的例子,假設reduce端某個分區依賴於10個map端的輸出分區,但是其中一個分區依賴的數據有10000條,而其他分區依賴的數據只有1條,這種情況下,顯然我們應該吧這個reduce任務優先調度到那個依賴了10000條的executor上。當然這個例子舉得很簡單,可能也不是什麼準確,但是也足夠說明這個方法的作用。

def getPreferredLocationsForShuffle(dep: ShuffleDependency[_, _, _], partitionId: Int)
  : Seq[String] = {
// 首先判斷幾個參數配置,如果都符合條件,那麼再進行偏向位置的計算
if (shuffleLocalityEnabled && dep.rdd.partitions.length < SHUFFLE_PREF_MAP_THRESHOLD &&
    dep.partitioner.numPartitions < SHUFFLE_PREF_REDUCE_THRESHOLD) {
  // 關鍵調用
  val blockManagerIds = getLocationsWithLargestOutputs(dep.shuffleId, partitionId,
    dep.partitioner.numPartitions, REDUCER_PREF_LOCS_FRACTION)
  if (blockManagerIds.nonEmpty) {
    blockManagerIds.get.map(_.host)
  } else {
    Nil
  }
} else {
  Nil
}
}

可以看出來,關鍵的方法是getLocationsWithLargestOutputs,接下來,我們就來看一下這個方法:
註釋已經說得很清楚,這個方法的邏輯很簡單,比如一個reduce端分區要讀取的總數據量是100m, 某個executor上的所有map輸出中與這個reduce分區相關的數據加起來有20m,即超過了總量的0.2,這時這個executor就能夠成為偏向位置,是不是很簡單。但是這裡應該註意到一個問題,這個方法是以executor為最小單位計算偏向位置,而在前一個方法getPreferredLocationsForShuffle中,獲取到成為偏向位置的那些BlockManagerId後,僅僅是取出了host作為偏向位置返回給上層調用者,問題在於一個host(即物理節點)上可能有多個executor,這就會造成返回的結果中會有重覆的host,;另外,既然返回host作為偏向位置,那為什麼不直接以host作為最小單位來計算偏向位置呢,比如將一個host上所有與這個reduce分區相關的數據加起來,如果超過0.2的占比就認為這個host能夠作為偏向位置,這樣好像更合理,也更容易產生偏向位置。舉個極端的例子,一個host上運行了5個executor,每個executor與分區相關的數據占比0.1,另外有5個host上每個都只運行了一個executor,他們的數據占比均為0.1,這種情況下是不會產生偏向位置的,但是實際上顯然應該將那個擁有5個executor的host作為偏向位置。

def getLocationsWithLargestOutputs(
  shuffleId: Int,
  reducerId: Int,
  numReducers: Int,
  fractionThreshold: Double)
: Option[Array[BlockManagerId]] = {

val shuffleStatus = shuffleStatuses.get(shuffleId).orNull
// 對shuffleStatus非空檢查
if (shuffleStatus != null) {
  shuffleStatus.withMapStatuses { statuses =>
    // 對mapStatus數組的非空檢查
    if (statuses.nonEmpty) {
      // HashMap to add up sizes of all blocks at the same location
      // 記錄每個executor上的所有map輸出的block中屬於這個reduce端分區的數據量
      val locs = new HashMap[BlockManagerId, Long]
      var totalOutputSize = 0L
      var mapIdx = 0
      while (mapIdx < statuses.length) {
        val status = statuses(mapIdx)
        // status may be null here if we are called between registerShuffle, which creates an
        // array with null entries for each output, and registerMapOutputs, which populates it
        // with valid status entries. This is possible if one thread schedules a job which
        // depends on an RDD which is currently being computed by another thread.
        if (status != null) {
          val blockSize = status.getSizeForBlock(reducerId)
          if (blockSize > 0) {
            locs(status.location) = locs.getOrElse(status.location, 0L) + blockSize
            totalOutputSize += blockSize
          }
        }
        mapIdx = mapIdx + 1
      }
      // 最後,判斷一個executor能否成為偏向位置的條件是:
      // 這個executor上所有與這個reduce分區相關的數據大小與這個分區數據總量的比值是否大於一個閾值
      // 這個閾值預設是0.2
      val topLocs = locs.filter { case (loc, size) =>
        size.toDouble / totalOutputSize >= fractionThreshold
      }
      // Return if we have any locations which satisfy the required threshold
      if (topLocs.nonEmpty) {
        return Some(topLocs.keys.toArray)
      }
    }
  }
}
None
}

總結

國際慣例,再晚也要總結一下。我們簡單總結一下map輸出追蹤器的作用:

  • 維護所有shuffle的map輸出狀態信息,位置信息等
  • 查找某個stage還有哪些未計算的分區
  • 獲取reduce分區的偏向位置
  • 獲取reduce分區依賴哪些map輸出,他們的位置,每個map輸出中相關數據的大小

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • [2019.06.20 學習筆記1] 1.父子元素 2.兄弟元素 ...
  • HTTP協議 1 HTTP請求狀態碼 當用戶試圖通過 HTTP 訪問一臺正在運行 Internet 信息服務 (IIS) 的伺服器上的內容時,IIS 返回一個表示該請求的狀態的數字代碼。狀態代碼可以指明具體請求是否已成功,還可以揭示請求失敗的確切原因。 1.1 1xx - 信息提示 這些狀態代碼表示 ...
  • 一、瀏覽器介紹 瀏覽器是網頁運行的平臺,常用的瀏覽器有 IE、火狐(Firefox)、谷歌(Chrome)、Safari和Opera等。我們平時稱為五大瀏覽器。 可以通過這個網址 http://tongji.baidu.com/data/browser 查看瀏覽器的占有的市場份額 二、瀏覽器內核 1 ...
  • [2019.06.19 學習筆記4] 1.標簽格式 <開始標簽></結束標簽> 不區分大小寫,而且規則寬鬆。推薦小寫,並且要求成對編寫。 2.閉合標簽 <h1>內容</h1> 3.自閉合標簽(沒有內容的標簽) <img/> ...
  • 技術人員在構思一項功能時,會同時在大腦中編織其實現細節! 不利的一面 這種編織過程就像本能難以避免,它有優勢,對現實可行性更有判斷,但不利的影響也很明顯! 1、容易陷入細節,破壞思維的整體性。 2、構思的實現複雜度會直接影響決策。 這種構思本身受個人經驗所限。事實上實際複雜度有不確定性,再加上複雜度 ...
  • 今天,早早的起床,被外面火辣辣的太陽曬紅了臉。洗漱完發現已經遲到了,又不能吃早餐,來到了教室,面對著操蛋的課程,很令人蛋疼。啊,,,,美好的一上午就這麼結束了。 ...
  • 本章描述了團隊為準備Contoso會議管理系統的第一個產品版本所做的更改。這項工作包括對前兩章介紹的訂單(Order)和註冊(Registrations)限界上下文的一些重構和功能添加,以及一個新的會議管理(Conference Management)限界上下文和一個新的支付(Payment)限界上... ...
  • [toc] 基於Coravel定時任務之計算總頁數 1 應用背景 在物聯網系統中,需要計算底端所有設備的總數,除以分頁每頁顯示數量,進行一個總頁數的顯示。包括狀態,告警,日誌等等數據都需要對應的總頁數的顯示。 2 對比各家定時庫 2.1 TaskScheduler TaskScheduler庫只支持 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...