天天看點

Flink TaskManager 記憶體管理機制介紹與調優總結概要TaskManager 記憶體分區總覽TaskManager 各記憶體區域詳解參考閱讀

概要

Flink 的新版記憶體管理機制,要追溯到 2020 年初釋出的 Flink 1.10 版本。當時 Flink 社群為了實作三大目标:

  1. 流和批模式下記憶體管理的統一,即同一套記憶體配置既可用于流作業也可用于批作業
  2. 管控好 RocksDB 等外部元件的記憶體,避免在容器環境下用量不受控導緻被 KILL
  3. 消除不同部署模式下配置參數的歧義,消除 cut-off 等參數語義模糊的問題

提出了兩個設計提案 FLIP-49: Unified Memory Configuration for TaskExecutors 1 和 FLIP-116: Unified Memory Configuration for Job Managers 2,以對之前 Flink 記憶體模型的各項缺陷進行了針對性的重構,為後續的流批一體演進奠定了基礎。

由于這個版本距今已有兩年多的曆史,網上對其記憶體模型的解讀文章也不勝枚舉,他們有的對提案進行了中文化的翻譯,有的則是對每個參數進行逐一講解,幫助大家了解 Flink 的記憶體配置方法 3。

本文則是上述簡介文章的進一步延展:在新版記憶體管理模型的基礎上,介紹每個區域的技術原理、相關技術資料,以及線上的調優經驗,幫助大家在實際應用場景下,更好地規劃 Flink 的記憶體空間,”知其然,也知其是以然“,提前識别和消除隐患。

TaskManager 記憶體分區總覽

我們從 Flink 官網文檔的 記憶體分區圖 5 開始介紹 ,并加以批注:圖的左邊标注了每個區域的配置參數名,右邊則是一個調優後的、使用 HashMapStateBackend 的作業記憶體各區域的容量限制:它和預設配置的差別在于 Managed Memory 部分被主動調整為 0,後面我們會講解何時需要調整各區域的大小,以最大化利用記憶體空間。

Flink TaskManager 記憶體管理機制介紹與調優總結概要TaskManager 記憶體分區總覽TaskManager 各記憶體區域詳解參考閱讀

Flink 新版記憶體模型

TaskManager 各記憶體區域詳解

接下來,我們詳細來看一下各個記憶體區域的含義、技術原理,以及 Flink 對它的預設值在什麼場景下需要調整。

JVM 程序總記憶體(Total Process Memory)

該區域表示在容器環境下,TaskManager 所在 JVM 的最大可用的記憶體配額,包含了本文後續介紹的所有記憶體區域,超用時可能被強制結束程序。我們可以通過

taskmanager.memory.process.size

參數控制它的大小。

例如我們設定 JVM 程序總記憶體為 4G,TaskManager 運作在 Kubernetes 平台,則 Pod 配置的 spec -> resources -> limits -> memory 項會被設定為 4Gi,源碼見

org.apache.flink.kubernetes.kubeclient.decorators.InitTaskManagerDecorator#decorateMainContainer

,運作時的 YAML 配置如下圖:

Flink TaskManager 記憶體管理機制介紹與調優總結概要TaskManager 記憶體分區總覽TaskManager 各記憶體區域詳解參考閱讀

Kubernetes Pod 記憶體設定

而對于 YARN,如果

yarn.nodemanager.pmem-check-enabled

設為

true

, 則也會在運作時定期檢查容器内的程序是否超用記憶體。

如果程序總記憶體用量超出配額,容器平台通常會直接發送最嚴格的 SIGKILL 信号(相當于

kill -9

)來中止 TaskManager,此時不會有任何延期退出的機會,可能會造成作業崩潰重新開機、外部系統資源無法釋放等嚴重後果。

是以,在 有硬性資源配額檢查 的容器環境下,請務必妥善設定該參數,對作業充分壓測後,盡可能預留一部分安全餘量,避免 TaskManager 頻繁被 KILL 而導緻的作業頻繁重新開機。

Flink 總記憶體(Total Flink Memory)

該記憶體區域指的是 Flink 可以控制的記憶體區域,即上述提到的 JVM 程序總記憶體 減去 Flink 無法控制的 Metaspace(元空間)和 Overhead(運作時開銷)區域。Flink 随後又把這部分記憶體區域劃分為堆内、堆外(Direct)、堆外(Managed)等不同子區域,後面我們會逐一講解他們的配置指南。

對于沒有硬性資源限制的環境,我們建議使用

taskmanager.memory.flink.size

參數來配置 Flink 總記憶體的大小,然後 Flink 自己也會會自動根據參數,計算得到各個子區域的配額。如果作業運作正常,則無需單獨調整。

例如 4G 的 程序總記憶體 配置下,JVM 運作時開銷(Overhead)占 程序總記憶體 的 10% 但最多 1G(下圖是 409.6M),元空間(Metaspace)占 256M;堆外直接(Direct)記憶體網絡緩存占 Flink 總記憶體 的 10% 但最多 1G(下圖是 343M),架構堆和架構堆外各占 128M,堆外管控(Managed)記憶體占 Flink 總記憶體 的 40%(下圖是 1372M 即 1.34G),其他空間留給任務堆,即使用者程式代碼可以使用的記憶體空間(1459M 即 1.42G),我們接下來會講到它。

Flink TaskManager 記憶體管理機制介紹與調優總結概要TaskManager 記憶體分區總覽TaskManager 各記憶體區域詳解參考閱讀

Flink TaskManager 作業記憶體區域用量

JVM 堆記憶體(JVM Heap Memory)

堆記憶體大家想必都不陌生,它是由 JVM 提供給使用者程式運作的記憶體區域,JVM 會按需運作 GC(垃圾回收器),協助清理失效對象。

當任務啟動時,

ProcessMemoryUtils#generateJvmParametersStr

方法會通過

-Xmx

-Xms

參數設定堆記憶體的最大容量。

Flink 将堆記憶體從邏輯上劃分為 ”架構堆“、”任務堆“ 兩個子區域,分别通過

taskmanager.memory.framework.heap.size

taskmanager.memory.task.heap.size

來指定其大小:架構堆預設是 128m,任務堆如果未顯式設定其大小,則會通過扣減其他區域配額來計算得到。例如對于 4G 的程序總記憶體,扣除了其他區域後,任務堆可用的隻有不到 1.5G。

但需要注意的是,Flink 自身并不能精确控制架構自身及任務會用多少堆記憶體,是以上述配置項隻提供理論上的計算依據。如果實際用量超出配額,且 JVM 難以回收對象釋放空間,則會抛出 OutOfMemoryError,此時 Flink TaskManager 會退出,導緻作業崩潰重新開機。是以對于堆記憶體的監控是必須要配置的,當堆記憶體用量超過一定比率,或者 Full GC 時長和次數明顯增長時,需要盡快介入并考慮擴容。

進階内容:對于使用 HashMapStateBackend(舊版本稱之為 FileSystem StateBackend)的流作業使用者,如果在程序總記憶體固定的前提下,希望盡可能提升任務堆的空間,則可以減少 托管記憶體(Managed Memory)的比例。我們接下來也會講到它。

JVM 堆外記憶體(JVM Off-Heap Memory)

廣義上的 堆外記憶體 指的是 JVM 堆之外的記憶體空間,而我們這裡特指 JVM 程序總記憶體除了元空間(Metaspace)和運作時開銷(Overhead)以外的記憶體區域。因為上述兩個區域是 JVM 自行管理,Flink 無法介入,我們後面單獨劃分和講解。

托管記憶體(Managed Memory)

文章開頭的總覽圖中,把托管記憶體區域設為 0,此時任務堆空間約 3G;而使用 Flink 預設配置時,任務堆隻有 1.5G。這是因為預設情況下,托管記憶體占了 40% 的 Flink 總記憶體,導緻堆記憶體可用的量變的相當少。是以我們非常有必要了解什麼是托管記憶體。

從官方文檔和 Flink 源碼上來看,托管記憶體主要有三大使用場景:

  1. 批處理算法,例如排序、HashJoin 等。他們會從 Flink 的 MemoryManager 請求記憶體片段(MemorySegment),而 MemoryManager 則會調用

    UNSAFE.allocateMemory

    配置設定堆外記憶體。
  2. RocksDB StateBackend,Flink 隻會預留一部分空間并扣除預算,但是不介入實際記憶體配置設定。是以該類型的記憶體資源被稱為

    OpaqueMemoryResource

    . 實際的記憶體配置設定還是由 JNI 調用的 RocksDB 自己通過 malloc 函數申請。
  3. PyFlink。與 JNI 類似,在與 Python 程序互動的過程中,也會用到一部分托管記憶體。

顯然,對于普通的流式 SQL 作業,如果啟用了 RocksDB 狀态後端時,才會大量使用托管記憶體。是以如果您的業務場景并未用到 RocksDB,那麼可以調小托管記憶體的相對比例(

taskmanager.memory.managed.fraction

)或絕對大小(

taskmanager.memory.managed.size

),以增大任務堆的空間。

對于 RocksDB 作業,之是以配置設定了 40% Flink 總記憶體,是因為 RocksDB 的記憶體用量實在是一個很頭疼的問題。早在 2017 年,就有 FLINK-7289: Memory allocation of RocksDB can be problematic in container environments 6 這個問題單,随後社群對此做了大量的工作(通過 LRUCache 參數、增強 WriteBufferManager 的 Slot 内空間複用等),來盡可能地限制 RocksDB 的總記憶體用量。在我之前的 Flink on RocksDB 參數調優指南 7 文章中,也有提到 RocksDB 記憶體調優的各項參數,其中 MemTable、Block Cache 都是托管記憶體空間的用量大戶。

為了避免手動調優的繁雜,Flink 新版記憶體管理預設将

state.backend.rocksdb.memory.managed

參數設為

true

,這樣就由 Flink 來計算 RocksDB 各部分需要用多少記憶體 8,這也是 ”托管“ 的含義所在。如果仍然希望精細化手動調整 RocksDB 參數,則需要将上述參數設為

false

.

直接記憶體(Direct Memory)

直接記憶體是 JVM 堆外的一類記憶體,它提供了相對安全可控但又不受 GC 影響的空間,JVM 參數是

-XX:MaxDirectMemorySize

. 它主要用于

  1. 架構自身(

    taskmanager.memory.framework.off-heap.size

    參數,預設 128M,例如 Sort-Merge Shuffle 算法所需的記憶體)
  2. 使用者任務(

    taskmanager.memory.task.off-heap.size

    參數,預設設為 0)
  3. Netty 對 Network Buffer 的網絡傳輸(

    taskmanager.memory.network.fraction

    等參數,預設 0.1 即 10% 的 Flink 總記憶體)。

在生産環境中,如果作業并行度非常大(例如大于 500 甚至 1000),則需要調大

taskmanager.network.memory.floating-buffers-per-gate

taskmanager.network.memory.max-buffers-per-channel

(例如從 8 調整到 1000)和

taskmanager.network.memory.buffers-per-channel

(例如從 2 調整到 500),避免 Network Buffer 不足導緻作業報錯。相關原理說明可以參見 這篇文章.

JVM 元空間(JVM Metaspace)

JVM Metaspace 主要儲存了加載的類和方法的中繼資料,Flink 配置的參數是

taskmanager.memory.jvm-metaspace.size

,預設大小為 256M,JVM 參數是

-XX:MaxMetaspaceSize

.

如果使用者編寫的 Flink 程式中,有大量的動态類加載的需求,例如我們之前遇到過一個使用者作業,動态編譯并加載了 44 萬個類,此時就容易出現元空間用量遠超預期,發生 OOM 報錯。此時就需要适當調大元空間的大小,或者優化使用者程式,及時解除安裝無用的 Classloader。

JVM 運作時開銷(JVM Overhead)

除了上述描述的記憶體區域外,JVM 自己還有一小塊 ”自留地“,用來存放線程棧、編譯的代碼緩存、JNI 調用的庫所配置設定的記憶體等等,Flink 配置參數是

taskmanager.memory.jvm-overhead.fraction

,預設是 JVM 總記憶體的 10%。

對于舊版本(1.9 及之前)的 Flink,RocksDB 通過 malloc 配置設定的記憶體也屬于 Overhead 部分,而新版 Flink 把這部分歸類到托管記憶體(Managed),但由于 FLINK-15532 Enable strict capacity limit for memory usage for RocksDB 9 問題仍未解決,RocksDB 仍然會少量超用一部分記憶體。

是以在生産環境下,如果 RocksDB 頻繁造成記憶體超用,除了調大 Managed 托管記憶體外,也可以考慮調大 Overhead 區空間,以留出更多的安全餘量。

參考閱讀

1(https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%253A+Unified+Memory+Configuration+for+TaskExecutors)

2(https://cwiki.apache.org/confluence/display/FLINK/FLIP-116%253A+Unified+Memory+Configuration+for+Job+Managers)

3(https://www.jianshu.com/p/96364463c831)

4(https://zhuanlan.zhihu.com/p/141120042)

5(https://nightlies.apache.org/flink/flink-docs-master/zh/docs/deployment/memory/mem_setup_tm/#%25e5%2586%2585%25e5%25ad%2598%25e6%25a8%25a1%25e5%259e%258b%25e8%25af%25a6%25e8%25a7%25a3)

6(https://issues.apache.org/jira/browse/FLINK-7289)

7(https://cloud.tencent.com/developer/article/1592441)

8(https://www.jianshu.com/p/47a40259a450)

9(https://issues.apache.org/jira/browse/FLINK-15532)