天天看点

大众点评调度系统Kepler的设计与实现

.

5.2.1.1调度系统Kepler整体设计

该企业调度系统的整体实现思路:

  1. 用SpringQuartz来负责定时任务这一块,用定时任务来周期性更新实例和容器的状态,即用定时任务做调度系统的引擎。
  2. 用状态管理器管理实例,资源管理器控制资源,容器管理器管理容器。它们都用单例模式实现,并持有实例,容器,资源在内存中的队列。
  3. 在内存和数据库同时保存实例信息,内存中采用多级状态队列的形式。在内存中有数据是为了调度的速度和及时性,在数据库中存副本是为了容灾恢复现场。
  4. 执行机上容器的管理用远程DockerClient API进行。定时任务也会使用这些API中,来定期更新内存容器队列。
  5. Docker容器和实例关系的映射存储到数据库中来建立两者的联系。
  6. 针对不同任务,制作不同Docker镜像,把镜像放在本地仓库中供执行机拉取。

5.2.1.2调度系统Kepler具体设计与实现

首先分析Init定时任务和Ready定时任务。下面(图5.1)是两者的流程设计。

大众点评调度系统Kepler的设计与实现

图ETL数据传输平台设计与实现.19 Init和Ready定时任务流程设计

在Quartz的使用方式上[20],我们把Quartz和Spring集成在一起。在Spring配置文件中配置InitExecutor以及ReadyExecutor的jobDetail(就是定时任务运行的类和方法)和Trigger(就是Cron表达式)。在容器启动过程中,Quartz会在配置文件中查找配置为jobDetail的类,找到它对应的Trigger,新建定时任务并按Cron表达式对应的时间触发[21]。这样spring就会帮我们把这两个定时任务每十分钟调度起来。

InitExecutor的定时方法是doExecutor()方法,它会调用InitInstances()方法,首先通过TaskService类获取所有生效任务,再调用InitInstance(Datebegin,Dateend)逐一检测在这个时间段内触发的任务,并生成这些任务的实例。如果这些实例的任务存在工作流上的前驱,则设置实例的前驱关系。然后调用状态管理器(StatusManager)的createInitInstance(InstanceDOinstanceDO)方法,把实例状态为Init的任务加入状态管理器的InitQueue中,并用InstanceService类的saveInstance(InstanceDOinstance)方法插入数据库的实例表中。

ReadyExecutor配置的定时方法同样是doExecutor()方法,这是因为他们都继承自同一个抽象类BaseExecutor,实现了它的抽象方法doExecutor(),在Spring中进行了统一配置。doExecutor()方法中又进一步调用了InitReady()方法,首先遍历InitQueue中的所有实例,判断实例的前驱实例是否准备好,如果准备好,再判断当前实例的并发数是否超过允许值(一般实例不允许并发所以并发数默认为1),如果这两项都满足,则把实例先暂存到一个List局部变量中。剩下则是用StatusManager的acquireResource(InstanceDOinstanceDO)方法先尝试获取任务的一些公共资源,如hive,Mysql的并发数,如果有一项获取不到,则放弃分配(这样做是为了防止死锁),否则一次性获取所有资源,相应的,在StatusManager中增加资源的用量。如果这一切都没问题的话,就把实例的状态改为Ready,用InstanceService的updateInstance(InstanceDOinstanceDO)更新数据库中任务状态,并把这些实例从StatusManager的InitQueue移动到ReadyQueue。

上面有涉及到两个数据库服务,InstanceService和TaskService,这分别是任务类的数据库服务和实例类的数据库服务,在接下来的介绍中,我们还会涉及。这两个服务并不是调度系统本身提供的服务,而是通过服务调用中间件集成到本地spring中的远程RPC调用。

其次,我们介绍比较复杂的Running进程(时序图为图5.2)。定时任务方法execute()方法调用doExecute()方法,后者调用ContainerExecuter类的doExecuteHeartBeat()方法从Zookeeper上获取所有存活的执行机机器,然后更新ContainerExecuter类中的存活的主机列表。然后调用ContainerService的getContainerByHost(Stringhost)方法获取此主机上所有的容器。然后调用DockerClient的inspect命令检查Docker容器的状态,然后用InstanceService的updateInstance(InstanceDOinstanceDO)方法更新容器对应的任务实例的状态,如果此容器是运行状态,则把它加入到RunningContainer列表。如果不在运行,则检查运行结果是成功还是失败。并相应更新RunningContainer列表。最后还要还要调用DockerClient的remove命令删除掉这个容器。同时还要更新对应的实例中的数据库状态,此外还要把运行完成或异常退出的实例从StatusManager的RunningQueue中去掉。这样,执行机状态和容器状态就得以实时更新。

大众点评调度系统Kepler的设计与实现

图ETL数据传输平台设计与实现.20 Running定时任务流程设计一

接下来讨论RunningExecutor对容器的管理(见图5.3)。同样是继承自父类的execute()方法调用RunningExecutor里的doExecute()方法,在这个方法里会具体调用getReadyInstances()方法,返回StatusManager里的readyQueue,这是所有状态为Ready的实例。然后遍历所有实例,为每个实例调用acquireContainerForTask(InstanceDOinstanceDO)方法分配执行机,具体的分配的流程是,若某执行机没有运行的容器,则优先分配到此执行机上,否则,选择还有槽位(对应一定的cpu、内存和带宽资源,就是一个容器所需标准资源)且运行容器最少的机器作为目标执行机。然后再遍历执行机上所有用于Docker容器的端口,去掉其中被占用的端口选择序号最小的那一个端口。最后把这些参数都包

装起来成ContainerDO返回。然后调用ContainerExecutor的runInstance(…)方法,

后者调用Dockerservice(这个类是为了管理Docker容器的持久化工作)的runInstance(…)方法,这个方法中会调用ImageService方法查询任务对应的image(这个在建立任务的时候就已经插入了相关信息)。查询完之后,调用DockerInstanceService(这个是封装了DockerClientAPI本地服务)的createContainer(…)的方法在执行host上创建容器,然后再调用startContainer(…)

大众点评调度系统Kepler的设计与实现

图ETL数据传输平台设计与实现.21 Running定时任务流程设计二

方法启动容器,同时还要调用ContainerService的saveContainer()方法将容器保存

在数据库中,同时还要把这个已启动的container加入内存的RunningContainers队列中。这样之后,返回到RunningExecutor中,再调用StatusManager的createRunningInstance(InstanceDOinstanceDO)方法,用InstanceService更新实例在数据库中的状态,并且把实例加入内存中的RunningQueue中,这样调度系统的整个架构系统就已经介绍完毕。

下面介绍调度系统的三个管理类:StatusManager(状态管理器),ResourceManager(资源管理器),ContainerScheduler(容器调度器)。

大众点评调度系统Kepler的设计与实现

图ETL数据传输平台设计与实现.22 StatusMananger类的设计

StatusManager管理调度系统所有的实例,所有内存中的实例都是数据库中数据的副本(如图5.4)。StatusManager成员变量如下:

  1. ContainnerService类实例containnerService。这个成员变量主要是把容器相关信息,特别是运行的容器和实例的映射关系。
  2. InstanceService类实例instanceService。这个成员变量主要是负责实例信息数据持久化。
  3. ResourceManager类实例resourceManager。这个成员变量是调度系统的资源管理器的单例实例,这个类下文会详细介绍,主要负责内部共享资源和外部共享资源的管理。
  4. Map<String,instanceDO>类的实例initQueue。这个成员变量主要是负责存储到达触发时间的任务被调度系统初始化的实例。Map类的键值就是instanceDO实例的id。
  5. Map<String,instanceDO>类的实例calcReadyQueue。这个成员变量主要是存储状态为Ready的计算任务实例,各个状态队列之间经常出现实例的转移。Map类的键值就是instanceDO实例的id。
  6. Map<String,instanceDO>类的实例transferReadyQueue。这个成员变量主要是存储状态为Ready的传输任务实例,之所以区分计算任务实例和传输任务实例主要是因为两者各运行在专用的两组执行机上,内部运行机制也有不同,对应的Docker容器的image也有所不同。所以申请资源等操作有所不同,必须区别对待。
  7. Map<String,instanceDO>类的实例runningQueue。这个成员变量主要是存储状态为Running的计算任务实例。与上面相同,Map类的键值就是instanceDO实例的id。两种不同类型的readyQueue实例会转移到同一个runningQueue中。
  8. Map<String,instanceDO>类的实例timeoutQueue。这个成员变量主要是存储状态为Timeout的计算任务实例。与上面相同,Map类的键值就是instanceDO实例的ID。实例之所以会出现在Timeout队列中是因为它的运行时间超过了它对应的任务设定的超时时间从而从Running队列中移出的实例,运行时间大大超过正常时间的实例往往是一种异常现象,需要任务的开发人员或ETL平台的管理员查看日志进行排错。

StatusMananger的成员变量已经讲述完毕。下面深入分析StatusManger的成员方法。

  1. loadInstancesInfo():void方法。该方法前有@PostConstruct注释,这说明本方法是statusManger对象的生命周期方法,在statusManager被容器创建后,将调用此方法,主要是为了恢复现场,即从数据库中读取状态为Init,Ready,Running和Timeout的实例,分别加入到各状态对应的队列中。这方法一般是在调度系统新的版本上线后,系统重启中,利用数据库中的数据快速构建内存中的四级队列。
  2. addReadyQueue(InstanceDOinstanceDO):void方法。该方法是把参数中的实例从InitQueue中移出并根据实例对应的任务类型插入到计算任务Ready队列或者传输任务Ready队列中。本方法是个工具方法,主要是内部调用,内部调用它的方法是loadInstancesInfo方法和createReadyInstance方法。
  3. calcResourceUsage(InstanceDOinstanceDO):void方法。本方法主要是把当前实例依赖的资源加入到ResourceManager的资源池中进行管控。本方法也是一个内部工具方法,调用时机是loadInstancesInfo方法重建内存实例状态四级队列时。
  4. isInstanceInited(StringinstanceId):boolean方法。本方法功能比较简单,就是判断指定的instanceId的实例是否在InitQueue中。本方法也是一个内部工具方法,调用它的方法是createInitInstance方法。
  5. createReadyInstance(InstanceDOinstanceDO):boolean方法。此方法就是调用addReadyQueue方法完成实例在Init队列和Ready队列之间的移动并更新数据库中实例的状态。在调用addReadyQueue方法之前,必须检查这个实例是否在已经在ReadyQueue中了,在则返回false,无需往下进行。不在才调用addReadyQueue方法。
  6. createRunningInstance(InstanceDOinstanceDO):boolean方法。此方法主要是调用addRunningQueue方法完成实例在readyQueue和runningQueue之间的移动。在此之前判断此实例是否已在ReadyQueue中以免重复移动,最后持久化实例的状态为Running。
  7. addRunningQueue(InstanceDOinstanceDO): void方法。此方法功能在上一个方法中已经介绍。
  8. createInitInstance(InstanceDOinstanceDO):void方法。此方法把实例加入Init队列,并把新建实例的插入数据库中。

ResourceManger是调度系统的内部和外部资源管理器,这里所有资源都以Slot为单位。对于不同资源来说,一个Slot代表的意义并不同。对于一台执行机来说,一个Slot可能代表2g内存,2个Vcpu,100Mbps的带宽。对于Hive公共资源来说,一个Slot代表可以并发访问的一个并发数。内部资源就是任务执行机资源,外部资源就是Hive,Mysql等公共共享资源。

ResourceManager类(如图5.5)比较简单,它的成员变量有:

  1. KeplerLionInfo类的keplerLionInfo实例。该成员变量是为了持有存在一个叫Lion的,搭建在Zookeeper集群之上的配置中心的各项有关调度系统的配置。显而易见,它存储一定信息,在static代码中加载Lion中关于Kepler调度系统的配置。
大众点评调度系统Kepler的设计与实现

图ETL数据传输平台设计与实现.23 ResourceMananger类的设计

  1. ConcurrentHashMap<String,Integer>类的currentResourceUsage实例。这个是用来存储各种内外部资源的已使用的Slot个数的,每种资源的最大资源使用量放在Lion资源配置中心。这个Map类的键是资源的名字,而值是资源的Slot个数。而之所以用ConcurrentHashMap是因为资源种类较多,可以用此类的分段锁来提高并发性能。

下面介绍ResourceManager的成员函数。

  1. addResourceUsage(int,String):boolean方法。这个方法的用途是添加指定资源名字的Slot数到currentResourceUsage中。
  2. acquireResource(InstanceDOinstanceDO):boolean方法。这个方法主要内容是调用tryAcquireResource和doAcquireResource来获取资源,前者是为了避免死锁查看所有资源是否能够一次性获取到,后者是真正的获取资源。
  3. tryAcquireResource(InstanceDOinstanceDO):boolean方法。这个方法是检查是否一次性获取实例的资源会死锁。首先获取实例依赖的资源列表。然后逐一检查当前已经使用的资源数加上欲分配的资源数是否超过资源的最大限度。如果超过,则返回false,提示可能造成死锁。
  4. doAcquireResource(InstanceDOinstanceDO):boolean方法。这个方法与tryAcquireResource方法有类似的流程,但不同的是,此方法会真正的分配资源,并调用addResourceUsage方法把资源加入到concurrentUsageMap中。
  5. releaseResource(InstanceDOinstanceDO):boolean方法。这方法和前面的acquireResource方法相反。是逐个释放实例所拥有的资源,即减少concurrentUsageMap该资源的用量。
大众点评调度系统Kepler的设计与实现

图ETL数据传输平台设计与实现.24 ContainerScheduler类的设计

ContainerScheduler是执行机上Docker容器的管理类(如图5.6),里面很多方法都可远程改变容器状态。是Docker容器和实例的桥梁。ContainerScheduler的成员变量有:

  1. ConcurrentHashMap<String,Set<ContainerDO>>类实例runningContainers。这个成员变量的键值是执行机的host,值是这个执行机host上运行的容器集合。
  2. Set<String>类实例aliveTransferHost。这个成员变量就是存储存活的传输任务执行机的集合。
  3. Set<String>类实例aliveCalcHost。这个成员变量就是存储存活的计算任务执行机的集合。
  4. Set<String>类实例deadHost。这个成员变量就是存储下线或宕机的执行机。
  5. ConcurrentHashMap<String,Set<ContainerDO>>类的实例heartbeats。这个成员变量就是存储定时心跳获取的各个执行机的心跳情况,比如上一次正常心跳的时间等,调度系统通过距上一次正常心跳的时间差来判断此执行机是否仍然正常执行。
  6. KeplerLionConfs类的实例keplerLionConfs。正如上面介绍,这个实例只是保存存在Lion配置中心的配置。
  7. DockerInstanceService类的实例dockerInstanceService。这个实例是封装Docker远程API。封装的命令包括create、start、stop、remove,inspect等。
  8. ContainerService类的实例containerService。这个是用来保存Docker容器相关信息到数据库中的持久化操作。最主要是存储容器和实例的映射。
  9. InstanceService类的实例instanceService。这个成员变量主要是管理实例持久化操作。

ContainerSheduler类的成员方法很多,这里我们只介绍主要的几个方法:

  1. LoadContainerInfo():void方法。此方法主要是为了恢复内存中的runningContainers队列,这里是从数据库中载入数据,具体容器的状态还需要调用checkInstanceStatus方法来检查容器状态。
  2. checkContainerStatus(Containercontainer):void方法。此方法是为了检查每一个容器的状态,方法的核心是Docker的inspect命令(通过DockerInstanceService封装的方法)去检查ContainerId对应的容器状态。
  3. executeContainerHeartbeat():void方法。此方法另外启动一个线程去zookeeper集群里获取所有执行机存活的信息,并把获取的每一个执行机的心跳信息保存在HeartBeatDO中,更新aliveTransferHost、aliveCalcHost和deadHost队列,维护内存中存活机器的列表。
  4. acquireContainerForTask(InstanceDOinstanceDO):Container方法。此方法主要是为实例的DockerContainer选择执行机,选择的逻辑的是遍历所有的执行机,获取指定执行机上的所有运行的Container列表,如果该Host上没有Container运行,则优先选择此Host。否则把所有还有空的槽位(一个槽位是一个容器运行所需资源)的执行机加入一个队列中,选择运行Container最少的机器。此外,还要调用getContainerPort方法来获取执行机上一个未占用的端口用来映射Docker容器的SSH服务。最后把所有的容器信息封装成一个ContainerDO类返回。
  5. runInstance(ContainerDOcontainerDO,InstanceDOinstanceDO)方法,此方法调用containerService的同名方法,最后是调用DockerInstanceService的createContainer和startContainer方法(这两个方法封装的就是DockerClient的create和start方法)。当然在创建之前要用imageService的queryImage方法来获取此实例对应的容器镜像。
  6. getContainerPort(Set<Container>containers)方法。此方法是为了要启动的容器获取一个要映射到主机的端口,这就要检查该主机上所有容器已经映射的端口,再从容器专用端口中把它们去掉,选择其中一个端口最小的返回。