天天看點

Celery-4.1 使用者指南: Optimizing

簡介

預設的配置做了很多折中考慮。它不是針對某個情況優化的,但是大多數情況下都工作的非常好。

基于一個特殊的使用場景,有很多優化可以做。

優化可以應用到運作環境的不同屬性,可以是任務執行的時間,使用的總記憶體數,或者是高負載時的響應時間。

Ensuring操作

Programming Pearl

這本書中,Jon Bentley 通過

一天有多少水從密西西比河流出?

這個問題提出了

back-of-the-envelope

的概念。

這個練習的重點是要說明一個系統能及時處理的資料量有一個極限。

Back of the envelope

計算能夠被用來預先做這個計劃。

在 Celery 中;如果一個任務需要10分鐘完成處理,并且有10個任務每分鐘進來一個,那麼隊列将永遠不會空。這就是為什麼監控隊列長度非常重要的原因!

有一種方案是使用

Munin

。你應該設定告警,使得一旦任意隊列達到不可接受的長度你會收到告警。此時,你可以采取合理的措施,添加新的工作節點或者取消不必要的任務。

通用設定

librabbitmq

如果你使用 RabbitMQ(AMQP) 作為消息中間件,那麼你可以安裝

librabbitmq

子產品這個 C 實作的優化過的用戶端。

$ pip install librabbitmq
           

如果

librabbitmq

子產品已經安裝,

amqp

傳輸将自動使用它,或者你也可以直接指定你想要的傳輸子產品,使用

pyamqp://

或者

librabbitmq://

字首。

消息中間件連接配接池

從2.5版本開始,消息中間件連接配接池被自動啟用。

你可調整

broker_pool_limit

設定來減少競争,并且這個值應該基于使用消息中間件的激活狀态的

thread/green-thread

數量。

使用臨時隊列

Celery 建立的隊列預設是持久化的。這意味着即使消息中間會将消息寫到硬碟使得即使重新開機任務也會被執行。

但是,一些情況下,消息丢失也沒關系,是以并非所有的任務都需要持久化。你可以為這類任務消息建立一個臨時隊列來提高性能:

from kombu import Exchange, Queue

task_queues = (
    Queue('celery', routing_key='celery'),
    Queue('transient', Exchange('transient', delivery_mode=),
          routing_key='transient', durable=False),
)
           

或者可以配置

task_routes

:

task_routes = {
    'proj.tasks.add': {'queue': 'celery', 'delivery_mode': 'transient'}
}
           

delivery_mode

修改發送到隊列的消息的遞送方式。

one

值代表消息不會寫到硬碟,而

two

值(預設)代表消息可被寫到硬碟。

将你的任務導向新的臨時隊列,你可以通過聲明

queue

參數(或者使用

task_routes設定

):

task.apply_async(args, queue='transient')
           

擷取更多的資訊,請檢視

routing guide

工作單元設定

Prefetch 限制

Prefetch

是一個繼承自 AMQP 的術語,它經常被使用者錯誤了解。

prefetch

限制是一個工作單元可以預留的任務的數量。如果他為0,工作單元将繼續消費消息,并不是說可能存儲其他可用節點能更快的處理任務,或者消息可能不适合保留在内容中。

工作單元的預設

prefetch

值是

worker_prefetch_multiplier

值乘以并行的數量(程序/線程/green-threads)。

如果你有許多長時間運作的任務,你可能會想将乘數值設定為1:意思是每個工作單元程序每次隻預留一個任務。

但是 - 如果你有許多短時間運作任務,并且吞吐量/往返延遲對你又很重要,這個值應該大。如果消息已經被預先擷取,且在記憶體中可用,工作單元每秒可以處理更多的任務。你可以通過實驗來找到針對你場景的最佳值。值 50 或者 150 可能在這些環境中有意義。

如果你既有長時間任務又有短時間任務,最佳的方式是使用兩個單獨配置的工作單元節點,并且根據運作時間路由任務到相應的隊列(檢視路由任務這一節)。

每次預留一個任務

任務消息隻有在被确認之後才會從隊列中删除,是以如果工作單元在确認任務消息之前崩潰了,任務消息會重新遞送到另一個工作單元(或者等目前工作單元恢複後又發送到這)

當使用預設的早确認機制,

prefetch

乘數設定為 1,意味着每個工作單元将為每個程序最多預留一個額外的任務消息:或者,換而言之,如果工作單元使用

-c 10

參數啟動,工作單元任意時刻最多預留20個任務(10個未确認的正在執行的任務,10個未确認的預留的任務)。

通常使用者問禁用

prefetching of tasks

是否可能,但是他們實際意思是一個工作單元隻預留工作單元程序數量的任務(對

-c 10

來說,10個未确認的任務)

這是能實作的,但是必須啟用延遲确認。使用這個選項而不是預設行為意味着已經開始的任務在由于電源失敗或者工作單元執行個體被意外殺死而失敗時會被重試,是以這也要求任務的幂等的。

(感覺這裡延遲确認和早确認說反了)

另見:

Should I use retry or acks_late?

你可以通過如下配置使能這個行為:

task_acks_late = True
worker_prefetch_multiplier = 
           

Prefork 池的 prefetch 設定

prefork

池會異步發送盡可能多的任務給工作程序,從效果上,這意味着程序在預先擷取任務。

這可以提高性能,不過它也意味着任務可能阻塞在等待長時間任務運作完成:

-> send task T1 to process A
# A executes T1
-> send task T2 to process B
# B executes T2
<- T2 complete sent by process B

-> send task T3 to process A
# A still executing T1, T3 stuck in local buffer and won't start until
# T1 returns, and other queued tasks won't be sent to idle processes
<- T1 complete sent by process A
# A executes T3
           

隻要管道緩沖可寫,工作單元将發送任務給工作程序。管道緩沖的大小與作業系統相關:一些可能隻有 64K 大小,但是在近期的一些Linux發行版中這個緩沖大小是1MB(隻能在系統層面上修改)。

你可以通過使用

-0fair

工作單元選項禁用這個預擷取行為:

使用這個選項,工作單元将隻給目前可用的程序發送任務,禁用預擷取行為:

-> send task T1 to process A
# A executes T1
-> send task T2 to process B
# B executes T2
<- T2 complete sent by process B

-> send T3 to process B
# B executes T3

<- T3 complete sent by process B
<- T1 complete sent by process A
           

Footnotes

[*] 可以免費讀這裡: The back of the envelope. 這本書很經典,建議閱讀。

[†] RabbitMQ 以及其他消息中間件輪詢的方式發送消息,是以這對于一個激活的系統沒有作用。如果沒有

prefetch

限制并且你想重新開機叢集,節點啟動之間可能會有延遲。如果有3個離線節點和一個線上節點,所有的消息都會被遞送到這個線上節點。

[‡] 這是一個并行設定;

worker_concurrency

設定或者 celery 工作單元的

-c

選項。