本文尝试从源码层面梳理Spark在任务调度与资源分配上的做法。
先从Executor和SchedulerBackend说起。Executor是真正执行任务的进程,本身拥有若干cpu和内存,可以执行以线程为单位的计算任务,它是资源管理系统能够给予的最小单位。SchedulerBackend是spark提供的接口,定义了许多与Executor事件相关的处理,包括:新的executor注册进来的时候记录executor的信息,增加全局的资源量(核数),进行一次makeOffer;executor更新状态,若任务完成的话,回收core,进行一次makeOffer;其他停止executor、remove executor等事件。下面由makeOffer展开。
资源更新的情况下,通过调用scheduler的resourceOffers方法来触发它对现有的任务进行一次分配,最终launch新的tasks。这里的全局scheduler就是TaskScheduler,实现是TaskSchedulerImpl,它可以对接各种SchedulerBackend的实现,包括standalone的,yarn的,mesos的。SchedulerBackend在做makeOffer的时候,会把现有的executor资源以WorkerOfffer列表的方式传给scheduler,即以worker为单位,将worker信息及其内的资源交给scheduler。scheduler拿到这一些集群的资源后,去遍历已提交的tasks并根据locality决定如何launch tasks。
优先级排序,这个排序算法目前是两种:FIFO或FAIR。得到这一份待运行的tasks后,接下里就是要把schedulerBackend交过来的worker资源信息合理分配给这些tasks。分配前,为了避免每次都是前几个worker被分到tasks,所以先对WorkerOffer列表进行一次随机洗牌。接下来就是遍历tasks,看workers的资源
“够不够”,
“符不符合”task,ok的话task就被正式launch起来。注意,这里资源"够不够"是很好判断的,在TaskScheduler里设置了每个task启动需要的cpu个数,默认是1,所以只需要做核数的大小判断和减1操作就可以遍历分配下去。而"符不符合"这件事情,取决于每个tasks的
locality设置。
delay scheduling。
到这里,对于任务的分配,资源的使用大致有个了解。实际上,TaskScheduler的resourceOffer里还触发了TaskSetManager的resourceOffer方法,TaskSetManager的resourceOffer是会检查task的locality并最终调用DAGScheduler去launch这个task。这些类的名字以及他们彼此的调用关系,看起来是比较乱的。我简单梳理下。
剥离的。我们上面提到的TaskSetManager的resourceOffer方法,是task与底下资源的交互,这个资源交互的协调人是TaskScheduler,也是全局的,TaskScheduler对接的是不同的SchedulerBackend的实现(比如mesos,yarn,standalone),如此来对接不同的资源管理系统。同时,对资源管理系统来说,他们要负责的是进程,是worker上起几个进程,每个进程分配多少资源。所以这两层很清楚,
spark本身计算框架内管理线程级别的task,每个stage都有一个TaskSet,本身是个小DAG,可以丢到全局可用的资源池里跑;spark下半身的
双层资源管理部分掌控的是进程级别的executor,不关心task怎么摆放,也不关心task运行状态,这是TaskSetManager管理的事情,两者的协调者就是TaskScheduler及其内的SchedulerBackend实现。
SchedulerBackend的实现,除去local模式的不说,分为细粒度和粗粒度两种。细粒度只有Mesos(mesos有粗细两种粒度的使用方式)实现了,粗粒度的实现者有yarn,mesos,standalone。拿standalone模式来说粗粒度,每台物理机器是一个worker,worker一共可以使用多少cpu和内存,启动时候可以指定每个worker起几个executor,即进程,每个executor的cpu和内存是多少。在我看来,粗粒度与细粒度的主要区别,就是粗粒度是进程long-running的,计算线程可以调到executor上跑,但executor的cpu和内存更容易浪费。细粒度的话,可以存在复用,可以实现抢占等等更加苛刻但促进资源利用率的事情。这俩概念还是AMPLab论文里最先提出来并在Mesos里实现的。AMPLab在资源使用粒度甚至任务分配最优的这块领域有不少论文,包括Mesos的DRF算法、Sparrow调度器等。所以standalone模式下,根据RDD的partition数,以及每个task需要的cpu数,可以很容易计算每台物理机器的负载量、资源的消耗情况、甚至知道TaskSet要分几批才能跑完一个stage。