天天看点

django 使用mongodb数据库 celery并发自增字段

背景:django操作mongodb数据库,celery定时并发创建任务,任务的id存入到mongodb数据库,这个字段能从10000自增。

问题:任务id在并发入库的时候,要实现这个字段有序的自增,所以要查询mongodb数据库最新的任务id,然后设置这个任务的id,然后再入到mongodb数据库,当celery多并发时,同一时间执行,就会出现id重复的情况。

解决思路:1.定义一个全局变量,在项目启动时,获取最新的任务id,赋值给这个全局变量,在调用的时候,采用迭代器,每调用一次,任务id更新一次。(经测试,大量并发还是会出现任务id重复的情况)

2.通过MySQL数据库的id自增特性来完成task_id在并发时的唯一。(使用的)

mongodb数据表结构py

class cc_ReportTask(mongoengine.Document):
    task_id = mongoengine.LongField()
    task_name = mongoengine.StringField()
    template_id = mongoengine.LongField()
    report_inst_total_num = mongoengine.LongField()
    report_branch_count = mongoengine.LongField()
    branch_max_num = mongoengine.LongField()
    # 0:手动 1:定时
    report_type = mongoengine.StringField()
    # 0:未开始 1:执行中 2:执行失败 3:执行成功
    task_status = mongoengine.LongField()
    spec = mongoengine.StringField()
    day = mongoengine.LongField()
    frequency = mongoengine.LongField()
    time = mongoengine.StringField()
    workflow_id = mongoengine.LongField()
    creator = mongoengine.StringField()
    create_time = mongoengine.DateTimeField()
    start_time = mongoengine.DateTimeField()
    end_time = mongoengine.DateTimeField()
    # 耗时 单位秒  保留3位小数
    duration = mongoengine.FloatField()

    meta = {'collection': 'cc_ReportTask'}
           

mysql任务自增表设计

class TaskIndex(models.Model):
    unique_id = models.IntegerField(verbose_name='表的id', null=True)
    task_id = models.IntegerField(verbose_name='任务的id', null=True)
    right_task_id = models.IntegerField(verbose_name='修改后的任务id', null=True)

    class Meta:
        app_label = 'celery'
        verbose_name = '任务序号'
        verbose_name_plural = '任务序号'
           

为了解耦MySQL和mongodb数据表数据强关联,不强制要求这两类型表的数据一一对应,防止出现不可控数据,或者手动操作了mongodb表等情况,两个表数据不对应。

每当重启项目的时候,就将MySQL的任务表数据重置一条最新要使用任务id,并且在使用的时候,也要确保MySQL的任务序号表数据正确,所有在入mongodb数据库的时候,要检测以及更新MySQL任务序号表数据。

检测更新MySQL任务序号表数据操作

def init_index_func():

    from apps.celery.models import TaskIndex
    if not TaskIndex.objects.filter(id=1):
        task_queryset = cc_ReportTask.objects.all()
        task_id_list = [int(i.task_id) for i in task_queryset if i.task_id]
        task_id = max(task_id_list) if task_id_list else 10000
        task_id = task_id if task_id > 10000 else 10000
        TaskIndex.objects.create(
            id=1,
            unique_id=0,
            task_id=task_id
        )
    else:
        task_queryset = cc_ReportTask.objects.all()
        task_id_list = [int(i.task_id) for i in task_queryset if i.task_id]
        task_id = max(task_id_list) if task_id_list else 10000
        task_id = task_id if task_id > 10000 else 10000
        if not TaskIndex.objects.filter(task_id=task_id) or not task_queryset:
            cursor = connection.cursor()
            cursor.execute("show tables;")
            TaskIndex.objects.exclude(id=1).all().delete()
            cursor.execute("alter table `celery_taskindex` auto_increment=1")
            TaskIndex.objects.filter(id=1).update(
                unique_id=0,
                task_id=task_id
            )
           

使用

@task()
def run_function(**kwargs):
    from apps.celery.utils import init_index_func
    logger.info('开始执行')
    # 检测纠正任务序列或者初始化task_id
    init_index_func()
    template_id = kwargs.get('template_id', None)
    if not template_id:
        logger.error('任务模板id是空,请检查schedules表的参数')
        return False, '任务模板id是空,请检查schedules表的参数'
    task_template_objects = cc_ReportTaskTemplate.objects.filter(template_id=template_id)
    if not task_template_objects:
        logger.error('在ReportTaskTemplate表中任务模板id:{0}不存在'.format(template_id))
        return False, '在ReportTaskTemplate表中任务模板id:{0}不存在'.format(template_id)
    task_template_object = task_template_objects[0]
    if task_template_object.report_status != 'enable':
        logger.error('任务模板id:{0}的状态:{1},处于未启用状态,不执行上报'.format(
            task_template_object.template_id,
            task_template_object.report_status))
        return False, '任务模板id:{0}的状态:{1},处于未启用状态,不执行上报'.format(
            task_template_object.template_id,
            task_template_object.report_status)

    # 根据模型标识符bk_obj_id过滤实例对象  并且没有被冻结的
    inst_objects = cc_ObjectBase.objects.filter(
        bk_obj_id=task_template_object.bk_obj_id,
        inst_status='1',
        need_report__in=['new', 'update', 'delete'],
        is_frozen=False
    )
    inst_ids = [i.bk_inst_id for i in inst_objects]
    inst_objects.update(is_frozen=True)
    if not inst_ids:
        logger.error('当前模型没有符合条件的实例')
        return False, '当前模型没有符合条件的实例'
    inst_objects = cc_ObjectBase.objects.filter(bk_obj_id=task_template_object.bk_obj_id,
                                                bk_inst_id__in=inst_ids)

    report_inst_total_num = inst_objects.count()

    # 模板配置的每个批次的数量
    branch_max_num = task_template_object.branch_max_num
    # 根据任务模板和实例对象计算有多少批次(向上取整)
    report_branch_count = math.ceil(report_inst_total_num / (branch_max_num * 1.0))

    # task_id 自增
    # 上报任务信息入库

    # 获取初始化任务id
    init_index_obj = TaskIndex.objects.get(id=1)
    init_task_id = init_index_obj.task_id

    task_index_obj = TaskIndex.objects.create(
        unique_id=0
    )
    task_index_obj.unique_id = task_index_obj.id - 1
    task_index_obj.save()
    task_index_obj.task_id = task_index_obj.id - 1 + init_task_id
    task_index_obj.save()

    task_id = task_index_obj.task_id
    logger.info('任务id:{0}, 模板id:{1}'.format(task_id, template_id))
    if task_template_object.report_type == 'manual':
        task_name = task_template_object.bk_obj_name + '手动报送' + datetime.datetime.now().strftime('%Y%m%d') + str(
            task_id)
    else:
        task_name = task_template_object.bk_obj_name + '定时报送' + datetime.datetime.now().strftime('%Y%m%d') + str(
            task_id)
    with transaction.atomic():
        # 任务报送入库
        try:
            logger.info('开始创建cc_ReportTask')
            task_obj = cc_ReportTask.objects.create(
                # 自增
                task_id=task_id,
                task_name=task_name,
                template_id=task_template_object.template_id,
                report_inst_total_num=report_inst_total_num,
                report_branch_count=report_branch_count,
                branch_max_num=task_template_object.branch_max_num,
                report_type=task_template_object.report_type,
                task_status=0,
                spec=task_template_object.spec,
                day=task_template_object.day,
                frequency=task_template_object.frequency,
                time=task_template_object.time,
                workflow_id=workflow_id,
                creator='system',
                create_time=datetime.datetime.utcfromtimestamp(time.time())
            )

            # 循环批次数量
            logger.info('循环批次次数:{0}'.format(report_branch_count))
            for i in range(1, report_branch_count + 1):
                # 计算当前批次实例数
                current_branch_inst_num = branch_max_num
                # 如果当前批次是最后一个批次,计算最后一批次的实例数量
                if i == report_branch_count:
                    current_branch_inst_num = report_inst_total_num - (branch_max_num * (report_branch_count - 1))

                # # 当前批次的上报数据封装
                report_inst_metadata_objs = inst_objects[current_branch_inst_num * (i - 1):current_branch_inst_num * i]
                logger.info('开始执行branch_operator')
                branch_operator(
                    task_obj=task_obj,
                    current_branch_inst_num=current_branch_inst_num,
                    report_inst_metadata_objs=report_inst_metadata_objs
                )

            return True, 'success'
        except Exception as e:
            logger.error('任务报送入库函数run_function错误{0}'.format(str(e)))
            return False, str(e)