一、概述 Hive是基於Hadoop的一個數據倉庫(Data Aarehouse,簡稱數倉、DW),可以將結構化的數據文件映射為一張資料庫表,並提供類SQL查詢功能。是用於存儲、分析、報告的數據系統。 在Hadoop生態系統中,HDFS用於存儲數據,Yarn用於資源管理,MapReduce用於數據處 ...
根據業務需求,需要對pyspark記憶體資源進行限制
本文使用的環境為pyspark 3.1.2,standalone模式
不足之處還請指出
pyspark進程說明
首先我們需要知道對pyspark進行記憶體限制,是限制哪部分的記憶體。
先看一下執行pyspark任務需要啟動哪些進程
pyspark與原版基於scala的spark啟動的進程大體相似但略有不同。
當啟動一個pyspark任務時,可以看到產生了2個系列的進程,分別是負責driver和executor
driver:
編號 | 說明 | 記憶體 |
---|---|---|
d1 | spark的driver端,spark-submit進程,運行在jvm,啟動sparkContext,構建dag等 | spark運算元在driver端用到的記憶體,包括collect等 |
d2 | spark的driver端啟動的pyspark的driver端,執行python部分代碼,通過py4j與d1通信 | python代碼中所用到的記憶體,包括創建一些變數等 |
executor:
編號 | 說明 | 記憶體 |
---|---|---|
e1 | spark的worker節點 | 不關註 |
e2 | 設備上其他spark任務的executor backend,與本文無關 | 不關註 |
e3 | 本任務對應的spark的executor backend,運行jvm中 | spark在executor端使用的記憶體,包括collect等運算元、shuffle等 |
e4 | 本任務對應的pyspark的executor backend,管理具體執行task的worker | 占用少量記憶體 |
e5 | 具體執行pyspark中的python task的任務的worker,由e4 fork得到,執行運算元中的自定義Python函數等 | pyspark在executor端使用的記憶體,通過python執行,包括map中的func等 |
可以看到,pyspark任務中,主要需要對4處進行記憶體限制
- spark的driver
- spark的executor
- pyspark的driver
- pyspark的executor
後兩個是pyspark比spark多出來的。
官方配置
關於spark中的記憶體,可以關註官方配置文檔
其中,重點關註3條配置信息
Property Name | Default | Meaning | Since Version |
---|---|---|---|
spark.driver.memory | 1g | Amount of memory to use for the driver process, i.e. where SparkContext is initialized, in the same format as JVM memory strings with a size unit suffix ("k", "m", "g" or "t") (e.g. 512m , 2g ). Note: In client mode, this config must not be set through the SparkConf directly in your application, because the driver JVM has already started at that point. Instead, please set this through the --driver-memory command line option or in your default properties file. |
1.1.1 |
spark.executor.memory | 1g | Amount of memory to use per executor process, in the same format as JVM memory strings with a size unit suffix ("k", "m", "g" or "t") (e.g. 512m , 2g ). |
0.7.0 |
spark.executor.pyspark.memory | Not set | The amount of memory to be allocated to PySpark in each executor, in MiB unless otherwise specified. If set, PySpark memory for an executor will be limited to this amount. If not set, Spark will not limit Python's memory use and it is up to the application to avoid exceeding the overhead memory space shared with other non-JVM processes. When PySpark is run in YARN or Kubernetes, this memory is added to executor resource requests. Note: This feature is dependent on Python's resource module; therefore, the behaviors and limitations are inherited. For instance, Windows does not support resource limiting and actual resource is not limited on MacOS. |
2.4.0 |
spark.driver.memory和spark.executor.memory這兩個參數限制就是spark端driver和executor的記憶體,
對需要在jvm中執行的任務進行了很好的限制,
但如上文所述,還需要對pyspark端的記憶體進行限制。
pyspark的executor記憶體限制
spark.executor.pyspark.memory這個參數是對pyspark的executor記憶體進行了限制
根據pyspark中worker.py
# set up memory limits
memory_limit_mb = int(os.environ.get('PYSPARK_EXECUTOR_MEMORY_MB', "-1"))
if memory_limit_mb > 0 and has_resource_module:
total_memory = resource.RLIMIT_AS
try:
(soft_limit, hard_limit) = resource.getrlimit(total_memory)
msg = "Current mem limits: {0} of max {1}\n".format(soft_limit, hard_limit)
print(msg, file=sys.stderr)
# convert to bytes
new_limit = memory_limit_mb * 1024 * 1024
if soft_limit == resource.RLIM_INFINITY or new_limit < soft_limit:
msg = "Setting mem limits to {0} of max {1}\n".format(new_limit, new_limit)
print(msg, file=sys.stderr)
resource.setrlimit(total_memory, (new_limit, new_limit))
except (resource.error, OSError, ValueError) as e:
# not all systems support resource limits, so warn instead of failing
print("WARN: Failed to set memory limit: {0}\n".format(e), file=sys.stderr)
看到,其實這個參數主要是使用了Python的resource模塊進行了記憶體限制
然而,這裡面設置的resource.RLIMIT_AS是對虛擬記憶體進行限制
我們通常想限制的是駐留記憶體。
例如一個小測試
import resource
resource.setrlimit(resource.RLIMIT_AS, (1*1024**3, -1))
def fun():
tmp = []
for i in range(1024**3):
try:
tmp.append('a'*1024)
except MemoryError:
break
return tmp
x = fun(), fun(), fun(), fun()
通過resource.setrlimit限制了1g記憶體。resource.RLIMIT_AS為虛擬記憶體的flag,RLIMIT_RSS為駐留記憶體,但只在老linux內核中生效,現在無法對內核態操作
運行後資源如下
virt達到了限制的1g,但res只有900m。在其他情況下,通常virt遠遠大於res,這樣virt達到了我們限制好的數值,但是res很小,記憶體遠遠沒得到充分利用,造成資源浪費。
另註:
在standalone模式下,每個worker(e5)限制的virt記憶體是在application啟動時計算好的。通過spark.executor.pyspark.memory 除以 --executor-cores 得到。
\(workerMemoryMb =memoryMb / execCores\)
減少每個stage的task個數並不能增加每個worker的virt記憶體限制大小
pyspark的driver記憶體限制
pyspark的driver負責執行python流程代碼,記憶體包含Python中創建的各種變數等
spark官方似乎沒有參數對這部分記憶體進行限制
可以自行使用resource模塊,對virt記憶體進行限制
報錯信息參考
spark的driver和executor出現oom後,會產生java.lang.OutOfMemoryError: Java heap space報錯信息
pyspark的driver和executor出現oom後,產生MemoryError,附有對應python代碼
cgroup管理記憶體
Control groups,是一種Linux內核特性,對進程進行分級分組管理,不同組用不同資源限制並監控。
可以對pyspark的駐留記憶體進行管理
安裝
以centos為例
yum install -y libcgroup libcgroup-tools
分組配置
這裡先設置了一個組,用作pyspark的總體控制
再在這個組中設置兩個組,分別對driver端的進程和executor的進程進行了限制
/sys/fs/cgroup/memory這個路徑是cgroup對memory進行控制的配置,在這裡建立對應文件夾來建立具體分組
首先是整體分組
mkdir /sys/fs/cgroup/memory/pyspark
再driver和executor分別建組控制
mkdir /sys/fs/cgroup/memory/pyspark/driver
mkdir /sys/fs/cgroup/memory/pyspark/executor
建組後,cgroup會自動生成一些配置文件,如下圖
關於每一項的說明可以參考大佬的文檔
在這裡主要關註memory.limit_in_bytes和cgroup.procs
memory.limit_in_bytes為當前限制的記憶體額度。超過額度的話會對相應進程進行kill
可以使用echo重定向對這個進行限制
echo 10g > /sys/fs/cgroup/memory/ai_pyspark/driver/memory.limit_in_bytes
echo 50g > /sys/fs/cgroup/memory/ai_pyspark/executor/memory.limit_in_bytes
則將這個分組的記憶體限製為10g和50g
cgroup.procs中包含這個分組中的pid
可將spark-submit和worker的pid追加進這個文件,cgroup便將這個進行拉進這個分組進行管理
echo 160224 >> /sys/fs/cgroup/memory/ai_pyspark/driver/cgroup.procs
echo 167910 >> /sys/fs/cgroup/memory/ai_pyspark/executor/cgroup.procs
cgroup會將進程中新產生的子進程自動加入cgroup.procs
例如當產生新的pyspark.daemon時,cgroup會將對應的pid添加到executor分組中
linux系統中每一個進程都有自己的分組,我們沒配置分組的進程會在/sys/fs/cgroup/memory分組中,如果想將某個分組中的某個pid移除這個分組,只需將他移動到另一個分組,例如
echo 167910 >> /sys/fs/cgroup/memory/cgroup.procs
另註:
如果executor發生oom,當前spark executor backend進程掛掉,spark會啟動一個新的executor backend,不要忘記將新的executor pid再加入cgroup.procs
參考
cgroups(7) — Linux manual page
Linux Cgroup系列(04):限制cgroup的記憶體使用(subsystem之memory)
如何限制python進程的記憶體使用量 - 酷python的文章 - 知乎