天天看點

Apache Spark 将支援 Stage 級别的資源控制和排程

背景

熟悉 Spark 的同學都知道,Spark 作業啟動的時候我們需要指定 Exectuor 的個數以及記憶體、CPU 等資訊。但是在 Spark 作業運作的時候,裡面可能包含很多個 Stages,這些不同的 Stage 需要的資源可能不一樣,由于目前 Spark 的設計,我們無法對每個 Stage 進行細粒度的資源設定。而且即使是一個資深的工程師也很難準确的預估一個比較合适的配置,使得作業啟動時設定的參數适合 Spark 每個 Stage。

我們來考慮這個這樣的場景:我們有個 Spark 作業,它總共有兩個 Stages。第一個 Stage 主要對我們輸入的資料進行基本的 ETL 操作。這個階段一般會啟動大量的 Task,但是每個 Task 僅僅需要少量的記憶體以及少數的核(比如1個core)。第一個 Stage 處理完之後,我們将 ETL 處理的結果作為 ML 算法的輸入,這個 Stage 隻需要少數的 Task,但是每個 Task 需要大量的記憶體、GPUs 以及 CPU。

像上面這種業務場景大家應該經常遇到過,我們需要對不同 Stage 設定不同的資源。但是目前的 Spark 不支援這種細粒度的資源配置,導緻我們不得不在作業啟動的時候設定大量的資源,進而導緻資源可能浪費,特别是在機器學習的場景下。

不過值得高興的是,來自英偉達的首席系統軟體工程師 Thomas Graves 給社群提了個 ISSUE,也就是 SPIP: Support Stage level resource configuration and scheduling,旨在讓 Spark 支援 Stage 級别的資源配置和排程。大家從名字還可以看出,這是個 SPIP(Spark Project Improvement Proposals 的簡稱),SPIP 主要是标記重大的面向使用者或跨領域的更改,而不是小的增量改進。是以可以看出,這個功能對 Spark 的修改很大,會對使用者産生比較大的影響。

作者提完這個 SPIP 之後給社群發了一份郵件,說明這個 SPIP 的目的,解決的問題等,然後讓大家進行投票決定這個 SPIP 要不要開發下去。值得高興的是,已經社群的一輪投票,得到6票贊成1票反對的結果。那就說明這個 SPIP 通過了,将進入開發狀态。

Apache Spark 将支援 Stage 級别的資源控制和排程

設計

前面扯了一堆,下面讓我們來看看這個方案是如何設計的。為了實作這個功能,需要在現有的 RDD 類裡面加上一些新的 API,用于指定這個 RDD 計算需要用到的資源,比如添加以下兩個方法:

def withResources(resources: ResourceProfile): this.type
def getResourceProfile(): Option[ResourceProfile]           

上面的 withResources 方法主要用于設定目前 RDD 的 resourceProfile,并傳回目前 RDD 執行個體。ResourceProfile 裡面指定的資源包括 cpu、記憶體和額外的資源(GPU/FPGA/等)。我們還可以利用它實作其他功能,比如限制每個 stage 的 task 數量,為 shuffle 指定一些參數。不過為了設計實作的簡單,目前隻考慮支援 Spark 目前支援的資源,針對 Task 可以設定 cpu 和額外的資源(GPU/FPGA/等);針對 Executor 可以設定 cpu、記憶體和額外的資源(GPU/FPGA/等) 。執行器資源包括cpu、記憶體和額外資源(GPU、FPGA等)。通過給現有 RDD 類添加上面的方法,這使得所有繼承自 RDD 的演變 RDD 都支援設定資源,當然包括了輸入檔案生成的 RDD。

使用者在程式設計的時候,可以通過 withResources 方法來設定 ResourceProfile 的,當然肯定不可以設定無限的資源。可以通過 ResourceProfile.require 同時設定 Executor 和 task 需要的資源。具體的接口如下所示:

def require(request: TaskResourceRequest): this.type
def require(request: ExecutorResourceRequest): this.type

class ExecutorResourceRequest(
 val resourceName: String,
 val amount: Int, // potentially make this handle fractional resources
 val units: String, // units required for memory resources
 val discoveryScript: Option[String] = None,
 val vendor: Option[String] = None)

class TaskResourceRequest(
 val resourceName: String,
 val amount: Int) // potentially make this handle fractional resources           

之是以用 ResourceProfile 包裝 ExecutorResourceRequest 或 TaskResourceRequest 是因為後面如果我們需要添加新功能可以很容易的實作。比如我們可以在 ResourceProfile 裡面添加 ResourceProfile.prefer 方法,來實作程式申請到足夠就運作這個作業,如果沒有申請到足夠資源就使作業失敗。

當然,這個功能的實作需要依賴 Spark 的 Dynamic Allocation 機制。如果使用者沒有啟用 Dynamic Allocation (spark.dynamicAllocation.enabled=false)或者使用者沒有為 RDD 設定 ResourceProfile,那麼就按照現有的資源申請那套機制運作,否則就使用這個新機制。

因為每個 RDD 都可以指定 ResourceProfile,而 DAGScheduler 是可以把多個 RDD 的轉換放到一個 stage 中計算的,是以 Spark 需要解決同一個 stage 中多個 RDD 的資源申請沖突。當然,一些 RDD 也會出現跨 Stages 的情況,比如 reduceByKey,是以針對這種情況 Spark 需要将 ResourceProfile 的設定應用到這兩個 Stage 中。

如何使用

那麼如果 RDD 添加了上面的方法,我們就可以想下面一樣設定每個 Task 的資源使用情況:

val rp = new ResourceProfile()
rp.require(new ExecutorResourceRequest("memory", 2048))
rp.require(new ExecutorResourceRequest("cores", 2))
rp.require(new ExecutorResourceRequest("gpu", 1, Some("/opt/gpuScripts/getGpus")))
rp.require(new TaskResourceRequest("gpu", 1))
​
val rdd = sc.makeRDD(1 to 10, 5).mapPartitions { it =>
  val context = TaskContext.get()
  context.resources().get("gpu").get.addresses.iterator
}.withResources(rp)
​
val gpus = rdd.collect()           

上面 ResourceProfile 指定 Executor 需要 2GB 記憶體、2個 cores 以及一個 GPU;Task 需要一個 GPU。

總結

本文隻是介紹了這個功能的簡單實作,在真實的設計開發中會有很多需要考慮的問題,具體可以參見 SPARK-27495,對應的設計檔案參見 Stage Level Scheduling SPIP Appendices API/Design。因為這是個比較大的功能,是以可能需要花費數個月的時間去實作。相信有了這個功能之後,我們會更合理的使用叢集的資源。

原文連結

本文轉載自公衆号:過往記憶大資料

阿裡巴巴開源大資料技術團隊成立Apache Spark中國技術社群,定期推送精彩案例,技術專家直播,問答區近萬人Spark技術同學線上提問答疑,隻為營造純粹的Spark氛圍,歡迎釘釘掃碼加入!

Apache Spark 将支援 Stage 級别的資源控制和排程

對開源大資料和感興趣的同學可以加小編微信(下圖二維碼,備注“進群”)進入技術交流微信群。

Apache Spark 将支援 Stage 級别的資源控制和排程