天天看點

django 使用mongodb資料庫 celery并發自增字段

背景:django操作mongodb資料庫,celery定時并發建立任務,任務的id存入到mongodb資料庫,這個字段能從10000自增。

問題:任務id在并發入庫的時候,要實作這個字段有序的自增,是以要查詢mongodb資料庫最新的任務id,然後設定這個任務的id,然後再入到mongodb資料庫,當celery多并發時,同一時間執行,就會出現id重複的情況。

解決思路:1.定義一個全局變量,在項目啟動時,擷取最新的任務id,指派給這個全局變量,在調用的時候,采用疊代器,每調用一次,任務id更新一次。(經測試,大量并發還是會出現任務id重複的情況)

2.通過MySQL資料庫的id自增特性來完成task_id在并發時的唯一。(使用的)

mongodb資料表結構py

class cc_ReportTask(mongoengine.Document):
    task_id = mongoengine.LongField()
    task_name = mongoengine.StringField()
    template_id = mongoengine.LongField()
    report_inst_total_num = mongoengine.LongField()
    report_branch_count = mongoengine.LongField()
    branch_max_num = mongoengine.LongField()
    # 0:手動 1:定時
    report_type = mongoengine.StringField()
    # 0:未開始 1:執行中 2:執行失敗 3:執行成功
    task_status = mongoengine.LongField()
    spec = mongoengine.StringField()
    day = mongoengine.LongField()
    frequency = mongoengine.LongField()
    time = mongoengine.StringField()
    workflow_id = mongoengine.LongField()
    creator = mongoengine.StringField()
    create_time = mongoengine.DateTimeField()
    start_time = mongoengine.DateTimeField()
    end_time = mongoengine.DateTimeField()
    # 耗時 機關秒  保留3位小數
    duration = mongoengine.FloatField()

    meta = {'collection': 'cc_ReportTask'}
           

mysql任務自增表設計

class TaskIndex(models.Model):
    unique_id = models.IntegerField(verbose_name='表的id', null=True)
    task_id = models.IntegerField(verbose_name='任務的id', null=True)
    right_task_id = models.IntegerField(verbose_name='修改後的任務id', null=True)

    class Meta:
        app_label = 'celery'
        verbose_name = '任務序号'
        verbose_name_plural = '任務序号'
           

為了解耦MySQL和mongodb資料表資料強關聯,不強制要求這兩類型表的資料一一對應,防止出現不可控資料,或者手動操作了mongodb表等情況,兩個表資料不對應。

每當重新開機項目的時候,就将MySQL的任務表資料重置一條最新要使用任務id,并且在使用的時候,也要確定MySQL的任務序号表資料正确,所有在入mongodb資料庫的時候,要檢測以及更新MySQL任務序号表資料。

檢測更新MySQL任務序号表資料操作

def init_index_func():

    from apps.celery.models import TaskIndex
    if not TaskIndex.objects.filter(id=1):
        task_queryset = cc_ReportTask.objects.all()
        task_id_list = [int(i.task_id) for i in task_queryset if i.task_id]
        task_id = max(task_id_list) if task_id_list else 10000
        task_id = task_id if task_id > 10000 else 10000
        TaskIndex.objects.create(
            id=1,
            unique_id=0,
            task_id=task_id
        )
    else:
        task_queryset = cc_ReportTask.objects.all()
        task_id_list = [int(i.task_id) for i in task_queryset if i.task_id]
        task_id = max(task_id_list) if task_id_list else 10000
        task_id = task_id if task_id > 10000 else 10000
        if not TaskIndex.objects.filter(task_id=task_id) or not task_queryset:
            cursor = connection.cursor()
            cursor.execute("show tables;")
            TaskIndex.objects.exclude(id=1).all().delete()
            cursor.execute("alter table `celery_taskindex` auto_increment=1")
            TaskIndex.objects.filter(id=1).update(
                unique_id=0,
                task_id=task_id
            )
           

使用

@task()
def run_function(**kwargs):
    from apps.celery.utils import init_index_func
    logger.info('開始執行')
    # 檢測糾正任務序列或者初始化task_id
    init_index_func()
    template_id = kwargs.get('template_id', None)
    if not template_id:
        logger.error('任務模闆id是空,請檢查schedules表的參數')
        return False, '任務模闆id是空,請檢查schedules表的參數'
    task_template_objects = cc_ReportTaskTemplate.objects.filter(template_id=template_id)
    if not task_template_objects:
        logger.error('在ReportTaskTemplate表中任務模闆id:{0}不存在'.format(template_id))
        return False, '在ReportTaskTemplate表中任務模闆id:{0}不存在'.format(template_id)
    task_template_object = task_template_objects[0]
    if task_template_object.report_status != 'enable':
        logger.error('任務模闆id:{0}的狀态:{1},處于未啟用狀态,不執行上報'.format(
            task_template_object.template_id,
            task_template_object.report_status))
        return False, '任務模闆id:{0}的狀态:{1},處于未啟用狀态,不執行上報'.format(
            task_template_object.template_id,
            task_template_object.report_status)

    # 根據模型辨別符bk_obj_id過濾執行個體對象  并且沒有被當機的
    inst_objects = cc_ObjectBase.objects.filter(
        bk_obj_id=task_template_object.bk_obj_id,
        inst_status='1',
        need_report__in=['new', 'update', 'delete'],
        is_frozen=False
    )
    inst_ids = [i.bk_inst_id for i in inst_objects]
    inst_objects.update(is_frozen=True)
    if not inst_ids:
        logger.error('目前模型沒有符合條件的執行個體')
        return False, '目前模型沒有符合條件的執行個體'
    inst_objects = cc_ObjectBase.objects.filter(bk_obj_id=task_template_object.bk_obj_id,
                                                bk_inst_id__in=inst_ids)

    report_inst_total_num = inst_objects.count()

    # 模闆配置的每個批次的數量
    branch_max_num = task_template_object.branch_max_num
    # 根據任務模闆和執行個體對象計算有多少批次(向上取整)
    report_branch_count = math.ceil(report_inst_total_num / (branch_max_num * 1.0))

    # task_id 自增
    # 上報任務資訊入庫

    # 擷取初始化任務id
    init_index_obj = TaskIndex.objects.get(id=1)
    init_task_id = init_index_obj.task_id

    task_index_obj = TaskIndex.objects.create(
        unique_id=0
    )
    task_index_obj.unique_id = task_index_obj.id - 1
    task_index_obj.save()
    task_index_obj.task_id = task_index_obj.id - 1 + init_task_id
    task_index_obj.save()

    task_id = task_index_obj.task_id
    logger.info('任務id:{0}, 模闆id:{1}'.format(task_id, template_id))
    if task_template_object.report_type == 'manual':
        task_name = task_template_object.bk_obj_name + '手動報送' + datetime.datetime.now().strftime('%Y%m%d') + str(
            task_id)
    else:
        task_name = task_template_object.bk_obj_name + '定時報送' + datetime.datetime.now().strftime('%Y%m%d') + str(
            task_id)
    with transaction.atomic():
        # 任務報送入庫
        try:
            logger.info('開始建立cc_ReportTask')
            task_obj = cc_ReportTask.objects.create(
                # 自增
                task_id=task_id,
                task_name=task_name,
                template_id=task_template_object.template_id,
                report_inst_total_num=report_inst_total_num,
                report_branch_count=report_branch_count,
                branch_max_num=task_template_object.branch_max_num,
                report_type=task_template_object.report_type,
                task_status=0,
                spec=task_template_object.spec,
                day=task_template_object.day,
                frequency=task_template_object.frequency,
                time=task_template_object.time,
                workflow_id=workflow_id,
                creator='system',
                create_time=datetime.datetime.utcfromtimestamp(time.time())
            )

            # 循環批次數量
            logger.info('循環批次次數:{0}'.format(report_branch_count))
            for i in range(1, report_branch_count + 1):
                # 計算目前批次執行個體數
                current_branch_inst_num = branch_max_num
                # 如果目前批次是最後一個批次,計算最後一批次的執行個體數量
                if i == report_branch_count:
                    current_branch_inst_num = report_inst_total_num - (branch_max_num * (report_branch_count - 1))

                # # 目前批次的上報資料封裝
                report_inst_metadata_objs = inst_objects[current_branch_inst_num * (i - 1):current_branch_inst_num * i]
                logger.info('開始執行branch_operator')
                branch_operator(
                    task_obj=task_obj,
                    current_branch_inst_num=current_branch_inst_num,
                    report_inst_metadata_objs=report_inst_metadata_objs
                )

            return True, 'success'
        except Exception as e:
            logger.error('任務報送入庫函數run_function錯誤{0}'.format(str(e)))
            return False, str(e)