Celery是一個簡單、靈活且可靠的,處理大量消息的分布式系統,專注于實時處理的異步任務隊列,同時也支援任務排程。本系列将繼續通過源碼分析,和大家一起深入學習 Celery。本文是系列第一篇,借鑒了幾位網友的大作,按照自己的了解再重新整理,遂得此文。
目錄
[源碼解析] 并行分布式架構 Celery 之架構 (1)
0x00 摘要
0x01 Celery 簡介
1.1 什麼是 Celery
1.2 場景
1.3 特性
1.4 差別
0x02 Celery的架構
2.1 元件
2.2 任務流程
2.3 架構圖
0x03 Celery 設計推理
3.1 Celery 基本功能
3.2 Celery 輔助功能
3.3 如何劃分
0x04 對 AMQP / Kombu 的封裝
4.1 封裝
4.2 Queues
0x05 TBC
0xFF 參考
Celery是一個簡單、靈活且可靠的,處理大量消息的分布式系統,專注于實時處理的異步任務隊列,同時也支援任務排程。
前面我們用幾篇文章分析了 Kombu,為 Celery 的分析打下了基礎。
[源碼分析] 消息隊列 Kombu 之 mailbox
[源碼分析] 消息隊列 Kombu 之 Hub
[源碼分析] 消息隊列 Kombu 之 Consumer
[源碼分析] 消息隊列 Kombu 之 Producer
[源碼分析] 消息隊列 Kombu 之 啟動過程
[源碼解析] 消息隊列 Kombu 之 基本架構
本系列将繼續通過源碼分析,和大家一起深入學習 Celery。本文是系列第一篇,借鑒了幾位網友的大作,按照自己的了解再重新整理,遂得此文。
Celery是Python世界中最受歡迎的背景工作管理者之一。它是一個簡單、靈活且可靠的,處理大量消息的分布式系統,專注于實時處理的異步任務隊列,同時也支援任務排程。
利用多線程,如Eventlet,gevent等,Celery的任務能被并發地執行在單個或多個工作伺服器(worker servers)上。任務能異步執行(背景運作)或同步執行(等待任務完成)。Celery用于生産系統時候每天可以處理數以百萬計的任務。
Celery是用Python編寫的,但該協定可以在任何語言實作。它也可以與其他語言通過webhooks實作。
Celery建議的消息隊列是RabbitMQ,但也支援Redis, Beanstalk, MongoDB, CouchDB, 和資料庫(使用SQLAlchemy的或Django的 ORM) 。并且可以同時充當生産者和消費者。
使用Celery的常見場景如下:
Web應用。當使用者觸發的一個操作需要較長時間才能執行完成時,可以把它作為任務交給Celery去異步執行,執行完再傳回給使用者。這段時間使用者不需要等待,提高了網站的整體吞吐量和響應時間。
定時任務。生産環境經常會跑一些定時任務。假如你有上千台的伺服器、上千種任務,定時任務的管理很困難,Celery可以幫助我們快速在不同的機器設定不同種任務。
同步完成的附加工作都可以異步完成。比如發送短信/郵件、推送消息、清理/設定緩存等。
Celery提供了如下的特性:
友善地檢視定時任務的執行情況,比如執行是否成功、目前狀态、執行任務花費的時間等。
可以使用功能齊備的管理背景或者指令行添加、更新、删除任務。
友善把任務和配置管理相關聯。
可選多程序、Eventlet 和 Gevent 三種模式并發執行。
提供錯誤處理機制。
提供多種任務原語,友善實作任務分組、拆分和調用鍊。
支援多種消息代理和存儲後端。
消息隊列和任務隊列,最大的不同之處就在于理念的不同 -- 消息隊列傳遞的是“消息”,任務隊列傳遞的是“任務”。
消息隊列用來快速消費隊列中的消息。消息隊列更側重于消息的吞吐、處理,具有有處理海量資訊的能力。另外利用消息隊列的生長者和消費者的概念,也可以實作任務隊列的功能,但是還需要進行額外的開發。
任務隊列是用來執行一個耗時任務。任務隊列則提供了執行任務所需的功能,比如任務的重試,結果的傳回,任務狀态記錄等。雖然也有并發的處理能力,但一般不适用于高吞吐量快速消費的場景。
Celery 的基本邏輯為:分布式異步消息任務隊列。
在 Celery 中,采用的是分布式的管理方式,每個節點之間都是通過廣播/單點傳播進行通信,進而達到協同效果。實際上,隻有部分輔助管理功能才會協同,基礎業務功能反而沒有借助協同。
Celery包含如下元件:
Celery Beat:任務排程器,Beat程序會讀取配置檔案的内容,周期性地将配置中到期需要執行的任務發送給任務隊列。
Celery Worker:執行任務的消費者,通常會在多台伺服器運作多個消費者來提高執行效率。
Broker:消息代理,或者叫作消息中間件,接受任務生産者發送過來的任務消息,存進隊列再按序分發給任務消費方(通常是消息隊列或者資料庫)。
Producer:調用了Celery提供的API、函數或者裝飾器而産生任務并交給任務隊列處理的都是任務生産者。
Result Backend:任務處理完後儲存狀态資訊和結果,以供查詢。Celery預設已支援 Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy 等方式。
再了解一下:
系統可以有多個"消息隊列"(message Queue),不同的消息可以指定發送給不同的Message Queue。
上述功能是通過Exchange來實作的,發送消息到"消息隊列"中時,可以指定 routing_key,Exchange 通過routing_key 來把消息路由(routes)到不同的"消息隊列"中去(Celery的底層依賴Kombu,裡面涉及Exchange)。
exchange 對應 一個消息隊列(queue),即:通過 "消息路由" 的機制使exchange對應queue,每個queue對應每個worker。
Celery 通過消息機制進行通信,通常使用中間人(Broker)作為用戶端和職程(Worker)調節。啟動一個任務的流程是:
用戶端向消息隊列發送一條消息;
然後中間人(Broker)将消息傳遞給一個職程(Worker),支援RabbitMQ、Redis等作為Broker。;
最後由職程(Worker)進行執行中間人(Broker)配置設定的任務;
Celery的架構圖如下所示:
目前我們得到如下資訊:
Celery 的基本邏輯為:分布式異步消息任務隊列;
Celery底層依賴 Kombu,基于 Kombu 完成基本功能;
之前我們通過若幹文章,基本了解了 Kombu 的大緻邏輯;
下面我們就需要依據 Kombu來推論 Celery 應該如何設計。
首先,我們看看為了完成基本功能,Celery 應該具備哪些元件(子產品),我們會提出一些問題,這些問題将在後續的分析中陸續得到解答。
因為Celery 的基本邏輯為:分布式異步消息任務隊列,是以Celery包含如下基礎元件:
Producer:需要有一個元件完成如下功能 :把使用者定義的代碼打包整合成任務送出給任務隊列處理。問題就在于:
對于任務,也就是task如何處理?
task的本質是什麼?
task 應該包括哪些功能?
如果task是函數,如何把task函數傳遞給服務端?如果task函數内容很大怎麼辦?
如何把task相關資訊從用戶端傳遞到服務端?
Broker:為了解耦合,需要有一個中間元件來緩存消息。這就是 消息代理,或者叫作消息中間件。其作用是接受任務生産者發送過來的任務消息,存進隊列再按序分發給任務消費方(通常是消息隊列或者資料庫)。問題在于:
如何區分不同的消息來源,即如何路由?
是否有容錯機制?
Worker:需要有一個元件來執行任務,這就是 Worker:
Worker 需要從 broker 接受任務。這就需要一個consumer,問題就是:Consumer 如何從 broker 擷取消息。
接受任務之後,Worker 需要了解任務,知道怎麼執行任務,執行任務。是以有一個問題:Worker 怎麼知道 client 端的任務?
通常會在多台伺服器運作多個 worker 來提高執行效率。這就涉及到一個問題:多個 worker 之間如何協調?如何在多個 Worker 之間配置設定任務?
Result Backend:任務處理完後儲存狀态資訊和結果,以供查詢。Celery預設已支援Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy等方式。
以上為基礎功能,但是作為分布式異步消息任務隊列,我們還需要輔助功能(以及相關問題),比如。
用于執行定時任務的timer;
需要處理監控事件;
如何通過遠端指令管理;
worker 出現問題,如何處理;
如何提高并發?
如何封裝amqp?
如何進行消息循環引擎?
以上功能哪些屬于帶有分布式特點的?
進一步問題是:這些輔助功能是作為基礎功能子產品的一部分?還是獨立出來成為一個功能子產品?
這其實是一個哲學問題,每種實作都有其道理,或者說,很多決定其實就是作者靈光一現(臨時拍腦袋)的産物。
比如我們後面提到的 Consumer 元件,表面上看,就是一個從broker擷取消息的功能子產品,直接使用 kombu 的 consumer 就可以做到。
但是實際上,celery Consumer 元件的概念遠遠要大于Kombu的Consumer,不隻是利用了Kombu的Consumer從broker取得消息。也包括消息的消費,分發,監控,心跳等一系列功能。可以說,除了消息循環引擎 被 hub 承擔,多程序被 Pool,Autoscaler 承擔,定時任務被 timer,beat 承擔之外,其他主要功能都被 Consumer 承擔。
是以,我們需要看看:
哪些元件可以利用 Kombu直接完成,哪些需要Celery自己重新設計。
若重新設計,哪些可以基于Kombu設計,如何調用相應Kombu子產品。
若使用Kombu子產品作為Celery子產品的變量,這些Kombu子產品分别屬于哪些Celery子產品。
Celery如果想成為消息處理系統,首先需要解決消息協定和消息傳輸問題。
消息協定由 AMQP(Advanced Message Queuing Protocol:進階消息隊列協定)解決。Celery 支援所有AMQP路由機制,可以通過配置的方式,執行相關的消息路由。
消息實作和傳輸由 Kombu 解決。由之前對 Kombu 的分析我們知道,Kombu 的定位是一個相容 AMQP 協定的消息隊列抽象,是一個把消息傳遞封裝成統一接口的庫。
是以我們首先看看如何封裝 AMQP / Kombu。
具體封裝是在 celery/app/amqp.py 檔案中,其中主要有兩個類:AMQP 和 Queues。
AMQP類的功能是 發送/接受消息,是對amqp協定實作的再一次封裝,在這裡其實就是對 kombu 類的再一次封裝。
我們可以看到,其内部成員變量都是來自于 Kombu。比如 Connection, Consumer, Exchange, Producer, Queue, pools。
為了更好的了解,我們列印出amqp類的具體内容來看看。
具體邏輯如下:
Queues 則是一個擴充,一個邏輯概念,可以認為是 Broker 概念的進一步縮減版。
Producer 把任務發送給 Queues,Worker 從 Queues 擷取任務,進行消費。
app.amqp.queues 就是 Queues 的一個執行個體,在其中存儲了本 Worker 可以讀取的所有 kombu.Queue。
對于一個 Consumer,可以配置其 queue,一個 Consumer 可以有多個queue,比如:
add_consumer 名字個人認為有一定誤導,其實是添加 queue,但是名字看起來像添加 Consumer。
而在 Consumer 之中,會對 queues 進行具體配置。
通過以上的分析,大家應該對 Celery 的架構有了初步的了解。在下篇文章中,我們将從幾個方面做進一步思考,敬請期待。
Nginx資料之Master與Worker基礎概念
1: Worker 啟動流程概述
2: Worker 的執行引擎
3: Task 對象的實作
4: 定時任務的實作
5: 遠端控制管理
6: Events 的實作
7: Worker 之間的互動
8: State 和 Result
Spark分布式計算引擎的應用
mfc 消息消息隊列概念_消息隊列和任務隊列到底有什麼不同?