一、Celery異步任務架構
Celery是一個異步任務架構,并且是一個簡單、靈活可靠的,處理大量消息的分布式系統
Celery服務為其他項目服務提供異步解決任務的需求,内置socket
Celery可執行的任務:執行異步任務,執行延遲任務,執行定時任務
Celery 官方文檔英文版:http://docs.celeryproject.org/en/latest/index.html
二、Celery架構
Celery是由三部分組成的,消息中間件(message broker)、任務執行單元(worker)、任務結果存儲(task result store)組成的。
Broker(任務中間件)————>Worker(任務執行者)————>Backend(任務結果倉庫)
消息中間件:Celery是不提供消息服務的,但是可以使用第三方來提供消息服務(提供任務),列如,Redis。
任務執行單元:Worker會自動(背景異步)執行消息中間件(broker)中的任務任務。
任務結果存儲:将Worker執行的結果存儲在backend中,可以使用Redis來存儲
三、Celery任務結構
Celery有兩種任務結構,基本結構、包架構封裝,但是我們提倡使用包架構封裝,因為結構更加清晰,例如:
project
├── celery_task # celery包
│ ├── __init__.py # 封包件
│ ├── celery.py # celery連接配接和配置相關檔案,且名字必須叫celery.py
│ └── tasks.py # 所有任務函數
├── add_task.py # 添加任務
└── get_result.py # 擷取結果
包架構封裝
celery.py
from celery import Celery
broker = 'redis://127.0.0.1:6379/1' # broker 任務隊列,任務放到這裡面
backend = 'redis://127.0.0.1:6379/2' # backend 結果存儲,執行結果放在這裡面
app = Celery(__name__, broker=broker, backend=backend, include=['celery_task.tasks'])
tasks.py
from .celery import app
@app.task
def add(x, y):
return x + y
@app.task
def multip(x, y):
return x * y
異步任務執行:
add_task.py
把 tasks.py 中的任務函數添加到 broker 中
windows 首先需要安裝:pip install celery 和 pip install eventlet
需要啟動 celery, 在包項目下輸入以下指令:
C:\project> celery -A celery_task worker -P eventlet -l info
celery -A 包名 worker -P eventlet -l info
from celery_task.tasks import add
# 送出異步任務
ret = add.delay(5, 3) # 把add函數任務添加到 broker中,worker在異步實時取出執行
print(ret) # 0cc72e56-4604-4c00-bb3d-5b456f4869a7 擷取執行結果需要此ID
延遲任務執行:
還是需要先啟動celery
from celery_task.tasks import multip
# 送出延遲任務
from datetime import datetime, timedelta
# 需要UTC時間
eta = datetime.utcnow() + timedelta(seconds=10) # 目前UTC時間往後加10秒
ret = multip.apply_async(args=(9, 9), eta=eta) # 10 秒之後執行
print(ret) # 3c8cfa57-05ff-4a26-b8fa-1f7f2d8051f2 擷取執行結果需要此ID
定時任務執行:
執行定時任務需要從新配置celery.py
from celery import Celery
from datetime import timedelta
from celery.schedules import crontab
broker = 'redis://127.0.0.1:6379/1' # broker 任務隊列,任務放到這裡面
backend = 'redis://127.0.0.1:6379/2' # backend 任務隊列,執行結果放在這裡面
app = Celery(__name__, broker=broker, backend=backend, include=['celery_task.tasks'])
# 時區
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
app.conf.beat_schedule = {
# add 任務
'add-task': {
'task': 'celery_task.tasks.add',
'schedule': crontab(hour=8, day_of_week=1), # 每周一早八點執行一次
'args': (300, 150),
},
# multip 任務
'multip-task': {
'task': 'celery_task.tasks.multip',
'schedule': timedelta(seconds=3), # 每三秒執行一次
'args': (300, 150),
}
}
啟動 worker 等待執行任務
celery -A celery_task beat -l info
celery -A 包名 beat -l info
啟動 beat 将任務添加 broker 中,讓worker執行
celery -A celery_task worker -P eventlet -l info
celery -A 包名 worker -P eventlet -l info
檢視任務執行結果:
get_result.py
from celery_task.celery import app
from celery.result import AsyncResult
id = '3fedc0d8-32c8-4b1a-af43-fedfac6107a2'
if __name__ == '__main__':
asyncs = AsyncResult(id=id, app=app)
if asyncs.successful():
result = asyncs.get()
print(result) # 成功則取出backend中id對應的值
elif asyncs.failed():
print('任務失敗')
elif asyncs.status == 'PENDING':
print('任務等待中被執行')
elif asyncs.status == 'RETRY':
print('任務異常後正在重試,或id不存在')
elif asyncs.status == 'STARTED':
print('任務已經開始被執行')
基本結構
from celery import Celery
import time
# backend='redis://:[email protected]:6379/1' # 有密碼123456
broker = 'redis://127.0.0.1:6379/1' # broker 任務隊列,任務放到這裡面
backend = 'redis://127.0.0.1:6379/2' # backend 任務隊列,執行結果放在這裡面
app = Celery(__name__, broker=broker, backend=backend)
@app.task
def add(x, y):
return x + y
celery -A celery_app_task worker -P eventlet -l info
from celery_app_task import add
# 送出任務
ret = add.delay(5, 3) # 往 broker 中添加一個任務
print(ret)
學習之旅