天天看點

Apache Spark 記憶體管理詳解

點選上方藍色字型,選擇“設為星标”

回複”資源“擷取更多資源

Apache Spark 記憶體管理詳解
Apache Spark 記憶體管理詳解

大資料技術與架構

點選右側關注,大資料開發領域最強公衆号!

Apache Spark 記憶體管理詳解
Apache Spark 記憶體管理詳解

暴走大資料

點選右側關注,暴走大資料!

Apache Spark 記憶體管理詳解

Spark 作為一個基于記憶體的分布式計算引擎,其記憶體管理子產品在整個系統中扮演着非常重要的角色。了解 Spark 記憶體管理的基本原理,有助于更好地開發 Spark 應用程式和進行性能調優。本文旨在梳理出 Spark 記憶體管理的脈絡,抛磚引玉,引出讀者對這個話題的深入探讨。本文中闡述的原理基于 Spark 2.1 版本,閱讀本文需要讀者有一定的 Spark 和 Java 基礎,了解 RDD、Shuffle、JVM 等相關概念。

在執行 Spark 的應用程式時,Spark 叢集會啟動 Driver 和 Executor 兩種 JVM 程序,前者為主要程序,負責建立 Spark 上下文,送出 Spark 作業(Job),并将作業轉化為計算任務(Task),在各個 Executor 程序間協調任務的排程,後者負責在工作節點上執行具體的計算任務,并将結果傳回給 Driver,同時為需要持久化的 RDD 提供存儲功能[1]。由于 Driver 的記憶體管理相對來說較為簡單,本文主要對 Executor 的記憶體管理進行分析,下文中的 Spark 記憶體均特指 Executor 的記憶體。

作為一個 JVM 程序,Executor 的記憶體管理建立在 JVM 的記憶體管理之上,Spark 對 JVM 的堆内(On-heap)空間進行了更為詳細的配置設定,以充分利用記憶體。同時,Spark 引入了堆外(Off-heap)記憶體,使之可以直接在工作節點的系統記憶體中開辟空間,進一步優化了記憶體的使用。

Apache Spark 記憶體管理詳解

堆内記憶體的大小,由 Spark 應用程式啟動時的 –executor-memory 或 spark.executor.memory 參數配置。Executor 内運作的并發任務共享 JVM 堆内記憶體,這些任務在緩存 RDD 資料和廣播(Broadcast)資料時占用的記憶體被規劃為存儲(Storage)記憶體,而這些任務在執行 Shuffle 時占用的記憶體被規劃為執行(Execution)記憶體,剩餘的部分不做特殊規劃,那些 Spark 内部的對象執行個體,或者使用者定義的 Spark 應用程式中的對象執行個體,均占用剩餘的空間。不同的管理模式下,這三部分占用的空間大小各不相同(下面第 2 小節會進行介紹)。

Spark 對堆内記憶體的管理是一種邏輯上的"規劃式"的管理,因為對象執行個體占用記憶體的申請和釋放都由 JVM 完成,Spark 隻能在申請後和釋放前記錄這些記憶體,我們來看其具體流程:

申請記憶體:

Spark 在代碼中 new 一個對象執行個體

JVM 從堆内記憶體配置設定空間,建立對象并傳回對象引用

Spark 儲存該對象的引用,記錄該對象占用的記憶體

釋放記憶體:

Spark 記錄該對象釋放的記憶體,删除該對象的引用

等待 JVM 的垃圾回收機制釋放該對象占用的堆内記憶體

我們知道,JVM 的對象可以以序列化的方式存儲,序列化的過程是将對象轉換為二進制位元組流,本質上可以了解為将非連續空間的鍊式存儲轉化為連續空間或塊存儲,在通路時則需要進行序列化的逆過程——反序列化,将位元組流轉化為對象,序列化的方式可以節省存儲空間,但增加了存儲和讀取時候的計算開銷。

對于 Spark 中序列化的對象,由于是位元組流的形式,其占用的記憶體大小可直接計算,而對于非序列化的對象,其占用的記憶體是通過周期性地采樣近似估算而得,即并不是每次新增的資料項都會計算一次占用的記憶體大小,這種方法降低了時間開銷但是有可能誤差較大,導緻某一時刻的實際記憶體有可能遠遠超出預期[2]。此外,在被 Spark 标記為釋放的對象執行個體,很有可能在實際上并沒有被 JVM 回收,導緻實際可用的記憶體小于 Spark 記錄的可用記憶體。是以 Spark 并不能準确記錄實際可用的堆内記憶體,進而也就無法完全避免記憶體溢出(OOM, Out of Memory)的異常。

雖然不能精準控制堆内記憶體的申請和釋放,但 Spark 通過對存儲記憶體和執行記憶體各自獨立的規劃管理,可以決定是否要在存儲記憶體裡緩存新的 RDD,以及是否為新的任務配置設定執行記憶體,在一定程度上可以提升記憶體的使用率,減少異常的出現。

為了進一步優化記憶體的使用以及提高 Shuffle 時排序的效率,Spark 引入了堆外(Off-heap)記憶體,使之可以直接在工作節點的系統記憶體中開辟空間,存儲經過序列化的二進制資料。利用 JDK Unsafe API(從 Spark 2.0 開始,在管理堆外的存儲記憶體時不再基于 Tachyon,而是與堆外的執行記憶體一樣,基于 JDK Unsafe API 實作[3]),Spark 可以直接作業系統堆外記憶體,減少了不必要的記憶體開銷,以及頻繁的 GC 掃描和回收,提升了處理性能。堆外記憶體可以被精确地申請和釋放,而且序列化的資料占用的空間可以被精确計算,是以相比堆内記憶體來說降低了管理的難度,也降低了誤差。

在預設情況下堆外記憶體并不啟用,可通過配置 spark.memory.offHeap.enabled 參數啟用,并由 spark.memory.offHeap.size 參數設定堆外空間的大小。除了沒有 other 空間,堆外記憶體與堆内記憶體的劃分方式相同,所有運作中的并發任務共享存儲記憶體和執行記憶體。

Spark 為存儲記憶體和執行記憶體的管理提供了統一的接口——MemoryManager,同一個 Executor 内的任務都調用這個接口的方法來申請或釋放記憶體:

我們看到,在調用這些方法時都需要指定其記憶體模式(MemoryMode),這個參數決定了是在堆内還是堆外完成這次操作。

MemoryManager 的具體實作上,Spark 1.6 之後預設為統一管理(Unified Memory Manager)方式,1.6 之前采用的靜态管理(Static Memory Manager)方式仍被保留,可通過配置 spark.memory.useLegacyMode 參數啟用。兩種方式的差別在于對空間配置設定的方式,下面的第 2 小節會分别對這兩種方式進行介紹。

2 . 記憶體空間配置設定

2.1 靜态記憶體管理

在 Spark 最初采用的靜态記憶體管理機制下,存儲記憶體、執行記憶體和其他記憶體的大小在 Spark 應用程式運作期間均為固定的,但使用者可以應用程式啟動前進行配置,堆内記憶體的配置設定如圖 2 所示:

Apache Spark 記憶體管理詳解

可以看到,可用的堆内記憶體的大小需要按照下面的方式計算:

其中 systemMaxMemory 取決于目前 JVM 堆内記憶體的大小,最後可用的執行記憶體或者存儲記憶體要在此基礎上與各自的 memoryFraction 參數和 safetyFraction 參數相乘得出。上述計算公式中的兩個 safetyFraction 參數,其意義在于在邏輯上預留出 1-safetyFraction 這麼一塊保險區域,降低因實際記憶體超出目前預設範圍而導緻 OOM 的風險(上文提到,對于非序列化對象的記憶體采樣估算會産生誤差)。值得注意的是,這個預留的保險區域僅僅是一種邏輯上的規劃,在具體使用時 Spark 并沒有差別對待,和"其它記憶體"一樣交給了 JVM 去管理。

堆外的空間配置設定較為簡單,隻有存儲記憶體和執行記憶體,如圖 3 所示。可用的執行記憶體和存儲記憶體占用的空間大小直接由參數 spark.memory.storageFraction 決定,由于堆外記憶體占用的空間可以被精确計算,是以無需再設定保險區域。

Apache Spark 記憶體管理詳解

靜态記憶體管理機制實作起來較為簡單,但如果使用者不熟悉 Spark 的存儲機制,或沒有根據具體的資料規模和計算任務或做相應的配置,很容易造成"一半海水,一半火焰"的局面,即存儲記憶體和執行記憶體中的一方剩餘大量的空間,而另一方卻早早被占滿,不得不淘汰或移出舊的内容以存儲新的内容。由于新的記憶體管理機制的出現,這種方式目前已經很少有開發者使用,出于相容舊版本的應用程式的目的,Spark 仍然保留了它的實作。

Spark 1.6 之後引入的統一記憶體管理機制,與靜态記憶體管理的差別在于存儲記憶體和執行記憶體共享同一塊空間,可以動态占用對方的空閑區域,如圖 4 和圖 5 所示

Apache Spark 記憶體管理詳解
Apache Spark 記憶體管理詳解

其中最重要的優化在于動态占用機制,其規則如下:

設定基本的存儲記憶體和執行記憶體區域(spark.storage.storageFraction 參數),該設定确定了雙方各自擁有的空間的範圍

雙方的空間都不足時,則存儲到硬碟;若己方空間不足而對方空餘時,可借用對方的空間;(存儲空間不足是指不足以放下一個完整的 Block)

執行記憶體的空間被對方占用後,可讓對方将占用的部分轉存到硬碟,然後"歸還"借用的空間

存儲記憶體的空間被對方占用後,無法讓對方"歸還",因為需要考慮 Shuffle 過程中的很多因素,實作起來較為複雜[4]

Apache Spark 記憶體管理詳解

憑借統一記憶體管理機制,Spark 在一定程度上提高了堆内和堆外記憶體資源的使用率,降低了開發者維護 Spark 記憶體的難度,但并不意味着開發者可以高枕無憂。譬如,是以如果存儲記憶體的空間太大或者說緩存的資料過多,反而會導緻頻繁的全量垃圾回收,降低任務執行時的性能,因為緩存的 RDD 資料通常都是長期駐留記憶體的 [5] 。是以要想充分發揮 Spark 的性能,需要開發者進一步了解存儲記憶體和執行記憶體各自的管理方式和實作原理。

彈性分布式資料集(RDD)作為 Spark 最根本的資料抽象,是隻讀的分區記錄(Partition)的集合,隻能基于在穩定實體存儲中的資料集上建立,或者在其他已有的 RDD 上執行轉換(Transformation)操作産生一個新的 RDD。轉換後的 RDD 與原始的 RDD 之間産生的依賴關系,構成了血統(Lineage)。憑借血統,Spark 保證了每一個 RDD 都可以被重新恢複。但 RDD 的所有轉換都是惰性的,即隻有當一個傳回結果給 Driver 的行動(Action)發生時,Spark 才會建立任務讀取 RDD,然後真正觸發轉換的執行。

Task 在啟動之初讀取一個分區時,會先判斷這個分區是否已經被持久化,如果沒有則需要檢查 Checkpoint 或按照血統重新計算。是以如果一個 RDD 上要執行多次行動,可以在第一次行動中使用 persist 或 cache 方法,在記憶體或磁盤中持久化或緩存這個 RDD,進而在後面的行動時提升計算速度。事實上,cache 方法是使用預設的 MEMORY_ONLY 的存儲級别将 RDD 持久化到記憶體,故緩存是一種特殊的持久化。 堆内和堆外存儲記憶體的設計,便可以對緩存 RDD 時使用的記憶體做統一的規劃和管 理 (存儲記憶體的其他應用場景,如緩存 broadcast 資料,暫時不在本文的讨論範圍之内)。

RDD 的持久化由 Spark 的 Storage 子產品 [7] 負責,實作了 RDD 與實體存儲的解耦合。Storage 子產品負責管理 Spark 在計算過程中産生的資料,将那些在記憶體或磁盤、在本地或遠端存取資料的功能封裝了起來。在具體實作時 Driver 端和 Executor 端的 Storage 子產品構成了主從式的架構,即 Driver 端的 BlockManager 為 Master,Executor 端的 BlockManager 為 Slave。Storage 子產品在邏輯上以 Block 為基本存儲機關,RDD 的每個 Partition 經過處理後唯一對應一個 Block(BlockId 的格式為 rdd_RDD-ID_PARTITION-ID )。Master 負責整個 Spark 應用程式的 Block 的中繼資料資訊的管理和維護,而 Slave 需要将 Block 的更新等狀态上報到 Master,同時接收 Master 的指令,例如新增或删除一個 RDD。

Apache Spark 記憶體管理詳解

在對 RDD 持久化時,Spark 規定了 MEMORY_ONLY、MEMORY_AND_DISK 等 7 種不同的 存儲級别 ,而存儲級别是以下 5 個變量的組合:

通過對資料結構的分析,可以看出存儲級别從三個次元定義了 RDD 的 Partition(同時也就是 Block)的存儲方式:

存儲位置:磁盤/堆内記憶體/堆外記憶體。如 MEMORY_AND_DISK 是同時在磁盤和堆内記憶體上存儲,實作了備援備份。OFF_HEAP 則是隻在堆外記憶體存儲,目前選擇堆外記憶體時不能同時存儲到其他位置。

存儲形式:Block 緩存到存儲記憶體後,是否為非序列化的形式。如 MEMORY_ONLY 是非序列化方式存儲,OFF_HEAP 是序列化方式存儲。

副本數量:大于 1 時需要遠端備援備份到其他節點。如 DISK_ONLY_2 需要遠端備份 1 個副本。

RDD 在緩存到存儲記憶體之前,Partition 中的資料一般以疊代器(Iterator)的資料結構來通路,這是 Scala 語言中一種周遊資料集合的方法。通過 Iterator 可以擷取分區中每一條序列化或者非序列化的資料項(Record),這些 Record 的對象執行個體在邏輯上占用了 JVM 堆内記憶體的 other 部分的空間,同一 Partition 的不同 Record 的空間并不連續。

RDD 在緩存到存儲記憶體之後,Partition 被轉換成 Block,Record 在堆内或堆外存儲記憶體中占用一塊連續的空間。将Partition由不連續的存儲空間轉換為連續存儲空間的過程,Spark稱之為"展開"(Unroll)。Block 有序列化和非序列化兩種存儲格式,具體以哪種方式取決于該 RDD 的存儲級别。非序列化的 Block 以一種 DeserializedMemoryEntry 的資料結構定義,用一個數組存儲所有的對象執行個體,序列化的 Block 則以 SerializedMemoryEntry的資料結構定義,用位元組緩沖區(ByteBuffer)來存儲二進制資料。每個 Executor 的 Storage 子產品用一個鍊式 Map 結構(LinkedHashMap)來管理堆内和堆外存儲記憶體中所有的 Block 對象的執行個體[6],對這個 LinkedHashMap 新增和删除間接記錄了記憶體的申請和釋放。

因為不能保證存儲空間可以一次容納 Iterator 中的所有資料,目前的計算任務在 Unroll 時要向 MemoryManager 申請足夠的 Unroll 空間來臨時占位,空間不足則 Unroll 失敗,空間足夠時可以繼續進行。對于序列化的 Partition,其所需的 Unroll 空間可以直接累加計算,一次申請。而非序列化的 Partition 則要在周遊 Record 的過程中依次申請,即每讀取一條 Record,采樣估算其所需的 Unroll 空間并進行申請,空間不足時可以中斷,釋放已占用的 Unroll 空間。如果最終 Unroll 成功,目前 Partition 所占用的 Unroll 空間被轉換為正常的緩存 RDD 的存儲空間,如下圖 8 所示。

Apache Spark 記憶體管理詳解

在圖 3 和圖 5 中可以看到,在靜态記憶體管理時,Spark 在存儲記憶體中專門劃分了一塊 Unroll 空間,其大小是固定的,統一記憶體管理時則沒有對 Unroll 空間進行特别區分,當存儲空間不足時會根據動态占用機制進行處理。

由于同一個 Executor 的所有的計算任務共享有限的存儲記憶體空間,當有新的 Block 需要緩存但是剩餘空間不足且無法動态占用時,就要對 LinkedHashMap 中的舊 Block 進行淘汰(Eviction),而被淘汰的 Block 如果其存儲級别中同時包含存儲到磁盤的要求,則要對其進行落盤(Drop),否則直接删除該 Block。

存儲記憶體的淘汰規則為:

被淘汰的舊 Block 要與新 Block 的 MemoryMode 相同,即同屬于堆外或堆内記憶體

新舊 Block 不能屬于同一個 RDD,避免循環淘汰

舊 Block 所屬 RDD 不能處于被讀狀态,避免引發一緻性問題

周遊 LinkedHashMap 中 Block,按照最近最少使用(LRU)的順序淘汰,直到滿足新 Block 所需的空間。其中 LRU 是 LinkedHashMap 的特性。

落盤的流程則比較簡單,如果其存儲級别符合_useDisk 為 true 的條件,再根據其_deserialized 判斷是否是非序列化的形式,若是則對其進行序列化,最後将資料存儲到磁盤,在 Storage 子產品中更新其資訊。

Executor 内運作的任務同樣共享執行記憶體,Spark 用一個 HashMap 結構儲存了任務到記憶體耗費的映射。每個任務可占用的執行記憶體大小的範圍為 1/2N ~ 1/N,其中 N 為目前 Executor 内正在運作的任務的個數。每個任務在啟動之時,要向 MemoryManager 請求申請最少為 1/2N 的執行記憶體,如果不能被滿足要求則該任務被阻塞,直到有其他任務釋放了足夠的執行記憶體,該任務才可以被喚醒。

執行記憶體主要用來存儲任務在執行 Shuffle 時占用的記憶體,Shuffle 是按照一定規則對 RDD 資料重新分區的過程,我們來看 Shuffle 的 Write 和 Read 兩階段對執行記憶體的使用:

Shuffle Write

若在 map 端選擇普通的排序方式,會采用 ExternalSorter 進行外排,在記憶體中存儲資料時主要占用堆内執行空間。

若在 map 端選擇 Tungsten 的排序方式,則采用 ShuffleExternalSorter 直接對以序列化形式存儲的資料排序,在記憶體中存儲資料時可以占用堆外或堆内執行空間,取決于使用者是否開啟了堆外記憶體以及堆外執行記憶體是否足夠。

Shuffle Read

在對 reduce 端的資料進行聚合時,要将資料交給 Aggregator 處理,在記憶體中存儲資料時占用堆内執行空間。

如果需要進行最終結果排序,則要将再次将資料交給 ExternalSorter 處理,占用堆内執行空間。

在 ExternalSorter 和 Aggregator 中,Spark 會使用一種叫 AppendOnlyMap 的哈希表在堆内執行記憶體中存儲資料,但在 Shuffle 過程中所有資料并不能都儲存到該哈希表中,當這個哈希表占用的記憶體會進行周期性地采樣估算,當其大到一定程度,無法再從 MemoryManager 申請到新的執行記憶體時,Spark 就會将其全部内容存儲到磁盤檔案中,這個過程被稱為溢存(Spill),溢存到磁盤的檔案最後會被歸并(Merge)。

Shuffle Write 階段中用到的 Tungsten 是 Databricks 公司提出的對 Spark 優化記憶體和 CPU 使用的計劃[9],解決了一些 JVM 在性能上的限制和弊端。Spark 會根據 Shuffle 的情況來自動選擇是否采用 Tungsten 排序。Tungsten 采用的頁式記憶體管理機制建立在 MemoryManager 之上,即 Tungsten 對執行記憶體的使用進行了一步的抽象,這樣在 Shuffle 過程中無需關心資料具體存儲在堆内還是堆外。每個記憶體頁用一個 MemoryBlock 來定義,并用 Object obj 和 long offset 這兩個變量統一辨別一個記憶體頁在系統記憶體中的位址。堆内的 MemoryBlock 是以 long 型數組的形式配置設定的記憶體,其 obj 的值為是這個數組的對象引用,offset 是 long 型數組的在 JVM 中的初始偏移位址,兩者配合使用可以定位這個數組在堆内的絕對位址;堆外的 MemoryBlock 是直接申請到的記憶體塊,其 obj 為 null,offset 是這個記憶體塊在系統記憶體中的 64 位絕對位址。Spark 用 MemoryBlock 巧妙地将堆内和堆外記憶體頁統一抽象封裝,并用頁表(pageTable)管理每個 Task 申請到的記憶體頁。

Tungsten 頁式管理下的所有記憶體用 64 位的邏輯位址表示,由頁号和頁内偏移量組成:

頁号:占 13 位,唯一辨別一個記憶體頁,Spark 在申請記憶體頁之前要先申請空閑頁号。

頁内偏移量:占 51 位,是在使用記憶體頁存儲資料時,資料在頁内的偏移位址。

有了統一的尋址方式,Spark 可以用 64 位邏輯位址的指針定位到堆内或堆外的記憶體,整個 Shuffle Write 排序的過程隻需要對指針進行排序,并且無需反序列化,整個過程非常高效,對于記憶體通路效率和 CPU 使用效率帶來了明顯的提升[10]。

Spark 的存儲記憶體和執行記憶體有着截然不同的管理方式:對于存儲記憶體來說,Spark 用一個 LinkedHashMap 來集中管理所有的 Block,Block 由需要緩存的 RDD 的 Partition 轉化而成;而對于執行記憶體,Spark 用 AppendOnlyMap 來存儲 Shuffle 過程中的資料,在 Tungsten 排序中甚至抽象成為頁式記憶體管理,開辟了全新的 JVM 記憶體管理機制。

歡迎點贊+收藏+轉發朋友圈素質三連

Apache Spark 記憶體管理詳解

文章不錯?點個【在看】吧! ????

繼續閱讀