Spark源碼分析之分區器的作用

来源:http://www.cnblogs.com/xing901022/archive/2017/04/16/6718642.html
-Advertisement-
Play Games

最近因為手抖,在Spark中給自己挖了一個數據傾斜的坑。為瞭解決這個問題,順便研究了下Spark分區器的原理,趁著周末加班總結一下~ 先說說數據傾斜 數據傾斜是指Spark中的RDD在計算的時候,每個RDD內部的分區包含的數據不平均。比如一共有5個分區,其中一個占有了90%的數據,這就導致本來5個分 ...


最近因為手抖,在Spark中給自己挖了一個數據傾斜的坑。為瞭解決這個問題,順便研究了下Spark分區器的原理,趁著周末加班總結一下~

先說說數據傾斜

數據傾斜是指Spark中的RDD在計算的時候,每個RDD內部的分區包含的數據不平均。比如一共有5個分區,其中一個占有了90%的數據,這就導致本來5個分區可以5個人一起並行幹活,結果四個人不怎麼幹活,工作全都壓到一個人身上了。遇到這種問題,網上有很多的解決辦法:

比如這篇寫的就不錯:http://www.cnblogs.com/jasongj/p/6508150.html

但是如果是底層數據的問題,無論怎麼優化,還是無法解決數據傾斜的。

比如你想要對某個rdd做groupby,然後做join操作,如果分組的key就是分佈不均勻的,那麼真樣都是無法優化的。因為一旦這個key被切分,就無法完整的做join了,如果不對這個key切分,必然會造成對應的分區數據傾斜。

不過,瞭解數據為什麼會傾斜還是很重要的,繼續往下看吧!

分區的作用

在PairRDD即(key,value)這種格式的rdd中,很多操作都是基於key的,因此為了獨立分割任務,會按照key對數據進行重組。比如groupbykey

重組肯定是需要一個規則的,最常見的就是基於Hash,Spark還提供了一種稍微複雜點的基於抽樣的Range分區方法。

下麵我們先看看分區器在Spark計算流程中是怎麼使用的:

Paritioner的使用

就拿groupbykey來說:

def groupByKey(): JavaPairRDD[K, JIterable[V]] =
    fromRDD(groupByResultToJava(rdd.groupByKey()))

它會調用PairRDDFunction的groupByKey()方法

def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {
    groupByKey(defaultPartitioner(self))
  }

在這個方法裡面創建了預設的分區器。預設的分區器是這樣定義的:

def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
    val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
    for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) {
      return r.partitioner.get
    }
    if (rdd.context.conf.contains("spark.default.parallelism")) {
      new HashPartitioner(rdd.context.defaultParallelism)
    } else {
      new HashPartitioner(bySize.head.partitions.size)
    }
  }

首先獲取當前分區的分區個數,如果沒有設置spark.default.parallelism參數,則創建一個跟之前分區個數一樣的Hash分區器。

當然,用戶也可以自定義分區器,或者使用其他提供的分區器。API裡面也是支持的:

// 傳入分區器對象
def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JIterable[V]] =
    fromRDD(groupByResultToJava(rdd.groupByKey(partitioner)))
// 傳入分區的個數
def groupByKey(numPartitions: Int): JavaPairRDD[K, JIterable[V]] =
    fromRDD(groupByResultToJava(rdd.groupByKey(numPartitions)))

HashPatitioner

Hash分區器,是最簡單也是預設提供的分區器,瞭解它的分區規則,對我們處理數據傾斜或者設計分組的key時,還是很有幫助的。

class HashPartitioner(partitions: Int) extends Partitioner {
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

  def numPartitions: Int = partitions

  // 通過key計算其HashCode,並根據分區數取模。如果結果小於0,直接加上分區數。
  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }

  // 對比兩個分區器是否相同,直接對比其分區個數就行
  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

  override def hashCode: Int = numPartitions
}

這裡最重要的是這個Utils.nonNegativeMod(key.hashCode, numPartitions),它決定了數據進入到哪個分區。

def nonNegativeMod(x: Int, mod: Int): Int = {
    val rawMod = x % mod
    rawMod + (if (rawMod < 0) mod else 0)
  }

說白了,就是基於這個key獲取它的hashCode,然後對分區個數取模。由於HashCode可能為負,這裡直接判斷下,如果小於0,再加上分區個數即可。

因此,基於hash的分區,只要保證你的key是分散的,那麼最終數據就不會出現數據傾斜的情況。

RangePartitioner

這個分區器,適合想要把數據打散的場景,但是如果相同的key重覆量很大,依然會出現數據傾斜的情況。

每個分區器,最核心的方法,就是getPartition

def getPartition(key: Any): Int = {
    val k = key.asInstanceOf[K]
    var partition = 0
    if (rangeBounds.length <= 128) {
      // If we have less than 128 partitions naive search
      while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
        partition += 1
      }
    } else {
      // Determine which binary search method to use only once.
      partition = binarySearch(rangeBounds, k)
      // binarySearch either returns the match location or -[insertion point]-1
      if (partition < 0) {
        partition = -partition-1
      }
      if (partition > rangeBounds.length) {
        partition = rangeBounds.length
      }
    }
    if (ascending) {
      partition
    } else {
      rangeBounds.length - partition
    }
  }

在range分區中,會存儲一個邊界的數組,比如[1,100,200,300,400],然後對比傳進來的key,返回對應的分區id。

那麼這個邊界是怎麼確定的呢?

這就是Range分區最核心的演算法了,大概描述下,就是遍歷每個paritiion,對裡面的數據進行抽樣,把抽樣的數據進行排序,並按照對應的權重確定邊界。

有幾個比較重要的地方:

  • 1 抽樣
  • 2 確定邊界

關於抽樣,有一個很常見的演算法題,即在不知道數據規模的情況下,如何以等概率的方式,隨機選擇一個值。

最笨的辦法,就是遍歷一次數據,知道數據的規模,然後隨機一個數,取其對應的值。其實這樣相當於遍歷了兩次(第二次的取值根據不同的存儲介質,可能不同)。

在Spark中,是使用水塘抽樣這種演算法。即首先取第一個值,然後依次往後遍歷;第二個值有二分之一的幾率替換選出來的值;第三個值有三分之一的幾率替換選出來的值;...;直到遍歷到最後一個值。這樣,通過依次遍歷就取出來隨機的數值了。

演算法參考源碼:

private var rangeBounds: Array[K] = {
    if (partitions <= 1) {
      Array.empty
    } else {
      // This is the sample size we need to have roughly balanced output partitions, capped at 1M.
      // 最大採樣數量不能超過1M。比如,如果分區是5,採樣數為100
      val sampleSize = math.min(20.0 * partitions, 1e6)
      // Assume the input partitions are roughly balanced and over-sample a little bit.
      // 每個分區的採樣數為平均值的三倍,避免數據傾斜造成的數據量過少
      val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt

      // 真正的採樣演算法(參數1:rdd的key數組, 採樣個數)
      val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
      if (numItems == 0L) {
        Array.empty
      } else {
        // If a partition contains much more than the average number of items, we re-sample from it
        // to ensure that enough items are collected from that partition.
        // 如果有的分區包含的數量遠超過平均值,那麼需要對它重新採樣。每個分區的採樣數/採樣返回的總的記錄數
        val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
        //保存有效的採樣數
        val candidates = ArrayBuffer.empty[(K, Float)]
        //保存數據傾斜導致的採樣數過多的信息
        val imbalancedPartitions = mutable.Set.empty[Int]

        sketched.foreach { case (idx, n, sample) =>
          if (fraction * n > sampleSizePerPartition) {
            imbalancedPartitions += idx
          } else {
            // The weight is 1 over the sampling probability.
            val weight = (n.toDouble / sample.size).toFloat
            for (key <- sample) {
              candidates += ((key, weight))
            }
          }
        }
        if (imbalancedPartitions.nonEmpty) {
          // Re-sample imbalanced partitions with the desired sampling probability.
          val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
          val seed = byteswap32(-rdd.id - 1)
          //基於RDD獲取採樣數據
          val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
          val weight = (1.0 / fraction).toFloat
          candidates ++= reSampled.map(x => (x, weight))
        }
        RangePartitioner.determineBounds(candidates, partitions)
      }
    }
  }
  
  def sketch[K : ClassTag](
      rdd: RDD[K],
      sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
    val shift = rdd.id
    // val classTagK = classTag[K] // to avoid serializing the entire partitioner object
    val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
      val seed = byteswap32(idx ^ (shift << 16))
      val (sample, n) = SamplingUtils.reservoirSampleAndCount(
        iter, sampleSizePerPartition, seed)
      //包裝成三元組,(索引號,分區的內容個數,抽樣的內容)
      Iterator((idx, n, sample))
    }.collect()
    val numItems = sketched.map(_._2).sum
    //返回(數據條數,(索引號,分區的內容個數,抽樣的內容))
    (numItems, sketched)
  }
  

真正的抽樣演算法在SamplingUtils中,由於在Spark中是需要一次性取多個值的,因此直接去前n個數值,然後依次概率替換即可:

def reservoirSampleAndCount[T: ClassTag](
      input: Iterator[T],
      k: Int,
      seed: Long = Random.nextLong())
    : (Array[T], Long) = {
    //創建臨時數組
    val reservoir = new Array[T](k)
    // Put the first k elements in the reservoir.
    // 取出前k個數,並把對應的rdd中的數據放入對應的序號的數組中
    var i = 0
    while (i < k && input.hasNext) {
      val item = input.next()
      reservoir(i) = item
      i += 1
    }

    // If we have consumed all the elements, return them. Otherwise do the replacement.
    // 如果全部的元素,比要抽取的採樣數少,那麼直接返回
    if (i < k) {
      // If input size < k, trim the array to return only an array of input size.
      val trimReservoir = new Array[T](i)
      System.arraycopy(reservoir, 0, trimReservoir, 0, i)
      (trimReservoir, i)

    // 否則開始抽樣替換
    } else {
      // If input size > k, continue the sampling process.
      // 從剛纔的序號開始,繼續遍歷
      var l = i.toLong
      // 隨機數
      val rand = new XORShiftRandom(seed)
      while (input.hasNext) {
        val item = input.next()
        // 隨機一個數與當前的l相乘,如果小於採樣數k,就替換。(越到後面,替換的概率越小...)
        val replacementIndex = (rand.nextDouble() * l).toLong
        if (replacementIndex < k) {
          reservoir(replacementIndex.toInt) = item
        }
        l += 1
      }
      (reservoir, l)
    }
  }

確定邊界

最後就可以通過獲取的樣本數據,確定邊界了。

def determineBounds[K : Ordering : ClassTag](
      candidates: ArrayBuffer[(K, Float)],
      partitions: Int): Array[K] = {
    val ordering = implicitly[Ordering[K]]
    // 數據格式為(key,權重)
    val ordered = candidates.sortBy(_._1)
    val numCandidates = ordered.size
    val sumWeights = ordered.map(_._2.toDouble).sum
    val step = sumWeights / partitions
    var cumWeight = 0.0
    var target = step
    val bounds = ArrayBuffer.empty[K]
    var i = 0
    var j = 0
    var previousBound = Option.empty[K]
    while ((i < numCandidates) && (j < partitions - 1)) {
      val (key, weight) = ordered(i)
      cumWeight += weight
      if (cumWeight >= target) {
        // Skip duplicate values.
        if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
          bounds += key
          target += step
          j += 1
          previousBound = Some(key)
        }
      }
      i += 1
    }
    bounds.toArray
  }

直接看代碼,還是有些晦澀難懂,我們舉個例子,一步一步解釋下:

按照上面的演算法流程,大致可以理解:

抽樣-->確定邊界(排序)

首先對spark有一定瞭解的都應該知道,在spark中每個RDD可以理解為一組分區,這些分區對應了記憶體塊block,他們才是數據最終的載體。那麼一個RDD由不同的分區組成,這樣在處理一些map,filter等運算元的時候,就可以直接以分區為單位並行計算了。直到遇到shuffle的時候才需要和其他的RDD配合。

在上面的圖中,如果我們不特殊設置的話,一個RDD由3個分區組成,那麼在對它進行groupbykey的時候,就會按照3進行分區。

按照上面的演算法流程,如果分區數為3,那麼採樣的大小為:

val sampleSize = math.min(20.0 * partitions, 1e6)

即採樣數為60,每個分區取60個數。但是考慮到數據傾斜的情況,有的分區可能數據很多,因此在實際的採樣時,會按照3倍大小採樣:

val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt

也就是說,最多會取60個樣本數據。

然後就是遍歷每個分區,取對應的樣本數。

val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
      val seed = byteswap32(idx ^ (shift << 16))
      val (sample, n) = SamplingUtils.reservoirSampleAndCount(
        iter, sampleSizePerPartition, seed)
      //包裝成三元組,(索引號,分區的內容個數,抽樣的內容)
      Iterator((idx, n, sample))
    }.collect()

然後檢查,是否有分區的樣本數過多,如果多於平均值,則繼續採樣,這時直接用sample 就可以了

sketched.foreach { case (idx, n, sample) =>
          if (fraction * n > sampleSizePerPartition) {
            imbalancedPartitions += idx
          } else {
            // The weight is 1 over the sampling probability.
            val weight = (n.toDouble / sample.size).toFloat
            for (key <- sample) {
              candidates += ((key, weight))
            }
          }
        }
        if (imbalancedPartitions.nonEmpty) {
          // Re-sample imbalanced partitions with the desired sampling probability.
          val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
          val seed = byteswap32(-rdd.id - 1)
          //基於RDD獲取採樣數據
          val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
          val weight = (1.0 / fraction).toFloat
          candidates ++= reSampled.map(x => (x, weight))
        }

取出樣本後,就到了確定邊界的時候了。

註意每個key都會有一個權重,這個權重是 【分區的數據總數/樣本數】

RangePartitioner.determineBounds(candidates, partitions)

首先排序val ordered = candidates.sortBy(_._1),然後確定一個權重的步長

val sumWeights = ordered.map(_._2.toDouble).sum
val step = sumWeights / partitions

基於該步長,確定邊界,最後就形成了幾個範圍數據。

然後分區器形成二叉樹,遍歷該數確定每個key對應的分區id

partition = binarySearch(rangeBounds, k)

實踐 —— 自定義分區器

自定義分區器,也是很簡單的,只需要實現對應的兩個方法就行:

public class MyPartioner extends Partitioner {
    @Override
    public int numPartitions() {
        return 1000;
    }

    @Override
    public int getPartition(Object key) {
        String k = (String) key;
        int code = k.hashCode() % 1000;
        System.out.println(k+":"+code);
        return  code < 0?code+1000:code;
    }

    @Override
    public boolean equals(Object obj) {
        if(obj instanceof MyPartioner){
            if(this.numPartitions()==((MyPartioner) obj).numPartitions()){
                return true;
            }
            return false;
        }
        return super.equals(obj);
    }
}

使用的時候,可以直接new一個對象即可。

pairRdd.groupbykey(new MyPartitioner())

這樣自定義分區器就完成了。

參考


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • CUBE:CUBE 生成的結果集顯示了所選列中值的所有組合的聚合。 ROLLUP:ROLLUP 生成的結果集顯示了所選列中值的某一層次結構的聚合。 GROUPING:當行由 CUBE 或 ROLLUP 運算符添加時,該函數將導致附加列的輸出值為 1;當行不由 CUBE 或 ROLLUP 運算符添加時 ...
  • 說到視窗框架就不得不提起開窗函數。 開窗函數支持分區、排序和框架三種元素,其語法格式如下: 視窗分區: 就是將視窗指定列具有相同值的那些行進行分區,分區與分組比較類似,但是分組指定後對於整個SELECT語句只能按照這個分組,不過 分區可以在一條語句中指定不同的分區。 1 <PARTITION BY ...
  • 先說點題外話,因為後面我會用到這個函數。 前兩天自定義了一個 sql 的字元串分割函數(Split),不過後來發現有點問題,例如: 我之前只處理了截取的最後一個為空的字元串,所以會出現以上的結果,現在我做了一些修改。代碼如下: 紅色部分的代碼為添加或修改的部分,下麵再看一下效果。 之前自定義 Spl ...
  • 導讀: 隨著大數據概念的火熱,啤酒與尿布的故事廣為人知。我們如何發現買啤酒的人往往也會買尿布這一規律?數據挖掘中的用於挖掘頻繁項集和關聯規則的Apriori演算法可以告訴我們。本文首先對Apriori演算法進行簡介,而後進一步介紹相關的基本概念,之後詳細的介紹Apriori演算法的具體策略和步驟,最後給出 ...
  • 最近在資料庫優化的時候,看到一些表在設計上使用了text或者blob的欄位,單表的存儲空間已經達到了近100G,這種情況再去改變和優化就非常難了 一、簡介 為了清楚大欄位對性能的影響,我們必須要知道innodb存儲引擎的處理方式: 1、一些知識點 1.1 在InnoDB 1.0.x版本之前,Inno ...
  • MapReduce是Hadoop2.x的一個計算框架,利用分治的思想,將一個計算量很大的作業分給很多個任務,每個任務完成其中的一小部分,然後再將結果合併到一起。將任務分開處理的過程為map階段,將每個小任務的結果合併到一起的過程為reduce階段。下麵先從巨集觀上介紹一下客戶端提交一個作業時,Hado ...
  • 在Eclipse下新建一個Map/Reduce項目,並將以下jar添加到Build path: 程式代碼: 以上程式調用HBAse的API,實現了新建一張表,並隨機向表裡插入數據。 ...
  • 有時候我們需要對mysql的資料庫數據進行整體遷移,數據路徑如下 C:\Documents and Settings\All Users 數據全在如何所示的路徑下,只要進行copy遷移,新的資料庫中就有對應的資料庫信息 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...