Celery是一个简单、灵活且可靠的,处理大量事件的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。本文讲解 EventDispatcher 和 Event 组件 如何实现。
目录
[源码解析] 并行分布式任务队列 Celery 之 EventDispatcher & Event 组件
0x00 摘要
0x01 思路
0x02 定义
0x03 Producer
3.1 Connection
3.2 Exchange
3.3 建立
0x04 分发事件
4.1 Send 发送
4.2 publish 与 broker 交互
0x05 Events 组件
5.1 Event 有什么用
5.2 调试
5.3 入口
5.4 事件循环
5.5 EventReceiver
5.6 ConsumerMixin5.6.1 Consumer
5.7 接收
5.8 处理
5.9 state处理函数
0xEE 个人信息
0xFF 参考
Celery是一个简单、灵活且可靠的,处理大量事件的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。
本文讲解 EventDispatcher 和 Event 组件 如何实现。
EventDispatcher 和 Event 组件负责 Celery 内部事件(Event)的处理。
从字面上可以知道,EventDispatcher 组件的功能是事件(Event)分发,所以我们可以有如下已知信息:
事件分发 势必有生产者,消费者,EventDispatcher 就是作为 事件生产者;
涉及到生产消费,那么需要有一个 broker 存储中间事件;
因为 Celery 底层依赖于 Kombu,而 Kombu 本身就有生产者,消费者概念,所以这里可以直接利用这两个概念;
Kombu 也提供了 Mailbox 的实现,它的作用就是通过 Mailbox 我们可以实现不同实例之间的事件发送和处理,具体可以是单播 和 广播;
所以我们可以大致推论:EventDispatcher 可以利用 kombu 的 producer, consumer 或者 Mailbox。
而 Events 是负责事件(Event)的接受,所以我们也可以推论:
Events 利用 Kombu 的消费者来处理 事件;
具体如何处理事件,则会依据 Celery 的当前状态决定,这就涉及到了 State 功能;
我们下面就看看具体是怎么实现的。
为了让大家更好理解,我们先给出一个逻辑图如下:

EventDispatcher 代码位于:<code>celery\events\dispatcher.py</code>。
可以看到一个事件分发者需要拥有哪些成员变量以实现自己的功能:
connection (kombu.Connection) :就是用来和 Broker 交互的连接功能;
channel (kombu.Channel) : Channel 可以理解成共享一个Connection的多个轻量化连接。就是真正的连接。
Connection 是 AMQP 对 连接的封装;
Channel 是 AMQP 对 MQ 的操作的封装;
具体以 "针对redis的轻量化连接" 来说,Channel 可以认为是 redis 操作和连接的封装。每个 Channel 都可以与 redis 建立一个连接,在此连接之上对 redis 进行操作,每个连接都有一个 socket,每个 socket 都有一个 file,从这个 file 可以进行 poll。
producer :事件生产者,使用 kombu producer 概念;
exchange :生产者发布事件时,先将事件发送到Exchange,通过Exchange与队列的绑定规则将事件发送到队列。
hostname : 用来标示自己,这样 EventDispatcher 的使用者可以知道并且使用;
groups :事件组功能;
_outbound_buffer :事件缓存;
clock :Lamport 逻辑时钟,在分布式系统中用于区分事件的发生顺序的时间机制;
具体类的定义是:
我们先给出此时变量内容,大家可以先有所了解。
我们发现,EventDispatcher 确实使用了 Kombu 的 Producer,当然 Celery 这里使用 ampq 对 Kombu 做了封装。所以我们重点就需要看如何配置 Producer。
具体需要配置的是:
Connection,需要以此来知道联系哪一个 Redis;
Exchange,需要知道读取哪一个 Queue;
下面我们就逐一分析。
由代码可以看到,Connection 是直接使用 Celery 的 connection_for_write
此时变量为:
Exchange 概念如下:
Exchange:交换机 或者 路由。事件发送者将事件发至Exchange,Exchange负责将事件分发至队列;
Queue:事件队列,存储着即将被应用消费掉的事件,Exchange负责将事件分发Queue,消费者从Queue接收事件;
具体来说,Exchange 用于路由事件(事件发给exchange,exchange发给对应的queue)。
交换机通过匹配事件的 routing_key 和 binding_key来转发事件,binding_key 是consumer 声明队列时与交换机的绑定关系。
路由就是比较routing-key(这个 message 提供)和 binding-key(这个queue 注册到 exchange 的时候提供)。
使用时,需要指定exchange的名称和类型(direct,topic和fanout)。可以发现,和RabbitMQ中的exchange概念是一样的。事件发送给exchages。交换机可以被命名,可以通过路由算法进行配置。
具体回到代码上。
所以我们知道,这里默认的 Exchange 就是一个 <code>celeryev(fanout)</code> 类型。
于是,我们具体就看到了 Producer。
既然建立了 Producer,我们就可以进行发送。
发送事件就是直接是否需要成组发送。
如果需要分组发送,就内部有一个缓存,然后成组发送;
否则就直接调用 Producer publish API 发送。
关于如何区分分组是依靠如下代码:
相关变量为:
发送具体代码如下:
send 会调用到这里。
这里构建了 routing_key :
于是得倒了routing_key 为 'worker.online'。
也构建了 Event;
publish 代码如下:
因为是 pubsub,所以此时在 redis 之中看不到事件内容。
此时redis内容如下(看不到事件):
现在,EventDispatcher 组件已经把事件发送出去。
这个事件将如何处理?我们需要看看 Events 组件。
前面说了,Celery 在 Task/Worker 的状态发生变化的时候就会发出 Event,所以,一个很明显的应用就是监控 Event 的状态,例如 Celery 大家所熟知的基于 WebUI 的管理工具 flower 就用到了 Event,但是,这也是一个比较明显的应用,除此之外,我们还可以利用 Event 来给 Task 做快照,甚至实时对 Task 的状态转变做出响应,例如任务失败之后触发报警,任务成功之后执行被依赖的任务等等,总结一下,其实就是:
对 Task 的状态做快照;
对 Task 的状态做实时处理;
监控 Celery(Worker/Task) 的执行状态;
Celery Events 可以用来开启快照相机,或者将事件dump到标准输出。
比如:
为了调试,我们需要采用如下方式:
具体命令实现是:
Events入口为:
接着跟踪看看。
我们来到了事件循环。
这里建立了一个 app.events.Receiver。
注意,这里给 Receiver 传入的 handlers={'*': state.event},是后续处理事件时候的处理函数。
结果发现是循环调用 recv.capture()。
具体如下:
EventReceiver 就是用来接收Event,并且处理的。而且需要留意,EventReceiver 是继承 ConsumerMixin。
其代码如下:
对应变量如下:
可以看到利用了 ConsumerMixin 来处理事件。其实从文章开始时候我们就知道,既然有 kombu . producer ,就必然有 kombu . consumer。
这里其实是有多个 EventReceiver 绑定了这个 Connection,然后 <code>ConsumerMixin</code> 帮助协调这些 Receiver,每个 Receiver 都可以收到这些 Event,但是能不能处理就看他们的 <code>routing_key</code> 设置得好不好了。
所以如下:
ConsumerMixin 是 Kombu 提供的 组合模式类,可以用来方便的实现 Consumer Programs。
文件在 :kombu\mixins.py
ConsumerMixin 内部建立 Consumer如下:
在 具体建立时候,把self._receive设置为 Consumer callback。
堆栈为:
此时为:
当有事件时候,就调用 _receive 进行接收。
接受之后,就可以进行处理。
此时如下:
这里的 Receiver . handlers 是建立 Receiver时候 传入的 handlers={'*': state.event},是后续处理事件时候的处理函数。
概括起来是这样的:
先找 <code>group</code> 的 handler,有的话就用这个了,否则看下面;这个默认是没东西的,所以可以先pass
如果是 <code>worker</code> 的 Event,就执行 worker 对应的处理
如果是 <code>task</code> 的 Event,就执行 task 的对应处理
最终,逻辑如下:
手机如下:

至此,Celery 内部的事件发送,接受处理 的两个组件就讲解完毕。
★★★★★★关于生活和技术的思考★★★★★★
微信公众账号:罗西的思考
如果您想及时得到个人撰写文章的消息推送,或者想看看个人推荐的技术资料,可以扫描下面二维码(或者长按识别二维码)关注个人公众号)。
6: Events 的实现
Celery用户指引------监控与管理