1 JVM
在大資料領域中,有很多開源架構(Hadoop、Spark、Storm)等都是基于 JVM 運作,可見 JVM 在大資料領域扮演的重要角色,是以在了解 Flink 記憶體時,我們需要先了解一下 JVM 。
JVM 是可運作 Java 代碼的假想計算機 ,包括程式計數器、Java 虛拟機棧、本地方法棧、Java 堆 和方法區。JVM 是運作在作業系統之上的,它與硬體沒有直接的互動。
1.1 JVM 資料運作區
Java 虛拟機在執行 Java 程式的過程中會把它在主存中管理的記憶體部分劃分成多個區域,每個區域存放不同類型的資料。如下圖所示:
1、程式計數器:是一個資料結構,用于儲存目前正常執行的程式的記憶體位址。
Java 虛拟機的多線程就是通過線程輪流切換并配置設定處理器時間來實作的,為了線程切換後能恢複到正确的位置,每條線程都需要一個獨立的程式計數器,互不影響,該區域線程私有。
2、Java 虛拟機棧:與線程生命周期相同,用于存儲局部變量表,操作棧,方法傳回值。局部變量表放着基本資料類型,還有對象的引用,該區域線程私有。
3、本地方法棧:跟虛拟機棧很像,不過它是為虛拟機使用到的 Native 方法服務,該區域線程私有。
4、方法區:儲存虛拟機加載的類資訊,常量,靜态變量,編譯後的代碼,該區域線程共享。
5、Java 堆:存放所有對象的執行個體。這一塊區域在 Java 虛拟機啟動的時候被建立,該區域被所有線程所共享,同時也是垃圾收集器的主要工作區域,是以這一部分區域除了被叫 堆内記憶體以外,也被叫做 GC 堆(Garbage Collected Heap)。
1.2 堆内記憶體(on-heap memory)
堆内記憶體是 Java 垃圾收集器的主要工作區域,為了提高垃圾回收的效率,在堆内記憶體的内部又劃分出了新生代、老年代、永久代。在新生代記憶體中又按照 8:1:1 的比例劃分出了 Eden、Survivor1、Survivor2 三個區域。
- 新生代:新生代有一個 Eden 區和兩個 Survivor 區,首先将對象放入 Eden 區,如果空間不足就向 Survivor1 區上放,觸發一次 minor GC ,如果仍然放不下就将存活的對象放入 Survivor2 區中,然後清空 Eden 和 Survivor1 區的記憶體。在某次 GC 過程中,如果發現仍然又放不下的對象,就将這些對象放入老年代記憶體裡去。
- 老年代:大對象以及長期存活的對象直接進入老年代。
- 永久代:永久存儲區是一個常駐記憶體區域,用于存放 JDK 自身所攜帶的 Class、Interface 的中繼資料,也就是說它存儲的是運作環境必須的類資訊,被裝載進此區域的資料是不會被垃圾回收器回收掉的,關閉 JVM 才會釋放此區域所占用的記憶體。
如果出現 java.lang.OutOfMemoryError: PermGen space,說明是 Java 虛拟機對永久代 Perm 記憶體設定不夠。
1.3 GC 算法
由于堆内記憶體處理是容易出現問題的地方,忘記或者錯誤的記憶體回收會導緻程式或系統的不穩定甚至崩潰,Java 就提供 GC 功能自動監測對象是否超過作用域進而達到自動回收記憶體的目的。
關于堆記憶體和永久區的垃圾回收,Java 提供的 GC 算法包含:引用計數法,标記-清除算法,複制算法,标記-壓縮算法,分代收集算法
- 引用計數法:引用計數器的實作很簡單,對于一個對象 A,隻要有任何一個對象引用了 A,則 A 的引用計數器就加 1,當引用失效時,引用計數器就減 1。隻要對象 A 的引用計數器的值為 0,則對象 A 就不可能再被使用。
- 缺點:1、無法處理循環引用情況,會造成記憶體洩漏。 2、對系統性能産生影響。
- 标記-清除算法:将垃圾回收分為兩個階段:标記階段和清除階段,首先标記出所有需要回收的對象,在标記完成後統一回收所有被标記的對象。
- 缺點:1、效率問題,2、空間問題。标記清除之後會産生大量不連續的記憶體碎片,空間碎片太多會導緻以後程式在運作過程中需要配置設定較大對象時,無法找到足夠的連續記憶體而提前觸發另一次垃圾收集動作。
- 複制算法:将可用記憶體按容量劃分為大小相等的兩塊,每次隻試用其中的一塊,當這一塊記憶體用完時,将存活的對象複制到另外一塊記憶體上面,然後清除使用記憶體中的所有對象。 适用于初生代。
- 标記壓縮算法:首先标記出所有需要回收的對象,然後讓所有存活的對象都向一端移動,然後清理掉端邊界以外的記憶體。适用于老年代
- 分代收集算法:初生代使用複制算法,老年代使用标記壓縮算法。
1.4 堆外記憶體(off-heap memory)
雖然 Java 提供了多種算法進行垃圾回收,但仍然無法徹底解決堆内記憶體過大帶來的長時間的 GC 停頓的問題,以及作業系統對堆内記憶體不可知的問題。
基于上述問題,Java 虛拟機開辟出了堆外記憶體(off-heap memory)。堆外記憶體意味着把一些對象的執行個體配置設定在 Java 虛拟機堆内記憶體以外的記憶體區域,這些記憶體直接受作業系統(而不是虛拟機)管理。這樣做的結果就是能保持一個較小的堆,以減少垃圾收集對應用的影響。同時因為這部分區域直接受作業系統的管理,别的程序和裝置(例如 GPU )可以直接通過作業系統對其進行通路,減少了從虛拟機中複制記憶體資料的過程。
import java.nio.ByteBuffer;
import sun.nio.ch.DirectBuffer;
public class TestDirectByBuffer {
public static void main(String[] args) throws Exception {
while (true) {
ByteBuffer buffer = ByteBuffer.allocateDirect(128 * 1024 * 1024)
}
}
}
1234567891011
優點:
- 可以很友善的自主開辟很大的記憶體空間,對大記憶體的伸縮性很好;
- 減少垃圾回收帶來的系統停頓時間;
- 直接受作業系統控制,可以直接被其他程序和裝置通路,減少了原本從虛拟機複制的過程;
- 特别适合那些配置設定次數少,讀寫操作很頻繁的場景。
缺點:
- 容易出現記憶體洩漏,并且很難排查;
- 堆外記憶體的資料結構不直覺,當存儲結構複雜的對象時,會浪費大量的時間對其進行串行化。
1.5 堆外記憶體與堆内記憶體聯系
雖然堆外記憶體本身不受垃圾回收算法的管轄,但是因為其是由 ByteBuffer 所創造出來的,是以這個 buffer 自身作為一個執行個體化的對象,其自身的資訊(例如堆外記憶體在主存中的起始位址等資訊)必須存儲在堆内記憶體中,具體情況如下圖所示。
1.6 JVM 記憶體管理缺陷
由于在 JVM 記憶體中存儲大量的資料 (包括緩存和高效處理)時,JVM 記憶體會面臨很多問題,包括如下:
- Java 對象存儲密度低。Java 的對象在記憶體中存儲包含 3 個主要部分:對象頭、執行個體資料、對齊填充部分。
- 例如,一個隻包含 boolean 屬性的對象占 16byte:對象頭占 8byte, boolean 屬性占 1byte,為了對齊達到 8 的倍數額外占 7byte。而實際上隻需要一個 bit(1/8 位元組)就夠了。
- Full GC 會極大地影響性能。尤其是為了處理更大資料而開了很大記憶體空間的 JVM 來說,GC 會達到秒級甚至分鐘級。
- OOM 問題影響穩定性。OutOfMemoryError 是分布式計算架構經常會遇到的問題, 當 JVM 中所有對象大小超過配置設定給 JVM 的記憶體大小時,就會發生 OutOfMemoryError 錯誤, 導緻 JVM 崩潰,分布式架構的健壯性和性能都會受到影響。
- 緩存未命中問題。CPU 進行計算的時候,是從 CPU 緩存中擷取資料。現代體系的 CPU 會有多級緩存,而加載的時候是以 Cache Line 為機關加載。如果能夠将對象連續存儲, 這樣就會大大降低 Cache Miss。使得 CPU 集中處理業務,而不是空轉。
2 Flink 記憶體管理
基于 JVM 記憶體存在一些問題,并且在大資料場景下,無法在記憶體中存儲海量資料,計算效率無法提高。Flink 社群采用自主記憶體管理設計。
Flink 并不是将大量對象存在堆記憶體上,而是将對象都序列化到一個預配置設定的記憶體塊上, 這個記憶體塊叫做 MemorySegment,它代表了一段固定長度的記憶體(預設大小為 32KB),也是 Flink 中最小的記憶體配置設定單元,并且提供了非常高效的讀寫方法,很多運算可以直接操作 二進制資料,不需要反序列化即可執行。每條記錄都會以序列化的形式存儲在一個或多個 MemorySegment 中。如果需要處理的資料多于可以儲存在記憶體中的資料,Flink 的運算符會将部分資料溢出到磁盤。
2.1 Flink 記憶體模型
Flink 總體記憶體類圖如下:
主要包含 JobManager 記憶體模型和 TaskManager 記憶體模型。
2.2 JobManager 記憶體模型
Flink JobManager記憶體類圖如虛線部分:
在 1.11 中,Flink 對 JM 端的記憶體配置進行了修改,使它的選項和配置方式與 TM 端的配置方式保持一緻。
配置JobManager的總程序記憶體
jobmanager.heap.size:1024m
jobmanager.memory.process.size:4096m
jobmanager.memory.heap.size:2048m
jobmanager.memory.off-heap.size:1536m
12345678910111213141516
2.3 TaskManager 記憶體模型
TaskManager 記憶體模型如下圖所示:
TaskManager 記憶體模型一共包含 3大部分,分别為總體記憶體、JVM Heap 堆上記憶體、Off-Heap 堆外記憶體等。
2.3.1 總體記憶體
1、Total Process Memory:Flink Java 應用程式(包括使用者代碼)和 JVM 運作整個程序所消耗的總記憶體。
總程序記憶體(Total Process Memory) = Flink 總記憶體 + JVM 元空間 + JVM 執行開銷
# 配置項:預設size: 1728mb
taskmanager.memory.process.size: 1728m
12
2、Total Flink Memory:僅 Flink Java 應用程式消耗的記憶體,包括使用者代碼,但不包括 JVM 為其運作而配置設定的記憶體。
Flink 總記憶體 = Framework堆内外 + task 堆内外 + network + managed Memory
配置項:預設size: 1280mb
taskmanager.memory.flink.size: 1280mb
12
2.3.2 JVM Heap (JVM 堆上記憶體)
1、Framework Heap :架構堆記憶體
Flink架構本身使用的記憶體,即TaskManager本身所占用的堆上記憶體,不計入Slot的資源中。
配置參數:預設128MB
taskmanager.memory.framework.heap.size=128MB
12
2、Task Heap : 任務堆記憶體
如果記憶體大小沒有指定,它将被推導出為總 Flink 記憶體減去架構堆記憶體、架構堆外記憶體、任務堆外記憶體、托管記憶體和網絡記憶體。
Task執行使用者代碼時所使用的堆上記憶體。
配置參數:
taskmanager.memory.task.heap.size
12
2.3.3 Off-Heap Mempry(JVM 堆外記憶體)
1、Managed memory: 托管記憶體
由 Flink 管理的原生托管記憶體,保留用于排序、哈希表、中間結果緩存和 RocksDB 狀态後端。
托管記憶體由 Flink 管理并配置設定為原生記憶體(堆外)。以下工作負載使用托管記憶體:
流式作業可以将其用于 RocksDB 狀态後端。流和批處理作業都可以使用它進行排序、哈希表、中間結果的緩存。流作業和批處理作業都可以使用它在 Python 程序中執行使用者定義的函數。
托管記憶體配置時如果兩者都設定,則大小将覆寫分數。如果大小和分數均未明确配置,則将使用預設分數。
配置參數:預設size:128mb,fraction:0.4
taskmanager.memory.managed.size=128mb
taskmanager.memory.managed.fraction=0.4
123
2、DirectMemory:JVM 直接記憶體
- Framework Off-Heap Memory:Flink 架構堆外記憶體。
- 即 TaskManager 本身所占用的對外記憶體,不計入 Slot 資源。
- 配置參數:預設128MB taskmanager.memory.framework.off-heap.size=128MB
- 1
- 2
- Task Off-Heap :Task 堆外記憶體。
- 專用于Flink 架構的堆外直接(或本機)記憶體。
- 配置參數:預設0 taskmanager.memory.task.off-heap.size=0
- 1
- 2
- Network Memory:網絡記憶體。
- 網絡資料交換所使用的堆外記憶體大小,如網絡資料交換 緩沖區。
- 配置參數:預設min:64mb,max:1gb,fraction:0.1 taskmanager.memory.network.min: 64mb taskmanager.memory.network.max: 1gb taskmanager.memory.network.fraction: 0.1
- 1
- 2
- 3
- 4
3、JVM metaspace:JVM 元空間。
Flink JVM 程序的元空間大小,預設為256MB。
配置參數:預設size:256mb
taskmanage.memory.jvm-metaspace.size:256mb
12
4、JVM Overhead:JVM執行開銷。
JVM 執行時自身所需要的内容,包括線程堆棧、IO、 編譯緩存等所使用的記憶體,這是一個上限分級成分的的總程序記憶體。
配置參數:
taskmanager.memory.jvm-overhead.min=192mb
taskmanager.memory.jvm-overhead.max=1gb
taskmanager.memory.jvm-overhead.fraction=0.1
1234
3 Flink 記憶體資料結構
Flink 的記憶體管理和作業系統管理記憶體一樣,将記憶體劃分為記憶體段、記憶體頁等結構。
3.1 Flink 記憶體段
記憶體段在 Flink 内部叫 MemorySegment,是 Flink 中最小的記憶體配置設定單元,預設大小 32KB。它既可以是堆上記憶體(Java 的 byte 數組),也可以是堆外記憶體(基于 Netty 的 DirectByteBuffer),同時提供了對二進制資料進行讀取和寫入的方法。
HeapMemorySegment:用來配置設定堆上記憶體 ;
HybridMemorySegment:用來配置設定堆外記憶體和堆上記憶體; 2017 年以後的版本實 際上隻使用了 HybridMemorySegment。
通過一個案例介紹Flink在序列化和反序列化過程中如何使用 MemorySegment:
如上圖所示,當建立一個Tuple 3 對象時,包含三個層面,一是 int 類型,一是 double 類型,還有一個是 Person。Person對象包含兩個字段,一是 int 型的 ID,另一個是 String 類型的 name,
(1)在序列化操作時,會委托相應具體序列化的序列化器進行相應的序列化操作。從圖中可以看到 Tuple 3 會把 int 類型通過 IntSerializer 進行序列化操作,此時 int 隻需要占用四個位元組。
(2)Person 類會被當成一個 Pojo 對象來進行處理,PojoSerializer 序列化器會把一些屬性資訊使用一個位元組存儲起來。同樣,其字段則采取相對應的序列化器進行相應序列化,在序列化完的結果中,可以看到所有的資料都是由 MemorySegment 去支援。
3.2 Flink 記憶體頁
記憶體頁是 MemorySegment 之上的資料通路視圖,資料讀取抽象為 DataInputView,資料寫入抽象為 DataOutputView。使用時就無需關心 MemorySegment 的細節,該層會自動處理跨 MemorySegment 的讀取和寫入。
3.2.1 DataInputView
DataInputView 是從 MemorySegment 資料讀取抽象視圖,繼承自 java.io.DataInput。InputView 中持有多個 MemorySegment 的引用(MemorySegment[]),這一組 MemorySegment 被視為一個記憶體頁(Page),可以順序讀取 MemorySegment 中的資料。如下圖為繼承關系圖:
3.2.2 DataInputView
DataOutputView 是從 MemorySegment 資料讀取抽象視圖,繼承自java.io.DataOutput。OutputView 中持有多個 MemorySegment 的引用(MemorySegment[]),這一組 MemorySegment 被視為一個記憶體頁(Page),可以順序地向 MemorySegment 中寫入資料。如下圖為繼承關系圖:
3.2.3 Buffer
Buffer 是具有引用計數的 MemorySegment 執行個體的包裝器。用來将上遊 Task 算子處理完畢的結果交給下遊時定義的一個抽象或者記憶體對象。
Buffer 的接口是網絡層面上傳輸資料和事件的統一抽象,其實作類是 NetworkBuffer。Flink 在各個 TaskManger 之間傳遞資料時,使用的是這一層的抽象。1個 NetworkBuffer 中包裝了一個 MemorySegment 。Buffer接口類圖如下:
Buffer 的底層是 MemorySegment,Buffer 申請和釋放由 Flink 自行管理,Flink 引入了引用數的概念。當有新的 Buffer 消費者時,引用數加 1,當消費者消費完 Buffer 時,引用數減 1,最終當引用數變為 0 時,就可以将 Buffer 釋放重用了。
3.2.4 Buffer 資源池
Buffer 資源池在 Flink 中叫作 BufferPool。BufferPool 用來管理 Buffer,包含 Buffer 的申請、釋放、銷毀、可用 Buffer 的通知等,其實作類是 LocalBufferPool ,每個 Task 擁有自己的 LocalBufferPool 。
BufferPool 的類體系如下:
4 網絡緩沖器(NetworkBuffer)
網絡緩沖器 (NetworkBuffer)是網絡交換資料的包裝,其對應于 MemorySegment 記憶體段。
Network Buffer,顧名思義,就是在網絡傳輸中使用到的 Buffer(實際非網絡傳輸也會用到)。Flink 經過網絡傳輸的上下遊 Task 的設計會比較類似生産者 - 消費者模型。
如果沒有這個緩沖區,那麼生産者或消費者會消耗大量時間在等待下遊拿資料和上遊發資料的環節上。加上這個緩沖區,生産者和消費者解耦開,任何一方短時間内的抖動理論上對另一方的資料處理都不會産生太大影響。如下圖所示:
這是對于單程序内生産者-消費者模型的一個圖示,事實上,如果兩個 Task 在同一個 TaskManager 内,那麼使用的就是上述模型,
對于不同 TM 内、或者需要跨網絡傳輸的 TM 之間,利用生産者-消費者模型來進行資料傳輸的原理圖如下:
可以看到,在 Netty Server 端,buffer 隻存在 LocalBufferPool 中,subpartition 自己并沒有緩存 buffer 或者獨享一部分 buffer,而在接收端,channel 有自己獨享的一部分 buffer(Exclusive Buffers),也有一部分共享的 buffer(Floating Buffers),是以,Network Buffer 的使用同時存在于發送端和接收端。
由此可見,TaskManager 内需要的 buffers 數量等于這個 TaskManager 内的所有 Task 中的發送端和接收端使用到的 network buffer 總和。明确了 Network Buffer 使用的位置,我們可以結合一些參數計算出作業實際所需的 NetworkBuffer 數量。
5 Flink 記憶體調優
了解了 Flink JobManager Memory 和 TaskManager Memory的記憶體模型和資料結構之後,應該針對不同的部署情況,配置不同的記憶體,下面我們針對不同的部署方式介紹記憶體如何調優。
5.1 為 Standalone 配置記憶體
建議為 Standalone 配置 Flink 總記憶體,設定 JobManager 和 TaskManager 的 flink.size 大小,聲明為 Flink 本身提供了多少記憶體。配置方式如下:
參數配置:
taskmanager.memory.flink.size:
jobmanager.memory.flink.size:
123
5.2 為 Containers(容器) 配置記憶體
建議為容器化部署(Kubernetes或Yarn)配置總程序記憶體,設定 process.size 大小,它聲明了總共應該配置設定多少記憶體給 Flink JVM 程序,并對應于請求容器的大小。配置方式如下:
參數配置:
taskmanager.memory.process.size:
jobmanager.memory.process.size:
123
注意:如果你配置了總 Flink 記憶體, Flink 會隐式添加 JVM 記憶體元件來推導總程序記憶體,并請求一個具有該推導大小的記憶體的容器。
警告:如果 Flink 或使用者代碼配置設定超出容器大小的非托管堆外(本機)記憶體,作業可能會失敗,因為部署環境可能會殺死有問題的容器。
5.3 為 state backends(狀态後端)配置記憶體
為 state backends(狀态後端)配置記憶體時,這僅與 TaskManager 相關。
在部署 Flink 流應用程式時,所使用的狀态後端類型将決定叢集的最佳記憶體配置。
5.3.1 HashMap 狀态後端
運作無狀态作業或使用 HashMapStateBackend 時,将托管記憶體設定為零。這将確定為 JVM 上的使用者代碼配置設定最大數量的堆記憶體。配置如下:
配置參數:設定size:0
taskmanager.memory.managed.size:0
12
5.3.2 RocksDB 狀态後端
該 EmbeddedRocksDBStateBackend 使用本機記憶體。預設情況下,RocksDB 設定為将本機記憶體配置設定限制為托管記憶體的大小。是以,為你的狀态保留足夠的托管記憶體非常重要。如果禁用預設的 RocksDB 記憶體控制,RocksDB 配置設定的記憶體超過請求的容器大小(總程序記憶體)的限制,則可以在容器化部署中終止 TaskManager 。
5.4 為 batch Job(批處理作業)配置記憶體
為 batch Job(批處理作業)配置記憶體時,這僅與 TaskManager 相關。
Flink 的批處理操作符利用托管記憶體來更高效地運作。這樣做時,可以直接對原始資料執行某些操作,而無需反序列化為 Java 對象。這意味着托管記憶體配置對應用程式的性能有實際影響。Flink 将嘗試配置設定和使用 為批處理作業配置的盡可能多的托管記憶體,但不會超出其限制。這可以防止 OutOfMemoryError’s,因為 Flink 準确地知道它必須利用多少記憶體。如果托管記憶體不足,Flink 會優雅地溢出到磁盤。
6 故障排除
6.1 非法配置異常
如果看到從 TaskExecutorProcessUtils 或 JobManagerProcessUtils抛出的 IllegalConfigurationException,通常表明存在無效的配置值(例如負記憶體大小、大于 1 的分數等)或配置沖突。請重新配置記憶體參數。
6.2 Java 堆空間異常
如果報 OutOfMemoryError: Java heap space 異常,通常表示 JVM Heap 太小。可以嘗試通過增加總記憶體來增加 JVM 堆大小。也可以直接為 TaskManager 增加任務堆記憶體或為 JobManager 增加 JVM 堆記憶體。
還可以為 TaskManagers 增加架構堆記憶體,但隻有在确定 Flink 架構本身需要更多記憶體時才應該更改此選項。
6.3 直接緩沖存儲器異常
如果報 OutOfMemoryError: Direct buffer memory 異常,通常表示 JVM直接記憶體限制太小或存在直接記憶體洩漏。檢查使用者代碼或其他外部依賴項是否使用了 JVM 直接記憶體,以及它是否被正确考慮。可以嘗試通過調整直接堆外記憶體來增加其限制。可以參考如何為 TaskManagers、 JobManagers 和 Flink 設定的JVM 參數配置堆外記憶體。
6.4 元空間異常
如果報 OutOfMemoryError: Metaspace 異常,通常表示 JVM 元空間限制配置得太小。可以嘗試加大 JVM 元空間 TaskManagers 或JobManagers 選項。
6.5 網絡緩沖區數量不足
如果報 IOException: Insufficient number of network buffers 異常,這僅與 TaskManager 相關。通常表示配置的網絡記憶體大小不夠大。可以嘗試通過調整以下選項來增加網絡記憶體:
參數配置:
taskmanager.memory.network.min
taskmanager.memory.network.max
taskmanager.memory.network.fraction
1234
6.6 超出容器記憶體異常
這個對應 5.2 節為容器配置記憶體。如果 Flink 容器嘗試配置設定超出其請求大小(Yarn 或 Kubernetes)的記憶體,這通常表明 Flink 沒有預留足夠的本機記憶體。當容器被部署環境殺死時,可以通過使用外部監控系統或從錯誤消息中觀察到這一點。
如果在 JobManager 程序中遇到這個問題,還可以通過設定排除可能的 JVM Direct Memory 洩漏的選項來開啟 JVM Direct Memory 的限制 使用以下指令進行配置
參數配置:
jobmanager.memory.enable-jvm-direct-memory-limit:
12
如果使用 RocksDBStateBackend,并且記憶體控制被禁用:可以嘗試增加 TaskManager 的托管記憶體。在儲存點或完整檢查點期間啟用記憶體控制和非堆記憶體增加,這可能是由于glibc記憶體配置設定器而發生的。
可以嘗試為 TaskManagers 添加環境變量 MALLOC_ARENA_MAX=1,或者增加 JVM 開銷。