目錄
- 前文清單
- 前言
-
- Task 的執行個體化
- 任務的名字
- 任務的綁定
- 任務的重試
- 任務的請求上下文
- 任務的繼承
-
前文清單
分布式任務隊列 Celery
分布式任務隊列 Celery —— 詳解工作流
分布式任務隊列 Celery —— 應用基礎
前言
緊接前文,繼續深入了解 Celery Tasks。示例代碼依舊在前文的基礎上進行修改。
Tasks 是 Celery 的基石,原型類為 celery.app.task:Task,它提供了兩個核心功能:
- 将任務消息發送到隊列
- 聲明 Worker 接收到消息後需要執行的具體函數
使用裝飾器 app.task 來裝飾一個普通函數,就可以輕松建立出一個任務函數。
from proj.celery import app
@app.task
def add(x, y):
return x + y
值得注意的是,任務函數本質上已經不再是一個普通函數,而是一個 celery.app.task:Task 執行個體對象。
>>> from proj.task.tasks import add
>>> dir(add)
['AsyncResult', 'MaxRetriesExceededError', ..., 'apply_async', 'delay', u'name', 'on_bound', 'on_failure', 'on_retry', 'on_success', 'request', 'retry', 'subtask', ...]
是以任務函數才可以調用 delay/apply_async 等屬于 Task 的執行個體屬性和方法。
>>> add.apply_async
<bound method add.apply_async of <@task: proj.task.tasks.add of proj at 0x7fedb363a790>>
>>> add.delay
<bound method add.delay of <@task: proj.task.tasks.add of proj at 0x7fedb363a790>>
這是一個非常重要的認識「app.task 的 “裝飾” 動作,其實是 Task 的執行個體化過程」,而裝飾器的參數就是 Task 初始化的參數。
官方文檔(http://docs.celeryproject.org/en/latest/userguide/tasks.html#list-of-options)提供了完整的 app.task 裝飾器參數清單。
除此之外,還需要注意「多裝飾器順序」的坑,app.task 應該始終放到最後使用,才能保證其穩定有效。
app.task
@decorator2
@decorator1
def add(x, y):
return x + y
每個任務函數都具有唯一的名字,這個名字被包含在任務消息中,Worker 通過該名字來找到具體執行的任務函數。預設的,Task 會啟用自動命名,将函數的全路徑名作為任務名。
>>> add.name
u'proj.task.tasks.add'
當然了,也可以通過指定裝飾器參數 name 來指定任務名。
@app.task(name='new_name')
def add(x, y):
return x + y
>>> from proj.task.tasks import add
>>> add.name
'new_name'
但為了避免命名沖突的問題,一般不建議這麼做,除非你很清楚自己的做什麼。
既然任務函數本質是一個 Task 執行個體對象,那麼當然也可以應用 self 綁定特性。
# 啟用綁定:
@app.task(bind=True)
def add(self, x, y):
print("self: ", self)
return x + y
>>> add.delay(1, 2)
<AsyncResult: 1982dc85-694b-4ceb-849b-5f69e40b4fe9>

綁定對象 self 十分重要,Task 的很多進階功能都是依靠它作為載體來調用的。例如:任務重試功能,請求上下文功能。
任務重試功能的實作為 Task.retry,它将任務消息重新發送到同一個的隊列中,以此來重新開機任務。
@app.task(bind=True, max_retries=3)
def send_twitter_status(self, oauth, tweet):
try:
twitter = Twitter(oauth)
twitter.update_status(tweet)
except (Twitter.FailWhaleError, Twitter.LoginError) as exc:
raise self.retry(exc=exc)
- max_retries 指定了最大的重試次數
- exc 指定将異常資訊輸出到日志,需要開啟 result backend。
如果你僅希望觸發特定異常時才進行重試,可以應用 Task 的「Automatic retry for known exceptions」特性。
# 隻有在觸發 FailWhaleError 異常時,才會重試任務,且最多重試 5 次。
@app.task(autoretry_for=(FailWhaleError,), retry_kwargs={'max_retries': 5})
def refresh_timeline(user):
return twitter.refresh_timeline(user)
在 Celery 請求 Worker 執行任務函數時,提供了請求的上下文,這是為了讓任務函數在執行過程中能夠通路上下文所包含的任務狀态和資訊。
@app.task(bind=True)
def dump_context(self, x, y):
print('Executing task id {0.id}, args: {0.args!r} kwargs: {0.kwargs!r}'.format(
self.request))
>>> from proj.task.tasks import dump_context
>>> dump_context.delay(1, 2)
<AsyncResult: 00bc9f96-98df-4bca-a4a3-4774c535a44c>
- 重新指定适用于的所有任務的預設基類
def make_app(context):
app = Celery('proj')
app.config_from_object('proj.celeryconfig')
default_exchange = Exchange('default', type='direct')
web_exchange = Exchange('task', type='direct')
app.conf.task_default_queue = 'default'
app.conf.task_default_exchange = 'default'
app.conf.task_default_routing_key = 'default'
app.conf.task_queues = (
Queue('default', default_exchange, routing_key='default'),
Queue('high_queue', web_exchange, routing_key='hign_task'),
Queue('low_queue', web_exchange, routing_key='low_task'),
)
app.conf.timezone = 'Asia/Shanghai'
app.conf.beat_schedule = {
'periodic_task_add': {
'task': 'proj.task.tasks.add',
'schedule': crontab(minute='*/1'),
'args': (2, 2)
},
}
TaskBase = app.Task
class ContextTask(TaskBase):
abstract = True
context = ctx
def __call__(self, *args, **kwargs):
"""Will be execute when create the instance object of ContextTesk.
"""
LOG.info(_LI("Invoked celery task starting: %(name)s[%(id)s]"),
{'name': self.name, 'id': self.request.id})
return super(ContextTask, self).__call__(*args, **kwargs)
# 任務執行成功前做什麼
def on_success(self, retval, task_id, args, kwargs):
"""Invoked after the task is successfully execute.
"""
LOG.info(_LI("Task %(id)s success: [%(ret)s]."),
{'id': task_id, 'ret': retval})
return super(ContextTask, self).on_success(retval, task_id,
args, kwargs)
# 任務執行失敗後做什麼
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""Invoked after the task failed to execute.
"""
msg = _LE("Task [%(id)s] failed:\n"
"args : %(args)s\n"
"kwargs : %(kw)s\n"
"detail :%(err)s") % {
'id': task_id, 'args': args,
'kw': kwargs, 'err': six.text_type(exc)}
LOG.exception(msg)
return super(ContextTask, self).on_failure(exc, task_id, args,
kwargs, einfo)
# 重新賦予預設基類
app.Task = ContextTask
return app
- 為不同類型的任務繼承出具有偏向屬性的基類
Import celery
class JSONTask(celery.Task):
serializer = 'json'
def on_failure(self, exc, task_id, args, kwargs, einfo):
print('{0!r} failed: {1!r}'.format(task_id, exc))
class XMLTask(celery.Task):
serializer = 'xml'
def on_failure(self, exc, task_id, args, kwargs, einfo):
print('{0!r} failed: {1!r}'.format(task_id, exc))
# 指定不同的基類
@task(base=JSONTask)
def add_json(x, y):
raise KeyError()
# 指定不同的基類
@task(base=XMLTask)
def add_xml(x, y):
raise KeyError()