Spark從1.6.0版本開始,記憶體管理模塊就發生了改變,舊版本的記憶體管理模塊是實現了StaticMemoryManager 類,現在被稱為"legacy"。"Legacy"模式預設被置為不可用,這就意味著當你用Spark1.5.x和Spark1.6.x運行相同的代碼會有不同的結果,應當多加註意。考 ...
Spark從1.6.0版本開始,記憶體管理模塊就發生了改變,舊版本的記憶體管理模塊是實現了StaticMemoryManager 類,現在被稱為"legacy"。"Legacy"模式預設被置為不可用,這就意味著當你用Spark1.5.x和Spark1.6.x運行相同的代碼會有不同的結果,應當多加註意。考慮的相容性,可以通過設置spark.memory.useLegacyMode為可用,預設是false.
這篇文章介紹自spark1.6.0版本後的新的記憶體管理模型,它實現的是UnifiedMemoryManager類。
在這張圖中你可以看到三個主要記憶體區域。
1.Reserved Memory.這部分記憶體是被系統預留的,它的大小也是被硬編碼的。在Spark1.6.0版本,它的大小是300MB,這就意味著這部分記憶體不能計入Spark記憶體計算,除非重新編譯源碼或設置spark.testing.reservedMemory,它的大小是不可改變的,因為park.testing.reservedMemory只是一個測試參數所以在生產中不推薦使用。註意,這部分記憶體只是被稱為“Reserved",實際上它不會被spark用來乾任何事情 ,但是它限制了你在spark中可分配的記憶體大小。即使你想將全部JVM堆記憶體用於spark緩存數據,也不能使用這部分空閑記憶體(不是真的就浪費了,其實它存儲了Spark的一些內部對象)。供參考,如果你不能為executor至少1.5 * Reserved Memory = 450MB的堆記憶體,任務將會失敗並提示”please use larger heap size“的錯誤信息。
2.User Memory.這部分記憶體是分配Spark Memory記憶體之後的部分,而且這部分用來乾什麼完全取決於你。你可以用來存儲RDD transformations過程使用的數據結構。例如,你可以通過mapPartitions transformation 重寫Spark aggregation,mapPartitions transformations 保存hash表保證aggregation運行。這部分數據就保存在User Memory。再次強調,這是User Memory它完全由你決定存什麼、如何使用,Spark完全不會管你拿這塊區域用來做什麼,怎麼用,也不會考慮你的代碼在這塊區域是否會導致記憶體溢出。
3.Spark Memory.這部分記憶體就是由Spark管理了。這部分記憶體大小的計算:(“Java Heap” – “Reserved Memory”) * spark.memory.fraction,而且在spark1.6.0版本預設大小為: (“Java Heap” – 300MB) * 0.75。例如:如果堆記憶體大小有4G,將有2847MB的Spark Memory,Spark Memory=(4*1024MB-300)*0.75=2847MB。這部分記憶體會被分成兩部分:Storage Memory和Execution Memory,而且這兩部分的邊界由spark.memory.storageFraction參數設定,預設是0.5即50%。新的記憶體管理模型中的優點是,這個邊界不是固定的,在記憶體壓力下這個邊界是可以移動的。如一個區域記憶體不夠用時可以從另一區域借用記憶體。下邊來討論如何移動及使用的:
1.Storage Memory.這部分記憶體即可以用來緩存spark數據也可以用來做unroll序列化數據的臨時空間。廣播變數以block的形式也存儲在這裡。你奇怪的是unroll,因為你可能會說,並不需要那麼多空間去unroll block使其可用——在沒有足夠記憶體去unroll bolock的情況下,如果得到持久化級別的允許,將直接在這部分記憶體unroll block。至於廣播變數,當它的持久化級別為MEMORY_AND_DISK時,就會緩存到此。
2.Execution Memory.這部分記憶體用於存儲執行task過程中的一些對象。例如,它可以用來shuflle map端的中間緩存,也可以用來存儲hash aggregation過程的hash table.在沒有足夠記憶體的時候,這部分記憶體支持溢室到磁碟,但是這部分記憶體的blocks不會被其它線程的task擠出去。
下邊我們來說一下Storage Memory 和Execution Memory之間的邊界移動。從Execution Memory的本質來看,你不能將這部分記憶體空間的數據擠出去,因為這部分記憶體的數據是用來計算的中間結果,如果計算過程找不到原來存到這的block數據任務就會失敗。但是對於Storage Memory記憶體就不會這樣,它只是用來緩存記憶體中數據,如果將裡邊的block數據驅逐出去,就會更新block 元數據映射信息使用到時告知該block被移除了,要想再拿到這些數據從HDD中讀取即可(或者如果緩存級別沒有溢寫就重新計算)。
所以,我們只能Execution Memory可以向Storage Memory擠用空間,反之不可。那麼當什麼時候會發生Execution Memory 向Storage Memory擠用空間呢?有兩種可能:
- 只要Storage Memory有可用空間,就可以增大Execution Memory 大小,減少Storage Memory 大小。
- Storage Memory的空間大小已經超出了初始設定的大小,並且將這部分空間全部占用,在這種情況下就可以強制將從Storage Memory中移出Blocks,減少它的空間到初始大小。
反過來,在只有當Execution Memory空間有空餘時,Storage Memory才可以向Execution Memory借用空間,也就是說Execution Memory只要不夠用了就可以向Storage Memory擠占空間不管Storage Memory有沒有空餘,而Storage Memory只能當Execution Memory有空餘時才要以借用不能搶占。
初始Storage Memory 大小:“Spark Memory” * spark.memory.storageFraction = (“Java Heap” – “Reserved Memory”) * spark.memory.fraction * spark.memory.storageFraction。根據預設值,即(“Java Heap” – 300MB) * 0.75 * 0.5 = (“Java Heap” – 300MB) * 0.375. 如果Java Heap=4G,那麼就有1423.5MB大小的Storage Memory空間。
這就意味著當我們使用Spark cacheu並載入全部數據到executor中時,至少要將Storage Memory大小等於預設初始值大小。因為當Storage Memory區域還沒滿時,Execution Memory區域已經膨脹大於其初始設定大小時,我們不能強制將Execution Memory搶占的空間數據驅逐,所以最終Storage Memory會變小。
希望這篇文章可以幫你更好的理解spark新的記憶體管理機制,並以此來應用。
譯自:https://0x0fff.com/spark-memory-management/ 如有不當之處或有寶貴意見,請私信聯繫我,及時更正。