天天看点

Celery基本使用简介安装目录结构基本使用broker启动workerbackend监控重试定时任务周期任务:celery beat子任务

简介

Celery 是一款非常简单、灵活、可靠的分布式系统,可用于处理大量消息,Celery架构如下图,由消息队列、任务执行单元、结果存储三部分组成。

Celery基本使用简介安装目录结构基本使用broker启动workerbackend监控重试定时任务周期任务:celery beat子任务

user:任务的生产者,可以是用户(触发任务)或者celery beat(产生周期任务)

broker:消息中间件,Redis或RabbitMQ,生产者产生的任务会先存放到broker

worker:任务执行单元,执行broker中的任务

store(backend):存储任务结果

安装

pip install -U Celery           

目录结构

使用下面简单的目录结构做演示,正式项目中的使用一般为多目录结构,不同的任务放到不同的task文件。

proj
  - task.py
  - app.py
  - result.py           

基本使用

创建任务

# task.py

import celery

broker = 'redis://127.0.0.1:6379/1'  # redis://:[email protected]:6379/1
backend = 'redis://127.0.0.1:6379/2'
app = celery.Celery('test', backend=backend, broker=broker)

# app = celery.Celery('test')  # 效果同上
# app.conf.broker_url = broker
# app.conf.result_backend = backend

@app.task
def add(a, b):
    print(f'{a}+{b} = {a + b}')
    return a + b           

• 实例化Celery对象

• 定义函数

• Celery对象task方法作为装饰器

如果要使用RabbitMQ作为消息中间件,只需修改broker,无需关心如何操作Redis或RabbitMQ。

调用任务

# app.py

from task import add

result = add.delay(1, 2)
print(result.id)  # 2a3d96fe-c3e9-4846-8237-24fc28d9ad2b           

task装饰器返回一个celery Task对象,赋予了原函数Task的方法,delay方法用于调用异步任务,异步任务返回celery.result.AsyncResult对象,在执行任务的时候最主要的就是获取ID,以后可以用ID去查任务执行状态、结果等。

任务状态

# result.py

from celery.result import AsyncResult
from task import app

async_result = AsyncResult(
    id='2a3d96fe-c3e9-4846-8237-24fc28d9ad2b', app=app
)
print(async_result.status)
print(async_result.result)
print(async_result.get())           

• status:任务执行状态(PENDING、STARTED、RETRY、FAILURE、SUCCESS)

• result:任务的返回值或错误信息

• get():同步的方式查结果

以上所有操作基于broker能连接上

执行add.delay后去查任务状态,一直处于PENDING,因为worker没启动,任务就存放在broker中一直没有被执行。

broker

broker选择的是Redis的数据库1,前面触发的任务存储在key为celery的list中。

Celery基本使用简介安装目录结构基本使用broker启动workerbackend监控重试定时任务周期任务:celery beat子任务

启动worker

celery worker -A task -l info -P eventlet           
  • -A:指定Celery对象的位置
  • -l:日志级别
  • -P:默认使用prefork管理并发,windows不支持prefork

worker启动后,可以看到部分配置信息、队列、任务,然后就会执行broker中堆积的任务,并将结果保存到backend

- ** ---------- [config]
- ** ---------- .> app:         test:0x1ade6ea95f8
- ** ---------- .> transport:   redis://127.0.0.1:6379/1
- ** ---------- .> results:     redis://127.0.0.1:6379/2
- *** --- * --- .> concurrency: 4 (eventlet)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

[tasks]
  . task.add

...

[2020-08-11 01:01:02,616: INFO/MainProcess] Received task: task.add[2a3d96fe-c3e9-4846-8237-24fc28d9ad2b]
[2020-08-11 01:01:08,796: WARNING/MainProcess] 1+2 = 3
[2020-08-11 01:01:08,801: INFO/MainProcess] Task task.add[2a3d96fe-c3e9-4846-8237-24fc28d9ad2b] succeeded in 6.172000000002299: 3           

值得注意的是tasks,列表展示所有的celery任务,后面celery-beat还会用到。

[tasks]
  . task.add           

backend

到Redis查看执行结果

Celery基本使用简介安装目录结构基本使用broker启动workerbackend监控重试定时任务周期任务:celery beat子任务

监控

监控使用flower,自带可视化web界面,使用pip安装

pip install -U flower           

启动命令:

celery -A task flower (--port=5555) 
或
celery flower --broker=redis://127.0.0.1:6379/1           
Celery基本使用简介安装目录结构基本使用broker启动workerbackend监控重试定时任务周期任务:celery beat子任务
Celery基本使用简介安装目录结构基本使用broker启动workerbackend监控重试定时任务周期任务:celery beat子任务

重试

可以设置任务执行失败后是否进行重试,对哪些错误进行重试,重试次数、时间间隔等。

# task

@app.task(
    autoretry_for=(Exception,),  # 指定错误码,Exception表示对所有错误进行重试
    max_retries=2,  # 重试次数
    retry_backoff=4,  # 重试时间间隔,retry_jitter为True时,时间间隔为1-retry_backoff之间随机数
    # retry_jitter=False,  # 默认为True,retry_jitter=False时,第n次重试时间为上一次重试时间retry_backoff**n秒后
)
def send_msg(msg):
    return msg[5]           
# app

res1 = send_msg.delay('abcdef')
res2 = send_msg.delay('abc')           

res1正常执行,res2 IndexError重试

Celery基本使用简介安装目录结构基本使用broker启动workerbackend监控重试定时任务周期任务:celery beat子任务

定时任务

普通函数使用task装饰器后被封装成celery任务,可以z作为异步任务调用,也可以作为定时任务调用,具体看调用方式。

调用定时任务的两种方式:

countdown

add.apply_async(args=(1, 2), countdown=3, expires=5)           

• args:函数的参数

• countdown:几秒后执行

• expires:过期时间

eta

eta:datetime、utc时间,可以使用timedelta做时间运算,设置时间上更为灵活。

add.apply_async(
    args=(1, 2), 
    eta=datetime.datetime.utcnow() + datetime.timedelta(seconds=10),
    expires=20
)           

周期任务:celery beat

如果定义了beat_schedule,在启动celery-beat后就会周期性的产生任务放到broker。

# task.py
app.conf.beat_schedule = {
    'test_cycle_task': {  # 任务name
        # 执行task下的add函数
        'task': 'task.add',  # 启动worker时监控到的任务 -> [tasks]
        # 'schedule': 5.0,  # 几秒执行一次
        'schedule': timedelta(seconds=6),  # 多久执行一次
        # 'schedule': crontab(hour=0, minute=55),  # 每天定时执行
        'args': (2, 3)  # 传递参数
    },
    'task2': {}
}           

celery-beat启动命令:

celery beat -A task           

子任务

任务执行成功或失败后执行一个回调函数。

task1.apply_async((1, 2), link=task2.s(3), link_error=task3.s())
task1执行成功,返回值传递给task2并且作为task2的第一个参数
task1出错,ID传递给task3并且作为task3的第一个参数           

执行:add.apply_async((1, 2), link=add.s(3))

[2020-08-11 01:42:18,592: INFO/MainProcess] Received task: task.add[a7be35fd-c932-46e1-970d-0c520caecc32]

[2020-08-11 01:42:18,598: WARNING/MainProcess] 1+2 = 3

[2020-08-11 01:42:18,609: INFO/MainProcess] Received task: task.add[2e72a107-e833-4da4-a532-90e6039cc32e]

[2020-08-11 01:42:18,614: INFO/MainProcess] Task task.add[a7be35fd-c932-46e1-970d-0c520caecc32] succeeded in 0.01499999

9999417923s: 3

[2020-08-11 01:42:18,618: WARNING/MainProcess] 3+3 = 6

[2020-08-11 01:42:18,625: INFO/MainProcess] Task task.add[2e72a107-e833-4da4-a532-90e6039cc32e] succeeded in 0.0s: 6