Celery是一個簡單、靈活且可靠的,處理大量消息的分布式系統,專注于實時處理的異步任務隊列,同時也支援任務排程。本系列将通過源碼分析,和大家一起深入學習 Celery。本文是系列第二篇,繼續探究 Celery 架構。
目錄
[源碼解析] 并行分布式架構 Celery 之架構 (2)
0x00 摘要
0x01 上文回顧
0x02 worker的思考
2.1 worker的模式
2.1.1 Nginx模式
2.1.2 Celery 模式
2.1.2.1 模式
2.1.2.2 互動
2.2 worker 組成
2.2.1 task_pool
2.2.2 consumer
2.2.3 Scheduler
2.2.3.1 Beat
2.2.3.2 Timer
2.3 初始化過程
0x03 Consumer的思考
3.1 元件
3.2 作用
0x04 高性能的思考
4.1 多程序
4.2 事件驅動
4.3 Task的實作
4.3.1 分發代碼
4.3.2 Celery 模式
4.4 Prefetch
4.5 Celery函數
0x05 分布式的思考
5.1 負載均衡
5.2 failover 容災恢複
5.2.1 錯誤種類&失敗次元
5.2.2 處理方法
5.2.2.1 重試
5.2.2.2 自動重試
5.2.2.3 fallback
5.3 Worker之間的互動
0x06 總結
0xFF 參考
Celery是一個簡單、靈活且可靠的,處理大量消息的分布式系統,專注于實時處理的異步任務隊列,同時也支援任務排程。
本系列将通過源碼分析,和大家一起深入學習 Celery。本文是系列第二篇,繼續探究 Celery 架構。
前面我們用幾篇文章分析了 Kombu,為 Celery 的分析打下了基礎。
[源碼分析] 消息隊列 Kombu 之 mailbox
[源碼分析] 消息隊列 Kombu 之 Hub
[源碼分析] 消息隊列 Kombu 之 Consumer
[源碼分析] 消息隊列 Kombu 之 Producer
[源碼分析] 消息隊列 Kombu 之 啟動過程
[源碼解析] 消息隊列 Kombu 之 基本架構
上文 [源碼解析] 并行分布式架構 Celery 之架構 (1) 中,我們大緻介紹了 Celery 的概念,用途和架構,現在回憶 Celery 的架構圖如下:
下面我們從幾個方面繼續分析。
當啟動一個worker的時候,這個worker會與broker建立連結(tcp長連結),然後如果有資料傳輸,則會建立相應的channel, 這個連接配接可以有多個channel。然後,worker就會去borker的隊列裡面取相應的task來進行消費了,這也是典型的消費者生産者模式。
首先,我們思考下worker 的工作模式,即,這些并發的 worker 彼此之間是什麼關系?是否需要一個master 來統一調控?為了更好的對比,我們先看看nginx的實作。
nginx 背景程序包含一個master程序和多個worker程序。
master程序主要用來管理worker程序,包含:接收來自外界的信号,向各worker程序發送信号,監控worker程序的運作狀态,當worker程序退出後(異常情況下),會自動重新啟動新的worker程序。
worker程序則處理基本的網絡事件。多個worker程序之間是對等的,他們同等競争來自用戶端的請求,各程序互相之間是獨立的。一個請求 隻可能在一個worker程序中處理,一個worker程序,不可能處理其它程序的請求。
worker程序之間是平等的,每個程序處理請求的機會也是一樣的。一個連接配接請求過來,每個程序都有可能處理這個連接配接,怎麼做到的呢?
首先,每個worker程序都是從master程序fork過來,在master程序裡面,先建立好需要listen的socket(listenfd)之後,然後再fork出多個worker程序。
其次,所有worker程序的listenfd會在新連接配接到來時變得可讀,為保證隻有一個程序處理該連接配接,所有worker程序在注冊listenfd讀事件前搶accept_mutex,搶到互斥鎖的那個程序注冊listenfd讀事件,在讀事件裡調用accept接受該連接配接。
最後,當一個worker程序在accept這個連接配接之後,就開始讀取請求,解析請求,處理請求,産生資料後,再傳回給用戶端,最後才斷開連接配接,這樣一個完整的請求就是這樣的了。
我們可以看到,一個請求完全由worker程序來處理,而且隻在一個worker程序中處理。
與 Nginx不同,在 Celery 之中,沒有 master 程序。所有的都是worker 程序。大家都在 redis 之上等待新任務。
但是,每一個worker内部,父程序和子程序内部,卻又是 master - slave 模式,也就是我們常說的主從。
master(就是父程序)負責任務的擷取,分發,slaves 的管理(建立,增加,關閉,重新開機,丢棄等等),其他輔助子產品的維護等等。
slave(就是子程序)負責消費從排程器傳遞過來的任務。
worker内部 具體流程如下:
排程器首先預生成(prefork)一些工作程序,做為一個程序池(mutiprocessing-pool),之後通過事件驅動(select/poll/epoll)的方式,監聽核心的事件(讀、寫、異常等等)。
如果master監聽到就執行對應的回調,源源不斷的從 中間人(broker)那裡提取任務,并通過 管道(pipe)作為程序間通訊的方式,運用一系列的路由政策(round-robin、weight 等等)交給slave。
slave工作程序 消費(ack)任務,再通過管道向排程器進行狀态同步(sync),程序間通訊等等行為。
這個 workloop 其實很明顯,就是監聽讀管道的資料(主程序從這個管道的另一端寫),然後執行對應的回調,期間會調用 put 方法,往寫管道同步狀态(主程序可以從管道的另一端讀這個資料)。
具體如下圖:
在 Celery 中,采用的是分布式的管理方式,每個節點之間都是通過廣播/單點傳播進行通信,進而達到協同效果。
在處理具體控制管理工作時候,worker 程序之間有交流,具體分為兩種:
啟動時候使用 Mingle 子產品來互相交換資訊。
運作狀态下,通過 gossip 協定進行狀态的共享。但是這個狀态共享對于任務的配置設定和worker 的排程沒有必然的聯系,隻是用來監控和響應控制台消息。因為假如有若幹 worker,面對一個控制台消息,應該隻有一個 worker 來響應其消息,是以就利用 gossip 協定選舉出一個 leader,這個 leader 進行響應。
在處理具體業務工作時候,worker 之間沒有交流。
當啟動一個worker的時候,這個worker會與broker建立連結(tcp長連結),然後如果有資料傳輸,則會建立相應的channel,一個連接配接可以有多個channel。然後,worker就會去borker的隊列裡面取相應的task來進行消費了,這也是典型的消費者生産者模式。
以 redis 為例,底層 Kombu 事實上是使用 redis 的 BRPOP 功能來完成對具體 queue 中消息的讀取。
如果多個 worker 同時去使用 brpop 擷取 broker 消息,那麼具體哪一個能夠讀取到消息,其實這就是有一個 競争機制,因為redis 的單程序處理,是以隻能有一個 worker 才能讀到。
在 worker 文檔中提到:worker主要由四部分組成的:task_pool, consumer, scheduler, mediator。
這四部分依賴下面兩組資料結構工作。
就緒隊列:那些 立刻就需要運作的task, 這些task到達worker的時候會被放到這個就緒隊列中等待consumer執行。
ETA:是那些有ETA參數,或是rate_limit參數的 task。這些 task 被放入 timer 隊列中,timer 負責在條件合适的情況下,把這些 task 放入執行pool。
但是實際上,mediator 在代碼中沒有發現。也許是 mediator 成了預設功能而非元件。
task_pool主要是用來存放的是一些worker。當啟動了一個worker,并且提供并發參數的時候,會将一些worker放在這裡面。
celery預設的并發方式是prefork,也就是多程序的方式,這裡隻是celery對<code>multiprocessing.Pool</code>進行了輕量的改造,然後給了一個新的名字叫做prefork。
這個pool與多程序的程序池的差別就是這個task_pool隻是存放一些運作的worker。
consumer也就是消費者, 主要是從broker那裡接受一些message。然後将message轉化為<code>celery.worker.request.Request</code> 的一個執行個體。并且在适當的時候,會把這個請求包裝進Task中。
Task就是用裝飾器 <code>app_celery.task()</code> 裝飾的函數所生成的類,是以可以在自定義的任務函數中使用這個請求參數,擷取一些關鍵的資訊。
對于 Scheduler,可以從 Beat 和 Timer 兩個方面講述。
Celery Beat:任務排程器,Beat程序會讀取配置檔案的内容,周期性地将配置中到期需要執行的任務發送給任務隊列。
其中樞部分就是 Scheduler,Service 是驅動部分,最後的承載實體就是 SchedulerEntry。
其内部主要資料結構是一個最小堆,它的作用就是承載了所有我們設定得定時任務,而最小堆的特性就是堆頂的元素是最小的,排序的依據是時間內插補點。celery 會先計算每個定時任務下一次執行的時間戳 - 目前時間戳,然後根據這個時間內插補點進行排序,毫無疑問,內插補點最小的就是下一次需要執行的任務。
在 Service 的 start 函數中,會調用 scheduler.tick(),進而在内部最小堆中擷取下次一需要執行的任務。将 <code>SchedulerEntry</code> 轉換為 <code>Task</code>,發送到 redis 的隊列中。
具體定義如下:
持久化
在 Celery 中,定時任務的執行并不會因為我們重新開機了 Celery 而失效,反而在重新開機 Celery 之後,Celery 會根據上一次關閉之前的執行狀态,重新計算新的執行周期,而這裡計算的前提就是能夠擷取舊的執行資訊,而在 Scheduler 中,這些資訊都是預設儲存在檔案中的。
Celery 預設的存儲是通過 Python 預設的 shelve 庫實作的,shelve 是一個類似于字典對象的資料庫,我們可以通過調用 <code>sync</code> 指令在磁盤和記憶體中同步資料。
文檔中對于 Timer 的描述如下:
The timer schedules internal functions, like cleanup and internal monitoring, but also it schedules ETA tasks and rate limited tasks. If the scheduled tasks ETA has passed it is moved to the execution pool.
可以看到,其功能就是排程内部的函數,比如清理和監控,也排程ETA tasks and rate limited tasks。
對于清理,有比如 backend.process_cleanup 和 loader.on_process_cleanup。
worker初始化過程中,各個子產品的執行順序是由一個BluePrint類定義,并且根據各個子產品之間的依賴進行排序執行。
Worker 的 start 方法中,其實就是執行了一個 self.blueprint 的 start 方法,這裡面的 blueprint,是 celery 自己實作的一個 有向無環圖(DAG)的資料結構,其功能簡單描述下就是:根據指令行傳入的不同參數,初始化不同的元件(step),并執行這些元件的初始化方法。其實就是一個對流程控制的面向對象的封裝。
每個 Step 的具體的功能如下:
Timer:用于執行定時任務的 Timer;
Hub:Eventloop 的封裝對象;
Pool:構造各種執行池(線程/程序/協程);
Autoscaler:用于自動增長或者 pool 中工作單元;
StateDB:持久化 worker 重新開機區間的資料(隻是重新開機);
Autoreloader:用于自動加載修改過的代碼;
Beat:建立 Beat 程序,不過是以子程序的形式運作(不同于指令行中以 beat 參數運作);
Celery 使用 Consumer 來從 broker 擷取消息。
Consumer 的元件如下:
【1】Connection:管理和 broker 的 Connection 連接配接
【3】Events:用于發送監控事件
【2】Agent:<code>cell</code> actor
【2】Mingle:不同 worker 之間同步狀态用的
【1】Tasks:啟動消息 Consumer
【3】Gossip:消費來自其他 worker 的事件
【1】Heart:發送心跳事件(consumer 的心跳)
【3】Control:遠端指令管理服務
在參考文章 1: Worker 啟動流程概述 中提到:
這裡我對所有的 Bootstep 都做了标号處理,标号的大小說明了這些服務對于我們代碼閱讀的重要程式,1 最重要,3 最不緊要。對于 Consumer 來說, 1 是基本功能,這些功能組成了一個簡單的非強壯的消息隊列架構; 2 一般重要,可以實作一個進階一點的功能; 3 屬于附加功能,同時也屬于一點分布式的功能。
是以,我們可以看到,celery Consumer 元件的概念遠遠要大于Kombu的Consumer,不隻是從broker取得消息,也包括消息的消費,分發,監控,心跳等一系列功能。
可以說,除了消息循環引擎 被 hub 承擔,多程序被 Pool,Autoscaler 承擔,定時任務被 timer,beat 承擔之外,其他主要功能都被 Consumer 承擔。
celery 的高性能主要靠兩個方面來保證,一個是多程序,一個是事件驅動。此外在一些具體功能實作方面也確定了高性能的實作。
多程序可以良好的發揮每個核的計算能力。可以在一定程度上提升程式的并發能力,緩解 IO 的壓力。
Celery 的方案叫做 prefork,也就是預生成。預生成指的是,主程序在執行具體的業務邏輯之前,先提前 fork 出來一堆子程序,并把他們存起來集中管理,形成一個程序池。平常的時候這些子程序都是 休眠(asleep) 狀态,隻有當主程序派發任務的時候,會喚醒(awake)其中的一個子程序,并通過程序間通訊的手段,向子程序傳輸相應的任務資料。
如前所述,每一個worker内部,父程序和子程序内部,是 master - slave 模式。
排程器首先預生成(prefork)一些工作程序(slave),做為一個程序池(mutiprocessing-pool),之後通過事件驅動(select/poll/epoll)的方式,監聽核心的事件(讀、寫、異常等等)。
slave(就是子程序)負責消費從排程器傳遞過來的任務。再通過管道向排程器進行狀态同步(sync),程序間通訊等等行為。
Kombu内部使用了事件驅動。
Master 排程器是一個事件驅動模型,什麼是事件驅動,其實就是它消滅了阻塞。
正常的單線程模型,一次隻能拿一條消息,每一次都要走一條來和回的鍊路,并且需要一個 while True 的循環不斷的去檢測,這樣無疑是非常低效且開銷大的。
而事件驅動則不這樣,他可以同時發送多個檢測的信号,然後就直接挂起,等待核心進行提示,有提示再去執行對應的回調。這樣既優雅的化解了單線程每次都要檢測的 while True,又通過多次請求并發降低了重複鍊路。
以 epoll 為例:
epoll可以同時支援水準觸發和邊緣觸發(Edge Triggered,隻告訴程序哪些檔案描述符剛剛變為就緒狀态,它隻說一遍,如果我們沒有采取行動,那麼它将不會再次告知,這種方式稱為邊緣觸發),理論上邊緣觸發的性能要更高一些。
epoll同樣隻告知那些就緒的檔案描述符,而且當我們調用epoll_wait()獲得就緒檔案描述符時,傳回的不是實際的描述符,而是一個代表 就緒描述符數量的值,你隻需要去epoll指定的一個數組中依次取得相應數量的檔案描述符即可,這裡也使用了記憶體映射(mmap)技術,這樣便徹底省掉了 這些檔案描述符在系統調用時複制的開銷。
另一個本質的改進在于epoll采用基于事件的就緒通知方式。在select/poll中,程序隻有在調用一定的方法後,核心才對所有監視的檔案描 述符進行掃描,而epoll事先通過epoll_ctl()來注冊一個檔案描述符,一旦基于某個檔案描述符就緒時,核心會采用類似callback的回調 機制,迅速激活這個檔案描述符,當程序調用epoll_wait()時便得到通知。
Task 承載的功能就是在 Celery 應用中,啟動對應的消息消費者。
關于 Task 的實作,這就涉及問題:究竟是分發代碼還是分發資料?
因為 Celery 是一個通用性功能,不是特定面對大資料,是以分發資料是不可避免的。
剩下問題就是是否需要分發代碼?
Task 任務最基本的形式就是函數,任務釋出最直接的想法就是client将要執行的相關函數代碼打包,釋出到broker。分布式計算架構spark就是使用這種方式。
業界分發代碼的代表是 Spark。Spark的思想比較簡單:挪計算不挪資料。那怎麼去描述這個計算呢?Spark 通過RDD封裝一個針對資料對應關系記錄,在這個封裝之上來記錄計算。這就涉及到兩個最重要的問題:
如何拆分計算邏輯;
如何分發計算邏輯;
于是 Spark 把所有的計算邏輯劃分為這兩種類型:
能夠分發到各個節點上并行執行的;
需要經過一定量的結果合并之後才能繼續執行的;
然後把一個巨大的問題拆分成相對獨立的子問題分發到各個機器上求解。
在實際送出時候,Spark把計算代碼送出到每個工作節點上然後進行計算。
2.0之前的celery也支援這種任務釋出的方式。這種方式顯而易見的一個壞處是傳遞給broker的資料量可能會比較大。解決的辦法也很容易想到,就是把要釋出的任務相關的代碼,提前告訴worker。
這就是 全局集合 和 注解注冊的作用。
其中的app是worker中的application,通過裝飾器的方式,對任務函數注冊。
app會維護一個字典,key是任務的名字,也就是這裡的<code>hello_task</code>,value是這個函數的記憶體位址。任務名必須唯一,但是任務名這個參數不是必須的,如果沒有給這個參數,celery會自動根據包的路徑和函數名生成一個任務名。
通過上面這種方式,client釋出任務隻需要提供任務名以及相關參數,不必提供任務相關代碼:
這裡需要注意:client釋出任務後,任務會以一個消息的形式寫入broker隊列,帶有任務名稱等相關參數,等待worker擷取。這裡任務的釋出,是完全獨立于worker端的,即使worker沒有啟動,消息也會被寫入隊列。
這種方式也有顯而易見的壞處,所有要執行的任務代碼都需要提前在worker端注冊好,client端和worker端的耦合變強了。
目前 Kombu QoS 隻是支援 prefetch_count。
設定 prefetch_count 的目的是:
Prefetch指的是一個Celery Worker節點,能夠提前擷取一些還還未被其他節點執行的任務,這樣可以提高Worker節點的運作效率。
同時也可以通過設定Qos的prefetch count來控制consumer的流量,防止消費者從隊列中一下拉取所有消息,進而導緻擊穿服務,導緻服務崩潰或異常。
Kombu qos prefetch_count 是一個整數值N,表示的意思就是一個消費者最多隻能一次拉取N條消息,一旦N條消息沒有處理完,就不會從隊列中擷取新的消息,直到有消息被ack。
Kombu 中,會記錄 prefetch_count的值,同時記錄的還有該channel dirty (acked/rejected) 的消息個數。
Celery 還提供了一些工作流功能,其中某些功能可以讓我們提高性能。比如 Chunks 功能。
任務塊函數能夠讓你将需要處理的大量對象分為分成若幹個任務塊,如果你有一百萬個對象,那麼你可以建立 10 個任務塊,每個任務塊處理十萬個對象。有些人可能會擔心,分塊處理會導緻并行性能下降,實際上,由于避免了消息傳遞的開銷,是以反而會大大的提高性能。
我們從負載均衡,容災恢複,worke之間互動這三個角度來看看 Celery 如何實作分布式。
Celery 的負載均衡其實可以分為三個層次,而且是與 Kombu 高度耦合(本文 broker 以 Redis 為例)。
在 worker 決定 與 哪幾個 queue 互動,有一個負載均衡;
在 worker 決定與 broker 互動,使用 brpop 擷取消息時候有一個負載均衡;
在 worker 獲得 broker 消息之後,内部 具體 調用 task 時候,worker 内部進行多程序配置設定時候,有一個負載均衡。
另外,Celery 還有一個 AutoScaler 元件,其作用 實際就是線上調節程序池大小。這也和緩解負載相關。
其主要邏輯大緻如下圖所示(後續文章中會有詳細講解):
Celery 之中,錯誤主要有3種:
使用者代碼錯誤:錯誤可以直接傳回應用,因為Celery無法知道如何處理;
Broker錯誤:Celery可以根據負載平衡政策嘗試下一個節點;
網絡逾時錯誤:Celery可以重試該請求;
從系統角度出發,幾個最可能的失敗次元如下:
Broker失敗;
Worker ---> Broker 這個鍊路會失敗;
Worker 節點會失敗;
Worker 中的多程序中,某一個程序本身失效;
Worker 的某一個程序中,内部處理任務失敗;
從實際處理看,broker可以使用 RabbitMQ,可以做 叢集和故障轉移;但這是涉及到整體系統設計的次元,是以本系列不做分析。
依據錯誤級别,錯誤處理 分别有 重試 與 fallback選擇 兩種。
我們以 Worker ---> Broker 次元為例來進行分析。此次元上主要關心的是:
Broker 某一個節點失效;
worker 與 Broker 之間網絡失效;
在這個次元上,無論是 Celery 還是 Kombu 都做了努力,但是從根本來說,還是 Kombu 的努力。
在 Celery 中,對于重試,有 broker_connection_max_retries 配置,就是最大重試次數。
當出現網絡故障時候,Celery 會根據 broker_connection_max_retries 配置來進行重試。
在 Komub 中,同樣做了 各種 重試 處理,比如 在 Connection.py 中有如下重試參數:
max_retries:最大重試次數;
errback (Callable):失敗回調政策;
callback (Callable):每次重試間隔的回調函數;
自動重試是 kombu 的另外一種重試途徑,比如在 kombu\connection.py 就有 autoretry,其基本套路是:
在調用fun時候,可以使用 autoretry 這個mapper 做包裝。并且可以傳入上次調用成功的 channel。
如果調用fun過程中失敗,kombu 會自動進行try。
如果重試不解決問題,則會使用 fallback。比如 broker_failover_strategy 是 Celery 針對 broker Connection 來設定的政策。會自動映射到 <code>kombu.connection.failover_strategies</code>。
Kombu 在配置 Connection的時候,可以設定多個 broker url,在連接配接 broker 的時候,kombu 自動會選取最健康的 broker 節點進行連接配接。
前面提到,在處理具體控制管理工作時候,在運作狀态下,worker 程序之間通過 gossip 協定進行狀态的共享。
但是這個狀态共享對于任務的配置設定和worker 的排程沒有必然的聯系,隻是用來監控和響應控制台消息。因為假如有若幹 worker,面對一個控制台消息,應該隻有一個 worker 來響應其消息,是以就利用 gossip 協定選舉出一個 leader,這個 leader 進行響應。
Gossip 協定跟其他協定一樣,也有一些不可避免的缺陷,主要是兩個:
1)消息的延遲
由于 Gossip 協定中,節點隻會随機向少數幾個節點發送消息,消息最終是通過多個輪次的散播而到達全網的,是以使用 Gossip 協定會造成不可避免的消息延遲。不适合用在對實時性要求較高的場景下。
2)消息備援
Gossip 協定規定,節點會定期随機選擇周圍節點發送消息,而收到消息的節點也會重複該步驟,是以就不可避免的存在消息重複發送給同一節點的情況,造成了消息的備援,同時也增加了收到消息的節點的處理壓力。而且,由于是定期發送,是以,即使收到了消息的節點還會反複收到重複消息,加重了消息的備援。
為什麼用 gossip?可能因為是用 gossip 來處理管理功能,就是在 workers 之中選出一個 leader 來響應控制台的消息。這樣就不需要對消息即時性有要求。
通過以上的分析,大家應該對 Celery 的架構有了初步的了解。從下文開始,我們逐一分析 Celery 的幾個方面,敬請期待。
Nginx資料之Master與Worker基礎概念
1: Worker 啟動流程概述
2: Worker 的執行引擎
3: Task 對象的實作
4: 定時任務的實作
5: 遠端控制管理
6: Events 的實作
7: Worker 之間的互動
8: State 和 Result
Spark分布式計算引擎的應用