背景
實驗背景,擁有一堆任務需要分組分發執行。但是,糟糕的情況是每個任務的執行,大機率都是失敗的。那麼,如果對失敗任務進行數次重試,以及如何對儲存失敗和成功的異步任務資訊。是需要搞定的問題。
實驗要求:模拟子任務的數次失敗。一堆任務按照分組進行執行,且每個分組之間需要順次執行,不可以混亂執行。并且,保證每個失敗任務擁有三次的重試操作。
實驗
服務端源碼:
# -*- utf-8 -*-
from __future__ import absolute_import, unicode_literals, print_function
from .config import app
import os
import time
import random
import fcntl
import json
from datetime import datetime, timedelta
from celery.utils.log import get_task_logger
from celery import Task, group
logger = get_task_logger(__name__)
def func(x):
if x in [1, 2, 3, 4]:
raise
def write_file(x):
date = datetime.now()
date = '%04d%02d%02d' % (date.year, date.month, date.day)
filename = os.path.join('./', 'test_log_%s.log' % date)
with open(filename, 'a') as fh:
fcntl.flock(fh.fileno(), fcntl.LOCK_EX)
fh.write(json.dumps(x, ensure_ascii = False) + '\n')
# 繼承Task類,重寫on_success, on_failure, on_retry方法
class MyTask(Task):
def on_success(self, retval, task_id, *args, **kwargs):
_info = 'save %s success' % args[0]
print(_info)
write_file(_info)
def on_failure(self, exc, task_id, *args, **kwargs):
_info = 'save %s failed, roll back' % args[0]
print(_info)
write_file(_info)
def on_retry(self, exc, task_id, *args, **kwargs):
_info = 'retry %s' % args[0]
print(_info)
# 綁定任務上下文,并定義基類
@app.task(bind = True, base = MyTask)
def save(self, s):
#logger.info(self.request.__dict__)
try:
# 模拟任務的大機率異常報錯
x = random.randint(1, 5)
func(x)
except Exception as e:
# 添加任務的失敗重試
raise self.retry(exc = e, countdown = 1, max_retries = 3)
@app.task()
def distribution(indexs):
for i in range(0, len(indexs), 5):
start = i
end = (i + 5) if (i + 5) <= len(indexs) else len(indexs)
L = []
for j in range(start, end):
L.append(save.s(indexs[j]))
res = group(L)()
while not res.ready():
time.sleep(1)
用戶端源碼:
from proj.tasks_1 import distribution
from celery import group
import time
from threading import Thread
def async(f):
def wrapper(*args, **kwargs):
thr = Thread(target = f, args = args, kwargs = kwargs)
thr.start()
return wrapper
@async
def func(indexs):
res = distribution(indexs)
def middle_func(indexs):
func(indexs)
return 'hello world'
if __name__ == '__main__':
indexs = [1, 2, 3, 4, 5, 6, 7, 8]
res = middle_func(indexs)
print(res)
實驗結果:
cat test_log_20200715.log
"save [1] success"
"save [3] success"
"save [2] failed, roll back"
"save [4] failed, roll back"
"save [5] failed, roll back"
"save [6] success"
"save [7] success"
"save [8] success"