Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。本文介绍 Celery 的负载均衡机制。
目录
[源码解析] 并行分布式任务队列 Celery 之 负载均衡
0x00 摘要
0x01 负载均衡
1.1 哪几个 queue
1.1.1 _brpop_start 选择下次读取的queue
1.1.2 round_robin_cycle 设置下次读取的 queue
1.2 哪一个worker
1.3 哪一个进程
1.3.1 策略
1.3.2 公平调度
1.3.3 公平调度 in Celery
0x02 Autoscaler
2.1 调用时机
2.2 具体实现
2.2.1 bgThread
2.2.2 定义
0xEE 个人信息
0xFF 参考
Autoscaler 的作用 实际就是在线调节进程池大小。这也和缓解负载相关,所以放在这里一起论述。
Celery 的负载均衡其实可以分为三个层次,而且是与 Kombu 高度耦合(本文 broker 以 Redis 为例)。
在 worker 决定 与 哪几个 queue 交互,有一个负载均衡(对于 queues );
在 worker 决定与 broker 交互,使用 brpop 获取消息时候有一个负载均衡(决定哪一个 worker 来处理任务);
在 worker 获得 broker 消息之后,内部 具体 调用 task 时候,worker 内部进行多进程分配时候,有一个负载均衡(决定 worker 内部哪几个进程)。
注意,这个顺序是从 worker 读取任务处理任务的角度 出发,而不是从系统架构角度出发。
因为从系统架构角度说,应该是 <code>which worker ----> which queue in the worker ----> which subprocess in the worker</code> 这个角度。
我们下面按照 "worker 读取任务处理任务角度" 的顺序进行分析。
Kombu 事实上是使用 redis 的 BRPOP 功能来完成对具体 queue 中消息的读取。
Kombu 是循环调用,每次调用会制定读取哪些内部queues的消息;
queue 这个逻辑概念,其实就是对应了 redis 中的一个 物理key,从 queue 读取,就代表 BRPOP 需要指定 监听的 key。
Kombu 是在每一次监听时候,根据这些 queues 得到 其在 redis 之中对应的物理keys,即都指定监听哪些 redis keys;
brpop是个多key命令,当给定多个 key 参数时,按参数 key 的先后顺序依次检查各个列表,弹出第一个非空列表的头元素。这样就得到了这些 逻辑queue 对应的消息。
因为 task 可能会 用到多个 queue,所以具体从哪几个queue 读取?这时候就用到了策略。
Kombu 在每次监听时候,调用 _brpop_start 完成监听。其作用就是 选择下一次读取的queues。
_brpop_start 如下:
此时变量如下:
所以<code>_brpop_start</code> 就是从 self._queue_cycle 获得几个需要读取的queue。
具体如下图:
从上面代码中,我们可以知道 consume 就是返回 round_robin_cycle 中前几个 queue,即 return self.items[:n]。
而 self.items 的维护,是通过 rotate 完成的,就是把 最近用的 那个 queue 放到队列最后,这样给其他 queue 机会,就是 round robin 的概念了。
比如在如下代码中,当读取到消息之后,就会调用 <code>self._queue_cycle.rotate(dest)</code> 进行调整。
如果多个 worker 同时去使用 brpop 获取 broker 消息,那么具体哪一个能够读取到消息,其实这就是有一个 竞争机制,因为redis 的单进程处理,所以只能有一个 worker 才能读到。
这本身就是一个负载均衡。这个和 spring quartz 的负载均衡实现非常类似。
spring quartz 是 多个节点读取 同一个数据库记录决定谁能开始下一次处理,哪一个得到了数据库锁 就是哪个。
Kombu 是通过 多个 worker 读取 redis "同一个或者一组key" 的 实际结果 来决定 "哪一个 worker 能开始下一次处理"。
进程池中,使用了策略来决定具体使用哪一个进程来处理任务。
先讲解 strategy。在 AsynPool 启动有如下,配置了策略:
于是我们看看 strategy 定义如下,基本由名字可以知道其策略意义:
我们讲讲公平调度的概念。
不同系统对于公平调度的理解大同小异,我们举几个例子看看。
Linux 中,调度器必须在各个进程之间尽可能公平地共享CPU时间,而同时又要考虑不同的任务优先级。一般原理是:按所需分配的计算能力,向系统中每个进程提供最大的公正性,或者从另外一个角度上说, 试图确保没有进程被亏待。
Hadoop 中,公平调度是一种赋予作业(job)资源的方法,它的目的是让所有的作业随着时间的推移,都能平均的获取等同的共享资源。当单独一个作业在运行时,它将使用整个集群。当有其它作业被提交上来时,系统会将任务(task)空闲时间片(slot)赋给这些新的作业,以使得每一个作业都大概获取到等量的CPU时间。
Yarn 之中,Fair Share指的都是Yarn根据每个队列的权重、最大,最小可运行资源计算的得到的可以分配给这个队列的最大可用资源。
在 asynpool之中,有设置,看看"是否为 fair 调度":
基于 is_fair_strategy 这个变量,Celery 的公平调度有几处体现。
在开始 poll 时候,如果是 fair,则需要 存在 idle worker 才调度,这样就给了 idler worker 一个调度机会。
在具体发布 写操作 时候,也会看看是否 worker 已经正在忙于执行某一个 task,如果正在执行,就不调度,这样就给了其他 不忙worker 一个调度的机会。
具体逻辑如下:
在 WorkerComponent 中可以看到,为 AutoScaler 注册了两个调用途径:
注册在 consumer 消息响应方法中,这样消费时候如果有需要,就会调整;
利用 Hub 的 call_repeatedly 方法注册了周期任务,即周期看看是否需要调整。
这样就会最大程度的加大调用频率。
Autoscaler 是Background thread,这样 AutoScaler就可以在后台运行:
Autoscaler 的定义如下,可以看到其逻辑就是定期判断是否需要调整:
如果当前并发已经到了最大,则下调;
如果到了最小并发,则上调;
则具体上调下调的,都是通过具体线程池函数做到的,这就是要根据具体操作系统来进行分析,此处略过。
★★★★★★关于生活和技术的思考★★★★★★
微信公众账号:罗西的思考
如果您想及时得到个人撰写文章的消息推送,或者想看看个人推荐的技术资料,敬请关注。

Hadoop公平调度器指南
浅析Linux中完全公平调度——CFS
yarn公平调度详细分析(一)