設置進程記憶體(Process Memory) Apache Flink通過嚴格控制其各種組件的記憶體使用,在JVM之上提供高效的工作負載。 配置總記憶體(Total Memory) Flink JVM進程的總進程記憶體(total process memory)由Flink應用程式消耗的記憶體(總Flink ...
設置進程記憶體(Process Memory)
Apache Flink通過嚴格控制其各種組件的記憶體使用,在JVM之上提供高效的工作負載。
配置總記憶體(Total Memory)
Flink JVM進程的總進程記憶體(total process memory)由Flink應用程式消耗的記憶體(總Flink記憶體(total Flink memory))和JVM運行進程所消耗的記憶體組成。總Flink記憶體消耗包括JVM堆記憶體( JVM Heap)和堆外(Off-heap,直接(direct)或本地(native)記憶體的使用量
在Flink中設置記憶體的最簡單方法是配置以下兩個選項之一:
組件 | TaskManager配置選項 | JobManager配置選項 |
---|---|---|
Total Flink memory | taskmanager.memory.flink.size |
jobmanager.memory.flink.size |
Total process memory | taskmanager.memory.process.size |
jobmanager.memory.process.size |
其餘記憶體組件將根據預設值或額外配置的選項自動調整。
配置 total Flink memory 更適合standalone部署,其中要聲明給Flink本身多少記憶體。total Flink memory分為JVM Heap和 Off-heap記憶體。另請參閱如何為standalone部署配置記憶體。
如果配置了total process memory,那就聲明瞭總共應該為Flink JVM進程分配多少記憶體。對於容器化部署,它對應於請求的容器的記憶體大小,另請參閱如何為容器配置記憶體(Kubernetes 或者 Yarn)
另一種設置記憶體的方法是配置特定於具體Flink進程的total Flink memory所需的內部組件,比如TaskManager、JobManager。
必須使用上述三種方法之一來配置Flink的記憶體(本地執行除外),否則Flink啟動將失敗。這意味著必須顯式配置以下沒有預設值的選項子集之一:
不建議同時顯示配置 total process memory 和 total Flink memory。因為這樣可能因為潛在的記憶體配置衝突,導致部署失敗。配置其它記憶體組件時同樣需要註意,因為也可能產生配置衝突。
JVM參數
Flink在啟動進程時,會根據配置或派生的記憶體組件大小,顯式添加以下與記憶體相關的JVM參數:
JVM 參數 | Value for TaskManager | Value for JobManager |
---|---|---|
-Xmx 和 -Xms | Framework + Task Heap Memory | JVM Heap Memory (*) |
-XX:MaxDirectMemorySize | Framework + Task Off-heap (**) + Network Memory | Off-heap Memory (**),(***) |
-XX:MaxMetaspaceSize | JVM Metaspace | JVM Metaspace |
(*) 請記住,根據使用的GC演算法,你可能無法使用全部堆記憶體。一些GC演算法會為自己分配一定數量的堆記憶體。這將導致Heap metrics返回不同的最大值(https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/metrics/#memory)。
(**) 請註意,用戶代碼中本地非直接使用記憶體也可以作為堆外記憶體的一部分。
(***) 僅當設置了對應的jobmanager.memory.enable-jvm-direct-memory-limit
選項時,才會為JobManager添加JVM Direct記憶體限制
根據比例限制的組件(Capped Fractionated Components)
本節描述了選項的配置細節,這些選項(的大小)可以設置為其它記憶體大小的占比,同時受到min-max範圍的限制:
- JVM Overhead(JVM 開銷)可以設置為 total process memory的占比
- Network memory 可以設置為 total Flink memory 的占比(僅針對TaskManager)
相關記憶體部分的配置方法,請同時參考 TaskManager 和 JobManager 的詳細記憶體模型。
這些組件的記憶體大小必須在相應的最大值、最小值範圍內,否則 Flink 將無法啟動。 最大值、最小值具有預設值,也可以通過相應的配置選項顯示設置。 例如,如果僅配置以下記憶體選項:
- total Process memory = 1000MB,
- JVM Overhead min = 64MB,
- JVM Overhead max = 128MB,
- JVM Overhead fraction = 0.1
那麼 JVM Overhead將會是 1000MB x 0.1 = 100MB,在 64-128MB 的範圍內。
註意,如果將最大值、最小值設置成相同大小,那相當於明確指定了該組件記憶體的大小。
如果沒有明確指定組件記憶體的大小,Flink 會根據總記憶體和占比(fraction)計算出該組件記憶體的大小。 計算得到的記憶體大小將受限於相應的最小值/最大值選項。 例如,如果僅配置下列選項:
- total Process memory = 1000MB,
- JVM Overhead min = 128MB,
- JVM Overhead max = 256MB,
- JVM Overhead fraction = 0.1
那麼 JVM Overhead將會是 128MB,因為根據總記憶體和占比計算得到的記憶體大小 100MB 小於最小值128MB。
如果配置了總記憶體和其他組件記憶體的大小,那麼 Flink 也有可能會忽略給定的占比。 這種情況下,JVM Overhead被設置為總記憶體減去其他所有組件記憶體後的剩餘部分。 這樣推導得出的記憶體大小必須符合最大值、最小值範圍,否則配置失敗。 例如,假設僅配置下列選項:
- total Process memory = 1000MB,
- task heap = 100MB, (類似的例子可以是JobManager中的JVM Heap)
- JVM Overhead min = 64MB,
- JVM Overhead max = 256MB,
- JVM Overhead fraction = 0.1
total Process memory中所有其他組件記憶體均有預設大小,包括 TaskManager 的預設Managed Memory占比或 JobManager 的預設Off-heap 記憶體。 因此,**JVM Overhead的實際大小不是根據占比算出的大小(1000MB x 0.1 = 100MB),而是total Process memory的剩餘部分,該值的大小必須在 64-256MB 的範圍內,否則將會啟動失敗。
設置任務管理器記憶體(TaskManager Memory)
TaskManager在Flink中運行用戶代碼。根據需要配置記憶體使用情況可以大大減少Flink的資源占用,並提高作業穩定性。
下述記憶體配置描述適用版本1.10及往後版本。
配置總記憶體
Flink JVM進程的total process memory由Flink應用程式消耗的記憶體(總Flink記憶體)和JVM運行進程所消耗的記憶體組成。總Flink記憶體消耗包括JVM堆、托管記憶體(由Flink管理)和其他直接(或本機)記憶體的使用。
如果您在本地(例如從IDE)運行Flink而沒有創建集群,那麼只有記憶體配置選項的一個子集是相關的,請參閱本地運行 以瞭解更多詳細信息。
否則,為TaskManager設置記憶體的最簡單方法是配置總記憶體(參見上文)。這裡更詳細地描述了一種更細粒度的方法。
其餘記憶體組件將根據預設值或額外配置的選項自動調整。
配置堆和托管記憶體(Heap and Managed Memory)
如前所述 ,在Flink中設置記憶體的另一種方法是顯式指定兩者task堆記憶體 和 托管記憶體)。它為Flink的任務可用堆記憶體及其托管記憶體提供了更多控制。
其餘記憶體組件將根據預設值或額外配置的選項自動調整。
如果已顯式配置任務堆和托管記憶體,建議既不設置total process memory,也不設置 total Flink memory,否則,很容易導致記憶體配置衝突
Task (Operator)堆記憶體
如果想保證一定數量的JVM堆記憶體可用於的用戶代碼,可以顯式地設置任務堆記憶體(taskmanager.memory.task.heap.size
)。它將被添加到JVM堆大小中,並將專用於運行用戶代碼的Flink operator。
托管記憶體
托管記憶體由Flink管理,並作為本地記憶體(堆外記憶體)進行分配。以下工作負載使用托管記憶體:
- 流作業(Streaming jobs)可以將其用於RocksDB state backend.
- 流作業和批處理作業都可以使用它進行排序、哈希表和中間結果的緩存。
- 流作業和批處理作業都可以使用它來執行Python進程中用戶定義的函數.
托管記憶體的大小可以:
- 通過
taskmanager.memory.managed.size
顯示配置 - 或者通過
taskmanager.memory.managed.fraction
計算為total Flink memory的占比值。
如果兩者都已設置,則Size將覆蓋fraction。如果沒有顯式配置size和fraction,則使用預設fraction
查看如何為state backends 和batch jobs配置記憶體。
使用者權重(Consumer Weights)
如果作業包含多種類型的托管記憶體使用者,還可以控制如何在這些類型之間共用托管記憶體。配置選項taskmanager.memory.managed.consumer-weights
允許你為每種類型設置一個權重,Flink將按比例保留托管記憶體。有效的消費者類型包括:
OPERATOR
: 用於內置演算法。STATE_BACKEND
: 用於流作業中的RocksDB State後端PYTHON
: 用於PYTHON進程
例如,如果流作業同時使用RocksDB State後端和Python UDFs,並且使用者權重配置為 STATE_BACKEND:70,PYTHON:30
,則Flink將為RocksDB State後端保留總托管記憶體的70%
,為Python進程保留 30%
。
對於每種類型,只有當作業包含該類型的托管記憶體使用者時,Flink才會保留托管記憶體。
Flink不會為未包含在使用者權重中的使用者類型保留托管記憶體。如果作業實際需要缺少的類型,則可能導致記憶體分配失敗。預設情況下,包括所有使用者類型。只有當顯式配置/覆蓋權重時,才會發生這種情況。
配置堆外記憶體(直接記憶體或者本地記憶體)
用戶代碼分配的堆外記憶體應計入任務堆外記憶體(taskmanager.memory.task.off-heap.size
)。
還可以調整框架堆外記憶體(framework off-heap memory)。僅當你確信Flink框架需要更多記憶體時,才應該更改此值。
Flink將框架堆外記憶體和任務堆外記憶體包含在JVM的直接記憶體(direct memory)限制中,另請參閱JVM參數。
註意:儘管本地非直接記憶體使用可以算作框架堆外記憶體或任務堆外記憶體的一部分,但這也將導致更高的JVM直接記憶體限制。
註意:網路記憶體(network memory)也是JVM直接記憶體的一部分,但它由Flink管理,並保證永遠不會超過其配置的大小。因此,在這種情況下,調整網路記憶體的大小將沒有幫助。
詳細記憶體模型
註意:用戶代碼的本地非直接記憶體使用也算作任務堆外記憶體(task off-heap memory
)的一部分
下表列出了上面描述的所有記憶體組件,及影響各個組件大小的Flink配置選項:
組件 | 配置 | 描述 |
---|---|---|
Framework Heap Memory | taskmanager.memory.framework.heap.size |
專用於Flink框架的JVM堆記憶體(高級選項)預設128 mb |
Task Heap Memory | taskmanager.memory.task.heap.size |
專用於Flink應用程式以運行Operator和用戶代碼的JVM堆記憶體,無預設大小 |
Managed memory | taskmanager.memory.managed.size taskmanager.memory.managed.fraction |
由Flink管理的本地記憶體,保留用於排序、哈希表、緩存中間結果和RocksDB state後端。size無預設大小,fraction預設0.4 |
Framework Off-heap Memory | taskmanager.memory.framework.off-heap.size |
專用於Flink框架的堆外直接(或本地)記憶體(高級選項)預設 128 mb |
Task Off-heap Memory | taskmanager.memory.task.off-heap.size |
專供Flink應用運行operator的堆外直接(或本地)記憶體。預設 0 bytes |
Network Memory | taskmanager.memory.network.min taskmanager.memory.network.max taskmanager.memory.network.fraction |
為任務之間交換數據記錄而保留的直接記憶體(例如,為網路傳輸進行緩衝)是total Flink memory的一個 capped fractionated component 。 該記憶體用於分配網路緩衝(network buffers) min 預設64 mb max 預設 infinite fraction 0.1 |
JVM metaspace | taskmanager.memory.jvm-metaspace.size |
Flink JVM 進程的元空間大小(Metaspace size) 預設 256mb |
JVM Overhead | taskmanager.memory.jvm-overhead.min taskmanager.memory.jvm-overhead.max taskmanager.memory.jvm-overhead.fraction |
為其他JVM開銷保留的本地記憶體:例如線程堆棧、代碼緩存、垃圾收集空間等,它是total process memory的一個capped fractionated component min 預設 192 mb max 預設 1 gb fraction 預設 0.1 |
框架記憶體(Framework Memory)
不應該在沒有充分理由的情況下更改框架堆記憶體(framework heap memory)和框架堆外記憶體(framework off-heap memory)。僅在你確信Flink需要更多記憶體用於某些內部數據結構或operator時,才調整它們。它可能與特定的部署環境或作業結構有關,例如高並行性。此外,在某些設置中,Flink依賴項(如Hadoop)可能會消耗更多的直接記憶體或本地記憶體。
註意 Flink目前沒有隔離框架堆或堆外記憶體和任務記憶體的版本。
本地執行(Local Execution)
如果將Flink作為一個單獨的java程式在機器上本地啟動(例如,從IDE),而不創建集群,則除以下組件外,所有組件都將被忽略:
記憶體組件 | 相關選項 | 用於本地執行的預設值 |
---|---|---|
Task heap | taskmanager.memory.task.heap.size |
infinite |
Task off-heap | taskmanager.memory.task.off-heap.size |
infinite |
Managed memory | taskmanager.memory.managed.size |
128MB |
Network memory | taskmanager.memory.network.min taskmanager.memory.network.max |
64MB |
上面列出的所有組件都可以但不必為本地執行顯示的配置。如果未對其進行配置,則會將其設置為預設值
註意 本地執行的情況下,任務堆大小與實際堆大小沒有任何關係。啟動的本地進程的實際JVM堆大小不受Flink控制,取決於進程的啟動方式。如果要控制JVM堆大小,則必須顯式傳遞相應的JVM參數,例如-Xmx、-Xms。
設置Job管理器(JobManager)記憶體
JobManager是Flink集群的控制元素。它由三個不同的組件組成:Resource Manager、Dispatcher和JobMaster(每個運行Flink Job各一個)。
以下描述的記憶體配置從1.11*版本開始適用。
配置總記憶體(Total Memory)
設置記憶體配置的最簡單方法是為進程配置總記憶體。如果使用本地執行模式運行JobManager進程,則不需要配置記憶體選項,不起任何作用。
詳細配置
下表列出了上面描述的所有記憶體組件,及影響各個組件大小的Flink配置選項:
Component | Configuration options | Description |
---|---|---|
JVM Heap | jobmanager.memory.heap.size |
job管理器的 JVM堆記憶體大小,無預設大小 |
Off-heap Memory | jobmanager.memory.off-heap.size |
job管理器的堆外記憶體大小,包括直接記憶體和本地記憶體,預設 128 mb |
JVM metaspace | jobmanager.memory.jvm-metaspace.size |
Flink JVM進程的元空間大小。 預設 256 mb |
JVM Overhead | jobmanager.memory.jvm-overhead.min jobmanager.memory.jvm-overhead.max jobmanager.memory.jvm-overhead.fraction |
為其他JVM開銷保留的本地記憶體:例如線程堆棧、代碼緩存、垃圾收集空間等,它是total process memory的一個capped fractionated component min 預設 192 mb max 預設 1 gb fraction 預設 0.1 |
配置JVM堆(Heap)
如前所述,為JobManager設置記憶體的另一種方法是顯式指定JVM Heap大小 (jobmanager.memory.heap.size
)。它提供了對可用的JVM堆的更多控制,該堆由以下用戶使用:
- Flink框架
- 在作業提交期間(例如,對於某些批處理源)或檢查點完成回調中執行的用戶代碼
所需的JVM堆大小主要由正在運行的作業的數量、作業的結構以及對所提到的用戶代碼的要求決定。
註意 如果已顯式配置了JVM堆,則建議既不設置總進程記憶體(total process memory),也不設置總Flink記憶體(total Flink memory)。否則,很容易導致記憶體配置衝突。
Flink腳本和CLI在啟動JobManager進程時通過JVM參數-Xms和-Xmx設置JVM堆大小
配置堆外記憶體(Off-heap Memory)
堆外記憶體組件可用於任何類型的JVM直接記憶體和本地記憶體使用。因此,還可以通過設置 jobmanager.memory.enable-jvm-direct-memory-limit
來啟用JVM直接記憶體(JVM Direct Memory)限制 。如果配置了此選項,Flink將通過相應的JVM參數:-XX:MaxDirectMemorySize 將限制設置為堆外記憶體大小。
此組件的大小可以由jobmanager.memory.off-heap.size
配置。可以調整此選項,例如,如果JobManager進程拋出“OutOfMemoryError:Direct buffer memory”
堆外記憶體消耗可能來源以下:
- Flink框架依賴關係(例如Akka網路通信)
- 在作業提交期間(例如,對於某些批處理源)或檢查點完成回調中執行的用戶代碼
註意 如果已顯示配置 Total Flink Memory和JVM Heap,但尚未配置堆外(Off-heap)記憶體,則堆外記憶體的大小將派生為Total Flink memory - JVM Heap。堆外記憶體選項的預設值將被忽略
本地執行
如果在本地(例如從IDE)運行Flink而沒有創建集群,那麼JobManager記憶體配置選項將被忽略。
參考鏈接
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/memory/mem_setup/
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/memory/mem_setup_tm/
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config
作者:授客
微信/QQ:1033553122
全國軟體測試QQ交流群:7156436
Git地址:https://gitee.com/ishouke
友情提示:限於時間倉促,文中可能存在錯誤,歡迎指正、評論!
作者五行缺錢,如果覺得文章對您有幫助,請掃描下邊的二維碼打賞作者,金額隨意,您的支持將是我繼續創作的源動力,打賞後如有任何疑問,請聯繫我!!!
微信打賞
支付寶打賞 全國軟體測試交流QQ群