背景:django操作mongodb数据库,celery定时并发创建任务,任务的id存入到mongodb数据库,这个字段能从10000自增。
问题:任务id在并发入库的时候,要实现这个字段有序的自增,所以要查询mongodb数据库最新的任务id,然后设置这个任务的id,然后再入到mongodb数据库,当celery多并发时,同一时间执行,就会出现id重复的情况。
解决思路:1.定义一个全局变量,在项目启动时,获取最新的任务id,赋值给这个全局变量,在调用的时候,采用迭代器,每调用一次,任务id更新一次。(经测试,大量并发还是会出现任务id重复的情况)
2.通过MySQL数据库的id自增特性来完成task_id在并发时的唯一。(使用的)
mongodb数据表结构py
class cc_ReportTask(mongoengine.Document):
task_id = mongoengine.LongField()
task_name = mongoengine.StringField()
template_id = mongoengine.LongField()
report_inst_total_num = mongoengine.LongField()
report_branch_count = mongoengine.LongField()
branch_max_num = mongoengine.LongField()
# 0:手动 1:定时
report_type = mongoengine.StringField()
# 0:未开始 1:执行中 2:执行失败 3:执行成功
task_status = mongoengine.LongField()
spec = mongoengine.StringField()
day = mongoengine.LongField()
frequency = mongoengine.LongField()
time = mongoengine.StringField()
workflow_id = mongoengine.LongField()
creator = mongoengine.StringField()
create_time = mongoengine.DateTimeField()
start_time = mongoengine.DateTimeField()
end_time = mongoengine.DateTimeField()
# 耗时 单位秒 保留3位小数
duration = mongoengine.FloatField()
meta = {'collection': 'cc_ReportTask'}
mysql任务自增表设计
class TaskIndex(models.Model):
unique_id = models.IntegerField(verbose_name='表的id', null=True)
task_id = models.IntegerField(verbose_name='任务的id', null=True)
right_task_id = models.IntegerField(verbose_name='修改后的任务id', null=True)
class Meta:
app_label = 'celery'
verbose_name = '任务序号'
verbose_name_plural = '任务序号'
为了解耦MySQL和mongodb数据表数据强关联,不强制要求这两类型表的数据一一对应,防止出现不可控数据,或者手动操作了mongodb表等情况,两个表数据不对应。
每当重启项目的时候,就将MySQL的任务表数据重置一条最新要使用任务id,并且在使用的时候,也要确保MySQL的任务序号表数据正确,所有在入mongodb数据库的时候,要检测以及更新MySQL任务序号表数据。
检测更新MySQL任务序号表数据操作
def init_index_func():
from apps.celery.models import TaskIndex
if not TaskIndex.objects.filter(id=1):
task_queryset = cc_ReportTask.objects.all()
task_id_list = [int(i.task_id) for i in task_queryset if i.task_id]
task_id = max(task_id_list) if task_id_list else 10000
task_id = task_id if task_id > 10000 else 10000
TaskIndex.objects.create(
id=1,
unique_id=0,
task_id=task_id
)
else:
task_queryset = cc_ReportTask.objects.all()
task_id_list = [int(i.task_id) for i in task_queryset if i.task_id]
task_id = max(task_id_list) if task_id_list else 10000
task_id = task_id if task_id > 10000 else 10000
if not TaskIndex.objects.filter(task_id=task_id) or not task_queryset:
cursor = connection.cursor()
cursor.execute("show tables;")
TaskIndex.objects.exclude(id=1).all().delete()
cursor.execute("alter table `celery_taskindex` auto_increment=1")
TaskIndex.objects.filter(id=1).update(
unique_id=0,
task_id=task_id
)
使用
@task()
def run_function(**kwargs):
from apps.celery.utils import init_index_func
logger.info('开始执行')
# 检测纠正任务序列或者初始化task_id
init_index_func()
template_id = kwargs.get('template_id', None)
if not template_id:
logger.error('任务模板id是空,请检查schedules表的参数')
return False, '任务模板id是空,请检查schedules表的参数'
task_template_objects = cc_ReportTaskTemplate.objects.filter(template_id=template_id)
if not task_template_objects:
logger.error('在ReportTaskTemplate表中任务模板id:{0}不存在'.format(template_id))
return False, '在ReportTaskTemplate表中任务模板id:{0}不存在'.format(template_id)
task_template_object = task_template_objects[0]
if task_template_object.report_status != 'enable':
logger.error('任务模板id:{0}的状态:{1},处于未启用状态,不执行上报'.format(
task_template_object.template_id,
task_template_object.report_status))
return False, '任务模板id:{0}的状态:{1},处于未启用状态,不执行上报'.format(
task_template_object.template_id,
task_template_object.report_status)
# 根据模型标识符bk_obj_id过滤实例对象 并且没有被冻结的
inst_objects = cc_ObjectBase.objects.filter(
bk_obj_id=task_template_object.bk_obj_id,
inst_status='1',
need_report__in=['new', 'update', 'delete'],
is_frozen=False
)
inst_ids = [i.bk_inst_id for i in inst_objects]
inst_objects.update(is_frozen=True)
if not inst_ids:
logger.error('当前模型没有符合条件的实例')
return False, '当前模型没有符合条件的实例'
inst_objects = cc_ObjectBase.objects.filter(bk_obj_id=task_template_object.bk_obj_id,
bk_inst_id__in=inst_ids)
report_inst_total_num = inst_objects.count()
# 模板配置的每个批次的数量
branch_max_num = task_template_object.branch_max_num
# 根据任务模板和实例对象计算有多少批次(向上取整)
report_branch_count = math.ceil(report_inst_total_num / (branch_max_num * 1.0))
# task_id 自增
# 上报任务信息入库
# 获取初始化任务id
init_index_obj = TaskIndex.objects.get(id=1)
init_task_id = init_index_obj.task_id
task_index_obj = TaskIndex.objects.create(
unique_id=0
)
task_index_obj.unique_id = task_index_obj.id - 1
task_index_obj.save()
task_index_obj.task_id = task_index_obj.id - 1 + init_task_id
task_index_obj.save()
task_id = task_index_obj.task_id
logger.info('任务id:{0}, 模板id:{1}'.format(task_id, template_id))
if task_template_object.report_type == 'manual':
task_name = task_template_object.bk_obj_name + '手动报送' + datetime.datetime.now().strftime('%Y%m%d') + str(
task_id)
else:
task_name = task_template_object.bk_obj_name + '定时报送' + datetime.datetime.now().strftime('%Y%m%d') + str(
task_id)
with transaction.atomic():
# 任务报送入库
try:
logger.info('开始创建cc_ReportTask')
task_obj = cc_ReportTask.objects.create(
# 自增
task_id=task_id,
task_name=task_name,
template_id=task_template_object.template_id,
report_inst_total_num=report_inst_total_num,
report_branch_count=report_branch_count,
branch_max_num=task_template_object.branch_max_num,
report_type=task_template_object.report_type,
task_status=0,
spec=task_template_object.spec,
day=task_template_object.day,
frequency=task_template_object.frequency,
time=task_template_object.time,
workflow_id=workflow_id,
creator='system',
create_time=datetime.datetime.utcfromtimestamp(time.time())
)
# 循环批次数量
logger.info('循环批次次数:{0}'.format(report_branch_count))
for i in range(1, report_branch_count + 1):
# 计算当前批次实例数
current_branch_inst_num = branch_max_num
# 如果当前批次是最后一个批次,计算最后一批次的实例数量
if i == report_branch_count:
current_branch_inst_num = report_inst_total_num - (branch_max_num * (report_branch_count - 1))
# # 当前批次的上报数据封装
report_inst_metadata_objs = inst_objects[current_branch_inst_num * (i - 1):current_branch_inst_num * i]
logger.info('开始执行branch_operator')
branch_operator(
task_obj=task_obj,
current_branch_inst_num=current_branch_inst_num,
report_inst_metadata_objs=report_inst_metadata_objs
)
return True, 'success'
except Exception as e:
logger.error('任务报送入库函数run_function错误{0}'.format(str(e)))
return False, str(e)