天天看點

[源碼解析] 并行分布式任務隊列 Celery 之 EventDispatcher & Event 元件

Celery是一個簡單、靈活且可靠的,處理大量事件的分布式系統,專注于實時處理的異步任務隊列,同時也支援任務排程。本文講解 EventDispatcher 和 Event 元件 如何實作。

目錄

[源碼解析] 并行分布式任務隊列 Celery 之 EventDispatcher & Event 元件

0x00 摘要

0x01 思路

0x02 定義

0x03 Producer

3.1 Connection

3.2 Exchange

3.3 建立

0x04 分發事件

4.1 Send 發送

4.2 publish 與 broker 互動

0x05 Events 元件

5.1 Event 有什麼用

5.2 調試

5.3 入口

5.4 事件循環

5.5 EventReceiver

5.6 ConsumerMixin5.6.1 Consumer

5.7 接收

5.8 處理

5.9 state處理函數

0xEE 個人資訊

0xFF 參考

Celery是一個簡單、靈活且可靠的,處理大量事件的分布式系統,專注于實時處理的異步任務隊列,同時也支援任務排程。

本文講解 EventDispatcher 和 Event 元件 如何實作。

EventDispatcher 和 Event 元件負責 Celery 内部事件(Event)的處理。

從字面上可以知道,EventDispatcher 元件的功能是事件(Event)分發,是以我們可以有如下已知資訊:

事件分發 勢必有生産者,消費者,EventDispatcher 就是作為 事件生産者;

涉及到生産消費,那麼需要有一個 broker 存儲中間事件;

因為 Celery 底層依賴于 Kombu,而 Kombu 本身就有生産者,消費者概念,是以這裡可以直接利用這兩個概念;

Kombu 也提供了 Mailbox 的實作,它的作用就是通過 Mailbox 我們可以實作不同執行個體之間的事件發送和處理,具體可以是單點傳播 和 廣播;

是以我們可以大緻推論:EventDispatcher 可以利用 kombu 的 producer, consumer 或者 Mailbox。

而 Events 是負責事件(Event)的接受,是以我們也可以推論:

Events 利用 Kombu 的消費者來處理 事件;

具體如何處理事件,則會依據 Celery 的目前狀态決定,這就涉及到了 State 功能;

我們下面就看看具體是怎麼實作的。

為了讓大家更好了解,我們先給出一個邏輯圖如下:

[源碼解析] 并行分布式任務隊列 Celery 之 EventDispatcher & Event 元件

EventDispatcher 代碼位于:<code>celery\events\dispatcher.py</code>。

可以看到一個事件分發者需要擁有哪些成員變量以實作自己的功能:

connection (kombu.Connection) :就是用來和 Broker 互動的連接配接功能;

channel (kombu.Channel) : Channel 可以了解成共享一個Connection的多個輕量化連接配接。就是真正的連接配接。

Connection 是 AMQP 對 連接配接的封裝;

Channel 是 AMQP 對 MQ 的操作的封裝;

具體以 "針對redis的輕量化連接配接" 來說,Channel 可以認為是 redis 操作和連接配接的封裝。每個 Channel 都可以與 redis 建立一個連接配接,在此連接配接之上對 redis 進行操作,每個連接配接都有一個 socket,每個 socket 都有一個 file,從這個 file 可以進行 poll。

producer :事件生産者,使用 kombu producer 概念;

exchange :生産者釋出事件時,先将事件發送到Exchange,通過Exchange與隊列的綁定規則将事件發送到隊列。

hostname : 用來标示自己,這樣 EventDispatcher 的使用者可以知道并且使用;

groups :事件組功能;

_outbound_buffer :事件緩存;

clock :Lamport 邏輯時鐘,在分布式系統中用于區分事件的發生順序的時間機制;

具體類的定義是:

我們先給出此時變量内容,大家可以先有所了解。

我們發現,EventDispatcher 确實使用了 Kombu 的 Producer,當然 Celery 這裡使用 ampq 對 Kombu 做了封裝。是以我們重點就需要看如何配置 Producer。

具體需要配置的是:

Connection,需要以此來知道聯系哪一個 Redis;

Exchange,需要知道讀取哪一個 Queue;

下面我們就逐一分析。

由代碼可以看到,Connection 是直接使用 Celery 的 connection_for_write

此時變量為:

Exchange 概念如下:

Exchange:交換機 或者 路由。事件發送者将事件發至Exchange,Exchange負責将事件分發至隊列;

Queue:事件隊列,存儲着即将被應用消費掉的事件,Exchange負責将事件分發Queue,消費者從Queue接收事件;

具體來說,Exchange 用于路由事件(事件發給exchange,exchange發給對應的queue)。

交換機通過比對事件的 routing_key 和 binding_key來轉發事件,binding_key 是consumer 聲明隊列時與交換機的綁定關系。

路由就是比較routing-key(這個 message 提供)和 binding-key(這個queue 注冊到 exchange 的時候提供)。

使用時,需要指定exchange的名稱和類型(direct,topic和fanout)。可以發現,和RabbitMQ中的exchange概念是一樣的。事件發送給exchages。交換機可以被命名,可以通過路由算法進行配置。

具體回到代碼上。

是以我們知道,這裡預設的 Exchange 就是一個 <code>celeryev(fanout)</code> 類型。

于是,我們具體就看到了 Producer。

既然建立了 Producer,我們就可以進行發送。

發送事件就是直接是否需要成組發送。

如果需要分組發送,就内部有一個緩存,然後成組發送;

否則就直接調用 Producer publish API 發送。

關于如何區分分組是依靠如下代碼:

相關變量為:

發送具體代碼如下:

send 會調用到這裡。

這裡建構了 routing_key :

于是得倒了routing_key 為 'worker.online'。

也建構了 Event;

publish 代碼如下:

因為是 pubsub,是以此時在 redis 之中看不到事件内容。

此時redis内容如下(看不到事件):

現在,EventDispatcher 元件已經把事件發送出去。

這個事件将如何處理?我們需要看看 Events 元件。

前面說了,Celery 在 Task/Worker 的狀态發生變化的時候就會發出 Event,是以,一個很明顯的應用就是監控 Event 的狀态,例如 Celery 大家所熟知的基于 WebUI 的管理工具 flower 就用到了 Event,但是,這也是一個比較明顯的應用,除此之外,我們還可以利用 Event 來給 Task 做快照,甚至實時對 Task 的狀态轉變做出響應,例如任務失敗之後觸發報警,任務成功之後執行被依賴的任務等等,總結一下,其實就是:

對 Task 的狀态做快照;

對 Task 的狀态做實時處理;

監控 Celery(Worker/Task) 的執行狀态;

Celery Events 可以用來開啟快照相機,或者将事件dump到标準輸出。

比如:

為了調試,我們需要采用如下方式:

具體指令實作是:

Events入口為:

接着跟蹤看看。

我們來到了事件循環。

這裡建立了一個 app.events.Receiver。

注意,這裡給 Receiver 傳入的 handlers={'*': state.event},是後續處理事件時候的處理函數。

結果發現是循環調用 recv.capture()。

具體如下:

EventReceiver 就是用來接收Event,并且處理的。而且需要留意,EventReceiver 是繼承 ConsumerMixin。

其代碼如下:

對應變量如下:

可以看到利用了 ConsumerMixin 來處理事件。其實從文章開始時候我們就知道,既然有 kombu . producer ,就必然有 kombu . consumer。

這裡其實是有多個 EventReceiver 綁定了這個 Connection,然後 <code>ConsumerMixin</code> 幫助協調這些 Receiver,每個 Receiver 都可以收到這些 Event,但是能不能處理就看他們的 <code>routing_key</code> 設定得好不好了。

是以如下:

ConsumerMixin 是 Kombu 提供的 組合模式類,可以用來友善的實作 Consumer Programs。

檔案在 :kombu\mixins.py

ConsumerMixin 内部建立 Consumer如下:

在 具體建立時候,把self._receive設定為 Consumer callback。

堆棧為:

此時為:

當有事件時候,就調用 _receive 進行接收。

接受之後,就可以進行處理。

此時如下:

這裡的 Receiver . handlers 是建立 Receiver時候 傳入的 handlers={'*': state.event},是後續處理事件時候的處理函數。

概括起來是這樣的:

先找 <code>group</code> 的 handler,有的話就用這個了,否則看下面;這個預設是沒東西的,是以可以先pass

如果是 <code>worker</code> 的 Event,就執行 worker 對應的處理

如果是 <code>task</code> 的 Event,就執行 task 的對應處理

最終,邏輯如下:

手機如下:

[源碼解析] 并行分布式任務隊列 Celery 之 EventDispatcher &amp; Event 元件

至此,Celery 内部的事件發送,接受處理 的兩個元件就講解完畢。

★★★★★★關于生活和技術的思考★★★★★★

微信公衆賬号:羅西的思考

如果您想及時得到個人撰寫文章的消息推送,或者想看看個人推薦的技術資料,可以掃描下面二維碼(或者長按識别二維碼)關注個人公衆号)。

[源碼解析] 并行分布式任務隊列 Celery 之 EventDispatcher &amp; Event 元件

6: Events 的實作

Celery使用者指引------監控與管理