天天看點

Spark技術内幕:Executor配置設定詳解1. SparkContext建立TaskScheduler和DAG Scheduler 2. TaskScheduler通過SchedulerBackend建立AppClient 3. AppClient向Master送出Application4. Master根據AppClient的送出選擇Worker5. Worker根據Master的資源配置設定結果來建立Executor

當使用者應用new sparkcontext後,叢集就會為在worker上配置設定executor,那麼這個過程是什麼呢?本文以standalone的cluster為例,詳細的闡述這個過程。序列圖如下:

Spark技術内幕:Executor配置設定詳解1. SparkContext建立TaskScheduler和DAG Scheduler 2. TaskScheduler通過SchedulerBackend建立AppClient 3. AppClient向Master送出Application4. Master根據AppClient的送出選擇Worker5. Worker根據Master的資源配置設定結果來建立Executor

sparkcontext是使用者應用和spark叢集的交換的主要接口,使用者應用一般首先要建立它。如果你使用sparkshell,你不必自己顯式去建立它,系統會自動建立一個名字為sc的sparkcontext的執行個體。建立sparkcontext的執行個體,主要的工作除了設定一些conf,比如executor使用到的memory的大小。如果系統的配置檔案有,那麼就讀取該配置。否則則讀取環境變量。如果都沒有設定,那麼取預設值為512m。當然了這個數值還是很保守的,特别是在記憶體已經那麼昂貴的今天。

除了加載這些叢集的參數,它完成了taskscheduler和dagscheduler的建立:

taskscheduler是通過不同的schedulerbackend來排程和管理任務。它包含資源配置設定和任務排程。它實作了fifo排程和fair排程,基于此來決定不同jobs之間的排程順序。并且管理任務,包括任務的送出和終止,為饑餓任務啟動備份任務。

不同的cluster,包括local模式,都是通過不同的schedulerbackend的實作其不同的功能。這個子產品的類圖如下:

Spark技術内幕:Executor配置設定詳解1. SparkContext建立TaskScheduler和DAG Scheduler 2. TaskScheduler通過SchedulerBackend建立AppClient 3. AppClient向Master送出Application4. Master根據AppClient的送出選擇Worker5. Worker根據Master的資源配置設定結果來建立Executor

sparkdeployschedulerbackend是standalone模式的schedulerbackend。通過建立appclient,可以向standalone的master注冊application,然後master會通過application的資訊為它配置設定worker,包括每個worker上使用cpu core的數目等。

org.apache.spark.deploy.client.appclientlistener是一個trait,主要為了schedulerbackend和appclient之間的函數回調,在以下四種情況下,appclient會回調相關函數以通知schedulerbackend:

向master成功注冊application,即成功連結到叢集;

斷開連接配接,如果目前sparkdeployschedulerbackend::stop == false,那麼可能原來的master實效了,待新的master ready後,會重新恢複原來的連接配接;

application由于不可恢複的錯誤停止了,這個時候需要重新送出出錯的taskset;

添加一個executor,在這裡的實作僅僅是列印了log,并沒有額外的邏輯;

删除一個executor,可能有兩個原因,一個是executor退出了,這裡可以得到executor的退出碼,或者由于worker的退出導緻了運作其上的executor退出,這兩種情況需要不同的邏輯來處理。

小結:sparkdeployschedulerbackend裝備好啟動executor的必要參數後,建立appclient,并通過一些回調函數來得到executor和連接配接等資訊;通過org.apache.spark.scheduler.cluster.coarsegrainedschedulerbackend.driveractor與executorbackend來進行通信。

appclient是application和master互動的接口。它的包含一個類型為org.apache.spark.deploy.client.appclient.clientactor的成員變量actor。它負責了所有的與master的互動。actor首先向master注冊application。如果超過20s沒有接收到注冊成功的消息,那麼會重新注冊;如果重試超過3次仍未成功,那麼本次送出就以失敗結束了。

主要的消息如下:

registeredapplication(appid_, masterurl) => //注:來自master的注冊application成功的消息

applicationremoved(message) => //注:來自master的删除application的消息。application執行成功或者失敗最終都會被删除。

executoradded(id: int, workerid: string, hostport: string, cores: int, memory: int) => //注:來自master

executorupdated(id, state, message, exitstatus) =>  //注:來自master的executor狀态更新的消息,如果是executor是完成的狀态,那麼回調schedulerbackend的executorremoved的函數。

masterchanged(masterurl, masterwebuiurl) =>  //注:來自新競選成功的master。master可以選擇zk實作ha,并且使用zk來持久化叢集的中繼資料資訊。是以在master變成leader後,會恢複持久化的application,driver和worker的資訊。

stopappclient => //注:來自appclient::stop()

master接收到appclient的registerapplication的請求後,處理邏輯如下:

schedule() 為處于待配置設定資源的application配置設定資源。在每次有新的application加入或者新的資源加入時都會調用schedule進行排程。為application配置設定資源選擇worker(executor),現在有兩種政策:

盡量的打散,即一個application盡可能多的配置設定到不同的節點。這個可以通過設定spark.deploy.spreadout來實作。預設值為true,即盡量的打散。

盡量的集中,即一個application盡量配置設定到盡可能少的節點。

對于同一個application,它在一個worker上隻能擁有一個executor;當然了,這個executor可能擁有多于1個core。對于政策1,任務的部署會慢于政策2,但是gc的時間會更快。

其主要邏輯如下:

在選擇了worker和确定了worker上得executor需要的cpu core數後,master會調用 launchexecutor(worker: workerinfo, exec: executorinfo)向worker發送請求,向appclient發送executor已經添加的消息。同時會更新master儲存的worker的資訊,包括增加executor,減少可用的cpu core數和memory數。master不會等到真正在worker上成功啟動executor後再更新worker的資訊。如果worker啟動executor失敗,那麼它會發送failed的消息給master,master收到該消息時再次更新worker的資訊即可。這樣是簡化了邏輯。

小結:現在的配置設定方式還是比較粗糙的。比如并沒有考慮節點的目前總體負載。可能會導緻節點上executor的配置設定是比較均勻的,單純靜态的從executor配置設定到得cpu core數和記憶體數來看,負載是比較均衡的。但是從實際情況來看,可能有的executor的資源消耗比較大,是以會導緻叢集負載不均衡。這個需要從生産環境的資料得到回報來進一步的修正和細化配置設定政策,以達到更好的資源使用率。

worker接收到來自master的launchexecutor的消息後,會建立org.apache.spark.deploy.worker.executorrunner。worker本身會記錄本身資源的使用情況,包括已經使用的cpu core數,memory數等;但是這個統計隻是為了web ui的展現。master本身會記錄worker的資源使用情況,無需worker自身彙報。worker與master之間的心跳的目的僅僅是為了報活,不會攜帶其他的資訊。

executorrunner會将在org.apache.spark.scheduler.cluster.sparkdeployschedulerbackend中準備好的org.apache.spark.deploy.applicationdescription以程序的形式啟動起來。當時以下幾個參數還是未知的:

val args = seq(driverurl, "{{executor_id}}", "{{hostname}}", "{{cores}}", "{{worker_url}}")。executorrunner需要将他們替換成已經配置設定好的實際值:

接下來就啟動org.apache.spark.deploy.applicationdescription中攜帶的org.apache.spark.executor.coarsegrainedexecutorbackend:

coarsegrainedexecutorbackend啟動後,會首先通過傳入的driverurl這個參數向在org.apache.spark.scheduler.cluster.coarsegrainedschedulerbackend::driveractor發送registerexecutor(executorid, hostport, cores),driveractor會回複registeredexecutor,此時coarsegrainedexecutorbackend會建立一個org.apache.spark.executor.executor。至此,executor建立完畢。executor在mesos, yarn, and the standalone scheduler中,都是相同的。不同的隻是資源的配置設定管理方式。

繼續閱讀