天天看点

Spark技术内幕:Executor分配详解1. SparkContext创建TaskScheduler和DAG Scheduler 2. TaskScheduler通过SchedulerBackend创建AppClient 3. AppClient向Master提交Application4. Master根据AppClient的提交选择Worker5. Worker根据Master的资源分配结果来创建Executor

当用户应用new sparkcontext后,集群就会为在worker上分配executor,那么这个过程是什么呢?本文以standalone的cluster为例,详细的阐述这个过程。序列图如下:

Spark技术内幕:Executor分配详解1. SparkContext创建TaskScheduler和DAG Scheduler 2. TaskScheduler通过SchedulerBackend创建AppClient 3. AppClient向Master提交Application4. Master根据AppClient的提交选择Worker5. Worker根据Master的资源分配结果来创建Executor

sparkcontext是用户应用和spark集群的交换的主要接口,用户应用一般首先要创建它。如果你使用sparkshell,你不必自己显式去创建它,系统会自动创建一个名字为sc的sparkcontext的实例。创建sparkcontext的实例,主要的工作除了设置一些conf,比如executor使用到的memory的大小。如果系统的配置文件有,那么就读取该配置。否则则读取环境变量。如果都没有设置,那么取默认值为512m。当然了这个数值还是很保守的,特别是在内存已经那么昂贵的今天。

除了加载这些集群的参数,它完成了taskscheduler和dagscheduler的创建:

taskscheduler是通过不同的schedulerbackend来调度和管理任务。它包含资源分配和任务调度。它实现了fifo调度和fair调度,基于此来决定不同jobs之间的调度顺序。并且管理任务,包括任务的提交和终止,为饥饿任务启动备份任务。

不同的cluster,包括local模式,都是通过不同的schedulerbackend的实现其不同的功能。这个模块的类图如下:

Spark技术内幕:Executor分配详解1. SparkContext创建TaskScheduler和DAG Scheduler 2. TaskScheduler通过SchedulerBackend创建AppClient 3. AppClient向Master提交Application4. Master根据AppClient的提交选择Worker5. Worker根据Master的资源分配结果来创建Executor

sparkdeployschedulerbackend是standalone模式的schedulerbackend。通过创建appclient,可以向standalone的master注册application,然后master会通过application的信息为它分配worker,包括每个worker上使用cpu core的数目等。

org.apache.spark.deploy.client.appclientlistener是一个trait,主要为了schedulerbackend和appclient之间的函数回调,在以下四种情况下,appclient会回调相关函数以通知schedulerbackend:

向master成功注册application,即成功链接到集群;

断开连接,如果当前sparkdeployschedulerbackend::stop == false,那么可能原来的master实效了,待新的master ready后,会重新恢复原来的连接;

application由于不可恢复的错误停止了,这个时候需要重新提交出错的taskset;

添加一个executor,在这里的实现仅仅是打印了log,并没有额外的逻辑;

删除一个executor,可能有两个原因,一个是executor退出了,这里可以得到executor的退出码,或者由于worker的退出导致了运行其上的executor退出,这两种情况需要不同的逻辑来处理。

小结:sparkdeployschedulerbackend装备好启动executor的必要参数后,创建appclient,并通过一些回调函数来得到executor和连接等信息;通过org.apache.spark.scheduler.cluster.coarsegrainedschedulerbackend.driveractor与executorbackend来进行通信。

appclient是application和master交互的接口。它的包含一个类型为org.apache.spark.deploy.client.appclient.clientactor的成员变量actor。它负责了所有的与master的交互。actor首先向master注册application。如果超过20s没有接收到注册成功的消息,那么会重新注册;如果重试超过3次仍未成功,那么本次提交就以失败结束了。

主要的消息如下:

registeredapplication(appid_, masterurl) => //注:来自master的注册application成功的消息

applicationremoved(message) => //注:来自master的删除application的消息。application执行成功或者失败最终都会被删除。

executoradded(id: int, workerid: string, hostport: string, cores: int, memory: int) => //注:来自master

executorupdated(id, state, message, exitstatus) =>  //注:来自master的executor状态更新的消息,如果是executor是完成的状态,那么回调schedulerbackend的executorremoved的函数。

masterchanged(masterurl, masterwebuiurl) =>  //注:来自新竞选成功的master。master可以选择zk实现ha,并且使用zk来持久化集群的元数据信息。因此在master变成leader后,会恢复持久化的application,driver和worker的信息。

stopappclient => //注:来自appclient::stop()

master接收到appclient的registerapplication的请求后,处理逻辑如下:

schedule() 为处于待分配资源的application分配资源。在每次有新的application加入或者新的资源加入时都会调用schedule进行调度。为application分配资源选择worker(executor),现在有两种策略:

尽量的打散,即一个application尽可能多的分配到不同的节点。这个可以通过设置spark.deploy.spreadout来实现。默认值为true,即尽量的打散。

尽量的集中,即一个application尽量分配到尽可能少的节点。

对于同一个application,它在一个worker上只能拥有一个executor;当然了,这个executor可能拥有多于1个core。对于策略1,任务的部署会慢于策略2,但是gc的时间会更快。

其主要逻辑如下:

在选择了worker和确定了worker上得executor需要的cpu core数后,master会调用 launchexecutor(worker: workerinfo, exec: executorinfo)向worker发送请求,向appclient发送executor已经添加的消息。同时会更新master保存的worker的信息,包括增加executor,减少可用的cpu core数和memory数。master不会等到真正在worker上成功启动executor后再更新worker的信息。如果worker启动executor失败,那么它会发送failed的消息给master,master收到该消息时再次更新worker的信息即可。这样是简化了逻辑。

小结:现在的分配方式还是比较粗糙的。比如并没有考虑节点的当前总体负载。可能会导致节点上executor的分配是比较均匀的,单纯静态的从executor分配到得cpu core数和内存数来看,负载是比较均衡的。但是从实际情况来看,可能有的executor的资源消耗比较大,因此会导致集群负载不均衡。这个需要从生产环境的数据得到反馈来进一步的修正和细化分配策略,以达到更好的资源利用率。

worker接收到来自master的launchexecutor的消息后,会创建org.apache.spark.deploy.worker.executorrunner。worker本身会记录本身资源的使用情况,包括已经使用的cpu core数,memory数等;但是这个统计只是为了web ui的展现。master本身会记录worker的资源使用情况,无需worker自身汇报。worker与master之间的心跳的目的仅仅是为了报活,不会携带其他的信息。

executorrunner会将在org.apache.spark.scheduler.cluster.sparkdeployschedulerbackend中准备好的org.apache.spark.deploy.applicationdescription以进程的形式启动起来。当时以下几个参数还是未知的:

val args = seq(driverurl, "{{executor_id}}", "{{hostname}}", "{{cores}}", "{{worker_url}}")。executorrunner需要将他们替换成已经分配好的实际值:

接下来就启动org.apache.spark.deploy.applicationdescription中携带的org.apache.spark.executor.coarsegrainedexecutorbackend:

coarsegrainedexecutorbackend启动后,会首先通过传入的driverurl这个参数向在org.apache.spark.scheduler.cluster.coarsegrainedschedulerbackend::driveractor发送registerexecutor(executorid, hostport, cores),driveractor会回复registeredexecutor,此时coarsegrainedexecutorbackend会创建一个org.apache.spark.executor.executor。至此,executor创建完毕。executor在mesos, yarn, and the standalone scheduler中,都是相同的。不同的只是资源的分配管理方式。

继续阅读