一個Spark應用運作的過程如下所示:

-
Driver
使用者的主程式送出到Driver中執行,在Driver中建立SparkContext,SparkContext初始化DAGScheduler和TaskScheduler,作為coordinator負責從AppMaster申請資源,并将作業的Task排程到Executor上面執行。
在yarn-cluster模式下,AppMaster中包含了Driver,在YARN中啟動,spark-submit用戶端kill掉不影響程式的運作;
在yarn-client模式下,Driver在spark-submit的用戶端啟動(不在YARN中),跟AppMaster是分離的,spark-submit用戶端kill掉會導緻Spark程式挂掉(如spark-sql/spark-shell等都是以yarn-client的方式送出)
Executor上面運作的每個MapTask結束後都會有MapStatus彙報給Driver, 當MapTask數量非常多的時候可能會導緻Driver出現OOM,此時需要調整Driver的記憶體大小,通過
--conf spark.driver.memory=4G
或者
--driver-memory 4G
來進行設定。
-
Executor
實際執行Task的節點,Executor的個數由
--conf spark.executor.instances=4
來設定;每個Executor裡面并發跑的Task個數由--num-executors 4
--conf spark.executor.cores=2
指定。--executor-cores
Executor的記憶體由
--conf spark.executor.memory=4G
--executor-memory 4G
設定。
Spark記憶體管理
上面介紹了Spark中兩個角色(Driver/Executor),其中Executor是實際運作Task的節點,Spark記憶體管理主要在Executor上面。
Executor記憶體使用結構
如上圖所示, Spark on YARN模式下一個Executor的記憶體使用情況:
整個Executor是YARN的一個container,是以它的總記憶體受
yarn.scheduler.maximum-allocation-mb
的參數控制;
當使用者送出作業的時候通過
spark.executor.memory
參數設定了executor的堆記憶體(heapsize),這部分記憶體的使用情況如上圖所示:
-
系統預留(固定300MB)
詳見
SPARK-12081 -
spark.memory.fraction
該參數控制executor内使用者計算(execution)和存儲(storage)總占用多少記憶體,即
大小的記憶體; 剩餘的(M-R)*(1-spark.memory.fraction)用于Spark内部的metadata以及使用者資料結構等使用(M-R)*spark.memory.fraction
對于
spark.executor.memroyOverhead
,它是executor可額外使用的堆外(off-heap)記憶體,比如spark的shuffle過程使用的netty就會使用到堆外記憶體,如果程式有遇到相關的oom錯誤,可以嘗試調大該參數。該記憶體不屬于上面
spark.executor.memory
(on-heap),但是它們的總和不能超過
yarn.scheduler.maximum-allocation-mb
.
execution/storage記憶體管理
上圖中execution/storage的記憶體(
(M-R)*spark.memroy.fraction
)是Task在executor中運作需要用到的記憶體,它們通過
UnifiedMemoryManager
這個統一記憶體管理器來管理。
UnifiedMemoryManager
中的execution和storage的管理沒有硬性的邊界控制(比如execution固定占比多少),它們之間是一個軟邊界,初始的邊界由
spark.memory.storageFraction
來設定(預設0.5),但這個并不是一個固定的邊界:
a) 當execution不夠的時候,可以從storage側借記憶體,如storage基本沒使用(如沒有cache資料等),execution可以從storage借記憶體甚至全部都借完,即使後續有storage需要用記憶體也不能強制從execution拿回,除非execution後續自己釋放了部分記憶體,storage才能拿來使用;
b) 當storage不夠的時候,如果execution有空閑多餘的記憶體,則也可以借,但是後續如果execution又需要更多記憶體了則可以強制從storage拿回記憶體(如可以将storge的資料寫到磁盤,然後釋放對應的記憶體),直到storage使用的記憶體減少到
spark.memory.storageFraction
的比例。
Task記憶體管理
一個Executor可以同時并發執行多個Task(通過
spark.executor.cores
控制),而每個Task在運作的過程中都需要從Executor申請記憶體來使用,那Executor如何将記憶體配置設定給并發運作的多個Task呢? 這塊留到下一篇文章來介紹。