天天看點

Celery異步任務架構

一、Celery異步任務架構

Celery是一個異步任務架構,并且是一個簡單、靈活可靠的,處理大量消息的分布式系統

Celery服務為其他項目服務提供異步解決任務的需求,内置socket

Celery可執行的任務:執行異步任務,執行延遲任務,執行定時任務

Celery 官方文檔英文版:http://docs.celeryproject.org/en/latest/index.html

二、Celery架構

Celery是由三部分組成的,消息中間件(message broker)、任務執行單元(worker)、任務結果存儲(task result store)組成的。

Broker(任務中間件)————>Worker(任務執行者)————>Backend(任務結果倉庫)

消息中間件:Celery是不提供消息服務的,但是可以使用第三方來提供消息服務(提供任務),列如,Redis。

任務執行單元:Worker會自動(背景異步)執行消息中間件(broker)中的任務任務。

任務結果存儲:将Worker執行的結果存儲在backend中,可以使用Redis來存儲

三、Celery任務結構

Celery有兩種任務結構,基本結構、包架構封裝,但是我們提倡使用包架構封裝,因為結構更加清晰,例如:

project
    ├── celery_task  	# celery包
    │   ├── __init__.py # 封包件
    │   ├── celery.py   # celery連接配接和配置相關檔案,且名字必須叫celery.py
    │   └── tasks.py    # 所有任務函數
    ├── add_task.py  	# 添加任務
    └── get_result.py   # 擷取結果
           

包架構封裝

celery.py

from celery import Celery

broker = 'redis://127.0.0.1:6379/1'     # broker 任務隊列,任務放到這裡面
backend = 'redis://127.0.0.1:6379/2'    # backend 結果存儲,執行結果放在這裡面

app = Celery(__name__, broker=broker, backend=backend, include=['celery_task.tasks'])
           

tasks.py

from .celery import app

@app.task
def add(x, y):
    return x + y

@app.task
def multip(x, y):
    return x * y
           

異步任務執行:

add_task.py

把 tasks.py 中的任務函數添加到 broker 中

windows 首先需要安裝:pip install celery 和 pip install eventlet

需要啟動 celery, 在包項目下輸入以下指令:

C:\project> celery -A celery_task worker -P eventlet -l info

celery -A 包名 worker -P eventlet -l info

from celery_task.tasks import add

# 送出異步任務
ret = add.delay(5, 3)	# 把add函數任務添加到 broker中,worker在異步實時取出執行

print(ret)	# 0cc72e56-4604-4c00-bb3d-5b456f4869a7	擷取執行結果需要此ID
           

延遲任務執行:

還是需要先啟動celery

from celery_task.tasks import multip
# 送出延遲任務
from datetime import datetime, timedelta

# 需要UTC時間
eta = datetime.utcnow() + timedelta(seconds=10)		# 目前UTC時間往後加10秒
ret = multip.apply_async(args=(9, 9), eta=eta)		# 10 秒之後執行

print(ret)	# 3c8cfa57-05ff-4a26-b8fa-1f7f2d8051f2	擷取執行結果需要此ID
           

定時任務執行:

執行定時任務需要從新配置celery.py

from celery import Celery
from datetime import timedelta
from celery.schedules import crontab

broker = 'redis://127.0.0.1:6379/1'  # broker 任務隊列,任務放到這裡面
backend = 'redis://127.0.0.1:6379/2'  # backend 任務隊列,執行結果放在這裡面

app = Celery(__name__, broker=broker, backend=backend, include=['celery_task.tasks'])

# 時區
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False

app.conf.beat_schedule = {
    # add 任務
    'add-task': {
        'task': 'celery_task.tasks.add',
        'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八點執行一次
        'args': (300, 150),
    },
    # multip 任務
    'multip-task': {
        'task': 'celery_task.tasks.multip',
        'schedule': timedelta(seconds=3),           # 每三秒執行一次
        'args': (300, 150),
    }
}
           

啟動 worker 等待執行任務

celery -A celery_task beat -l info

celery -A 包名 beat -l info
           

啟動 beat 将任務添加 broker 中,讓worker執行

celery -A celery_task worker -P eventlet -l info

celery -A 包名 worker -P eventlet -l info
           

檢視任務執行結果:

get_result.py

from celery_task.celery import app
from celery.result import AsyncResult

id = '3fedc0d8-32c8-4b1a-af43-fedfac6107a2'

if __name__ == '__main__':
    asyncs = AsyncResult(id=id, app=app)

    if asyncs.successful():
        result = asyncs.get()
        print(result)	# 成功則取出backend中id對應的值
        
    elif asyncs.failed():
        print('任務失敗')
    elif asyncs.status == 'PENDING':
        print('任務等待中被執行')
    elif asyncs.status == 'RETRY':
        print('任務異常後正在重試,或id不存在')
    elif asyncs.status == 'STARTED':
        print('任務已經開始被執行')
           

基本結構

from celery import Celery
import time

# backend='redis://:[email protected]:6379/1'	# 有密碼123456

broker = 'redis://127.0.0.1:6379/1'  	# broker 任務隊列,任務放到這裡面
backend = 'redis://127.0.0.1:6379/2'  	# backend 任務隊列,執行結果放在這裡面

app = Celery(__name__, broker=broker, backend=backend)

@app.task
def add(x, y):
    return x + y
           
celery -A celery_app_task worker -P eventlet -l info
           
from celery_app_task import add

# 送出任務
ret = add.delay(5, 3)	# 往 broker 中添加一個任務
print(ret)
           

學習之旅

繼續閱讀