Spark的一些重要概念

来源:https://www.cnblogs.com/paopaoT/archive/2023/07/05/17529622.html
-Advertisement-
Play Games

# Shuffle的深入理解 什麼是Shuffle,本意為洗牌,在數據處理領域裡面,意為將數打散。 問題:shuffle一定有網路傳輸嗎?有網路傳輸的一定是Shuffle嗎? ## Shuffle的概念 通過網路將數據傳輸到多台機器,數據被打散,但是有網路傳輸,不一定就有shuffle,Shuffl ...


Shuffle的深入理解

什麼是Shuffle,本意為洗牌,在數據處理領域裡面,意為將數打散。
問題:shuffle一定有網路傳輸嗎?有網路傳輸的一定是Shuffle嗎?

Shuffle的概念

通過網路將數據傳輸到多台機器,數據被打散,但是有網路傳輸,不一定就有shuffle,Shuffle的功能是將具有相同規律的數據按照指定的分區器的分區規則,通過網路,傳輸到指定的機器的一個分區中,需要註意的是,不是上游的Task發送給下游的Task,而是下游的Task到上游拉取數據。

image

reduceByKey一定會Shuffle嗎

不一定,如果一個RDD事先使用了HashPartitioner分區先進行分區,然後再調用reduceByKey方法,使用的也是HashPartitioner,並且沒有改變分區數量,調用redcueByKey就不shuffle
如果自定義分區器,多次使用自定義的分區器,並且沒有改變分區的數量,為了減少shuffle的次數,提高計算效率,需要重新自定義分區器的equals方法
例如:

//創建RDD,並沒有立即讀取數據,而是觸發Action才會讀取數據
val lines = sc.textFile("hdfs://node-1.51doit.cn:9000/words")

val wordAndOne = lines.flatMap(_.split(" ")).map((_, 1))
//先使用HashPartitioner進行partitionBy
val partitioner = new HashPartitioner(wordAndOne.partitions.length)
val partitioned = wordAndOne.partitionBy(partitioner)
//然後再調用reduceByKey
val reduced: RDD[(String, Int)] = partitioned.reduceByKey(_ + _)

reduced.saveAsTextFile("hdfs://node-1.51doit.cn:9000/out-36-82")

image

join一定會Shuffle嗎

不一定,join一般情況會shuffle,但是如果兩個要join的rdd實現都使用相同的分區去進行分區了,並且join時,依然使用相同類型的分區器,並且沒有改變分區數據,那麼不shuffle

//通過並行化的方式創建一個RDD
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)), 2)
//通過並行化的方式再創建一個RDD
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2), ("jerry", 4)), 2)
//該join一定有shuffle,並且是3個Stage
val rdd3: RDD[(String, (Int, Int))] = rdd1.join(rdd2)


val rdd11 = rdd1.groupByKey()
val rdd22 = rdd2.groupByKey()
//下麵的join,沒有shuffle
val rdd33 = rdd11.join(rdd22)

rdd33.saveAsTextFile("hdfs://node-1.51doit.cn:9000/out-36-86")

image

shuffle數據的復用

spark在shuffle時,會應用分區器,當讀取達到一定大小或整個分區的數據被處理完,會將數據溢寫磁碟磁碟(數據文件和索引文件),溢寫持磁碟的數據,會保存在Executor所在機器的本地磁碟(預設是保存在/temp目錄,也可以配置到其他目錄),只要application一直運行,shuffle的中間結果數據就會被保存。如果以後再次觸發Action,使用到了以前shuffle的中間結果,那麼就不會從源頭重新計算而是,而是復用shuffle中間結果,所有說,shuffle是一種特殊的persist,以後再次觸發Action,就會跳過前面的Stage,直接讀取shuffle的數據,這樣可以提高程式的執行效率。

廣播變數

廣播變數的使用場景

在很多計算場景,經常會遇到兩個RDD進行JOIN,如果一個RDD對應的數據比較大,一個RDD對應的數據比較小,如果使用JOIN,那麼會shuffle,導致效率變低。廣播變數就是將相對較小的數據,先收集到Driver,然後再通過網路廣播到屬於該Application對應的每個Executor中,以後處理大量數據對應的RDD關聯數據,就不用shuffle了,而是直接在記憶體中關聯已經廣播好的數據,即通實現mapside join,可以將Driver端的數據廣播到屬於該application的Executor,然後通過Driver廣播變數返回的引用,獲取實現廣播到Executor的數據

廣播變數的特點:廣播出去的數據就無法在改變了,在沒有Executor中是只讀的操作,在每個Executor中,多個Task使用一份廣播變數
image

廣播變數的實現原理

廣播變數是通過BT的方式廣播的(TorrentBroadcast),多個Executor可以相互傳遞數據,可以提高效率
sc.broadcast這個方法是阻塞的(同步的)
廣播變數一但廣播出去就不能改變,為了以後可以定期的改變要關聯的數據,可以定義一個object[單例對象],在函數內使用,並且加一個定時器,然後定期更新數據
廣播到Executor的數據,可以在Driver獲取到引用,然後這個引用會伴隨著每一個Task發送到Executor,然後通過這個引用,獲取到事先廣播好的數據

序列化問題

序列化問題的場景

spark任務在執行過程中,由於編寫的程式不當,任務在執行時,會出序列化問題,通常有以下兩種情況,
• 封裝數據的Bean沒有實現序列化介面(Task已經生成了),在ShuffleWirte之前要將數據溢寫磁碟,會拋出異常
• 函數閉包問題,即函數的內部,使用到了外部沒有實現序列化的引用(Task沒有生成)

數據Bean未實現序列化介面

spark在運算過程中,由於很多場景必須要shuffle,即向數據溢寫磁碟並且在網路間進行傳輸,但是由於封裝數據的Bean沒有實現序列化介面,就會導致出現序列化的錯誤!


object C02_CustomSort {

  def main(args: Array[String]): Unit = {

    val sc = SparkUtil.getContext(this.getClass.getSimpleName, true)
    //使用並行化的方式創建RDD
    val lines = sc.parallelize(
      List(
        "laoduan,38,99.99",
        "nianhang,33,99.99",
        "laozhao,18,9999.99"
      )
    )
    val tfBoy: RDD[Boy] = lines.map(line => {
      val fields = line.split(",")
      val name = fields(0)
      val age = fields(1).toInt
      val fv = fields(2).toDouble
      new Boy(name, age, fv) //將數據封裝到一個普通的class中
    })

    implicit val ord = new Ordering[Boy] {
      override def compare(x: Boy, y: Boy): Int = {
        if (x.fv == y.fv) {
          x.age - y.age
        } else {
          java.lang.Double.compare(y.fv, x.fv)
        }
      }
    }
    //sortBy會產生shuffle,如果Boy沒有實現序列化介面,Shuffle時會報錯
    val sorted: RDD[Boy] = tfBoy.sortBy(bean => bean)

    val res = sorted.collect()

    println(res.toBuffer)
  }
}

//如果以後定義bean,建議使用case class
class Boy(val name: String, var age: Int, var fv: Double)  //extends Serializable 
{
  
  override def toString = s"Boy($name, $age, $fv)"
}

函數閉包問題

閉包的現象

在調用RDD的Transformation和Action時,可能會傳入自定義的函數,如果函數內部使用到了外部未被序列化的引用,就會報Task無法序列化的錯誤。原因是spark的Task是在Driver端生成的,並且需要通過網路傳輸到Executor中,Task本身實現了序列化介面,函數也實現了序列化介面,但是函數內部使用到的外部引用不支持序列化,就會函數導致無法序列化,從而導致Task沒法序列化,就無法發送到Executor中了
image

在調用RDD的Transformation或Action是傳入函數,第一步就進行檢測,即調用sc的clean方法
為了避免錯誤,在Driver初始化的object或class必須實現序列化介面,不然會報錯誤

def map[U: ClassTag](f: T => U): RDD[U] = withScope {
  val cleanF = sc.clean(f) //檢測函數是否可以序列化,如果可以直接將函數返回,如果不可以,拋出異常
  new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))
}
private def ensureSerializable(func: AnyRef): Unit = {
  try {
    if (SparkEnv.get != null) {
      //獲取spark執行換的的序列化器,如果函數無法序列化,直接拋出異常,程式退出,根本就沒有生成Task
      SparkEnv.get.closureSerializer.newInstance().serialize(func)
    }
  } catch {
    case ex: Exception => throw new SparkException("Task not serializable", ex)
  }
}

在Driver端初始化實現序列化的object

在一個Executor中,多個Task使用同一個object對象,因為在scala中,object就是單例對象,一個Executor中只有一個實例,Task會反序列化多次,但是引用的單例對象只反序列化一次

//從HDFS中讀取數據,創建RDD
//HDFS指定的目錄中有4個小文件,內容如下:
//1,ln
val lines = sc.textFile(args(1))
//函數外部定義的一個引用類型(變數)
//RuleObjectSer是一個靜態對象,實在第一次使用的時候被初始化了(實在Driver被初始化的)
val rulesObj = RuleObjectSer

//函數實在Driver定義的
val func = (line: String) => {
  val fields = line.split(",")
  val id = fields(0).toInt
  val code = fields(1)
  val name = rulesObj.rulesMap.getOrElse(code, "未知") //閉包
  //獲取當前線程ID
  val treadId = Thread.currentThread().getId
  //獲取當前Task對應的分區編號
  val partitiondId = TaskContext.getPartitionId()
  //獲取當前Task運行時的所在機器的主機名
  val host = InetAddress.getLocalHost.getHostName
  (id, code, name, treadId, partitiondId, host, rulesObj.toString)
}

//處理數據,關聯維度
val res = lines.map(func)
res.saveAsTextFile(args(2))

image

在Driver端初始化實現序列化的class
在一個Executor中,每個Task都會使用自己獨享的class實例,因為在scala中,class就是多例,Task會反序列化多次,每個Task引用的class實例也會被序列化

//從HDFS中讀取數據,創建RDD
//HDFS指定的目錄中有4個小文件,內容如下:
//1,ln
val lines = sc.textFile(args(1))
//函數外部定義的一個引用類型(變數)
//RuleClassNotSer是一個類,需要new才能實現(實在Driver被初始化的)
val rulesClass = new RuleClassSer

//處理數據,關聯維度
val res = lines.map(e => {
  val fields = e.split(",")
  val id = fields(0).toInt
  val code = fields(1)
  val name = rulesClass.rulesMap.getOrElse(code, "未知") //閉包
  //獲取當前線程ID
  val treadId = Thread.currentThread().getId
  //獲取當前Task對應的分區編號
  val partitiondId = TaskContext.getPartitionId()
  //獲取當前Task運行時的所在機器的主機名
  val host = InetAddress.getLocalHost.getHostName
  (id, code, name, treadId, partitiondId, host, rulesClass.toString)
})

res.saveAsTextFile(args(2))

image

在函數內部初始化未序列化的object
object沒有實現序列化介面,不會出現問題,因為該object實現函數內部被初始化的,而不是在Driver初始化的

//從HDFS中讀取數據,創建RDD
//HDFS指定的目錄中有4個小文件,內容如下:
//1,ln
val lines = sc.textFile(args(1))
//不再Driver端初始化RuleObjectSer或RuleClassSer
//函數實在Driver定義的
val func = (line: String) => {
  val fields = line.split(",")
  val id = fields(0).toInt
  val code = fields(1)
  //在函數內部初始化沒有實現序列化介面的RuleObjectNotSer
  val name = RuleObjectNotSer.rulesMap.getOrElse(code, "未知") 
  //獲取當前線程ID
  val treadId = Thread.currentThread().getId
  //獲取當前Task對應的分區編號
  val partitiondId = TaskContext.getPartitionId()
  //獲取當前Task運行時的所在機器的主機名
  val host = InetAddress.getLocalHost.getHostName
  (id, code, name, treadId, partitiondId, host, RuleObjectNotSer.toString)
}
//處理數據,關聯維度
val res = lines.map(func)
res.saveAsTextFile(args(2))
sc.stop()

image

在函數內部初始化未序列化的class

這種方式非常不好,因為每來一條數據,new一個class的實例,會導致消耗更多資源,jvm會頻繁GC

//從HDFS中讀取數據,創建RDD
//HDFS指定的目錄中有4個小文件,內容如下:
//1,ln
val lines = sc.textFile(args(1))

//處理數據,關聯維度
val res = lines.map(e => {
  val fields = e.split(",")
  val id = fields(0).toInt
  val code = fields(1)
  //RuleClassNotSer是在Executor中被初始化的
  val rulesClass = new RuleClassNotSer
  //但是如果每來一條數據new一個RuleClassNotSer,不好,效率低,浪費資源,頻繁GC
  val name = rulesClass.rulesMap.getOrElse(code, "未知") 
  //獲取當前線程ID
  val treadId = Thread.currentThread().getId
  //獲取當前Task對應的分區編號
  val partitiondId = TaskContext.getPartitionId()
  //獲取當前Task運行時的所在機器的主機名
  val host = InetAddress.getLocalHost.getHostName
  (id, code, name, treadId, partitiondId, host, rulesClass.toString)
})

res.saveAsTextFile(args(2))

調用mapPartitions在函數內部初始化未序列化的class

一個分區使用一個class的實例,即每個Task都是自己的class實例

//從HDFS中讀取數據,創建RDD
//HDFS指定的目錄中有4個小文件,內容如下:
//1,ln
val lines = sc.textFile(args(1))
//處理數據,關聯維度
val res = lines.mapPartitions(it => {
  //RuleClassNotSer是在Executor中被初始化的
  //一個分區的多條數據,使用同一個RuleClassNotSer實例
  val rulesClass = new RuleClassNotSer
  it.map(e => {
    val fields = e.split(",")
    val id = fields(0).toInt
    val code = fields(1)
    val name = rulesClass.rulesMap.getOrElse(code, "未知") 
    //獲取當前線程ID
    val treadId = Thread.currentThread().getId
    //獲取當前Task對應的分區編號
    val partitiondId = TaskContext.getPartitionId()
    //獲取當前Task運行時的所在機器的主機名
    val host = InetAddress.getLocalHost.getHostName
    (id, code, name, treadId, partitiondId, host, rulesClass.toString)
  })
})
res.saveAsTextFile(args(2))
sc.stop()

image

Task線程安全問題

在一個Executor可以同時運行多個Task,如果多個Task使用同一個共用的單例對象,如果對共用的數據同時進行讀寫操作,會導致線程不安全的問題,為了避免這個問題,可以加鎖,但效率變低了,因為在一個Executor中同一個時間點只能有一個Task使用共用的數據,這樣就變成了串列了,效率低!

定義一個工具類object,格式化日期,因為SimpleDateFormat線程不安全,會出現異常

val conf = new SparkConf()
  .setAppName("WordCount")
  .setMaster("local[*]") //本地模式,開多個線程
//1.創建SparkContext
val sc = new SparkContext(conf)

val lines = sc.textFile("data/date.txt")

val timeRDD: RDD[Long] = lines.map(e => {
  //將字元串轉成long類型時間戳
  //使用自定義的object工具類
  val time: Long = DateUtilObj.parse(e)
  time
})

val res = timeRDD.collect()
println(res.toBuffer)
object DateUtilObj {

  //多個Task使用了一個共用的SimpleDateFormat,SimpleDateFormat是線程不安全

  val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

  //線程安全的
  //val sdf: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")

  def parse(str: String): Long = {
    //2022-05-23 11:39:30
    sdf.parse(str).getTime
  }

}

上面的程式會出現錯誤,因為多個Task同時使用一個單例對象格式化日期,報錯,如果加鎖,程式會變慢,改進後的代碼:

val conf = new SparkConf()
  .setAppName("WordCount")
  .setMaster("local[*]") //本地模式,開多個線程
//1.創建SparkContext
val sc = new SparkContext(conf)

val lines = sc.textFile("data/date.txt")

val timeRDD = lines.mapPartitions(it => {
  //一個Task使用自己單獨的DateUtilClass實例,缺點是浪費記憶體資源
  val dataUtil = new DateUtilClass
  it.map(e => {
    dataUtil.parse(e)
  })
})

val res = timeRDD.collect()
println(res.toBuffer)
class DateUtilClass {

  val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

  def parse(str: String): Long = {
    //2022-05-23 11:39:30
    sdf.parse(str).getTime
  }
}

改進後,一個Task使用一個DateUtilClass實例,不會出現線程安全的問題。

累加器

累加器是Spark中用來做計數功能的,在程式運行過程當中,可以做一些額外的數據指標統計

觸發一次Action,並且將附帶的統計指標計算出來,可以使用Accumulator進行處理,Accumulator的本質數一個實現序列化介面class,每個Task都有自己的累加器,避免累加的數據發送衝突

object C14_AccumulatorDemo3 {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
      .setAppName("WordCount")
      .setMaster("local[*]") //本地模式,開多個線程
    //1.創建SparkContext
    val sc = new SparkContext(conf)

    val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
    //在Driver定義一個特殊的變數,即累加器
    //Accumulator可以將每個分區的計數結果,通過網路傳輸到Driver,然後進行全局求和
    val accumulator: LongAccumulator = sc.longAccumulator("even-acc")
    val rdd2 = rdd1.map(e => {
      if (e % 2 == 0) {
        accumulator.add(1)  //閉包,在Executor中累計的
      }
      e * 10
    })

    //就觸發一次Action
    rdd2.saveAsTextFile("out/113")

    //每個Task中累計的數據會返回到Driver嗎?
    println(accumulator.count) 
  }
}

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • > 作者:小牛呼嚕嚕 | [https://xiaoniuhululu.com](https://xiaoniuhululu.com/) > 電腦內功、源碼解析、科技故事、項目實戰、面試八股等更多硬核文章,首發於公眾號「[小牛呼嚕嚕](https://www.xiaoniuhululu.com/i ...
  • BeanDefinition在Spring初始化階段保存Bean的元數據信息,包括Class名稱、Scope、構造方法參數、屬性值等信息,本文將介紹一下BeanDefinition介面、重要的實現類,以及在Spring中的使用示例。 # BeanDefinition介面 用於描述了一個Bean實例, ...
  • 遠程調用百度AI開放平臺的web服務,快速完成人臉識別 ### 歡迎訪問我的GitHub > 這裡分類和彙總了欣宸的全部原創(含配套源碼):[https://github.com/zq2599/blog_demos](https://github.com/zq2599/blog_demos) ### ...
  • 個人認為百度地圖開放平臺確實很好用但就是C#的SN校驗會出現以下幾個問題 一、官方的示例代碼說的不清不楚 獲取SN函數的Uri應該使用不帶功能變數名稱的Uri 比如:最終請求地址為https://api.map.baidu.com/location/ip?ip=119.126.10.15&coor=gcj0 ...
  • ## 一:背景 ### 1. 講故事 前段時間有位朋友找到我,說他們的工業視覺軟體僵死了,讓我幫忙看下到底是什麼情況,哈哈,其實卡死的問題相對好定位,無非就是看主線程棧嘛,然後就是具體問題具體分析,當然難度大小就看運氣了。 前幾天看一篇文章說現在的 .NET程式員 不需要學習**WinDbg** , ...
  • 記憶體“泄露”是開發中常見的問題之一,它會導致應用程式占用越來越多的記憶體資源,最終可能導致系統性能下降甚至崩潰。軟體開發者需要瞭解在程式中出現記憶體泄露的情況,以避免軟體出現該的問題。 **什麼是記憶體“泄露”?** 記憶體泄露是申請了記憶體空間的變數一直在占用,無法釋放。比如申請了一塊記憶體空間,沒有回收一直 ...
  • # lvm邏輯捲 ## 前言 > raid磁碟陣列技術,提高硬碟的讀寫效率,以及數據的安全,raid的缺點在於: > 1.當你配置好了raid磁碟陣列組,容量的大小,已經是限定了,如果你存儲的業務非常多,磁碟容量不夠用的問題就會出現,你想要擴容磁碟的空間,就會非常麻煩。 > 2.不同的磁碟分區,相對 ...
  • > 本篇內容主要來源於自己學習的視頻,如有侵權,請聯繫刪除,謝謝。 ### 1、etcd讀請求概覽 etcd是典型的`讀多寫少`存儲,在我們實際業務場景中,讀一般占據2/3以上的請求。一個讀 請求從client通過`Round-robin(輪詢)`負載均衡演算法,選擇一個etcd server節點,發 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...