spark RDD,reduceByKey vs groupByKey

来源:https://www.cnblogs.com/listenfwind/archive/2018/10/28/9860204.html
-Advertisement-
Play Games

Spark 中有兩個類似的api,分別是 reduceByKey 和 groupByKey 。這兩個的功能類似,但底層實現卻有些不同,那麼為什麼要這樣設計呢?我們來從源碼的角度分析一下。 先看兩者的調用順序(都是使用預設的Partitioner,即defaultPartitioner) 所用 spa ...


Spark 中有兩個類似的api,分別是 reduceByKey 和 groupByKey 。這兩個的功能類似,但底層實現卻有些不同,那麼為什麼要這樣設計呢?我們來從源碼的角度分析一下。

先看兩者的調用順序(都是使用預設的Partitioner,即defaultPartitioner)

所用 spark 版本:spark 2.1.0

先看reduceByKey

Step1

  def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
    reduceByKey(defaultPartitioner(self), func)
  }

Setp2

  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
    combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
  }

Setp3

def combineByKeyWithClassTag[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
    require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
    if (keyClass.isArray) {
      if (mapSideCombine) {
        throw new SparkException("Cannot use map-side combining with array keys.")
      }
      if (partitioner.isInstanceOf[HashPartitioner]) {
        throw new SparkException("HashPartitioner cannot partition array keys.")
      }
    }
    val aggregator = new Aggregator[K, V, C](
      self.context.clean(createCombiner),
      self.context.clean(mergeValue),
      self.context.clean(mergeCombiners))
    if (self.partitioner == Some(partitioner)) {
      self.mapPartitions(iter => {
        val context = TaskContext.get()
        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
      }, preservesPartitioning = true)
    } else {
      new ShuffledRDD[K, V, C](self, partitioner)
        .setSerializer(serializer)
        .setAggregator(aggregator)
        .setMapSideCombine(mapSideCombine)
    }
  }

姑且不去看方法裡面的細節,我們會只要知道最後調用的是 combineByKeyWithClassTag 這個方法。這個方法有兩個參數我們來重點看一下,

def combineByKeyWithClassTag[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null)

首先是 partitioner 參數 ,這個即是 RDD 的分區設置。除了預設的 defaultPartitioner,Spark 還提供了 RangePartitioner 和 HashPartitioner 外,此外用戶也可以自定義 partitioner 。通過源碼可以發現如果是 HashPartitioner 的話,那麼是會拋出一個錯誤的。

然後是 mapSideCombine 參數 ,這個參數正是 reduceByKey 和 groupByKey 最大不同的地方,它決定是是否會先在節點上進行一次 Combine 操作,下麵會有更具體的例子來介紹。

然後是groupByKey

Step1

  def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {
    groupByKey(defaultPartitioner(self))
  }

Step2

  def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
    // groupByKey shouldn't use map side combine because map side combine does not
    // reduce the amount of data shuffled and requires all map side data be inserted
    // into a hash table, leading to more objects in the old gen.
    val createCombiner = (v: V) => CompactBuffer(v)
    val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
    val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
    val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
      createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
    bufs.asInstanceOf[RDD[(K, Iterable[V])]]
  }

Setp3

def combineByKeyWithClassTag[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
    require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
    if (keyClass.isArray) {
      if (mapSideCombine) {
        throw new SparkException("Cannot use map-side combining with array keys.")
      }
      if (partitioner.isInstanceOf[HashPartitioner]) {
        throw new SparkException("HashPartitioner cannot partition array keys.")
      }
    }
    val aggregator = new Aggregator[K, V, C](
      self.context.clean(createCombiner),
      self.context.clean(mergeValue),
      self.context.clean(mergeCombiners))
    if (self.partitioner == Some(partitioner)) {
      self.mapPartitions(iter => {
        val context = TaskContext.get()
        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
      }, preservesPartitioning = true)
    } else {
      new ShuffledRDD[K, V, C](self, partitioner)
        .setSerializer(serializer)
        .setAggregator(aggregator)
        .setMapSideCombine(mapSideCombine)
    }
  }

結合上面 reduceByKey 的調用鏈,可以發現最終其實都是調用 combineByKeyWithClassTag 這個方法的,但調用的參數不同。
reduceByKey的調用

combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)

groupByKey的調用

combineByKeyWithClassTag[CompactBuffer[V]](
      createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)

正是兩者不同的調用方式導致了兩個方法的差別,我們分別來看

  • reduceByKey的泛型參數直接是[V],而groupByKey的泛型參數是[CompactBuffer[V]]。這直接導致了 reduceByKey 和 groupByKey 的返回值不同,前者是RDD[(K, V)],而後者是RDD[(K, Iterable[V])]

  • 然後就是mapSideCombine = false 了,這個mapSideCombine 參數的預設是true的。這個值有什麼用呢,上面也說了,這個參數的作用是控制要不要在map端進行初步合併(Combine)。可以看看下麵具體的例子。

從功能上來說,可以發現 ReduceByKey 其實就是會在每個節點先進行一次合併的操作,而 groupByKey 沒有。

這麼來看 ReduceByKey 的性能會比 groupByKey 好很多,因為有些工作在節點已經處理了。那麼 groupByKey 為什麼存在,它的應用場景是什麼呢?我也不清楚,如果觀看這篇文章的讀者知道的話不妨在評論里說出來吧。非常感謝!


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 1. 購買了VPS,CentOS系統,發現伺服器時間與北京時間往往不一致,存在時差。 2. 可以執行tzselect命令按提示更換時區,依次選擇5 Asia,9 China,1 Beijing Time。 3. 將 這行添加到/etc/profile(所有用戶生效)或者~/.bash_profile ...
  • 一。win10系統下c盤,program 文件下 軟體一般為32 或者 64位,但是現在win10系統有些C盤會顯示program x86 向這種情況的話我們的軟體預設安裝在這個盤的話可能會造成很多文件消失,但是功能依舊存在!!! 從文件所在位置可以看到安裝目錄部分文件,但是返回上一級則會消失(位數 ...
  • nbtstat -A ip 對方136到139其中一個埠開了的話,就可查看對方最近登陸的用戶名(03前的為用戶名)-註意:參數-A要大寫 tracert -參數 ip(或電腦名) 跟蹤路由(數據包),參數:“-w數字”用於設置超時間隔。 ping ip(或功能變數名稱) 向對方主機發送預設大小為32位元組 ...
  • 第1節 查看當前系統版本及內核 cat /etc/redhat-release (查看系統版本) CentOS Linux release 7.4.1708 (Core) uname -r (查看系統內核) 3.10.0-693.el7.x86_64 uname -a (查看當前操作系統) x86_ ...
  • 安裝redis 部署集群 參考資料 https://www.cnblogs.com/it-cen/p/4295984.html https://blog.csdn.net/naixiyi/article/details/51346880 https://www.cnblogs.com/Patrick ...
  • mkfs 在磁碟分區上創建ext2、ext3、ext4、ms-dos、vfat文件系統,預設情況下會創建ext2。mkfs用於在設備上構建Linux文件系統,通常是硬碟分區。文件要麼是設備名稱(例如/dev/hda1,/dev/sdb2),要麼是包含文件系統的常規文件。成功返回0,失敗返回1。 實際 ...
  • 有時候,管理員終端登陸了系統,如果離開沒有退出賬戶,則會有安全隱患存在,因此需要優化終端超時。 設置終端超時: export TMOUT=10 永久生效: echo "export TMOUT=600" >>/etc/profile source /etc/profile 檢查是否生效: ...
  • 目的:表操作(表維護) 一、一對一(略過) 二、一對 1、建表原則:在多的一方創建外鍵指向一的一方的外鍵 2、建表:實體中添加 3、操作 1、參數: name屬性:集合屬性名 column屬性: 外鍵列名 class屬性: 與我關聯的對象完整類名 2、級聯操作: cascade save-updat ...
一周排行
    -Advertisement-
    Play Games
  • 前言 本文介紹一款使用 C# 與 WPF 開發的音頻播放器,其界面簡潔大方,操作體驗流暢。該播放器支持多種音頻格式(如 MP4、WMA、OGG、FLAC 等),並具備標記、實時歌詞顯示等功能。 另外,還支持換膚及多語言(中英文)切換。核心音頻處理採用 FFmpeg 組件,獲得了廣泛認可,目前 Git ...
  • OAuth2.0授權驗證-gitee授權碼模式 本文主要介紹如何筆者自己是如何使用gitee提供的OAuth2.0協議完成授權驗證並登錄到自己的系統,完整模式如圖 1、創建應用 打開gitee個人中心->第三方應用->創建應用 創建應用後在我的應用界面,查看已創建應用的Client ID和Clien ...
  • 解決了這個問題:《winForm下,fastReport.net 從.net framework 升級到.net5遇到的錯誤“Operation is not supported on this platform.”》 本文內容轉載自:https://www.fcnsoft.com/Home/Sho ...
  • 國內文章 WPF 從裸 Win 32 的 WM_Pointer 消息獲取觸摸點繪製筆跡 https://www.cnblogs.com/lindexi/p/18390983 本文將告訴大家如何在 WPF 裡面,接收裸 Win 32 的 WM_Pointer 消息,從消息裡面獲取觸摸點信息,使用觸摸點 ...
  • 前言 給大家推薦一個專為新零售快消行業打造了一套高效的進銷存管理系統。 系統不僅具備強大的庫存管理功能,還集成了高性能的輕量級 POS 解決方案,確保頁面載入速度極快,提供良好的用戶體驗。 項目介紹 Dorisoy.POS 是一款基於 .NET 7 和 Angular 4 開發的新零售快消進銷存管理 ...
  • ABP CLI常用的代碼分享 一、確保環境配置正確 安裝.NET CLI: ABP CLI是基於.NET Core或.NET 5/6/7等更高版本構建的,因此首先需要在你的開發環境中安裝.NET CLI。這可以通過訪問Microsoft官網下載並安裝相應版本的.NET SDK來實現。 安裝ABP ...
  • 問題 問題是這樣的:第三方的webapi,需要先調用登陸介面獲取Cookie,訪問其它介面時攜帶Cookie信息。 但使用HttpClient類調用登陸介面,返回的Headers中沒有找到Cookie信息。 分析 首先,使用Postman測試該登陸介面,正常返回Cookie信息,說明是HttpCli ...
  • 國內文章 關於.NET在中國為什麼工資低的分析 https://www.cnblogs.com/thinkingmore/p/18406244 .NET在中國開發者的薪資偏低,主要因市場需求、技術棧選擇和企業文化等因素所致。歷史上,.NET曾因微軟的閉源策略發展受限,儘管後來推出了跨平臺的.NET ...
  • 在WPF開發應用中,動畫不僅可以引起用戶的註意與興趣,而且還使軟體更加便於使用。前面幾篇文章講解了畫筆(Brush),形狀(Shape),幾何圖形(Geometry),變換(Transform)等相關內容,今天繼續講解動畫相關內容和知識點,僅供學習分享使用,如有不足之處,還請指正。 ...
  • 什麼是委托? 委托可以說是把一個方法代入另一個方法執行,相當於指向函數的指針;事件就相當於保存委托的數組; 1.實例化委托的方式: 方式1:通過new創建實例: public delegate void ShowDelegate(); 或者 public delegate string ShowDe ...