Spark 作為一個以擅長記憶體計算為優勢的計算引擎,記憶體管理方案是其非常重要的模塊; Spark的記憶體可以大體歸為兩類:execution和storage,前者包括shuffles、joins、sorts和aggregations所需記憶體,後者包括cache和節點間數據傳輸所需記憶體;在Spark 1 ...
Spark 作為一個以擅長記憶體計算為優勢的計算引擎,記憶體管理方案是其非常重要的模塊; Spark的記憶體可以大體歸為兩類:execution和storage,前者包括shuffles、joins、sorts和aggregations所需記憶體,後者包括cache和節點間數據傳輸所需記憶體;在Spark 1.5和之前版本里,兩者是靜態配置的,不支持借用,spark1.6 對記憶體管理模塊進行了優化,通過記憶體空間的融合,消除以上限制,提供更好的性能。官方網站只是要求記憶體在8GB之上即可(Impala推薦要求機器配置在128GB), 但spark job運行效率主要取決於:數據量大小,記憶體消耗,內核數(確定併發運行的task數量)
目錄:
- 基礎知識
- spark1.5- 記憶體管理
- spark1.6 記憶體管理
基本知識:
- on-heap memory:Java中分配的非空對象都是由Java虛擬機的垃圾收集器管理的,也稱為堆內記憶體。虛擬機會定期對垃圾記憶體進行回收,在某些特定的時間點,它會進行一次徹底的回收(full gc)。徹底回收時,垃圾收集器會對所有分配的堆內記憶體進行完整的掃描,這意味著一個重要的事實——這樣一次垃圾收集對Java應用造成的影響,跟堆的大小是成正比的。過大的堆會影響Java應用的性能
- off-heap memory:堆外記憶體意味著把記憶體對象分配在Java虛擬機的堆以外的記憶體,這些記憶體直接受操作系統管理(而不是虛擬機)。這樣做的結果就是能保持一個較小的堆,以減少垃圾收集對應用的影響
- LRU Cache(Least Recently Used):LRU可以說是一種演算法,也可以算是一種原則,用來判斷如何從Cache中清除對象,而LRU就是“近期最少使用”原則,當Cache溢出時,最近最少使用的對象將被從Cache中清除
- spark 源碼: https://github.com/apache/spark/releases
- scale ide for Intellij : http://plugins.jetbrains.com/plugin/?id=1347
Spark1.5- 記憶體管理:
- 1.6 版本引入了新的記憶體管理方案,配置參數: spark.memory.useLegacyMode 預設 false 表示使用新方案,true 表示使用舊方案, SparkEnv.scala 源碼 如下圖:
- 在staticMemoryManager.scala 類中查看構造類及記憶體獲取定義
- 通過代碼推斷,若設置了 spark.testing.memory 則以該配置的值作為 systemMaxMemory,否則使用 JVM 最大記憶體作為 systemMaxMemory。
- spark.testing.memory 僅用於測試,一般不設置,所以這裡我們認為 systemMaxMemory 的值就是 executor 的最大可用記憶體
- Execution:用於緩存shuffle、join、sort和aggregation的臨時數據,通過spark.shuffle.memoryFraction配置
- spark.shuffle.memoryFraction:shuffle 期間占 executor 運行時記憶體的百分比,用小數表示。在任何時候,用於 shuffle 的記憶體總 size 不得超過這個限制,超出部分會 spill 到磁碟。如果經常 spill,考慮調大參數值
- spark.shuffle.safetyFraction:為防止 OOM,不能把 systemMaxMemory * spark.shuffle.memoryFraction 全用了,需要有個安全百分比
- 最終用於 execution 的記憶體量為:executor 最大可用記憶體* spark.shuffle.memoryFraction*spark.shuffle.safetyFraction,預設為 executor 最大可用記憶體 * 0.16
- execution記憶體被分配給JVM里的多個task線程。
- task間的execution記憶體分配是動態的,如果沒有其他tasks存在,Spark允許一個task占用所有可用execution記憶體
- storage記憶體分配分析過程與 Execution 一致,由上面的代碼得出,用於storage 的記憶體量為: executor 最大可用記憶體 * spark.storage.memoryFraction * spark.storage.safetyFraction,預設為 executor 最大可用記憶體 * 0.54
- 在 storage 中,有一部分記憶體是給 unroll 使用的,unroll 即反序列化 block,該部分占比由 spark.storage.unrollFraction 控制,預設為0.2
- 通過代碼分析,storage 和 execution 總共使用了 80% 的記憶體,剩餘 20% 記憶體被系統保留了,用來存儲運行中產生的對象,該類型記憶體不可控.
小結:
- 這種記憶體管理方式的缺陷,即 execution 和 storage 記憶體表態分配,即使在一方記憶體不夠用而另一方記憶體空閑的情況下也不能共用,造成記憶體浪費,為解決這一問題,spark1.6 啟用新的記憶體管理方案UnifiedMemoryManager
- staticMemoryManager- jvm 堆記憶體分配圖如下
Spark1.6 記憶體管理:
-
從spark1.6開始,引入了新的記憶體管理方式-----統一記憶體管理(UnifiedMemoryManager),在統一記憶體管理下,spark一個executor中的jvm heap記憶體被劃分成如下圖:
- Reserved Memory,這一部分的記憶體是我們無法使用的部分,spark內部保留記憶體,會存儲一些spark的內部對象等內容。
- spark1.6預設的Reserved Memory大小是300MB。這部分大小是不允許我們使用者改變的。簡單點說就是我們在為executor申請記憶體後,有300MB是我們無法使用的。並且如果我們申請的executor的大小小於1.5 * Reserved Memory 即 < 450MB,spark會報錯:
- User Memory:用戶在程式中創建的對象存儲等一系列非spark管理的記憶體開銷都占用這一部分記憶體
- Spark Memory:該部分大小為 (JVM Heap Size - Reserved Memory) * spark.memory.fraction,其中的spark.memory.fraction可以是我們配置的(預設0.75),如下圖:
- 如果spark.memory.fraction配小了,我們的spark task在執行時產生數據時,包括我們在做cache時就很可能出現經常因為這部分記憶體不足的情況而產生spill到disk的情況,影響效率。採用官方推薦預設配置
- Spark Memory這一塊有被分成了兩個部分,Execution Memory 和 Storage Memory,這通過spark.memory.storageFraction來配置兩塊各占的大小(預設0.5,一邊一半),如圖:
- Storage Memory主要用來存儲我們cache的數據和臨時空間序列化時unroll的數據,以及broadcast變數cache級別存儲的內容
- Execution Memory則是spark Task執行時使用的記憶體(比如shuffle時排序就需要大量的記憶體)
- 為了提高記憶體利用率,spark針對Storage Memory 和 Execution Memory有如下策略:
- 一方空閑,一方記憶體不足情況下,記憶體不足一方可以向空閑一方借用記憶體
- 只有Execution Memory可以強制拿回Storage Memory在Execution Memory空閑時,借用的Execution Memory的部分記憶體(如果因強制取回,而Storage Memory數據丟失,重新計算即可)
- 如果Storage Memory只能等待Execution Memory主動釋放占用的Storage Memory空閑時的記憶體。(這裡不強制取回,因為如果task執行,數據丟失就會導致task 失敗)