# Shuffle的深入理解 什麼是Shuffle,本意為洗牌,在數據處理領域裡面,意為將數打散。 問題:shuffle一定有網路傳輸嗎?有網路傳輸的一定是Shuffle嗎? ## Shuffle的概念 通過網路將數據傳輸到多台機器,數據被打散,但是有網路傳輸,不一定就有shuffle,Shuffl ...
Shuffle的深入理解
什麼是Shuffle,本意為洗牌,在數據處理領域裡面,意為將數打散。
問題:shuffle一定有網路傳輸嗎?有網路傳輸的一定是Shuffle嗎?
Shuffle的概念
通過網路將數據傳輸到多台機器,數據被打散,但是有網路傳輸,不一定就有shuffle,Shuffle的功能是將具有相同規律的數據按照指定的分區器的分區規則,通過網路,傳輸到指定的機器的一個分區中,需要註意的是,不是上游的Task發送給下游的Task,而是下游的Task到上游拉取數據。
reduceByKey一定會Shuffle嗎
不一定,如果一個RDD事先使用了HashPartitioner分區先進行分區,然後再調用reduceByKey方法,使用的也是HashPartitioner,並且沒有改變分區數量,調用redcueByKey就不shuffle
如果自定義分區器,多次使用自定義的分區器,並且沒有改變分區的數量,為了減少shuffle的次數,提高計算效率,需要重新自定義分區器的equals方法
例如:
//創建RDD,並沒有立即讀取數據,而是觸發Action才會讀取數據
val lines = sc.textFile("hdfs://node-1.51doit.cn:9000/words")
val wordAndOne = lines.flatMap(_.split(" ")).map((_, 1))
//先使用HashPartitioner進行partitionBy
val partitioner = new HashPartitioner(wordAndOne.partitions.length)
val partitioned = wordAndOne.partitionBy(partitioner)
//然後再調用reduceByKey
val reduced: RDD[(String, Int)] = partitioned.reduceByKey(_ + _)
reduced.saveAsTextFile("hdfs://node-1.51doit.cn:9000/out-36-82")
join一定會Shuffle嗎
不一定,join一般情況會shuffle,但是如果兩個要join的rdd實現都使用相同的分區去進行分區了,並且join時,依然使用相同類型的分區器,並且沒有改變分區數據,那麼不shuffle
//通過並行化的方式創建一個RDD
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)), 2)
//通過並行化的方式再創建一個RDD
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2), ("jerry", 4)), 2)
//該join一定有shuffle,並且是3個Stage
val rdd3: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
val rdd11 = rdd1.groupByKey()
val rdd22 = rdd2.groupByKey()
//下麵的join,沒有shuffle
val rdd33 = rdd11.join(rdd22)
rdd33.saveAsTextFile("hdfs://node-1.51doit.cn:9000/out-36-86")
shuffle數據的復用
spark在shuffle時,會應用分區器,當讀取達到一定大小或整個分區的數據被處理完,會將數據溢寫磁碟磁碟(數據文件和索引文件),溢寫持磁碟的數據,會保存在Executor所在機器的本地磁碟(預設是保存在/temp目錄,也可以配置到其他目錄),只要application一直運行,shuffle的中間結果數據就會被保存。如果以後再次觸發Action,使用到了以前shuffle的中間結果,那麼就不會從源頭重新計算而是,而是復用shuffle中間結果,所有說,shuffle是一種特殊的persist,以後再次觸發Action,就會跳過前面的Stage,直接讀取shuffle的數據,這樣可以提高程式的執行效率。
廣播變數
廣播變數的使用場景
在很多計算場景,經常會遇到兩個RDD進行JOIN,如果一個RDD對應的數據比較大,一個RDD對應的數據比較小,如果使用JOIN,那麼會shuffle,導致效率變低。廣播變數就是將相對較小的數據,先收集到Driver,然後再通過網路廣播到屬於該Application對應的每個Executor中,以後處理大量數據對應的RDD關聯數據,就不用shuffle了,而是直接在記憶體中關聯已經廣播好的數據,即通實現mapside join,可以將Driver端的數據廣播到屬於該application的Executor,然後通過Driver廣播變數返回的引用,獲取實現廣播到Executor的數據
廣播變數的特點:廣播出去的數據就無法在改變了,在沒有Executor中是只讀的操作,在每個Executor中,多個Task使用一份廣播變數
廣播變數的實現原理
廣播變數是通過BT的方式廣播的(TorrentBroadcast),多個Executor可以相互傳遞數據,可以提高效率
sc.broadcast這個方法是阻塞的(同步的)
廣播變數一但廣播出去就不能改變,為了以後可以定期的改變要關聯的數據,可以定義一個object[單例對象],在函數內使用,並且加一個定時器,然後定期更新數據
廣播到Executor的數據,可以在Driver獲取到引用,然後這個引用會伴隨著每一個Task發送到Executor,然後通過這個引用,獲取到事先廣播好的數據
序列化問題
序列化問題的場景
spark任務在執行過程中,由於編寫的程式不當,任務在執行時,會出序列化問題,通常有以下兩種情況,
• 封裝數據的Bean沒有實現序列化介面(Task已經生成了),在ShuffleWirte之前要將數據溢寫磁碟,會拋出異常
• 函數閉包問題,即函數的內部,使用到了外部沒有實現序列化的引用(Task沒有生成)
數據Bean未實現序列化介面
spark在運算過程中,由於很多場景必須要shuffle,即向數據溢寫磁碟並且在網路間進行傳輸,但是由於封裝數據的Bean沒有實現序列化介面,就會導致出現序列化的錯誤!
object C02_CustomSort {
def main(args: Array[String]): Unit = {
val sc = SparkUtil.getContext(this.getClass.getSimpleName, true)
//使用並行化的方式創建RDD
val lines = sc.parallelize(
List(
"laoduan,38,99.99",
"nianhang,33,99.99",
"laozhao,18,9999.99"
)
)
val tfBoy: RDD[Boy] = lines.map(line => {
val fields = line.split(",")
val name = fields(0)
val age = fields(1).toInt
val fv = fields(2).toDouble
new Boy(name, age, fv) //將數據封裝到一個普通的class中
})
implicit val ord = new Ordering[Boy] {
override def compare(x: Boy, y: Boy): Int = {
if (x.fv == y.fv) {
x.age - y.age
} else {
java.lang.Double.compare(y.fv, x.fv)
}
}
}
//sortBy會產生shuffle,如果Boy沒有實現序列化介面,Shuffle時會報錯
val sorted: RDD[Boy] = tfBoy.sortBy(bean => bean)
val res = sorted.collect()
println(res.toBuffer)
}
}
//如果以後定義bean,建議使用case class
class Boy(val name: String, var age: Int, var fv: Double) //extends Serializable
{
override def toString = s"Boy($name, $age, $fv)"
}
函數閉包問題
閉包的現象
在調用RDD的Transformation和Action時,可能會傳入自定義的函數,如果函數內部使用到了外部未被序列化的引用,就會報Task無法序列化的錯誤。原因是spark的Task是在Driver端生成的,並且需要通過網路傳輸到Executor中,Task本身實現了序列化介面,函數也實現了序列化介面,但是函數內部使用到的外部引用不支持序列化,就會函數導致無法序列化,從而導致Task沒法序列化,就無法發送到Executor中了
在調用RDD的Transformation或Action是傳入函數,第一步就進行檢測,即調用sc的clean方法
為了避免錯誤,在Driver初始化的object或class必須實現序列化介面,不然會報錯誤
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f) //檢測函數是否可以序列化,如果可以直接將函數返回,如果不可以,拋出異常
new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))
}
private def ensureSerializable(func: AnyRef): Unit = {
try {
if (SparkEnv.get != null) {
//獲取spark執行換的的序列化器,如果函數無法序列化,直接拋出異常,程式退出,根本就沒有生成Task
SparkEnv.get.closureSerializer.newInstance().serialize(func)
}
} catch {
case ex: Exception => throw new SparkException("Task not serializable", ex)
}
}
在Driver端初始化實現序列化的object
在一個Executor中,多個Task使用同一個object對象,因為在scala中,object就是單例對象,一個Executor中只有一個實例,Task會反序列化多次,但是引用的單例對象只反序列化一次
//從HDFS中讀取數據,創建RDD
//HDFS指定的目錄中有4個小文件,內容如下:
//1,ln
val lines = sc.textFile(args(1))
//函數外部定義的一個引用類型(變數)
//RuleObjectSer是一個靜態對象,實在第一次使用的時候被初始化了(實在Driver被初始化的)
val rulesObj = RuleObjectSer
//函數實在Driver定義的
val func = (line: String) => {
val fields = line.split(",")
val id = fields(0).toInt
val code = fields(1)
val name = rulesObj.rulesMap.getOrElse(code, "未知") //閉包
//獲取當前線程ID
val treadId = Thread.currentThread().getId
//獲取當前Task對應的分區編號
val partitiondId = TaskContext.getPartitionId()
//獲取當前Task運行時的所在機器的主機名
val host = InetAddress.getLocalHost.getHostName
(id, code, name, treadId, partitiondId, host, rulesObj.toString)
}
//處理數據,關聯維度
val res = lines.map(func)
res.saveAsTextFile(args(2))
在Driver端初始化實現序列化的class
在一個Executor中,每個Task都會使用自己獨享的class實例,因為在scala中,class就是多例,Task會反序列化多次,每個Task引用的class實例也會被序列化
//從HDFS中讀取數據,創建RDD
//HDFS指定的目錄中有4個小文件,內容如下:
//1,ln
val lines = sc.textFile(args(1))
//函數外部定義的一個引用類型(變數)
//RuleClassNotSer是一個類,需要new才能實現(實在Driver被初始化的)
val rulesClass = new RuleClassSer
//處理數據,關聯維度
val res = lines.map(e => {
val fields = e.split(",")
val id = fields(0).toInt
val code = fields(1)
val name = rulesClass.rulesMap.getOrElse(code, "未知") //閉包
//獲取當前線程ID
val treadId = Thread.currentThread().getId
//獲取當前Task對應的分區編號
val partitiondId = TaskContext.getPartitionId()
//獲取當前Task運行時的所在機器的主機名
val host = InetAddress.getLocalHost.getHostName
(id, code, name, treadId, partitiondId, host, rulesClass.toString)
})
res.saveAsTextFile(args(2))
在函數內部初始化未序列化的object
object沒有實現序列化介面,不會出現問題,因為該object實現函數內部被初始化的,而不是在Driver初始化的
//從HDFS中讀取數據,創建RDD
//HDFS指定的目錄中有4個小文件,內容如下:
//1,ln
val lines = sc.textFile(args(1))
//不再Driver端初始化RuleObjectSer或RuleClassSer
//函數實在Driver定義的
val func = (line: String) => {
val fields = line.split(",")
val id = fields(0).toInt
val code = fields(1)
//在函數內部初始化沒有實現序列化介面的RuleObjectNotSer
val name = RuleObjectNotSer.rulesMap.getOrElse(code, "未知")
//獲取當前線程ID
val treadId = Thread.currentThread().getId
//獲取當前Task對應的分區編號
val partitiondId = TaskContext.getPartitionId()
//獲取當前Task運行時的所在機器的主機名
val host = InetAddress.getLocalHost.getHostName
(id, code, name, treadId, partitiondId, host, RuleObjectNotSer.toString)
}
//處理數據,關聯維度
val res = lines.map(func)
res.saveAsTextFile(args(2))
sc.stop()
在函數內部初始化未序列化的class
這種方式非常不好,因為每來一條數據,new一個class的實例,會導致消耗更多資源,jvm會頻繁GC
//從HDFS中讀取數據,創建RDD
//HDFS指定的目錄中有4個小文件,內容如下:
//1,ln
val lines = sc.textFile(args(1))
//處理數據,關聯維度
val res = lines.map(e => {
val fields = e.split(",")
val id = fields(0).toInt
val code = fields(1)
//RuleClassNotSer是在Executor中被初始化的
val rulesClass = new RuleClassNotSer
//但是如果每來一條數據new一個RuleClassNotSer,不好,效率低,浪費資源,頻繁GC
val name = rulesClass.rulesMap.getOrElse(code, "未知")
//獲取當前線程ID
val treadId = Thread.currentThread().getId
//獲取當前Task對應的分區編號
val partitiondId = TaskContext.getPartitionId()
//獲取當前Task運行時的所在機器的主機名
val host = InetAddress.getLocalHost.getHostName
(id, code, name, treadId, partitiondId, host, rulesClass.toString)
})
res.saveAsTextFile(args(2))
調用mapPartitions在函數內部初始化未序列化的class
一個分區使用一個class的實例,即每個Task都是自己的class實例
//從HDFS中讀取數據,創建RDD
//HDFS指定的目錄中有4個小文件,內容如下:
//1,ln
val lines = sc.textFile(args(1))
//處理數據,關聯維度
val res = lines.mapPartitions(it => {
//RuleClassNotSer是在Executor中被初始化的
//一個分區的多條數據,使用同一個RuleClassNotSer實例
val rulesClass = new RuleClassNotSer
it.map(e => {
val fields = e.split(",")
val id = fields(0).toInt
val code = fields(1)
val name = rulesClass.rulesMap.getOrElse(code, "未知")
//獲取當前線程ID
val treadId = Thread.currentThread().getId
//獲取當前Task對應的分區編號
val partitiondId = TaskContext.getPartitionId()
//獲取當前Task運行時的所在機器的主機名
val host = InetAddress.getLocalHost.getHostName
(id, code, name, treadId, partitiondId, host, rulesClass.toString)
})
})
res.saveAsTextFile(args(2))
sc.stop()
Task線程安全問題
在一個Executor可以同時運行多個Task,如果多個Task使用同一個共用的單例對象,如果對共用的數據同時進行讀寫操作,會導致線程不安全的問題,為了避免這個問題,可以加鎖,但效率變低了,因為在一個Executor中同一個時間點只能有一個Task使用共用的數據,這樣就變成了串列了,效率低!
定義一個工具類object,格式化日期,因為SimpleDateFormat線程不安全,會出現異常
val conf = new SparkConf()
.setAppName("WordCount")
.setMaster("local[*]") //本地模式,開多個線程
//1.創建SparkContext
val sc = new SparkContext(conf)
val lines = sc.textFile("data/date.txt")
val timeRDD: RDD[Long] = lines.map(e => {
//將字元串轉成long類型時間戳
//使用自定義的object工具類
val time: Long = DateUtilObj.parse(e)
time
})
val res = timeRDD.collect()
println(res.toBuffer)
object DateUtilObj {
//多個Task使用了一個共用的SimpleDateFormat,SimpleDateFormat是線程不安全
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
//線程安全的
//val sdf: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
def parse(str: String): Long = {
//2022-05-23 11:39:30
sdf.parse(str).getTime
}
}
上面的程式會出現錯誤,因為多個Task同時使用一個單例對象格式化日期,報錯,如果加鎖,程式會變慢,改進後的代碼:
val conf = new SparkConf()
.setAppName("WordCount")
.setMaster("local[*]") //本地模式,開多個線程
//1.創建SparkContext
val sc = new SparkContext(conf)
val lines = sc.textFile("data/date.txt")
val timeRDD = lines.mapPartitions(it => {
//一個Task使用自己單獨的DateUtilClass實例,缺點是浪費記憶體資源
val dataUtil = new DateUtilClass
it.map(e => {
dataUtil.parse(e)
})
})
val res = timeRDD.collect()
println(res.toBuffer)
class DateUtilClass {
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
def parse(str: String): Long = {
//2022-05-23 11:39:30
sdf.parse(str).getTime
}
}
改進後,一個Task使用一個DateUtilClass實例,不會出現線程安全的問題。
累加器
累加器是Spark中用來做計數功能的,在程式運行過程當中,可以做一些額外的數據指標統計
觸發一次Action,並且將附帶的統計指標計算出來,可以使用Accumulator進行處理,Accumulator的本質數一個實現序列化介面class,每個Task都有自己的累加器,避免累加的數據發送衝突
object C14_AccumulatorDemo3 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("WordCount")
.setMaster("local[*]") //本地模式,開多個線程
//1.創建SparkContext
val sc = new SparkContext(conf)
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
//在Driver定義一個特殊的變數,即累加器
//Accumulator可以將每個分區的計數結果,通過網路傳輸到Driver,然後進行全局求和
val accumulator: LongAccumulator = sc.longAccumulator("even-acc")
val rdd2 = rdd1.map(e => {
if (e % 2 == 0) {
accumulator.add(1) //閉包,在Executor中累計的
}
e * 10
})
//就觸發一次Action
rdd2.saveAsTextFile("out/113")
//每個Task中累計的數據會返回到Driver嗎?
println(accumulator.count)
}
}