目錄
一、Celery 對象解析
二、建立異步任務的方法 task
三、調用異步任務的三種方法
四、擷取任務結果和狀态
五、Celery 使用案例
我們先來看一下 Celery 的初始化方法:
class Celery(object):
def __init__(self, main=None, loader=None, backend=None,
amqp=None, events=None, log=None, control=None,
set_as_current=True, accept_magic_kwargs=False,
tasks=None, broker=None, include=None, changes=None,
config_source=None, fixups=None, task_cls=None,
autofinalize=True, **kwargs):
常用的需要配置的參數:
這些參數都是 celery 執行個體化的配置,我們也可以不寫,可以使用config_from_object方法加載配置;
main : 如果作為__main__運作,則為主子產品的名稱。用作自動生成的任務名稱的字首
loader : 目前加載器執行個體。
backend : 任務結果url;
amqp : AMQP對象或類名,一般不管;
log : 日志對象或類名;
set_as_current : 将本執行個體設為全局目前應用
tasks : 任務系統資料庫。
broker : 使用的預設代理的URL,任務隊列;
include : 每個worker應該導入的子產品清單,以執行個體建立的子產品的目錄作為起始路徑;
二、建立異步任務的方法 task
任何被 task 修飾的方法都會被建立一個 Task 對象,變成一個可序列化并發送到遠端伺服器的任務;它有多種修飾方式:
方式一:使用預設的參數
@celery.task
def function_name():
pass
方式二:指定相關參數
@celery.task(bind=True, name='name')
def function_name():
pass
# task方法參數
name : 可以顯式指定任務的名字;預設是子產品的命名空間中本函數的名字。
serializer : 指定本任務的序列化的方法;
bind : 一個bool值,設定是否綁定一個task的執行個體,如果綁定,task執行個體會作為參數傳遞到任務方法中,可以通路task執行個體的所有的屬性,即前面反序列化中那些屬性
base : 定義任務的基類,可以以此來定義回調函數,預設是Task類,我們也可以定義自己的Task類
default_retry_delay : 設定該任務重試的延遲時間,當任務執行失敗後,會自動重試,機關是秒,預設3分鐘;
autoretry_for : 設定在特定異常時重試任務,預設False即不重試;
retry_backoff : 預設False,設定重試時的延遲時間間隔政策;
retry_backoff_max : 設定最大延遲重試時間,預設10分鐘,如果失敗則不再重試;
retry_jitter : 預設True,即引入抖動,避免重試任務集中執行;
# 當bind=True時,add函數第一個參數是self,指的是task執行個體
@task(bind=True) # 第一個參數是self,使用self.request通路相關的屬性
def add(self, x, y):
try:
logger.info(self.request.id)
except:
self.retry() # 當任務失敗則進行重試,也可以通過max_retries屬性來指定最大重試次數
方式三:自定義Task基類
import celery
class MyTask(celery.Task):
# 任務失敗時執行
def on_failure(self, exc, task_id, args, kwargs, einfo):
print('{0!r} failed: {1!r}'.format(task_id, exc))
# 任務成功時執行
def on_success(self, retval, task_id, args, kwargs):
pass
# 任務重試時執行
def on_retry(self, exc, task_id, args, kwargs, einfo):
pass
@task(base=MyTask)
def add(x, y):
raise KeyError()
# 方法相關的參數
exc : 失敗時的錯誤的類型;
task_id : 任務的id;
args : 任務函數的參數;
kwargs : 鍵值對參數;
einfo : 失敗或重試時的異常詳細資訊;
retval : 任務成功執行的傳回值;
Task的常用屬性
Task.name : 任務名稱;
Task.request : 目前任務的資訊;
Task.max_retries : 設定重試的最大次數
Task.throws : 預期錯誤類的可選元組,不應被視為實際錯誤,而是結果失敗;
Task.rate_limit : 設定此任務類型的速率限制
Task.time_limit : 此任務的硬限時(以秒為機關)。
Task.ignore_result : 不存儲任務狀态。預設False;
Task.store_errors_even_if_ignored : 如果True,即使任務配置為忽略結果,也會存儲錯誤。
Task.serializer : 辨別要使用的預設序列化方法的字元串。
Task.compression : 辨別要使用的預設壓縮方案的字元串。預設為task_compression設定。
Task.backend : 指定該任務的結果存儲後端用于此任務。
Task.acks_late : 如果設定True為此任務的消息将在任務執行後确認 ,而不是在執行任務之前(預設行為),即預設任務執行之前就會發送确認;
Task.track_started : 如果True任務在從業人員執行任務時将其狀态報告為“已啟動”。預設是False;
三、調用異步任務的三種方法
調用異步任務的三個方法分别是:
# 方法一:這是apply_async方法的别名,但接受的參數較為簡單;
task.delay()
# 方法二:可以接受複雜的參數
task.apply_async(args=[arg1, arg2], kwargs={key:value, key:value})
# 方法三:可以發送未被注冊的異步任務,即沒有被celery.task裝飾的任務;
send_task()
方法一:app.send_task
注意: send_task 在發送的時候是不會檢查 tasks.add 函數是否存在的,即使為空也會發送成功,是以 celery 執行是可能找不到該函數報錯;
# File_name:tasks.py
from celery import Celery
app = Celery()
def add(x, y):
return x+y
app.send_task('tasks.add',args=[3,4]) # 參數基本和apply_async函數一樣
方法二:Task.delay
delay 方法是 apply_async 方法的簡化版,不支援執行選項,隻能傳遞任務的參數。
from celery import Celery
app = Celery()
@app.task
def add(x, y, z=0):
return x + y
add.delay(30, 40, z=5) # 包括位置參數和關鍵字參數
方法三:Task.apply_async
apply_async 支援執行選項,它會覆寫全局的預設參數和定義該任務時指定的執行選項,本質上還是調用了 send_task 方法;
from celery import Celery
app = Celery()
@app.task
def add(x, y, z=0):
return x + y
add.apply_async(args=[30,40], kwargs={'z':5})
# 其他參數
task_id : 為任務配置設定唯一id,預設是uuid;
countdown : 設定該任務等待一段時間再執行,機關為s;
eta : 定義任務的開始時間;eta=time.time()+10;
expires : 設定任務時間,任務在過期時間後還沒有執行則被丢棄;
retry : 如果任務失敗後, 是否重試;使用true或false,預設為true
shadow : 重新指定任務的名字str,覆寫其在日志中使用的任務名稱;
retry_policy : {},重試政策.如下:
----max_retries : 最大重試次數, 預設為 3 次.
----interval_start : 重試等待的時間間隔秒數, 預設為 0 , 表示直接重試不等待.
----interval_step : 每次重試讓重試間隔增加的秒數, 可以是數字或浮點數, 預設為 0.2
----interval_max : 重試間隔最大的秒數, 即 通過 interval_step 增大到多少秒之後, 就不在增加了, 可以是數字或者浮點數, 預設為 0.2 .
routing_key : 自定義路由鍵;
queue : 指定發送到哪個隊列;
exchange : 指定發送到哪個交換機;
priority : 任務隊列的優先級,0到255之間,對于rabbitmq來說0是最高優先級;
serializer :任務序列化方法;通常不設定;
compression : 壓縮方案,通常有zlib, bzip2
headers : 為任務添加額外的消息;
link : 任務成功執行後的回調方法;是一個signature對象;可以用作關聯任務;
link_error : 任務失敗後的回調方法,是一個signature對象;
# 其他參數參考用法如下:
add.apply_async((2, 2), retry=True, retry_policy={
'max_retries': 3,
'interval_start': 0,
'interval_step': 0.2,
'interval_max': 0.2,
})
自定義釋出者、交換機、路由鍵、隊列、優先級、序列方案和壓縮方法:
task.apply_async((2,2),
compression='zlib',
serialize='json',
queue='priority.high',
routing_key='web.add',
priority=0,
exchange='web_exchange')
四、擷取任務結果和狀态
由于 celery 發送的都是去其他程序執行的任務,如果需要在用戶端監控任務的狀态,有如下方法:
r = task.apply_async()
r.ready() # 檢視任務狀态,傳回布爾值, 任務執行完成, 傳回 True, 否則傳回 False.
r.wait() # 會阻塞等待任務完成, 傳回任務執行結果,很少使用;
r.get(timeout=1) # 擷取任務執行結果,可以設定等待時間,如果逾時但任務未完成傳回None;
r.result # 任務執行結果,未完成傳回None;
r.state # PENDING, START, SUCCESS,任務目前的狀态
r.status # PENDING, START, SUCCESS,任務目前的狀态
r.successful # 任務成功傳回true
r.traceback # 如果任務抛出了一個異常,可以擷取原始的回溯資訊
但是一般業務中很少用到,因為擷取任務執行的結果需要阻塞,celery使用場景一般是不關心結果的。
# seting.py
# 設定配置
BROKER_URL = 'amqp://username:password@localhost:5672/yourvhost'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_TASK_SERIALIZER = 'msgpack'
CELERY_RESULT_SERIALIZER = 'msgpack'
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24
CELERY_ACCEPT_CONTENT = ["msgpack"]
CELERY_DEFAULT_QUEUE = "default"
CELERY_QUEUES = {
"default": { # 這是上面指定的預設隊列
"exchange": "default",
"exchange_type": "direct",
"routing_key": "default"
}
}
# app.py --- 初始化celery對象
from celery import Celery
import seting
from task import test_one, test_two
celery = Celery(__name__, include=["task"]) # 設定需要導入的子產品
# 引入配置檔案
celery.config_from_object(seting)
if __name__ == '__main__':
test_one.apply_async((2,2),
routing_key='default',
priority=0,
exchange='default')
# task.py --- 定義需要執行的任務
from app import celery
@celery.task
def test_one(x, y):
return x + y
@celery.task(name="one_name")
def test_two(x, y):
return x * y