Celery是一個簡單、靈活且可靠的,處理大量事件的分布式系統,專注于實時處理的異步任務隊列,同時也支援任務排程。本文講解 EventDispatcher 和 Event 元件 如何實作。
目錄
[源碼解析] 并行分布式任務隊列 Celery 之 EventDispatcher & Event 元件
0x00 摘要
0x01 思路
0x02 定義
0x03 Producer
3.1 Connection
3.2 Exchange
3.3 建立
0x04 分發事件
4.1 Send 發送
4.2 publish 與 broker 互動
0x05 Events 元件
5.1 Event 有什麼用
5.2 調試
5.3 入口
5.4 事件循環
5.5 EventReceiver
5.6 ConsumerMixin5.6.1 Consumer
5.7 接收
5.8 處理
5.9 state處理函數
0xEE 個人資訊
0xFF 參考
Celery是一個簡單、靈活且可靠的,處理大量事件的分布式系統,專注于實時處理的異步任務隊列,同時也支援任務排程。
本文講解 EventDispatcher 和 Event 元件 如何實作。
EventDispatcher 和 Event 元件負責 Celery 内部事件(Event)的處理。
從字面上可以知道,EventDispatcher 元件的功能是事件(Event)分發,是以我們可以有如下已知資訊:
事件分發 勢必有生産者,消費者,EventDispatcher 就是作為 事件生産者;
涉及到生産消費,那麼需要有一個 broker 存儲中間事件;
因為 Celery 底層依賴于 Kombu,而 Kombu 本身就有生産者,消費者概念,是以這裡可以直接利用這兩個概念;
Kombu 也提供了 Mailbox 的實作,它的作用就是通過 Mailbox 我們可以實作不同執行個體之間的事件發送和處理,具體可以是單點傳播 和 廣播;
是以我們可以大緻推論:EventDispatcher 可以利用 kombu 的 producer, consumer 或者 Mailbox。
而 Events 是負責事件(Event)的接受,是以我們也可以推論:
Events 利用 Kombu 的消費者來處理 事件;
具體如何處理事件,則會依據 Celery 的目前狀态決定,這就涉及到了 State 功能;
我們下面就看看具體是怎麼實作的。
為了讓大家更好了解,我們先給出一個邏輯圖如下:

EventDispatcher 代碼位于:<code>celery\events\dispatcher.py</code>。
可以看到一個事件分發者需要擁有哪些成員變量以實作自己的功能:
connection (kombu.Connection) :就是用來和 Broker 互動的連接配接功能;
channel (kombu.Channel) : Channel 可以了解成共享一個Connection的多個輕量化連接配接。就是真正的連接配接。
Connection 是 AMQP 對 連接配接的封裝;
Channel 是 AMQP 對 MQ 的操作的封裝;
具體以 "針對redis的輕量化連接配接" 來說,Channel 可以認為是 redis 操作和連接配接的封裝。每個 Channel 都可以與 redis 建立一個連接配接,在此連接配接之上對 redis 進行操作,每個連接配接都有一個 socket,每個 socket 都有一個 file,從這個 file 可以進行 poll。
producer :事件生産者,使用 kombu producer 概念;
exchange :生産者釋出事件時,先将事件發送到Exchange,通過Exchange與隊列的綁定規則将事件發送到隊列。
hostname : 用來标示自己,這樣 EventDispatcher 的使用者可以知道并且使用;
groups :事件組功能;
_outbound_buffer :事件緩存;
clock :Lamport 邏輯時鐘,在分布式系統中用于區分事件的發生順序的時間機制;
具體類的定義是:
我們先給出此時變量内容,大家可以先有所了解。
我們發現,EventDispatcher 确實使用了 Kombu 的 Producer,當然 Celery 這裡使用 ampq 對 Kombu 做了封裝。是以我們重點就需要看如何配置 Producer。
具體需要配置的是:
Connection,需要以此來知道聯系哪一個 Redis;
Exchange,需要知道讀取哪一個 Queue;
下面我們就逐一分析。
由代碼可以看到,Connection 是直接使用 Celery 的 connection_for_write
此時變量為:
Exchange 概念如下:
Exchange:交換機 或者 路由。事件發送者将事件發至Exchange,Exchange負責将事件分發至隊列;
Queue:事件隊列,存儲着即将被應用消費掉的事件,Exchange負責将事件分發Queue,消費者從Queue接收事件;
具體來說,Exchange 用于路由事件(事件發給exchange,exchange發給對應的queue)。
交換機通過比對事件的 routing_key 和 binding_key來轉發事件,binding_key 是consumer 聲明隊列時與交換機的綁定關系。
路由就是比較routing-key(這個 message 提供)和 binding-key(這個queue 注冊到 exchange 的時候提供)。
使用時,需要指定exchange的名稱和類型(direct,topic和fanout)。可以發現,和RabbitMQ中的exchange概念是一樣的。事件發送給exchages。交換機可以被命名,可以通過路由算法進行配置。
具體回到代碼上。
是以我們知道,這裡預設的 Exchange 就是一個 <code>celeryev(fanout)</code> 類型。
于是,我們具體就看到了 Producer。
既然建立了 Producer,我們就可以進行發送。
發送事件就是直接是否需要成組發送。
如果需要分組發送,就内部有一個緩存,然後成組發送;
否則就直接調用 Producer publish API 發送。
關于如何區分分組是依靠如下代碼:
相關變量為:
發送具體代碼如下:
send 會調用到這裡。
這裡建構了 routing_key :
于是得倒了routing_key 為 'worker.online'。
也建構了 Event;
publish 代碼如下:
因為是 pubsub,是以此時在 redis 之中看不到事件内容。
此時redis内容如下(看不到事件):
現在,EventDispatcher 元件已經把事件發送出去。
這個事件将如何處理?我們需要看看 Events 元件。
前面說了,Celery 在 Task/Worker 的狀态發生變化的時候就會發出 Event,是以,一個很明顯的應用就是監控 Event 的狀态,例如 Celery 大家所熟知的基于 WebUI 的管理工具 flower 就用到了 Event,但是,這也是一個比較明顯的應用,除此之外,我們還可以利用 Event 來給 Task 做快照,甚至實時對 Task 的狀态轉變做出響應,例如任務失敗之後觸發報警,任務成功之後執行被依賴的任務等等,總結一下,其實就是:
對 Task 的狀态做快照;
對 Task 的狀态做實時處理;
監控 Celery(Worker/Task) 的執行狀态;
Celery Events 可以用來開啟快照相機,或者将事件dump到标準輸出。
比如:
為了調試,我們需要采用如下方式:
具體指令實作是:
Events入口為:
接着跟蹤看看。
我們來到了事件循環。
這裡建立了一個 app.events.Receiver。
注意,這裡給 Receiver 傳入的 handlers={'*': state.event},是後續處理事件時候的處理函數。
結果發現是循環調用 recv.capture()。
具體如下:
EventReceiver 就是用來接收Event,并且處理的。而且需要留意,EventReceiver 是繼承 ConsumerMixin。
其代碼如下:
對應變量如下:
可以看到利用了 ConsumerMixin 來處理事件。其實從文章開始時候我們就知道,既然有 kombu . producer ,就必然有 kombu . consumer。
這裡其實是有多個 EventReceiver 綁定了這個 Connection,然後 <code>ConsumerMixin</code> 幫助協調這些 Receiver,每個 Receiver 都可以收到這些 Event,但是能不能處理就看他們的 <code>routing_key</code> 設定得好不好了。
是以如下:
ConsumerMixin 是 Kombu 提供的 組合模式類,可以用來友善的實作 Consumer Programs。
檔案在 :kombu\mixins.py
ConsumerMixin 内部建立 Consumer如下:
在 具體建立時候,把self._receive設定為 Consumer callback。
堆棧為:
此時為:
當有事件時候,就調用 _receive 進行接收。
接受之後,就可以進行處理。
此時如下:
這裡的 Receiver . handlers 是建立 Receiver時候 傳入的 handlers={'*': state.event},是後續處理事件時候的處理函數。
概括起來是這樣的:
先找 <code>group</code> 的 handler,有的話就用這個了,否則看下面;這個預設是沒東西的,是以可以先pass
如果是 <code>worker</code> 的 Event,就執行 worker 對應的處理
如果是 <code>task</code> 的 Event,就執行 task 的對應處理
最終,邏輯如下:
手機如下:

至此,Celery 内部的事件發送,接受處理 的兩個元件就講解完畢。
★★★★★★關于生活和技術的思考★★★★★★
微信公衆賬号:羅西的思考
如果您想及時得到個人撰寫文章的消息推送,或者想看看個人推薦的技術資料,可以掃描下面二維碼(或者長按識别二維碼)關注個人公衆号)。
6: Events 的實作
Celery使用者指引------監控與管理