最近因為手抖,在Spark中給自己挖了一個數據傾斜的坑。為瞭解決這個問題,順便研究了下Spark分區器的原理,趁著周末加班總結一下~ 先說說數據傾斜 數據傾斜是指Spark中的RDD在計算的時候,每個RDD內部的分區包含的數據不平均。比如一共有5個分區,其中一個占有了90%的數據,這就導致本來5個分 ...
最近因為手抖,在Spark中給自己挖了一個數據傾斜的坑。為瞭解決這個問題,順便研究了下Spark分區器的原理,趁著周末加班總結一下~
先說說數據傾斜
數據傾斜是指Spark中的RDD在計算的時候,每個RDD內部的分區包含的數據不平均。比如一共有5個分區,其中一個占有了90%的數據,這就導致本來5個分區可以5個人一起並行幹活,結果四個人不怎麼幹活,工作全都壓到一個人身上了。遇到這種問題,網上有很多的解決辦法:
比如這篇寫的就不錯:http://www.cnblogs.com/jasongj/p/6508150.html
但是如果是底層數據的問題,無論怎麼優化,還是無法解決數據傾斜的。
比如你想要對某個rdd做groupby,然後做join操作,如果分組的key就是分佈不均勻的,那麼真樣都是無法優化的。因為一旦這個key被切分,就無法完整的做join了,如果不對這個key切分,必然會造成對應的分區數據傾斜。
不過,瞭解數據為什麼會傾斜還是很重要的,繼續往下看吧!
分區的作用
在PairRDD即(key,value)這種格式的rdd中,很多操作都是基於key的,因此為了獨立分割任務,會按照key對數據進行重組。比如groupbykey
重組肯定是需要一個規則的,最常見的就是基於Hash,Spark還提供了一種稍微複雜點的基於抽樣的Range分區方法。
下麵我們先看看分區器在Spark計算流程中是怎麼使用的:
Paritioner的使用
就拿groupbykey來說:
def groupByKey(): JavaPairRDD[K, JIterable[V]] =
fromRDD(groupByResultToJava(rdd.groupByKey()))
它會調用PairRDDFunction的groupByKey()方法
def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {
groupByKey(defaultPartitioner(self))
}
在這個方法裡面創建了預設的分區器。預設的分區器是這樣定義的:
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) {
return r.partitioner.get
}
if (rdd.context.conf.contains("spark.default.parallelism")) {
new HashPartitioner(rdd.context.defaultParallelism)
} else {
new HashPartitioner(bySize.head.partitions.size)
}
}
首先獲取當前分區的分區個數,如果沒有設置spark.default.parallelism
參數,則創建一個跟之前分區個數一樣的Hash分區器。
當然,用戶也可以自定義分區器,或者使用其他提供的分區器。API裡面也是支持的:
// 傳入分區器對象
def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JIterable[V]] =
fromRDD(groupByResultToJava(rdd.groupByKey(partitioner)))
// 傳入分區的個數
def groupByKey(numPartitions: Int): JavaPairRDD[K, JIterable[V]] =
fromRDD(groupByResultToJava(rdd.groupByKey(numPartitions)))
HashPatitioner
Hash分區器,是最簡單也是預設提供的分區器,瞭解它的分區規則,對我們處理數據傾斜或者設計分組的key時,還是很有幫助的。
class HashPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
def numPartitions: Int = partitions
// 通過key計算其HashCode,並根據分區數取模。如果結果小於0,直接加上分區數。
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
// 對比兩個分區器是否相同,直接對比其分區個數就行
override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}
override def hashCode: Int = numPartitions
}
這裡最重要的是這個Utils.nonNegativeMod(key.hashCode, numPartitions)
,它決定了數據進入到哪個分區。
def nonNegativeMod(x: Int, mod: Int): Int = {
val rawMod = x % mod
rawMod + (if (rawMod < 0) mod else 0)
}
說白了,就是基於這個key獲取它的hashCode,然後對分區個數取模。由於HashCode可能為負,這裡直接判斷下,如果小於0,再加上分區個數即可。
因此,基於hash的分區,只要保證你的key是分散的,那麼最終數據就不會出現數據傾斜的情況。
RangePartitioner
這個分區器,適合想要把數據打散的場景,但是如果相同的key重覆量很大,依然會出現數據傾斜的情況。
每個分區器,最核心的方法,就是getPartition
def getPartition(key: Any): Int = {
val k = key.asInstanceOf[K]
var partition = 0
if (rangeBounds.length <= 128) {
// If we have less than 128 partitions naive search
while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
partition += 1
}
} else {
// Determine which binary search method to use only once.
partition = binarySearch(rangeBounds, k)
// binarySearch either returns the match location or -[insertion point]-1
if (partition < 0) {
partition = -partition-1
}
if (partition > rangeBounds.length) {
partition = rangeBounds.length
}
}
if (ascending) {
partition
} else {
rangeBounds.length - partition
}
}
在range分區中,會存儲一個邊界的數組,比如[1,100,200,300,400],然後對比傳進來的key,返回對應的分區id。
那麼這個邊界是怎麼確定的呢?
這就是Range分區最核心的演算法了,大概描述下,就是遍歷每個paritiion,對裡面的數據進行抽樣,把抽樣的數據進行排序,並按照對應的權重確定邊界。
有幾個比較重要的地方:
- 1 抽樣
- 2 確定邊界
關於抽樣,有一個很常見的演算法題,即在不知道數據規模的情況下,如何以等概率的方式,隨機選擇一個值。
最笨的辦法,就是遍歷一次數據,知道數據的規模,然後隨機一個數,取其對應的值。其實這樣相當於遍歷了兩次(第二次的取值根據不同的存儲介質,可能不同)。
在Spark中,是使用水塘抽樣
這種演算法。即首先取第一個值,然後依次往後遍歷;第二個值有二分之一的幾率替換選出來的值;第三個值有三分之一的幾率替換選出來的值;...;直到遍歷到最後一個值。這樣,通過依次遍歷就取出來隨機的數值了。
演算法參考源碼:
private var rangeBounds: Array[K] = {
if (partitions <= 1) {
Array.empty
} else {
// This is the sample size we need to have roughly balanced output partitions, capped at 1M.
// 最大採樣數量不能超過1M。比如,如果分區是5,採樣數為100
val sampleSize = math.min(20.0 * partitions, 1e6)
// Assume the input partitions are roughly balanced and over-sample a little bit.
// 每個分區的採樣數為平均值的三倍,避免數據傾斜造成的數據量過少
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt
// 真正的採樣演算法(參數1:rdd的key數組, 採樣個數)
val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
if (numItems == 0L) {
Array.empty
} else {
// If a partition contains much more than the average number of items, we re-sample from it
// to ensure that enough items are collected from that partition.
// 如果有的分區包含的數量遠超過平均值,那麼需要對它重新採樣。每個分區的採樣數/採樣返回的總的記錄數
val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
//保存有效的採樣數
val candidates = ArrayBuffer.empty[(K, Float)]
//保存數據傾斜導致的採樣數過多的信息
val imbalancedPartitions = mutable.Set.empty[Int]
sketched.foreach { case (idx, n, sample) =>
if (fraction * n > sampleSizePerPartition) {
imbalancedPartitions += idx
} else {
// The weight is 1 over the sampling probability.
val weight = (n.toDouble / sample.size).toFloat
for (key <- sample) {
candidates += ((key, weight))
}
}
}
if (imbalancedPartitions.nonEmpty) {
// Re-sample imbalanced partitions with the desired sampling probability.
val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
val seed = byteswap32(-rdd.id - 1)
//基於RDD獲取採樣數據
val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
val weight = (1.0 / fraction).toFloat
candidates ++= reSampled.map(x => (x, weight))
}
RangePartitioner.determineBounds(candidates, partitions)
}
}
}
def sketch[K : ClassTag](
rdd: RDD[K],
sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
val shift = rdd.id
// val classTagK = classTag[K] // to avoid serializing the entire partitioner object
val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
val seed = byteswap32(idx ^ (shift << 16))
val (sample, n) = SamplingUtils.reservoirSampleAndCount(
iter, sampleSizePerPartition, seed)
//包裝成三元組,(索引號,分區的內容個數,抽樣的內容)
Iterator((idx, n, sample))
}.collect()
val numItems = sketched.map(_._2).sum
//返回(數據條數,(索引號,分區的內容個數,抽樣的內容))
(numItems, sketched)
}
真正的抽樣演算法在SamplingUtils中,由於在Spark中是需要一次性取多個值的,因此直接去前n個數值,然後依次概率替換即可:
def reservoirSampleAndCount[T: ClassTag](
input: Iterator[T],
k: Int,
seed: Long = Random.nextLong())
: (Array[T], Long) = {
//創建臨時數組
val reservoir = new Array[T](k)
// Put the first k elements in the reservoir.
// 取出前k個數,並把對應的rdd中的數據放入對應的序號的數組中
var i = 0
while (i < k && input.hasNext) {
val item = input.next()
reservoir(i) = item
i += 1
}
// If we have consumed all the elements, return them. Otherwise do the replacement.
// 如果全部的元素,比要抽取的採樣數少,那麼直接返回
if (i < k) {
// If input size < k, trim the array to return only an array of input size.
val trimReservoir = new Array[T](i)
System.arraycopy(reservoir, 0, trimReservoir, 0, i)
(trimReservoir, i)
// 否則開始抽樣替換
} else {
// If input size > k, continue the sampling process.
// 從剛纔的序號開始,繼續遍歷
var l = i.toLong
// 隨機數
val rand = new XORShiftRandom(seed)
while (input.hasNext) {
val item = input.next()
// 隨機一個數與當前的l相乘,如果小於採樣數k,就替換。(越到後面,替換的概率越小...)
val replacementIndex = (rand.nextDouble() * l).toLong
if (replacementIndex < k) {
reservoir(replacementIndex.toInt) = item
}
l += 1
}
(reservoir, l)
}
}
確定邊界
最後就可以通過獲取的樣本數據,確定邊界了。
def determineBounds[K : Ordering : ClassTag](
candidates: ArrayBuffer[(K, Float)],
partitions: Int): Array[K] = {
val ordering = implicitly[Ordering[K]]
// 數據格式為(key,權重)
val ordered = candidates.sortBy(_._1)
val numCandidates = ordered.size
val sumWeights = ordered.map(_._2.toDouble).sum
val step = sumWeights / partitions
var cumWeight = 0.0
var target = step
val bounds = ArrayBuffer.empty[K]
var i = 0
var j = 0
var previousBound = Option.empty[K]
while ((i < numCandidates) && (j < partitions - 1)) {
val (key, weight) = ordered(i)
cumWeight += weight
if (cumWeight >= target) {
// Skip duplicate values.
if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
bounds += key
target += step
j += 1
previousBound = Some(key)
}
}
i += 1
}
bounds.toArray
}
直接看代碼,還是有些晦澀難懂,我們舉個例子,一步一步解釋下:
按照上面的演算法流程,大致可以理解:
抽樣-->確定邊界(排序)
首先對spark有一定瞭解的都應該知道,在spark中每個RDD可以理解為一組分區,這些分區對應了記憶體塊block,他們才是數據最終的載體。那麼一個RDD由不同的分區組成,這樣在處理一些map,filter等運算元的時候,就可以直接以分區為單位並行計算了。直到遇到shuffle的時候才需要和其他的RDD配合。
在上面的圖中,如果我們不特殊設置的話,一個RDD由3個分區組成,那麼在對它進行groupbykey的時候,就會按照3進行分區。
按照上面的演算法流程,如果分區數為3,那麼採樣的大小為:
val sampleSize = math.min(20.0 * partitions, 1e6)
即採樣數為60,每個分區取60個數。但是考慮到數據傾斜的情況,有的分區可能數據很多,因此在實際的採樣時,會按照3倍大小採樣:
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt
也就是說,最多會取60個樣本數據。
然後就是遍歷每個分區,取對應的樣本數。
val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
val seed = byteswap32(idx ^ (shift << 16))
val (sample, n) = SamplingUtils.reservoirSampleAndCount(
iter, sampleSizePerPartition, seed)
//包裝成三元組,(索引號,分區的內容個數,抽樣的內容)
Iterator((idx, n, sample))
}.collect()
然後檢查,是否有分區的樣本數過多,如果多於平均值,則繼續採樣,這時直接用sample 就可以了
sketched.foreach { case (idx, n, sample) =>
if (fraction * n > sampleSizePerPartition) {
imbalancedPartitions += idx
} else {
// The weight is 1 over the sampling probability.
val weight = (n.toDouble / sample.size).toFloat
for (key <- sample) {
candidates += ((key, weight))
}
}
}
if (imbalancedPartitions.nonEmpty) {
// Re-sample imbalanced partitions with the desired sampling probability.
val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
val seed = byteswap32(-rdd.id - 1)
//基於RDD獲取採樣數據
val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
val weight = (1.0 / fraction).toFloat
candidates ++= reSampled.map(x => (x, weight))
}
取出樣本後,就到了確定邊界的時候了。
註意每個key都會有一個權重,這個權重是 【分區的數據總數/樣本數】
RangePartitioner.determineBounds(candidates, partitions)
首先排序val ordered = candidates.sortBy(_._1)
,然後確定一個權重的步長
val sumWeights = ordered.map(_._2.toDouble).sum
val step = sumWeights / partitions
基於該步長,確定邊界,最後就形成了幾個範圍數據。
然後分區器形成二叉樹,遍歷該數確定每個key對應的分區id
partition = binarySearch(rangeBounds, k)
實踐 —— 自定義分區器
自定義分區器,也是很簡單的,只需要實現對應的兩個方法就行:
public class MyPartioner extends Partitioner {
@Override
public int numPartitions() {
return 1000;
}
@Override
public int getPartition(Object key) {
String k = (String) key;
int code = k.hashCode() % 1000;
System.out.println(k+":"+code);
return code < 0?code+1000:code;
}
@Override
public boolean equals(Object obj) {
if(obj instanceof MyPartioner){
if(this.numPartitions()==((MyPartioner) obj).numPartitions()){
return true;
}
return false;
}
return super.equals(obj);
}
}
使用的時候,可以直接new一個對象即可。
pairRdd.groupbykey(new MyPartitioner())
這樣自定義分區器就完成了。
參考
- https://www.iteblog.com/archives/1368.html
- https://www.iteblog.com/archives/1522.html
- https://www.iteblog.com/archives/1525.html
- http://blog.csdn.net/zongzhiyuan/article/details/49965021