PySpark 限制記憶體

来源:https://www.cnblogs.com/antelx/archive/2022/04/06/16107238.html
-Advertisement-
Play Games

一、概述 Hive是基於Hadoop的一個數據倉庫(Data Aarehouse,簡稱數倉、DW),可以將結構化的數據文件映射為一張資料庫表,並提供類SQL查詢功能。是用於存儲、分析、報告的數據系統。 在Hadoop生態系統中,HDFS用於存儲數據,Yarn用於資源管理,MapReduce用於數據處 ...


根據業務需求,需要對pyspark記憶體資源進行限制

本文使用的環境為pyspark 3.1.2,standalone模式

不足之處還請指出

pyspark進程說明

首先我們需要知道對pyspark進行記憶體限制,是限制哪部分的記憶體。

先看一下執行pyspark任務需要啟動哪些進程

pyspark與原版基於scala的spark啟動的進程大體相似但略有不同。

當啟動一個pyspark任務時,可以看到產生了2個系列的進程,分別是負責driver和executor

driver:

image

編號 說明 記憶體
d1 spark的driver端,spark-submit進程,運行在jvm,啟動sparkContext,構建dag等 spark運算元在driver端用到的記憶體,包括collect等
d2 spark的driver端啟動的pyspark的driver端,執行python部分代碼,通過py4j與d1通信 python代碼中所用到的記憶體,包括創建一些變數等

executor:

image

編號 說明 記憶體
e1 spark的worker節點 不關註
e2 設備上其他spark任務的executor backend,與本文無關 不關註
e3 本任務對應的spark的executor backend,運行jvm中 spark在executor端使用的記憶體,包括collect等運算元、shuffle等
e4 本任務對應的pyspark的executor backend,管理具體執行task的worker 占用少量記憶體
e5 具體執行pyspark中的python task的任務的worker,由e4 fork得到,執行運算元中的自定義Python函數等 pyspark在executor端使用的記憶體,通過python執行,包括map中的func等

可以看到,pyspark任務中,主要需要對4處進行記憶體限制

  • spark的driver
  • spark的executor
  • pyspark的driver
  • pyspark的executor

後兩個是pyspark比spark多出來的。

官方配置

關於spark中的記憶體,可以關註官方配置文檔

其中,重點關註3條配置信息

Property Name Default Meaning Since Version
spark.driver.memory 1g Amount of memory to use for the driver process, i.e. where SparkContext is initialized, in the same format as JVM memory strings with a size unit suffix ("k", "m", "g" or "t") (e.g. 512m, 2g). Note: In client mode, this config must not be set through the SparkConf directly in your application, because the driver JVM has already started at that point. Instead, please set this through the --driver-memory command line option or in your default properties file. 1.1.1
spark.executor.memory 1g Amount of memory to use per executor process, in the same format as JVM memory strings with a size unit suffix ("k", "m", "g" or "t") (e.g. 512m, 2g). 0.7.0
spark.executor.pyspark.memory Not set The amount of memory to be allocated to PySpark in each executor, in MiB unless otherwise specified. If set, PySpark memory for an executor will be limited to this amount. If not set, Spark will not limit Python's memory use and it is up to the application to avoid exceeding the overhead memory space shared with other non-JVM processes. When PySpark is run in YARN or Kubernetes, this memory is added to executor resource requests. Note: This feature is dependent on Python's resource module; therefore, the behaviors and limitations are inherited. For instance, Windows does not support resource limiting and actual resource is not limited on MacOS. 2.4.0

spark.driver.memoryspark.executor.memory這兩個參數限制就是spark端driver和executor的記憶體,

對需要在jvm中執行的任務進行了很好的限制,

但如上文所述,還需要對pyspark端的記憶體進行限制。

pyspark的executor記憶體限制

spark.executor.pyspark.memory這個參數是對pyspark的executor記憶體進行了限制

根據pyspark中worker.py

# set up memory limits
memory_limit_mb = int(os.environ.get('PYSPARK_EXECUTOR_MEMORY_MB', "-1"))
if memory_limit_mb > 0 and has_resource_module:
    total_memory = resource.RLIMIT_AS
    try:
        (soft_limit, hard_limit) = resource.getrlimit(total_memory)
        msg = "Current mem limits: {0} of max {1}\n".format(soft_limit, hard_limit)
        print(msg, file=sys.stderr)

        # convert to bytes
        new_limit = memory_limit_mb * 1024 * 1024

        if soft_limit == resource.RLIM_INFINITY or new_limit < soft_limit:
            msg = "Setting mem limits to {0} of max {1}\n".format(new_limit, new_limit)
            print(msg, file=sys.stderr)
            resource.setrlimit(total_memory, (new_limit, new_limit))

    except (resource.error, OSError, ValueError) as e:
        # not all systems support resource limits, so warn instead of failing
        print("WARN: Failed to set memory limit: {0}\n".format(e), file=sys.stderr)

看到,其實這個參數主要是使用了Python的resource模塊進行了記憶體限制

然而,這裡面設置的resource.RLIMIT_AS是對虛擬記憶體進行限制

我們通常想限制的是駐留記憶體。

例如一個小測試

import resource
resource.setrlimit(resource.RLIMIT_AS, (1*1024**3, -1))

def fun():
    tmp = []
    for i in range(1024**3):
        try:
            tmp.append('a'*1024)
        except MemoryError:
            break
    return tmp
    
x = fun(), fun(), fun(), fun()

通過resource.setrlimit限制了1g記憶體。resource.RLIMIT_AS為虛擬記憶體的flag,RLIMIT_RSS為駐留記憶體,但只在老linux內核中生效,現在無法對內核態操作

運行後資源如下

image

virt達到了限制的1g,但res只有900m。在其他情況下,通常virt遠遠大於res,這樣virt達到了我們限制好的數值,但是res很小,記憶體遠遠沒得到充分利用,造成資源浪費。

另註:

在standalone模式下,每個worker(e5)限制的virt記憶體是在application啟動時計算好的。通過spark.executor.pyspark.memory 除以 --executor-cores 得到。

\(workerMemoryMb =memoryMb / execCores\)

減少每個stage的task個數並不能增加每個worker的virt記憶體限制大小

pyspark的driver記憶體限制

pyspark的driver負責執行python流程代碼,記憶體包含Python中創建的各種變數等

spark官方似乎沒有參數對這部分記憶體進行限制

可以自行使用resource模塊,對virt記憶體進行限制

報錯信息參考

spark的driver和executor出現oom後,會產生java.lang.OutOfMemoryError: Java heap space報錯信息

pyspark的driver和executor出現oom後,產生MemoryError,附有對應python代碼

cgroup管理記憶體

Control groups,是一種Linux內核特性,對進程進行分級分組管理,不同組用不同資源限制並監控。

可以對pyspark的駐留記憶體進行管理

安裝

以centos為例

yum install -y libcgroup libcgroup-tools

分組配置

這裡先設置了一個組,用作pyspark的總體控制

再在這個組中設置兩個組,分別對driver端的進程和executor的進程進行了限制

/sys/fs/cgroup/memory這個路徑是cgroup對memory進行控制的配置,在這裡建立對應文件夾來建立具體分組

首先是整體分組

mkdir /sys/fs/cgroup/memory/pyspark

再driver和executor分別建組控制

mkdir /sys/fs/cgroup/memory/pyspark/driver
mkdir /sys/fs/cgroup/memory/pyspark/executor

建組後,cgroup會自動生成一些配置文件,如下圖

image

關於每一項的說明可以參考大佬的文檔

在這裡主要關註memory.limit_in_bytes和cgroup.procs

memory.limit_in_bytes為當前限制的記憶體額度。超過額度的話會對相應進程進行kill

可以使用echo重定向對這個進行限制

echo 10g > /sys/fs/cgroup/memory/ai_pyspark/driver/memory.limit_in_bytes
echo 50g > /sys/fs/cgroup/memory/ai_pyspark/executor/memory.limit_in_bytes

則將這個分組的記憶體限製為10g和50g

cgroup.procs中包含這個分組中的pid

可將spark-submit和worker的pid追加進這個文件,cgroup便將這個進行拉進這個分組進行管理

image

echo 160224 >> /sys/fs/cgroup/memory/ai_pyspark/driver/cgroup.procs
echo 167910 >> /sys/fs/cgroup/memory/ai_pyspark/executor/cgroup.procs

cgroup會將進程中新產生的子進程自動加入cgroup.procs

例如當產生新的pyspark.daemon時,cgroup會將對應的pid添加到executor分組中

linux系統中每一個進程都有自己的分組,我們沒配置分組的進程會在/sys/fs/cgroup/memory分組中,如果想將某個分組中的某個pid移除這個分組,只需將他移動到另一個分組,例如

echo 167910 >> /sys/fs/cgroup/memory/cgroup.procs

另註:

如果executor發生oom,當前spark executor backend進程掛掉,spark會啟動一個新的executor backend,不要忘記將新的executor pid再加入cgroup.procs

參考

Spark Configuration

cgroups(7) — Linux manual page

Linux Cgroup系列(04):限制cgroup的記憶體使用(subsystem之memory)

如何限制python進程的記憶體使用量 - 酷python的文章 - 知乎


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 《零基礎學Java》 流概述 在程式開發過程中,將輸入與輸出設備之間的數據傳遞抽象為流(例如鍵盤可以輸入數據,顯示器可以顯示鍵盤輸入的數據等),按照不同的分類方式,可以將流分為不同的類型:根據操作流的數據單元,可以將流分為位元組流(操作的數據單元是一個位元組)和字元流(操作的數據單元是兩個位元組或個字元, ...
  • AWT中常用組件 基本組件 組件名 功能 Button Button Canvas 用於繪圖的畫布 Checkbox 覆選框組件(也可當做單選框組件使用) CheckboxGroup 用於將多個Checkbox 組件組合成一組, 一組 Checkbox 組件將只有一個可以 被選中 , 即全部變成單選 ...
  • Runnable介面實現多線程 package Day23; //多線稱實現方式2() Runnable【介面】 public class RunnableDemo implements Runnable { //重寫此介面的run方法 @Override public void run() { f ...
  • 一. 歸併排序演算法簡介 歸併排序演算法是一種採用了分治策略的排序演算法。它通過遞歸地先使每個子序列有序,再將兩個有序的序列進行合併成一個有序的序列(也可以採用非遞歸的方式實現,效率更高一些)。歸併演算法是穩定和高效的排序演算法(適用於複雜對象(結構體)數列的穩定排序) 二. 演算法複雜度 最理想情況:O(nl ...
  • 話不多說 先上效果圖:功能:1.每一個更新按鈕對應著所更新的書籍定價數量,並將結果輸出至消費總金額處2.提交對應著所有書籍對應價格數量的總和,同樣輸出至總金額處3.重置既刷新頁面實列:1.整體是居中的,效果圖存在位置偏差2.數量和消費總金額的預設值是0,如果將0刪除會出現報錯,所以請小心OK 上代碼 ...
  • 鏡像下載、功能變數名稱解析、時間同步請點擊 阿裡雲開源鏡像站 在本文中,我們將瞭解如何在 Ubuntu 20.04 上安裝 Kubernetes。在過去的幾年裡,容器化為開發人員提供了很大的靈活性。最常用的容器化應用程式之一是 Docker。 運行小型應用程式並不難,但如果你想擴展它們怎麼辦?當您擁有成百上 ...
  • 背景:WindowsServer2019安裝.NET3.5報錯0x800f0950,嘗試網上的方法,發現以下方法有效,進行重新整理解決方法,進行記錄。 一、以下為報錯信息: 二、嘗試過的方法有: 直接從伺服器控制面板安裝。 使用dism命令安裝,存在如下報錯: C:\Windows>dism /on ...
  • 鏡像下載、功能變數名稱解析、時間同步請點擊 阿裡雲開源鏡像站 一、 安裝依賴包 # yum –y install gcc gcc-c++ openssl openssl-devel pcre pcre-devel zlib zlib-devel 如yum安裝依賴包時報錯: 解決辦法: #~ wget -O ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...