天天看點

Celery - Task 根據任務狀态執行不同的操做,和重試操作的模拟實驗

背景

實驗背景,擁有一堆任務需要分組分發執行。但是,糟糕的情況是每個任務的執行,大機率都是失敗的。那麼,如果對失敗任務進行數次重試,以及如何對儲存失敗和成功的異步任務資訊。是需要搞定的問題。

實驗要求:模拟子任務的數次失敗。一堆任務按照分組進行執行,且每個分組之間需要順次執行,不可以混亂執行。并且,保證每個失敗任務擁有三次的重試操作。

實驗

服務端源碼:

# -*- utf-8 -*-
from __future__ import absolute_import, unicode_literals, print_function
from .config import app

import os
import time
import random
import fcntl
import json
from datetime import datetime, timedelta

from celery.utils.log import get_task_logger
from celery import Task, group

logger = get_task_logger(__name__)

def func(x):
    if x in [1, 2, 3, 4]:
        raise

def write_file(x):
    date = datetime.now()
    date = '%04d%02d%02d' % (date.year, date.month, date.day)
    filename = os.path.join('./', 'test_log_%s.log' % date)

    with open(filename, 'a') as fh:
        fcntl.flock(fh.fileno(), fcntl.LOCK_EX)
        fh.write(json.dumps(x, ensure_ascii = False) + '\n')

# 繼承Task類,重寫on_success, on_failure, on_retry方法
class MyTask(Task):
    def on_success(self, retval, task_id, *args, **kwargs):
        _info = 'save %s success' % args[0]
        print(_info)
        write_file(_info)

    def on_failure(self, exc, task_id, *args, **kwargs):
        _info = 'save %s failed, roll back' % args[0]
        print(_info)
        write_file(_info)

    def on_retry(self, exc, task_id, *args, **kwargs):
        _info = 'retry %s' % args[0]
        print(_info)

# 綁定任務上下文,并定義基類
@app.task(bind = True, base = MyTask)
def save(self, s):
    #logger.info(self.request.__dict__)
    try:
    	# 模拟任務的大機率異常報錯
        x = random.randint(1, 5)
        func(x)
    except Exception as e:
    	# 添加任務的失敗重試
        raise self.retry(exc = e, countdown = 1, max_retries = 3)

@app.task()
def distribution(indexs):
    for i in range(0, len(indexs), 5):
        start = i
        end = (i + 5) if (i + 5) <= len(indexs) else len(indexs)

        L = []
        for j in range(start, end):
            L.append(save.s(indexs[j]))

        res = group(L)()

        while not res.ready():
            time.sleep(1)
           

用戶端源碼:

from proj.tasks_1 import distribution
from celery import group

import time
from threading import Thread

def async(f):
    def wrapper(*args, **kwargs):
        thr = Thread(target = f, args = args, kwargs = kwargs)
        thr.start()
    return wrapper

@async
def func(indexs):
   res = distribution(indexs)

def middle_func(indexs):
   func(indexs)
   return 'hello world'

if __name__ == '__main__':
   indexs = [1, 2, 3, 4, 5, 6, 7, 8]
   res = middle_func(indexs)
   print(res)
           

實驗結果:

cat test_log_20200715.log
"save [1] success"
"save [3] success"
"save [2] failed, roll back"
"save [4] failed, roll back"
"save [5] failed, roll back"
"save [6] success"
"save [7] success"
"save [8] success"
           

繼續閱讀