天天看点

[源码解析] 并行分布式任务队列 Celery 之 EventDispatcher & Event 组件

Celery是一个简单、灵活且可靠的,处理大量事件的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。本文讲解 EventDispatcher 和 Event 组件 如何实现。

目录

[源码解析] 并行分布式任务队列 Celery 之 EventDispatcher & Event 组件

0x00 摘要

0x01 思路

0x02 定义

0x03 Producer

3.1 Connection

3.2 Exchange

3.3 建立

0x04 分发事件

4.1 Send 发送

4.2 publish 与 broker 交互

0x05 Events 组件

5.1 Event 有什么用

5.2 调试

5.3 入口

5.4 事件循环

5.5 EventReceiver

5.6 ConsumerMixin5.6.1 Consumer

5.7 接收

5.8 处理

5.9 state处理函数

0xEE 个人信息

0xFF 参考

Celery是一个简单、灵活且可靠的,处理大量事件的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。

本文讲解 EventDispatcher 和 Event 组件 如何实现。

EventDispatcher 和 Event 组件负责 Celery 内部事件(Event)的处理。

从字面上可以知道,EventDispatcher 组件的功能是事件(Event)分发,所以我们可以有如下已知信息:

事件分发 势必有生产者,消费者,EventDispatcher 就是作为 事件生产者;

涉及到生产消费,那么需要有一个 broker 存储中间事件;

因为 Celery 底层依赖于 Kombu,而 Kombu 本身就有生产者,消费者概念,所以这里可以直接利用这两个概念;

Kombu 也提供了 Mailbox 的实现,它的作用就是通过 Mailbox 我们可以实现不同实例之间的事件发送和处理,具体可以是单播 和 广播;

所以我们可以大致推论:EventDispatcher 可以利用 kombu 的 producer, consumer 或者 Mailbox。

而 Events 是负责事件(Event)的接受,所以我们也可以推论:

Events 利用 Kombu 的消费者来处理 事件;

具体如何处理事件,则会依据 Celery 的当前状态决定,这就涉及到了 State 功能;

我们下面就看看具体是怎么实现的。

为了让大家更好理解,我们先给出一个逻辑图如下:

[源码解析] 并行分布式任务队列 Celery 之 EventDispatcher & Event 组件

EventDispatcher 代码位于:<code>celery\events\dispatcher.py</code>。

可以看到一个事件分发者需要拥有哪些成员变量以实现自己的功能:

connection (kombu.Connection) :就是用来和 Broker 交互的连接功能;

channel (kombu.Channel) : Channel 可以理解成共享一个Connection的多个轻量化连接。就是真正的连接。

Connection 是 AMQP 对 连接的封装;

Channel 是 AMQP 对 MQ 的操作的封装;

具体以 "针对redis的轻量化连接" 来说,Channel 可以认为是 redis 操作和连接的封装。每个 Channel 都可以与 redis 建立一个连接,在此连接之上对 redis 进行操作,每个连接都有一个 socket,每个 socket 都有一个 file,从这个 file 可以进行 poll。

producer :事件生产者,使用 kombu producer 概念;

exchange :生产者发布事件时,先将事件发送到Exchange,通过Exchange与队列的绑定规则将事件发送到队列。

hostname : 用来标示自己,这样 EventDispatcher 的使用者可以知道并且使用;

groups :事件组功能;

_outbound_buffer :事件缓存;

clock :Lamport 逻辑时钟,在分布式系统中用于区分事件的发生顺序的时间机制;

具体类的定义是:

我们先给出此时变量内容,大家可以先有所了解。

我们发现,EventDispatcher 确实使用了 Kombu 的 Producer,当然 Celery 这里使用 ampq 对 Kombu 做了封装。所以我们重点就需要看如何配置 Producer。

具体需要配置的是:

Connection,需要以此来知道联系哪一个 Redis;

Exchange,需要知道读取哪一个 Queue;

下面我们就逐一分析。

由代码可以看到,Connection 是直接使用 Celery 的 connection_for_write

此时变量为:

Exchange 概念如下:

Exchange:交换机 或者 路由。事件发送者将事件发至Exchange,Exchange负责将事件分发至队列;

Queue:事件队列,存储着即将被应用消费掉的事件,Exchange负责将事件分发Queue,消费者从Queue接收事件;

具体来说,Exchange 用于路由事件(事件发给exchange,exchange发给对应的queue)。

交换机通过匹配事件的 routing_key 和 binding_key来转发事件,binding_key 是consumer 声明队列时与交换机的绑定关系。

路由就是比较routing-key(这个 message 提供)和 binding-key(这个queue 注册到 exchange 的时候提供)。

使用时,需要指定exchange的名称和类型(direct,topic和fanout)。可以发现,和RabbitMQ中的exchange概念是一样的。事件发送给exchages。交换机可以被命名,可以通过路由算法进行配置。

具体回到代码上。

所以我们知道,这里默认的 Exchange 就是一个 <code>celeryev(fanout)</code> 类型。

于是,我们具体就看到了 Producer。

既然建立了 Producer,我们就可以进行发送。

发送事件就是直接是否需要成组发送。

如果需要分组发送,就内部有一个缓存,然后成组发送;

否则就直接调用 Producer publish API 发送。

关于如何区分分组是依靠如下代码:

相关变量为:

发送具体代码如下:

send 会调用到这里。

这里构建了 routing_key :

于是得倒了routing_key 为 'worker.online'。

也构建了 Event;

publish 代码如下:

因为是 pubsub,所以此时在 redis 之中看不到事件内容。

此时redis内容如下(看不到事件):

现在,EventDispatcher 组件已经把事件发送出去。

这个事件将如何处理?我们需要看看 Events 组件。

前面说了,Celery 在 Task/Worker 的状态发生变化的时候就会发出 Event,所以,一个很明显的应用就是监控 Event 的状态,例如 Celery 大家所熟知的基于 WebUI 的管理工具 flower 就用到了 Event,但是,这也是一个比较明显的应用,除此之外,我们还可以利用 Event 来给 Task 做快照,甚至实时对 Task 的状态转变做出响应,例如任务失败之后触发报警,任务成功之后执行被依赖的任务等等,总结一下,其实就是:

对 Task 的状态做快照;

对 Task 的状态做实时处理;

监控 Celery(Worker/Task) 的执行状态;

Celery Events 可以用来开启快照相机,或者将事件dump到标准输出。

比如:

为了调试,我们需要采用如下方式:

具体命令实现是:

Events入口为:

接着跟踪看看。

我们来到了事件循环。

这里建立了一个 app.events.Receiver。

注意,这里给 Receiver 传入的 handlers={'*': state.event},是后续处理事件时候的处理函数。

结果发现是循环调用 recv.capture()。

具体如下:

EventReceiver 就是用来接收Event,并且处理的。而且需要留意,EventReceiver 是继承 ConsumerMixin。

其代码如下:

对应变量如下:

可以看到利用了 ConsumerMixin 来处理事件。其实从文章开始时候我们就知道,既然有 kombu . producer ,就必然有 kombu . consumer。

这里其实是有多个 EventReceiver 绑定了这个 Connection,然后 <code>ConsumerMixin</code> 帮助协调这些 Receiver,每个 Receiver 都可以收到这些 Event,但是能不能处理就看他们的 <code>routing_key</code> 设置得好不好了。

所以如下:

ConsumerMixin 是 Kombu 提供的 组合模式类,可以用来方便的实现 Consumer Programs。

文件在 :kombu\mixins.py

ConsumerMixin 内部建立 Consumer如下:

在 具体建立时候,把self._receive设置为 Consumer callback。

堆栈为:

此时为:

当有事件时候,就调用 _receive 进行接收。

接受之后,就可以进行处理。

此时如下:

这里的 Receiver . handlers 是建立 Receiver时候 传入的 handlers={'*': state.event},是后续处理事件时候的处理函数。

概括起来是这样的:

先找 <code>group</code> 的 handler,有的话就用这个了,否则看下面;这个默认是没东西的,所以可以先pass

如果是 <code>worker</code> 的 Event,就执行 worker 对应的处理

如果是 <code>task</code> 的 Event,就执行 task 的对应处理

最终,逻辑如下:

手机如下:

[源码解析] 并行分布式任务队列 Celery 之 EventDispatcher &amp; Event 组件

至此,Celery 内部的事件发送,接受处理 的两个组件就讲解完毕。

★★★★★★关于生活和技术的思考★★★★★★

微信公众账号:罗西的思考

如果您想及时得到个人撰写文章的消息推送,或者想看看个人推荐的技术资料,可以扫描下面二维码(或者长按识别二维码)关注个人公众号)。

[源码解析] 并行分布式任务队列 Celery 之 EventDispatcher &amp; Event 组件

6: Events 的实现

Celery用户指引------监控与管理