天天看點

Celery基本使用簡介安裝目錄結構基本使用broker啟動workerbackend監控重試定時任務周期任務:celery beat子任務

簡介

Celery 是一款非常簡單、靈活、可靠的分布式系統,可用于處理大量消息,Celery架構如下圖,由消息隊列、任務執行單元、結果存儲三部分組成。

Celery基本使用簡介安裝目錄結構基本使用broker啟動workerbackend監控重試定時任務周期任務:celery beat子任務

user:任務的生産者,可以是使用者(觸發任務)或者celery beat(産生周期任務)

broker:消息中間件,Redis或RabbitMQ,生産者産生的任務會先存放到broker

worker:任務執行單元,執行broker中的任務

store(backend):存儲任務結果

安裝

pip install -U Celery           

目錄結構

使用下面簡單的目錄結構做示範,正式項目中的使用一般為多目錄結構,不同的任務放到不同的task檔案。

proj
  - task.py
  - app.py
  - result.py           

基本使用

建立任務

# task.py

import celery

broker = 'redis://127.0.0.1:6379/1'  # redis://:[email protected]:6379/1
backend = 'redis://127.0.0.1:6379/2'
app = celery.Celery('test', backend=backend, broker=broker)

# app = celery.Celery('test')  # 效果同上
# app.conf.broker_url = broker
# app.conf.result_backend = backend

@app.task
def add(a, b):
    print(f'{a}+{b} = {a + b}')
    return a + b           

• 執行個體化Celery對象

• 定義函數

• Celery對象task方法作為裝飾器

如果要使用RabbitMQ作為消息中間件,隻需修改broker,無需關心如何操作Redis或RabbitMQ。

調用任務

# app.py

from task import add

result = add.delay(1, 2)
print(result.id)  # 2a3d96fe-c3e9-4846-8237-24fc28d9ad2b           

task裝飾器傳回一個celery Task對象,賦予了原函數Task的方法,delay方法用于調用異步任務,異步任務傳回celery.result.AsyncResult對象,在執行任務的時候最主要的就是擷取ID,以後可以用ID去查任務執行狀态、結果等。

任務狀态

# result.py

from celery.result import AsyncResult
from task import app

async_result = AsyncResult(
    id='2a3d96fe-c3e9-4846-8237-24fc28d9ad2b', app=app
)
print(async_result.status)
print(async_result.result)
print(async_result.get())           

• status:任務執行狀态(PENDING、STARTED、RETRY、FAILURE、SUCCESS)

• result:任務的傳回值或錯誤資訊

• get():同步的方式查結果

以上所有操作基于broker能連接配接上

執行add.delay後去查任務狀态,一直處于PENDING,因為worker沒啟動,任務就存放在broker中一直沒有被執行。

broker

broker選擇的是Redis的資料庫1,前面觸發的任務存儲在key為celery的list中。

Celery基本使用簡介安裝目錄結構基本使用broker啟動workerbackend監控重試定時任務周期任務:celery beat子任務

啟動worker

celery worker -A task -l info -P eventlet           
  • -A:指定Celery對象的位置
  • -l:日志級别
  • -P:預設使用prefork管理并發,windows不支援prefork

worker啟動後,可以看到部配置設定置資訊、隊列、任務,然後就會執行broker中堆積的任務,并将結果儲存到backend

- ** ---------- [config]
- ** ---------- .> app:         test:0x1ade6ea95f8
- ** ---------- .> transport:   redis://127.0.0.1:6379/1
- ** ---------- .> results:     redis://127.0.0.1:6379/2
- *** --- * --- .> concurrency: 4 (eventlet)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

[tasks]
  . task.add

...

[2020-08-11 01:01:02,616: INFO/MainProcess] Received task: task.add[2a3d96fe-c3e9-4846-8237-24fc28d9ad2b]
[2020-08-11 01:01:08,796: WARNING/MainProcess] 1+2 = 3
[2020-08-11 01:01:08,801: INFO/MainProcess] Task task.add[2a3d96fe-c3e9-4846-8237-24fc28d9ad2b] succeeded in 6.172000000002299: 3           

值得注意的是tasks,清單展示所有的celery任務,後面celery-beat還會用到。

[tasks]
  . task.add           

backend

到Redis檢視執行結果

Celery基本使用簡介安裝目錄結構基本使用broker啟動workerbackend監控重試定時任務周期任務:celery beat子任務

監控

監控使用flower,自帶可視化web界面,使用pip安裝

pip install -U flower           

啟動指令:

celery -A task flower (--port=5555) 
或
celery flower --broker=redis://127.0.0.1:6379/1           
Celery基本使用簡介安裝目錄結構基本使用broker啟動workerbackend監控重試定時任務周期任務:celery beat子任務
Celery基本使用簡介安裝目錄結構基本使用broker啟動workerbackend監控重試定時任務周期任務:celery beat子任務

重試

可以設定任務執行失敗後是否進行重試,對哪些錯誤進行重試,重試次數、時間間隔等。

# task

@app.task(
    autoretry_for=(Exception,),  # 指定錯誤碼,Exception表示對所有錯誤進行重試
    max_retries=2,  # 重試次數
    retry_backoff=4,  # 重試時間間隔,retry_jitter為True時,時間間隔為1-retry_backoff之間随機數
    # retry_jitter=False,  # 預設為True,retry_jitter=False時,第n次重試時間為上一次重試時間retry_backoff**n秒後
)
def send_msg(msg):
    return msg[5]           
# app

res1 = send_msg.delay('abcdef')
res2 = send_msg.delay('abc')           

res1正常執行,res2 IndexError重試

Celery基本使用簡介安裝目錄結構基本使用broker啟動workerbackend監控重試定時任務周期任務:celery beat子任務

定時任務

普通函數使用task裝飾器後被封裝成celery任務,可以z作為異步任務調用,也可以作為定時任務調用,具體看調用方式。

調用定時任務的兩種方式:

countdown

add.apply_async(args=(1, 2), countdown=3, expires=5)           

• args:函數的參數

• countdown:幾秒後執行

• expires:過期時間

eta

eta:datetime、utc時間,可以使用timedelta做時間運算,設定時間上更為靈活。

add.apply_async(
    args=(1, 2), 
    eta=datetime.datetime.utcnow() + datetime.timedelta(seconds=10),
    expires=20
)           

周期任務:celery beat

如果定義了beat_schedule,在啟動celery-beat後就會周期性的産生任務放到broker。

# task.py
app.conf.beat_schedule = {
    'test_cycle_task': {  # 任務name
        # 執行task下的add函數
        'task': 'task.add',  # 啟動worker時監控到的任務 -> [tasks]
        # 'schedule': 5.0,  # 幾秒執行一次
        'schedule': timedelta(seconds=6),  # 多久執行一次
        # 'schedule': crontab(hour=0, minute=55),  # 每天定時執行
        'args': (2, 3)  # 傳遞參數
    },
    'task2': {}
}           

celery-beat啟動指令:

celery beat -A task           

子任務

任務執行成功或失敗後執行一個回調函數。

task1.apply_async((1, 2), link=task2.s(3), link_error=task3.s())
task1執行成功,傳回值傳遞給task2并且作為task2的第一個參數
task1出錯,ID傳遞給task3并且作為task3的第一個參數           

執行:add.apply_async((1, 2), link=add.s(3))

[2020-08-11 01:42:18,592: INFO/MainProcess] Received task: task.add[a7be35fd-c932-46e1-970d-0c520caecc32]

[2020-08-11 01:42:18,598: WARNING/MainProcess] 1+2 = 3

[2020-08-11 01:42:18,609: INFO/MainProcess] Received task: task.add[2e72a107-e833-4da4-a532-90e6039cc32e]

[2020-08-11 01:42:18,614: INFO/MainProcess] Task task.add[a7be35fd-c932-46e1-970d-0c520caecc32] succeeded in 0.01499999

9999417923s: 3

[2020-08-11 01:42:18,618: WARNING/MainProcess] 3+3 = 6

[2020-08-11 01:42:18,625: INFO/MainProcess] Task task.add[2e72a107-e833-4da4-a532-90e6039cc32e] succeeded in 0.0s: 6