天天看点

Spark技术内幕:Stage划分及提交源码分析handleJobSubmitted

当触发一个rdd的action后,以count为例,调用关系如下:

org.apache.spark.rdd.rdd#count

org.apache.spark.sparkcontext#runjob

org.apache.spark.scheduler.dagscheduler#runjob

org.apache.spark.scheduler.dagscheduler#submitjob

org.apache.spark.scheduler.dagschedulereventprocessactor#receive(jobsubmitted)

org.apache.spark.scheduler.dagscheduler#handlejobsubmitted

其中步骤五的dagschedulereventprocessactor是dagscheduler 的与外部交互的接口代理,dagscheduler在创建时会创建名字为eventprocessactor的actor。这个actor的作用看它的实现就一目了然了:

总结一下org.apache.spark.scheduler.dagschedulereventprocessactor的作用:可以把他理解成dagscheduler的对外的功能接口。它对外隐藏了自己内部实现的细节,也更易于理解其逻辑;也降低了维护成本,将dagscheduler的比较复杂功能接口化。

org.apache.spark.scheduler.dagscheduler#handlejobsubmitted首先会根据rdd创建finalstage。finalstage,顾名思义,就是最后的那个stage。然后创建job,最后提交。提交的job如果满足一下条件,那么它将以本地模式运行:

1)spark.localexecution.enabled设置为true  并且 2)用户程序显式指定可以本地运行 并且 3)finalstage的没有父stage 并且 4)仅有一个partition

3)和 4)的话主要为了任务可以快速执行;如果有多个stage或者多个partition的话,本地运行可能会因为本机的计算资源的问题而影响任务的计算速度。

要理解什么是stage,首先要搞明白什么是task。task是在集群上运行的基本单位。一个task负责处理rdd的一个partition。rdd的多个patition会分别由不同的task去处理。当然了这些task的处理逻辑完全是一致的。这一组task就组成了一个stage。有两种task:

 org.apache.spark.scheduler.shufflemaptask

 org.apache.spark.scheduler.resulttask

shufflemaptask根据task的partitioner将计算结果放到不同的bucket中。而resulttask将计算结果发送回driver application。一个job包含了多个stage,而stage是由一组完全相同的task组成的。最后的stage包含了一组resulttask。

在用户触发了一个action后,比如count,collect,sparkcontext会通过runjob的函数开始进行任务提交。最后会通过dag的event processor 传递到dagscheduler本身的handlejobsubmitted,它首先会划分stage,提交stage,提交task。至此,task就开始在运行在集群上了。

一个stage的开始就是从外部存储或者shuffle结果中读取数据;一个stage的结束就是由于发生shuffle或者生成结果时。

handlejobsubmitted 通过调用newstage来创建finalstage:

创建一个result stage,或者说finalstage,是通过调用org.apache.spark.scheduler.dagscheduler#newstage完成的;而创建一个shuffle stage,需要通过调用org.apache.spark.scheduler.dagscheduler#neworusedstage。 

对于result 的final stage来说,传入的shuffledep是none。

我们知道,rdd通过org.apache.spark.rdd.rdd#getdependencies可以获得它依赖的parent rdd。而stage也可能会有parent stage。看一个rdd论文的stage划分吧:

Spark技术内幕:Stage划分及提交源码分析handleJobSubmitted

一个stage的边界,输入是外部的存储或者一个stage shuffle的结果;输入则是job的结果(result task对应的stage)或者shuffle的结果。

上图的话stage3的输入则是rdd a和rdd f shuffle的结果。而a和f由于到b和g需要shuffle,因此需要划分到不同的stage。

从源码实现的角度来看,通过触发action也就是最后一个rdd创建final stage(上图的stage 3),我们注意到new stage的第五个参数就是该stage的parent stage:通过rdd和job id获取:

生成了finalstage后,就需要提交stage了。

dagscheduler将stage划分完成后,提交实际上是通过把stage转换为taskset,然后通过taskscheduler将计算任务最终提交到集群。其所在的位置如下图所示。

Spark技术内幕:Stage划分及提交源码分析handleJobSubmitted

接下来,将分析stage是如何转换为taskset,并最终提交到executor去运行的。

btw,最近工作太忙了,基本上到家洗漱完都要10点多。也再没有精力去进行源码解析了。幸运的是周末不用加班。因此以后的博文更新都要集中在周末了。加油。

继续阅读