Spark相關問題 Spark比MR快的原因? 1) Spark的計算結果可以放入記憶體,支持基於記憶體的迭代,MR不支持。 2) Spark有DAG有向無環圖,可以實現pipeline的計算模式。 3) 資源調度模式:Spark粗粒度資源調度,MR是細粒度資源調度。 資源復用:Spark中的task可 ...
Spark相關問題
- Spark比MR快的原因?
1) Spark的計算結果可以放入記憶體,支持基於記憶體的迭代,MR不支持。
2) Spark有DAG有向無環圖,可以實現pipeline的計算模式。
3) 資源調度模式:Spark粗粒度資源調度,MR是細粒度資源調度。
資源復用:Spark中的task可以復用同一批Executor的資源。
MR裡面每一個map task對應一個jvm,不能復用資源。
- Spark中主要進程的作用?
Driver進程:負責任務的分發和結果的回收。
Executor進程:負責具體任務的執行。
Master進程:Spark資源管理的主進程,負責資源調度。
Worker進程:Spark資源管理的從進程,woker節點主要運行Executor
- Spark調優
1. 資源調優
1) .搭建Spark集群的時候要給Spark集群足夠的資源(core,memory)
在spark安裝包的conf下spark-env.sh
SPARK_WORKER_CORES
SPARK_WORKER_MEMORY
SPARK_WORKER_INSTANCE
2) .在提交Application的時候給Application分配更多的資源。
提交命令選項:(在提交Application的時候使用選項)
--executor-cores
--executor-memory
--total-executor-cores
配置信息:(在Application的代碼中設置
在Spark-default.conf中設置)
spark.executor.cores
spark.executor.memory
spark.max.cores
- 並行度調優
原則:一個core一般分配2~3個task,每一個task一般處理1G數據(task的複雜度類似wc)
提高並行度的方式:
1) .如果讀取的數據在HDFS上,降低block塊的大小
2) .sc.textFile(path,numPartitions)
3) sc.parallelize(list,numPartitions) 一般用於測試
4) coalesce、repartition可以提高RDD的分區數。
5) 配置信息:
spark.default.parallelism not set (預設executor core的總個數)
spark.sql.shuffle.partitions 200
6) 自定義分區器
- 代碼調優
- 避免創建重覆的RDD,復用同一個RDD
- 對多次使用的RDD進行持久化
如何選擇一種最合適的持久化策略?
預設情況下,性能最高的當然是MEMORY_ONLY,但前提是你的記憶體必須足夠足夠大,可以綽綽有餘地存放下整個RDD的所有數據。因為不進行序列化與反序列化操作,就避免了這部分的性能開銷;對這個RDD的後續運算元操作,都是基於純記憶體中的數據的操作,不需要從磁碟文件中讀取數據,性能也很高;而且不需要複製一份數據副本,並遠程傳送到其他節點上。但是這裡必須要註意的是,在實際的生產環境中,恐怕能夠直接用這種策略的場景還是有限的,如果RDD中數據比較多時(比如幾十億),直接用這種持久化級別,會導致JVM的OOM記憶體溢出異常。
如果使用MEMORY_ONLY級別時發生了記憶體溢出,那麼建議嘗試使用MEMORY_ONLY_SER級別。該級別會將RDD數據序列化後再保存在記憶體中,此時每個partition僅僅是一個位元組數組而已,大大減少了對象數量,並降低了記憶體占用。這種級別比MEMORY_ONLY多出來的性能開銷,主要就是序列化與反序列化的開銷。但是後續運算元可以基於純記憶體進行操作,因此性能總體還是比較高的。此外,可能發生的問題同上,如果RDD中的數據量過多的話,還是可能會導致OOM記憶體溢出的異常。
如果純記憶體的級別都無法使用,那麼建議使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略。因為既然到了這一步,就說明RDD的數據量很大,記憶體無法完全放下。序列化後的數據比較少,可以節省記憶體和磁碟的空間開銷。同時該策略會優先儘量嘗試將數據緩存在記憶體中,記憶體緩存不下才會寫入磁碟。
通常不建議使用DISK_ONLY和尾碼為_2的級別:因為完全基於磁碟文件進行數據的讀寫,會導致性能急劇降低,有時還不如重新計算一次所有RDD。尾碼為_2的級別,必須將所有數據都複製一份副本,併發送到其他節點上,數據複製以及網路傳輸會導致較大的性能開銷,除非是要求作業的高可用性,否則不建議使用。
持久化運算元:
cache:
MEMORY_ONLY
persist:
MEMORY_ONLY
MEMORY_ONLY_SER
MEMORY_AND_DISK_SER
一般不要選擇帶有_2的持久化級別。
checkpoint:
① 如果一個RDD的計算時間比較長或者計算起來比較複雜,一般將這個RDD的計算結果保存到HDFS上,這樣數據會更加安全。
② 如果一個RDD的依賴關係非常長,也會使用checkpoint,會切斷依賴關係,提高容錯的效率。
- 儘量避免使用shuffle類的運算元
使用廣播變數來模擬使用join,使用情況:一個RDD比較大,一個RDD比較小。
join運算元=廣播變數+filter、廣播變數+map、廣播變數+flatMap
- 使用map-side預聚合的shuffle操作
即儘量使用有combiner的shuffle類運算元。
combiner概念:
在map端,每一個map task計算完畢後進行的局部聚合。
combiner好處:
1) 降低shuffle write寫磁碟的數據量。
2) 降低shuffle read拉取數據量的大小。
3) 降低reduce端聚合的次數。
有combiner的shuffle類運算元:
1) reduceByKey:這個運算元在map端是有combiner的,在一些場景中可以使用reduceByKey代替groupByKey。
2) aggregateByKey(fun1,func2)
- 儘量使用高性能的運算元
使用reduceByKey替代groupByKey
使用mapPartition替代map
使用foreachPartition替代foreach
filter後使用coalesce減少分區數
使用使用repartitionAndSortWithinPartitions替代repartition與sort類操作
使用repartition和coalesce運算元操作分區。
- 使用廣播變數
開發過程中,會遇到需要在運算元函數中使用外部變數的場景(尤其是大變數,比如100M以上的大集合),那麼此時就應該使用Spark的廣播(Broadcast)功能來提升性能,函數中使用到外部變數時,預設情況下,Spark會將該變數複製多個副本,通過網路傳輸到task中,此時每個task都有一個變數副本。如果變數本身比較大的話(比如100M,甚至1G),那麼大量的變數副本在網路中傳輸的性能開銷,以及在各個節點的Executor中占用過多記憶體導致的頻繁GC,都會極大地影響性能。如果使用的外部變數比較大,建議使用Spark的廣播功能,對該變數進行廣播。廣播後的變數,會保證每個Executor的記憶體中,只駐留一份變數副本,而Executor中的task執行時共用該Executor中的那份變數副本。這樣的話,可以大大減少變數副本的數量,從而減少網路傳輸的性能開銷,並減少對Executor記憶體的占用開銷,降低GC的頻率。
廣播大變數發送方式:Executor一開始並沒有廣播變數,而是task運行需要用到廣播變數,會找executor的blockManager要,bloackManager找Driver裡面的blockManagerMaster要。
使用廣播變數可以大大降低集群中變數的副本數。不使用廣播變數,變數的副本數和task數一致。使用廣播變數變數的副本和Executor數一致。
使用廣播變數可以大大的降低集群中變數的副本數。
不使用廣播變數:變數的副本數和task數一致。
使用廣播變數:變數的副本數與Executor數一致。
廣播變數最大可以是多大?
ExecutorMemory*60%*90%*80% = executorMemory *0.42
- 使用Kryo優化序列化性能
在Spark中,主要有三個地方涉及到了序列化:
1) 在運算元函數中使用到外部變數時,該變數會被序列化後進行網路傳輸。
2) 將自定義的類型作為RDD的泛型類型時(比如JavaRDD<SXT>,SXT是自定義類型),所有自定義類型對象,都會進行序列化。因此這種情況下,也要求自定義的類必須實現Serializable介面。
3) 使用可序列化的持久化策略時(比如MEMORY_ONLY_SER),Spark會將RDD中的每個partition都序列化成一個大的位元組數組。
Kryo序列化器介紹:
Spark支持使用Kryo序列化機制。Kryo序列化機制,比預設的Java序列化機制,速度要快,序列化後的數據要更小,大概是Java序列化機制的1/10。所以Kryo序列化優化以後,可以讓網路傳輸的數據變少;在集群中耗費的記憶體資源大大減少。
對於這三種出現序列化的地方,我們都可以通過使用Kryo序列化類庫,來優化序列化和反序列化的性能。Spark預設使用的是Java的序列化機制,也就是ObjectOutputStream/ObjectInputStream API來進行序列化和反序列化。但是Spark同時支持使用Kryo序列化庫,Kryo序列化類庫的性能比Java序列化類庫的性能要高很多。官方介紹,Kryo序列化機制比Java序列化機制,性能高10倍左右。Spark之所以預設沒有使用Kryo作為序列化類庫,是因為Kryo要求最好要註冊所有需要進行序列化的自定義類型,因此對於開發者來說,這種方式比較麻煩。
Spark中使用Kryo:
Sparkconf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .registerKryoClasses(new Class[]{SpeedSortKey.class}) |
- 優化數據結構
java中有三種類型比較消耗記憶體:
1) 對象,每個Java對象都有對象頭、引用等額外的信息,因此比較占用記憶體空間。
2) 字元串,每個字元串內部都有一個字元數組以及長度等額外信息。
3) 集合類型,比如HashMap、LinkedList等,因為集合類型內部通常會使用一些內部類來封裝集合元素,比如Map.Entry。
因此Spark官方建議,在Spark編碼實現中,特別是對於運算元函數中的代碼,儘量不要使用上述三種數據結構,儘量使用字元串替代對象,使用原始類型(比如Int、Long)替代字元串,使用數組替代集合類型,這樣儘可能地減少記憶體占用,從而降低GC頻率,提升性能。
- 使用高性能的庫fastutil
fasteutil介紹:
fastutil是擴展了Java標準集合框架(Map、List、Set;HashMap、ArrayList、HashSet)的類庫,提供了特殊類型的map、set、list和queue;fastutil能夠提供更小的記憶體占用,更快的存取速度;我們使用fastutil提供的集合類,來替代自己平時使用的JDK的原生的Map、List、Set,好處在於,fastutil集合類,可以減小記憶體的占用,並且在進行集合的遍歷、根據索引(或者key)獲取元素的值和設置元素的值的時候,提供更快的存取速度。fastutil的每一種集合類型,都實現了對應的Java中的標準介面(比如fastutil的map,實現了Java的Map介面),因此可以直接放入已有系統的任何代碼中。
fastutil最新版本要求Java 7以及以上版本。
使用:
見RandomExtractCars.java類
- 數據本地化
- 數據本地化的級別:
1) PROCESS_LOCAL
task要計算的數據在本進程(Executor)的記憶體中。
2) NODE_LOCAL
① task所計算的數據在本節點所在的磁碟上。
② task所計算的數據在本節點其他Executor進程的記憶體中。
3) NO_PREF
task所計算的數據在關係型資料庫中,如mysql。
4) RACK_LOCAL
task所計算的數據在同機架的不同節點的磁碟或者Executor進程的記憶體中
5) ANY
跨機架。
- Spark數據本地化調優:
Spark中任務調度時,TaskScheduler在分發之前需要依據數據的位置來分發,最好將task分發到數據所在的節點上,如果TaskScheduler分發的task在預設3s依然無法執行的話,TaskScheduler會重新發送這個task到相同的Executor中去執行,會重試5次,如果依然無法執行,那麼TaskScheduler會降低一級數據本地化的級別再次發送task。
如上圖中,會先嘗試1,PROCESS_LOCAL數據本地化級別,如果重試5次每次等待3s,會預設這個Executor計算資源滿了,那麼會降低一級數據本地化級別到2,NODE_LOCAL,如果還是重試5次每次等待3s還是失敗,那麼還是會降低一級數據本地化級別到3,RACK_LOCAL。這樣數據就會有網路傳輸,降低了執行效率。
1) 如何提高數據本地化的級別?
可以增加每次發送task的等待時間(預設都是3s),將3s倍數調大, 結合WEBUI來調節:
• spark.locality.wait
• spark.locality.wait.process
• spark.locality.wait.node
• spark.locality.wait.rack
註意:等待時間不能調大很大,調整數據本地化的級別不要本末倒置,雖然每一個task的本地化級別是最高了,但整個Application的執行時間反而加長。
2) 如何查看數據本地化的級別?
通過日誌或者WEBUI
- Spark Shuffle調優
- SparkShuffle
spark1.x 中有 兩種類型的shuffle (hashShuffleManager 另外一個是sortShuffleManager)
到spark2.x以後 只有一種shuffle 機制 SortShuffle 管理器叫做SortShuffleManager
- SparkShuffle概念
reduceByKey會將上一個RDD中的每一個key對應的所有value聚合成一個value,然後生成一個新的RDD,元素類型是<key,value>對的形式,這樣每一個key對應一個聚合起來的value。
問題:聚合之前,每一個key對應的value不一定都是在一個partition中,也不太可能在同一個節點上,因為RDD是分散式的彈性的數據集,RDD的partition極有可能分佈在各個節點上。
如何聚合?
– Shuffle Write:上一個stage的每個map task就必須保證將自己處理的當前分區的數據相同的key寫入一個分區文件中,可能會寫入多個不同的分區文件中。
– Shuffle Read:reduce task就會從上一個stage的所有task所在的機器上尋找屬於己的那些分區文件,這樣就可以保證每一個key所對應的value都會匯聚到同一個節點上去處理和聚合。
Spark中有兩種Shuffle管理類型,HashShufflManager和SortShuffleManager,Spark1.2之前是HashShuffleManager, Spark1.2引入SortShuffleManager,在Spark 2.0+版本中已經將HashShuffleManager丟棄。
- HashShuffleManager
1) 普通機制
- 普通機制示意圖
- 執行流程
a) 每一個map task將不同結果寫到不同的buffer中,每個buffer的大小為32K。buffer起到數據緩存的作用。
b) 每個buffer文件最後對應一個磁碟小文件。
c) reduce task來拉取對應的磁碟小文件。
- 總結
① .map task的計算結果會根據分區器(預設是hashPartitioner)來決定寫入到哪一個磁碟小文件中去。ReduceTask會去Map端拉取相應的磁碟小文件。
② .產生的磁碟小文件的個數:
M(map task的個數)*R(reduce task的個數)
- 存在的問題
產生的磁碟小文件過多,會導致以下問題:
a) 在Shuffle Write過程中會產生很多寫磁碟小文件的對象。
b) 在Shuffle Read過程中會產生很多讀取磁碟小文件的對象。
c) 在JVM堆記憶體中對象過多會造成頻繁的gc,gc還無法解決運行所需要的記憶體 的話,就會OOM。
d) 在數據傳輸過程中會有頻繁的網路通信,頻繁的網路通信出現通信故障的可能性大大增加,一旦網路通信出現了故障會導致shuffle file cannot find 由於這個錯誤導致的task失敗,TaskScheduler不負責重試,由DAGScheduler負責重試Stage。
2) 合併機制(considation機制)
- 合併機制示意圖
- 總結
產生磁碟小文件的個數:C(core的個數)*R(reduce的個數)
如果核數比較多的話 那麼產生的小文件個數 是不是也很多啊?
- SortShuffleManager
1) 普通機制
- 普通機制示意圖
- 執行流程
a) map task 的計算結果會寫入到一個記憶體數據結構裡面,記憶體數據結構預設是5M
b) 在shuffle的時候會有一個定時器,不定期的去估算這個記憶體結構的大小,當記憶體結構中的數據超過5M時,比如現在記憶體結構中的數據為5.01M,那麼他會申請5.01*2-5=5.02M記憶體給記憶體數據結構。
c) 如果申請成功不會進行溢寫,如果申請不成功,這時候會發生溢寫磁碟。
d) 在溢寫之前記憶體結構中的數據會進行排序分區
e) 然後開始溢寫磁碟,寫磁碟是以batch的形式去寫,一個batch是1萬條數據,
f) map task執行完成後,會將這些磁碟小文件合併成一個大的磁碟文件,同時生成一個索引文件。
g) reduce task去map端拉取數據的時候,首先解析索引文件,根據索引文件再去拉取對應的數據。
- 總結
產生磁碟小文件的個數: 2*M(map task的個數)
2) bypass機制
- bypass機制示意圖
- 總結
① .bypass運行機制的觸發條件如下:
shuffle reduce task的數量小於spark.shuffle.sort.bypassMergeThreshold的參數值。這個值預設是200。
② .產生的磁碟小文件為:2*M(map task的個數)
- Shuffle文件定址
1) MapOutputTracker
MapOutputTracker是Spark架構中的一個模塊,是一個主從架構。管理磁碟小文件的地址。
- MapOutputTrackerMaster是主對象,存在於Driver中。
- MapOutputTrackerWorker是從對象,存在於Excutor中。
2) BlockManager
BlockManager塊管理者,是Spark架構中的一個模塊,也是一個主從架構。
- BlockManagerMaster,主對象,存在於Driver中。
BlockManagerMaster會在集群中有用到廣播變數和緩存數據或者刪除緩存數據的時候,通知BlockManagerSlave傳輸或者刪除數據。
- BlockManagerSlave,從對象,存在於Excutor中。
BlockManagerSlave會與BlockManagerSlave之間通信。
¬ 無論在Driver端的BlockManager還是在Excutor端的BlockManager都含有三個對象:
① DiskStore:負責磁碟的管理。
② MemoryStore:負責記憶體的管理。
③ BlockTransferService:負責數據的傳輸。
3) Shuffle文件定址圖
4) Shuffle文件定址流程
a) 當map task執行完成後,會將task的執行情況和磁碟小文件的地址封裝到MpStatus對象中,通過MapOutputTrackerWorker對象向Driver中的MapOutputTrackerMaster彙報。
b) 在所有的map task執行完畢後,Driver中就掌握了所有的磁碟小文件的地址。
c) 在reduce task執行之前,會通過Excutor中MapOutPutTrackerWorker向Driver端的MapOutputTrackerMaster獲取磁碟小文件的地址。
d) 獲取到磁碟小文件的地址後,會通過BlockManager連接數據所在節點,然後通過BlockTransferService進行數據的傳輸。
e) BlockTransferService預設啟動5個task去節點拉取數據。預設情況下,5個task拉取數據量不能超過48M。
- Shuffle調優
- SparkShuffle調優配置項如何使用?
1) 在代碼中,不推薦使用,硬編碼。
new SparkConf().set(“spark.shuffle.file.buffer”,”64”)
2) 在提交spark任務的時候,推薦使用。
spark-submit --conf spark.shuffle.file.buffer=64 –conf ….
3) 在conf下的spark-default.conf配置文件中,不推薦,因為是寫死後所有應用程式都要用。
- buffer大小
- shuffle read拉取數據量的大小
- shuffle聚合記憶體的比例
- 拉取數據重試次數
- 重試間隔時間60s
- Spark Shuffle的種類
- HashShuffle 合併機制
- SortShuffle bypass機制 200次
- Spark記憶體管理
Spark執行應用程式時,Spark集群會啟動Driver和Executor兩種JVM進程,Driver負責創建SparkContext上下文,提交任務,task的分發等。Executor負責task的計算任務,並將結果返回給Driver。同時需要為需要持久化的RDD提供儲存。Driver端的記憶體管理比較簡單,這裡所說的Spark記憶體管理針對Executor端的記憶體管理。
Spark記憶體管理分為靜態記憶體管理和統一記憶體管理,Spark1.6之前使用的是靜態記憶體管理,Spark1.6之後引入了統一記憶體管理。
靜態記憶體管理中存儲記憶體、執行記憶體和其他記憶體的大小在 Spark 應用程式運行期間均為固定的,但用戶可以應用程式啟動前進行配置。
統一記憶體管理與靜態記憶體管理的區別在於儲存記憶體和執行記憶體共用同一塊空間,可以互相借用對方的空間。
Spark1.6以上版本預設使用的是統一記憶體管理,可以通過參數spark.memory.useLegacyMode 設置為true(預設為false)使用靜態記憶體管理。
- 靜態記憶體管理分佈圖
- 統一記憶體管理分佈圖
- reduce 中OOM如何處理?
1) 減少每次拉取的數據量
2) 提高shuffle聚合的記憶體比例
3) 提高Excutor的總記憶體
- 記憶體調優
比如我們創建對象 先往伊甸園和s1 中放 滿了 發生minoGC 此時 會清空 伊甸園和s1 如果還有對象 那麼就往s2中放 如果s2放的下 就放在s2中 s2也滿了 會發生小型的minoGC 將對象清空
如果還有數據 將數據+1 加到15 會放入到老年代中
但是 老年代中的對象 都是常用的對象 比如資料庫連接池等 老年代如果滿了 會發生full GC 如果清空後 還不夠用 就會發生GC
我們上面討論的問題 討論的task 的記憶體夠不夠用
JVM堆記憶體分為一塊較大的Eden和兩塊較小的Survivor,每次只使用Eden和其中一塊Survivor,當回收時將Eden和Survivor中還存活著的對象一次性複製到另外一塊Survivor上,最後清理掉Eden和剛纔用過的Survivor。也就是說當task創建出來對象會首先往Eden和survivor1中存放,survivor2是空閑的,當Eden和survivor1區域放滿以後就會觸發minor gc小型垃圾回收,清理掉不再使用的對象。會將存活下來的對象放入survivor2中。
如果存活下來的對象大小大於survivor2的大小,那麼JVM就會將多餘的對象直接放入到老年代中。
如果這個時候年輕代的記憶體不是很大的話,就會經常的進行minor gc,頻繁的minor gc會導致短時間內有些存活的對象(多次垃圾回收都沒有回收掉,一直在用的又不能被釋放,這種對象每經過一次minor gc都存活下來)頻繁的倒來倒去,會導致這些短生命周期的對象(不一定長期使用)每進行一次垃圾回收就會長一歲。年齡過大,預設15歲,垃圾回收還是沒有回收回去就會跑到老年代裡面去了。
這樣會導致在老年代中存放大量的短生命周期的對象,老年代應該存放的是數量比較少並且會長期使用的對象,比如資料庫連接池對象。這樣的話,老年代就會滿溢(full gc 因為本來老年代中的對象很少,很少進行full gc 因此採取了不太複雜但是消耗性能和時間的垃圾回收演算法)。不管minor gc 還是 full gc都會導致JVM的工作線程停止。
總結-堆記憶體不足造成的影響:
1) 頻繁的minor gc。
2) 老年代中大量的短生命周期的對象會導致full gc。
3) gc 多了就會影響Spark的性能和運行的速度。
Spark JVM調優主要是降低gc時間,可以修改Executor記憶體的比例參數。
RDD緩存、task定義運行的運算元函數,可能會創建很多對象,這樣會占用大量的堆記憶體。堆記憶體滿了之後會頻繁的GC,如果GC還不能夠滿足記憶體的需要的話就會報OOM。比如一個task在運行的時候會創建N個對象,這些對象首先要放入到JVM年輕代中。比如在存數據的時候我們使用了foreach來將數據寫入到記憶體,每條數據都會封裝到一個對象中存入資料庫中,那麼有多少條數據就會在JVM中創建多少個對象。
Spark中如何記憶體調優?
Spark Executor堆記憶體中存放(以靜態記憶體管理為例):RDD的緩存數據和廣播變數(spark.storage.memoryFraction 0.6),shuffle聚合記憶體(spark.shuffle.memoryFraction 0.2),task的運行(0.2)那麼如何調優呢?
1) 提高Executor總體記憶體的大小
2) 降低儲存記憶體比例或者降低聚合記憶體比例
如何查看gc?
Spark WEBUI中job->stage->task
- 調節Executor的堆外記憶體
Spark底層shuffle的傳輸方式是使用netty傳輸,netty在進行網路傳輸的過程會申請堆外記憶體(netty是零拷貝),所以使用了堆外記憶體。預設情況下,這個堆外記憶體上限預設是每一個executor的記憶體大小的10%;真正處理大數據的時候,這裡都會出現問題,導致spark作業反覆崩潰,無法運行;此時就會去調節這個參數,到至少1G(1024M),甚至說2G、4G。
executor在進行shuffle write,優先從自己本地關聯的mapOutPutWorker中獲取某份數據,如果本地block manager沒有的話,那麼會通過TransferService,去遠程連接其他節點上executor的block manager去獲取,嘗試建立遠程的網路連接,並且去拉取數據。頻繁創建對象讓JVM堆記憶體滿溢,進行垃圾回收。正好碰到那個exeuctor的JVM在垃圾回收。處於垃圾回過程中,所有的工作線程全部停止;相當於只要一旦進行垃圾回收,spark / executor停止工作,無法提供響應,spark預設的網路連接的超時時長是60s;如果卡住60s都無法建立連接的話,那麼這個task就失敗了。task失敗了就會出現shuffle file cannot find的錯誤。
那麼如何調節等待的時長呢?
在./spark-submit提交任務的腳本裡面添加:
--conf spark.core.connection.ack.wait.timeout=300
Executor由於記憶體不足或者堆外記憶體不足了,掛掉了,對應的Executor上面的block manager也掛掉了,找不到對應的shuffle map output文件,Reducer端不能夠拉取數據。我們可以調節堆外記憶體的大小,如何調節?
在./spark-submit提交任務的腳本裡面添加
yarn下:
--conf spark.yarn.executor.memoryOverhead=2048 單位M
standalone下:
--conf spark.memory.offHeap.size=2048單位M
- 解決數據傾斜
- 提高shuffle操作的並行度
方案實現思路:
在對RDD執行shuffle運算元時,給shuffle運算元傳入一個參數,比如reduceByKey(1000),該參數就設置了這個shuffle運算元執行時shuffle read task的數量。對於Spark SQL中的shuffle類語句,比如group by、join等,需要設置一個參數,即spark.sql.shuffle.partitions,該參數代表了shuffle read task的並行度,該值預設是200,對於很多場景來說都有點過小。
方案實現原理:
增加shuffle read task的數量,可以讓原本分配給一個task的多個key分配給多個task,從而讓每個task處理比原來更少的數據。舉例來說,如果原本有5個不同的key,每個key對應10條數據,這5個key都是分配給一個task的,那麼這個task就要處理50條數據。而增加了shuffle read task以後,每個task就分配到一個key,即每個task就處理10條數據,那麼自然每個task的執行時間都會變短了。
- 雙重聚合
方案適用場景:
對RDD執行reduceByKey等聚合類shuffle運算元或者在Spark SQL中使用group by語句進行分組聚合時,比較適用這種方案。
方案實現思路:
這個方案的核心實現思路就是進行兩階段聚合。第一次是局部聚合,先給每個key都打上一個隨機數,比如10以內的隨機數,此時原先一樣的key就變成不一樣的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就會變成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接著對打上隨機數後的數據,執行reduceByKey等聚合操作,進行局部聚合,那麼局部聚合結果,就會變成了(1_hello, 2) (2_hello, 2)。然後將各個key的首碼給去掉,就會變成(hello,2)(hello,2),再次進行全局聚合操作,就可以得到最終結果了,比如(hello, 4)。
方案實現原理:
將原本相同的key通過附加隨機首碼的方式,變成多個不同的key,就可以讓原本被一個task處理的數據分散到多個task上去做局部聚合,進而解決單個task處理數據量過多的問題。接著去除掉隨機首碼,再次進行全局聚合,就可以得到最終的結果。
如果一個RDD中有一個key導致數據傾斜,同時還有其他的key,那麼一般先對數據集進行抽樣,然後找出傾斜的key,再使用filter對原始的RDD進行分離為兩個RDD,一個是由傾斜的key組成的RDD1,一個是由其他的key組成的RDD2,那麼對於RDD1可以使用加隨機首碼進行多分區多task計算,對於另一個RDD2正常聚合計算,最後將結果再合併起來。
- 將reduce join轉為map join
BroadCast+filter(或者map)
方案適用場景:
在對RDD使用join類操作,或者是在Spark SQL中使用join語句時,而且join操作中的一個RDD或表的數據量比較小(比如幾百M或者一兩G),比較適用此方案。
方案實現思路:
不使用join運算元進行連接操作,而使用Broadcast變數與map類運算元實現join操作,進而完全規避掉shuffle類的操作,徹底避免數據傾斜的發生和出現。將較小RDD中的數據直接通過collect運算元拉取到Driver端的記憶體中來,然後對其創建一個Broadcast變數;接著對另外一個RDD執行map類運算元,在運算元函數內,從Broadcast變數中獲取較小RDD的全量數據,與當前RDD的每一條數據按照連接key進行比對,如果連接key相同的話,那麼就將兩個RDD的數據用你需要的方式連接起來。
方案實現原理:
普通的join是會走shuffle過程的,而一旦shuffle,就相當於會將相同key的數據拉取到一個shuffle read task中再進行join,此時就是reduce join。但是如果一個RDD是比較小的,則可以採用廣播小RDD全量數據+map運算元來實現與join同樣的效果,也就是map join,此時就不會發生shuffle操作,也就不會發生數據傾斜。
- 採樣傾斜key並分拆join操作
方案適用場景:
兩個RDD/Hive表進行join的時候,如果數據量都比較大,無法採用“解決方案五”,那麼此時可以看一下兩個RDD/Hive表中的key分佈情況。如果出現數據傾斜,是因為其中某一個RDD/Hive表中的少數幾個key的數據量過大,而另一個RDD/Hive表中的所有key都分佈比較均勻,那麼採用這個解決方案是比較合適的。
方案實現思路:
對包含少數幾個數據量過大的key的那個RDD,通過sample運算元採樣出一份樣本來,然後統計一下每個key的數量,計算出來數據量最大的是哪幾個key。然後將這幾個key對應的數據從原來的RDD中拆分出來,形成一個單獨的RDD,並給每個key都打上n以內的隨機數作為首碼,而不會導致傾斜的大部分key形成另外一個RDD。接著將需要join的另一個RDD,也過濾出來那幾個傾斜key對應的數據並形成一個單獨的RDD,將每條數據膨脹成n條數據,這n條數據都按順序附加一個0~n的首碼,不會導致傾斜的大部分key也形成另外一個RDD。再將附加了隨機首碼的獨立RDD與另一個膨脹n倍的獨立RDD進行join,此時就可以將原先相同的key打散成n份,分散到多個task中去進行join了。而另外兩個普通的RDD就照常join即可。最後將兩次join的結果使用union運算元合併起來即可,就是最終的join結果 。
- 使用隨機首碼和擴容RDD進行join
方案適用場景:
如果在進行join操作時,RDD中有大量的key導致數據傾斜,那麼進行分拆key也沒什麼意義,此時就只能使用最後一種方案來解決問題了。
方案實現思路:
該方案的實現思路基本和“解決方案六”類似,首先查看RDD/Hive表中的數據分佈情況,找到那個造成數據傾斜的RDD/Hive表,比如有多個key都對應了超過1萬條數據。然後將該RDD的每條數據都打上一個n以內的隨機首碼。同時對另外一個正常的RDD進行擴容,將每條數據都擴容成n條數據,擴容出來的每條數據都依次打上一個0~n的首碼。最後將兩個處理後的RDD進行join即可。
- Spark故障解決(troubleshooting)
- shuffle file cannot find:磁碟小文件找不到。
1) connection timeout ----shuffle file cannot find
提高建立連接的超時時間,或者降低gc,降低gc了那麼spark不能堆外提供服務的時間就少了,那麼超時的可能就會降低。
2) fetch data fail ---- shuffle file cannot find
提高拉取數據的重試次數以及間隔時間。
3) OOM/executor lost ---- shuffle file cannot find
提高堆外記憶體大小,提高堆內記憶體大小。
- reduce OOM
BlockManager拉取的數據量大,reduce task處理的數據量小
解決方法:
1) 降低每次拉取的數據量
2) 提高shuffle聚合的記憶體比例
3) 提高Executor的記憶體比例
- 序列化問題
- Null值問題
val rdd = rdd.map{x=>{
x+”~”;
}}
rdd.foreach{x=>{
System.out.println(x.getName())
}}