天天看點

celery--調用異步任務的三種方法和task參數

調用異步任務的三種方法

第一種

調用異步任務有三種方法,前面我們使用的是task.delay(),這是apply_async方法的别名,但接受的參數較為簡單

第二種

我們常用的是task.apply_async(args=[arg1,args],kwargs={key:value}):可以接受複雜的參數

這種可以接收的參數有:

  • task_id:為任務配置設定唯一id,預設是uuid
  • countdown:設定該任務等待一段時間在執行,機關為秒
  • eta:定義任務的開始時間,eta=time.time()+5,機關為秒,是UTC時間,設定成國内時間也沒有用
  • expires:設定任務過期時間,任務在過期時間後還沒有執行則被丢棄,機關為秒
  • retry:如果任務失敗後,是否重試,預設為True
  • shadow:重新指定任務的名字,覆寫其在日志中使用的任務名稱
  • retry_policy:{} 重試政策,max_retries:最大重試次數,預設為3次。interval_start:重試等待的時間間隔,預設為0。interval_step:每次重試讓重試間隔增加的秒數,預設為0.2秒。interval_max:重試間隔最大的秒數,既通過interval_step增大到多少秒之後,就不在增加了,預設為0.2秒。
  • routing_key:自定義路由鍵
  • queue:指定發送到哪個隊列
  • exchange:指定發送到哪個交換機
  • priority:任務隊列的優先級,0到255之間,對于rabbitmq來說0是最高優先級
  • headers:為任務添加額外的消息

還是使用前面的例子,使用task1和task2兩個任務,demo.py調用

在demo.py裡更改調用方式

from apps.task1 import add
from apps.task2 import subs

if __name__ == '__main__':
    add.delay(3,5)
    subs.apply_async(args=[55,22],
                     task_id='aaaaa2222',
                     countdown=5,
                     shadow = 'zouzou'
                     )      

執行結果

celery--調用異步任務的三種方法和task參數

 第三種

app.send_task(task1.add,args=[1,2])

不建議用,因為不會校驗是否存在這個方法,直接就發送成功裡,celery執行就會報錯

task參數

task常用參數
  • name:可以顯示指定任務的名字,預設是本函數的名字,也就是上面的 shadow
  • bind:一個bool值,設定是否綁定一個task的執行個體,如果綁定,task執行個體會作為參數傳遞到任務方法中(第一個參數為self),可以通路task執行個體的所有屬性。
  • base:定義任務的基類,可以以此來定義回調函數,預設是Task類,我們也可以定義自己的Task類
  • default_retry_delay:設定該任務重試的延遲時間,當任務執行失敗後,會自動重試,機關是秒,預設為3分鐘
task不常用參數
  • serializer:指定本任務的序列化的方法
  • autoretry_for:設定在特定異常時重試任務,預設False不重試。
  • retry_backoff:預設Flase,設定重試時的延遲時間間隔政策
  • retry_backoff_max:設定最大延遲重試時間,預設10分鐘,如果失敗則不在重試
  • retry_jitter:預設為True,既引入抖動,避免重試任務集中執行
Task的一般屬性
  • Task.name:任務名稱
  • Task.request:目前任務的資訊
  • Task.max_retries:設定重試的最大次數
  • Task.rhrows:預期錯誤類的可選元組
  • Task.rate_limit:設定任務類型的速度限制
  • Task.time_limit:此任務的硬限時,機關為秒
  • Task.serializer:辨別要使用的預設序列化方法的字元串

修改task1.py的内容如下

from  apps import app
import celery

celery.Task   # Task的屬性在這裡面

class BaseTask(celery.Task):
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print('執行task失敗')

    def on_success(self, retval, task_id, args, kwargs):
        print(f'執行task成功,task id為:{task_id}')

@app.task(name='wahaha',bind=True,base=BaseTask)
def add(self,x,y):
    print(self.request.id)  # task_id
    return      

啟動celery worker

celery--調用異步任務的三種方法和task參數

執行demo.py

from apps.task1 import add
from apps.task2 import subs

if __name__ == '__main__':
    add.delay(3,5)
    subs.apply_async(args=[55,33])