調用異步任務的三種方法
第一種
調用異步任務有三種方法,前面我們使用的是task.delay(),這是apply_async方法的别名,但接受的參數較為簡單
第二種
我們常用的是task.apply_async(args=[arg1,args],kwargs={key:value}):可以接受複雜的參數
這種可以接收的參數有:
- task_id:為任務配置設定唯一id,預設是uuid
- countdown:設定該任務等待一段時間在執行,機關為秒
- eta:定義任務的開始時間,eta=time.time()+5,機關為秒,是UTC時間,設定成國内時間也沒有用
- expires:設定任務過期時間,任務在過期時間後還沒有執行則被丢棄,機關為秒
- retry:如果任務失敗後,是否重試,預設為True
- shadow:重新指定任務的名字,覆寫其在日志中使用的任務名稱
- retry_policy:{} 重試政策,max_retries:最大重試次數,預設為3次。interval_start:重試等待的時間間隔,預設為0。interval_step:每次重試讓重試間隔增加的秒數,預設為0.2秒。interval_max:重試間隔最大的秒數,既通過interval_step增大到多少秒之後,就不在增加了,預設為0.2秒。
- routing_key:自定義路由鍵
- queue:指定發送到哪個隊列
- exchange:指定發送到哪個交換機
- priority:任務隊列的優先級,0到255之間,對于rabbitmq來說0是最高優先級
- headers:為任務添加額外的消息
還是使用前面的例子,使用task1和task2兩個任務,demo.py調用
在demo.py裡更改調用方式
from apps.task1 import add
from apps.task2 import subs
if __name__ == '__main__':
add.delay(3,5)
subs.apply_async(args=[55,22],
task_id='aaaaa2222',
countdown=5,
shadow = 'zouzou'
)
執行結果
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiI0gTMx81dsQWZ4lmZf1GLlpXazVmcvwFciV2dsQXYtJ3bm9CX9s2RkBnVHFmb1clWvB3MaVnRtp1XlBXe0xCMy81dvRWYoNHLwEzX5xCMx8FesU2cfdGLwMzX0xiRGZkRGZ0Xy9GbvNGLpZTY1EmMZVDUSFTU4VFRR9Fd4VGdsYTMfVmepNHLrJXYtJXZ0F2dvwVZnFWbp1zczV2YvJHctM3cv1Ce-cmbw5yMzITM2ITZ5UjYjF2N1UmYyYzXyUDMxATMxAzLcdDMyIDMy8CXn9Gbi9CXzV2Zh1WavwVbvNmLvR3YxUjLyM3Lc9CX6MHc0RHaiojIsJye.png)
第三種
app.send_task(task1.add,args=[1,2])
不建議用,因為不會校驗是否存在這個方法,直接就發送成功裡,celery執行就會報錯
task參數
task常用參數
- name:可以顯示指定任務的名字,預設是本函數的名字,也就是上面的 shadow
- bind:一個bool值,設定是否綁定一個task的執行個體,如果綁定,task執行個體會作為參數傳遞到任務方法中(第一個參數為self),可以通路task執行個體的所有屬性。
- base:定義任務的基類,可以以此來定義回調函數,預設是Task類,我們也可以定義自己的Task類
- default_retry_delay:設定該任務重試的延遲時間,當任務執行失敗後,會自動重試,機關是秒,預設為3分鐘
task不常用參數
- serializer:指定本任務的序列化的方法
- autoretry_for:設定在特定異常時重試任務,預設False不重試。
- retry_backoff:預設Flase,設定重試時的延遲時間間隔政策
- retry_backoff_max:設定最大延遲重試時間,預設10分鐘,如果失敗則不在重試
- retry_jitter:預設為True,既引入抖動,避免重試任務集中執行
Task的一般屬性
- Task.name:任務名稱
- Task.request:目前任務的資訊
- Task.max_retries:設定重試的最大次數
- Task.rhrows:預期錯誤類的可選元組
- Task.rate_limit:設定任務類型的速度限制
- Task.time_limit:此任務的硬限時,機關為秒
- Task.serializer:辨別要使用的預設序列化方法的字元串
修改task1.py的内容如下
from apps import app
import celery
celery.Task # Task的屬性在這裡面
class BaseTask(celery.Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
print('執行task失敗')
def on_success(self, retval, task_id, args, kwargs):
print(f'執行task成功,task id為:{task_id}')
@app.task(name='wahaha',bind=True,base=BaseTask)
def add(self,x,y):
print(self.request.id) # task_id
return
啟動celery worker
執行demo.py
from apps.task1 import add
from apps.task2 import subs
if __name__ == '__main__':
add.delay(3,5)
subs.apply_async(args=[55,33])