1.在tornado 中 使用 Celery
2.Celery的進階
2.1回調
官方文檔中把回調稱為linking tasks。和回調差不多一個意思。這個回調怎麼用呢,請看下面例子。
這個link就相當于tornado中的callback了
add.apply_async((,),link = add.s())
這個函數傳回的值為20,這個20怎麼來呢?
其實就是先執行第一個任務add(2,2) ,第一個任傳回的結果傳遞給回調函數,然後add(4,16).最後就的好20了。這個例子很簡單,
不過這個例子有個疑問—add.s(16),這個s是怎麼回事?突然出現這個s,而且以前沒有見過啊。
官網canvas guide裡有說明的,這個咱們簡單的學習一下。
>>>add.subtask((, ), countdown=)
tasks.add(, )
>>>add.s(, )
tasks.add(, )
add.s()就相當于subtask()方法,等于它的簡寫。
特殊變量
EAT(estimated time of arrival) 預計到達時間
result = add.apply_async((, ), countdown=)
#我們通過countdown變量設定它
result.get()
那麼這個變量的作用是什麼呢?我們可以給這個參數設定一個特殊的日期(date)或者時間(time),那麼在這個時間内這個函數是不會傳回結果的。上面那個例子代表我們至少需要等待3秒才能得到傳回結果。
Message Sending Retry
celey将會在事件連接配接的失敗時候自動重新連接配接,通過設定retry變量可以控制是否重新連接配接。比如retry=False表示連接配接失敗後不重連。
不過還有更進階的配置
add.apply_async((, ), retry=True, retry_policy={
'max_retries': ,
'interval_start': ,
'interval_step': .,
'interval_max': .,
})
- max_retries :最大重連次數,設為None表示無限次數的重連。
- interval_start:第一次兩次重連的等待時間,機關秒,預設為0
- interval_max:重連登台時間 的最大值預設0.2
-
nterval_step:重連時間增量的步長,預設0.2
剛才的例子裡重連的間隔從0開始,每次增長0.2s,當間隔到帶0.8的時候就不增長了。
Serializers
worker 傳回來内容的格式。預設為pickle python有專門的包。
不過我們可是設定 json,pickle,ymal,msgpack。
add.apply_async((, ), serializer='json')
不過每調用一次都要設定它不是很麻煩麼,我們可以定義celery設在它。這樣就不必每次都設定了。
通過CELERY_TASK_PUBLISH_RETRY 就可以了。
所有的設這内容都在這裡了
3.celery的配置(Configuration)
不過一般情況下我們不必對Celery進行配置哈,如果你有啥特殊的要求的話就需要了解這一方面了。
怎麼配置
配置的格式(等号連接配接)
## Broker settings.
BROKER_URL = 'amqp://guest:[email protected]:5672//'
# List of modules to import when celery starts.
CELERY_IMPORTS = ('myapp.tasks', )
## Using the database to store task state and results.
CELERY_RESULT_BACKEND = 'db+sqlite:///results.db'
CELERY_ANNOTATIONS = {'tasks.add': {'rate_limit': '10/s'}}
配置單個條目
要配置多個
app.conf.update(
CELERY_TASK_SERIALIZER='json',
CELERY_ACCEPT_CONTENT=['json'], # Ignore other content
CELERY_RESULT_SERIALIZER='json',
CELERY_TIMEZONE='Europe/Oslo',
CELERY_ENABLE_UTC=True,
)
常用的配置選項
CELERY_RESULT_BACKEND
