1.在tornado 中 使用 Celery
2.Celery的进阶
2.1回调
官方文档中把回调称为linking tasks。和回调差不多一个意思。这个回调怎么用呢,请看下面例子。
这个link就相当于tornado中的callback了
add.apply_async((,),link = add.s())
这个函数返回的值为20,这个20怎么来呢?
其实就是先执行第一个任务add(2,2) ,第一个任返回的结果传递给回调函数,然后add(4,16).最后就的好20了。这个例子很简单,
不过这个例子有个疑问—add.s(16),这个s是怎么回事?突然出现这个s,而且以前没有见过啊。
官网canvas guide里有说明的,这个咱们简单的学习一下。
>>>add.subtask((, ), countdown=)
tasks.add(, )
>>>add.s(, )
tasks.add(, )
add.s()就相当于subtask()方法,等于它的简写。
特殊变量
EAT(estimated time of arrival) 预计到达时间
result = add.apply_async((, ), countdown=)
#我们通过countdown变量设置它
result.get()
那么这个变量的作用是什么呢?我们可以给这个参数设定一个特殊的日期(date)或者时间(time),那么在这个时间内这个函数是不会返回结果的。上面那个例子代表我们至少需要等待3秒才能得到返回结果。
Message Sending Retry
celey将会在事件连接的失败时候自动重新连接,通过设置retry变量可以控制是否重新连接。比如retry=False表示连接失败后不重连。
不过还有更高级的配置
add.apply_async((, ), retry=True, retry_policy={
'max_retries': ,
'interval_start': ,
'interval_step': .,
'interval_max': .,
})
- max_retries :最大重连次数,设为None表示无限次数的重连。
- interval_start:第一次两次重连的等待时间,单位秒,默认为0
- interval_max:重连登台时间 的最大值默认0.2
-
nterval_step:重连时间增量的步长,默认0.2
刚才的例子里重连的间隔从0开始,每次增长0.2s,当间隔到带0.8的时候就不增长了。
Serializers
worker 返回来内容的格式。默认为pickle python有专门的包。
不过我们可是设置 json,pickle,ymal,msgpack。
add.apply_async((, ), serializer='json')
不过每调用一次都要设置它不是很麻烦么,我们可以定义celery设在它。这样就不必每次都设置了。
通过CELERY_TASK_PUBLISH_RETRY 就可以了。
所有的设这内容都在这里了
3.celery的配置(Configuration)
不过一般情况下我们不必对Celery进行配置哈,如果你有啥特殊的要求的话就需要了解这一方面了。
怎么配置
配置的格式(等号连接)
## Broker settings.
BROKER_URL = 'amqp://guest:[email protected]:5672//'
# List of modules to import when celery starts.
CELERY_IMPORTS = ('myapp.tasks', )
## Using the database to store task state and results.
CELERY_RESULT_BACKEND = 'db+sqlite:///results.db'
CELERY_ANNOTATIONS = {'tasks.add': {'rate_limit': '10/s'}}
配置单个条目
要配置多个
app.conf.update(
CELERY_TASK_SERIALIZER='json',
CELERY_ACCEPT_CONTENT=['json'], # Ignore other content
CELERY_RESULT_SERIALIZER='json',
CELERY_TIMEZONE='Europe/Oslo',
CELERY_ENABLE_UTC=True,
)
常用的配置选项
CELERY_RESULT_BACKEND
