背景:django操作mongodb資料庫,celery定時并發建立任務,任務的id存入到mongodb資料庫,這個字段能從10000自增。
問題:任務id在并發入庫的時候,要實作這個字段有序的自增,是以要查詢mongodb資料庫最新的任務id,然後設定這個任務的id,然後再入到mongodb資料庫,當celery多并發時,同一時間執行,就會出現id重複的情況。
解決思路:1.定義一個全局變量,在項目啟動時,擷取最新的任務id,指派給這個全局變量,在調用的時候,采用疊代器,每調用一次,任務id更新一次。(經測試,大量并發還是會出現任務id重複的情況)
2.通過MySQL資料庫的id自增特性來完成task_id在并發時的唯一。(使用的)
mongodb資料表結構py
class cc_ReportTask(mongoengine.Document):
task_id = mongoengine.LongField()
task_name = mongoengine.StringField()
template_id = mongoengine.LongField()
report_inst_total_num = mongoengine.LongField()
report_branch_count = mongoengine.LongField()
branch_max_num = mongoengine.LongField()
# 0:手動 1:定時
report_type = mongoengine.StringField()
# 0:未開始 1:執行中 2:執行失敗 3:執行成功
task_status = mongoengine.LongField()
spec = mongoengine.StringField()
day = mongoengine.LongField()
frequency = mongoengine.LongField()
time = mongoengine.StringField()
workflow_id = mongoengine.LongField()
creator = mongoengine.StringField()
create_time = mongoengine.DateTimeField()
start_time = mongoengine.DateTimeField()
end_time = mongoengine.DateTimeField()
# 耗時 機關秒 保留3位小數
duration = mongoengine.FloatField()
meta = {'collection': 'cc_ReportTask'}
mysql任務自增表設計
class TaskIndex(models.Model):
unique_id = models.IntegerField(verbose_name='表的id', null=True)
task_id = models.IntegerField(verbose_name='任務的id', null=True)
right_task_id = models.IntegerField(verbose_name='修改後的任務id', null=True)
class Meta:
app_label = 'celery'
verbose_name = '任務序号'
verbose_name_plural = '任務序号'
為了解耦MySQL和mongodb資料表資料強關聯,不強制要求這兩類型表的資料一一對應,防止出現不可控資料,或者手動操作了mongodb表等情況,兩個表資料不對應。
每當重新開機項目的時候,就将MySQL的任務表資料重置一條最新要使用任務id,并且在使用的時候,也要確定MySQL的任務序号表資料正确,所有在入mongodb資料庫的時候,要檢測以及更新MySQL任務序号表資料。
檢測更新MySQL任務序号表資料操作
def init_index_func():
from apps.celery.models import TaskIndex
if not TaskIndex.objects.filter(id=1):
task_queryset = cc_ReportTask.objects.all()
task_id_list = [int(i.task_id) for i in task_queryset if i.task_id]
task_id = max(task_id_list) if task_id_list else 10000
task_id = task_id if task_id > 10000 else 10000
TaskIndex.objects.create(
id=1,
unique_id=0,
task_id=task_id
)
else:
task_queryset = cc_ReportTask.objects.all()
task_id_list = [int(i.task_id) for i in task_queryset if i.task_id]
task_id = max(task_id_list) if task_id_list else 10000
task_id = task_id if task_id > 10000 else 10000
if not TaskIndex.objects.filter(task_id=task_id) or not task_queryset:
cursor = connection.cursor()
cursor.execute("show tables;")
TaskIndex.objects.exclude(id=1).all().delete()
cursor.execute("alter table `celery_taskindex` auto_increment=1")
TaskIndex.objects.filter(id=1).update(
unique_id=0,
task_id=task_id
)
使用
@task()
def run_function(**kwargs):
from apps.celery.utils import init_index_func
logger.info('開始執行')
# 檢測糾正任務序列或者初始化task_id
init_index_func()
template_id = kwargs.get('template_id', None)
if not template_id:
logger.error('任務模闆id是空,請檢查schedules表的參數')
return False, '任務模闆id是空,請檢查schedules表的參數'
task_template_objects = cc_ReportTaskTemplate.objects.filter(template_id=template_id)
if not task_template_objects:
logger.error('在ReportTaskTemplate表中任務模闆id:{0}不存在'.format(template_id))
return False, '在ReportTaskTemplate表中任務模闆id:{0}不存在'.format(template_id)
task_template_object = task_template_objects[0]
if task_template_object.report_status != 'enable':
logger.error('任務模闆id:{0}的狀态:{1},處于未啟用狀态,不執行上報'.format(
task_template_object.template_id,
task_template_object.report_status))
return False, '任務模闆id:{0}的狀态:{1},處于未啟用狀态,不執行上報'.format(
task_template_object.template_id,
task_template_object.report_status)
# 根據模型辨別符bk_obj_id過濾執行個體對象 并且沒有被當機的
inst_objects = cc_ObjectBase.objects.filter(
bk_obj_id=task_template_object.bk_obj_id,
inst_status='1',
need_report__in=['new', 'update', 'delete'],
is_frozen=False
)
inst_ids = [i.bk_inst_id for i in inst_objects]
inst_objects.update(is_frozen=True)
if not inst_ids:
logger.error('目前模型沒有符合條件的執行個體')
return False, '目前模型沒有符合條件的執行個體'
inst_objects = cc_ObjectBase.objects.filter(bk_obj_id=task_template_object.bk_obj_id,
bk_inst_id__in=inst_ids)
report_inst_total_num = inst_objects.count()
# 模闆配置的每個批次的數量
branch_max_num = task_template_object.branch_max_num
# 根據任務模闆和執行個體對象計算有多少批次(向上取整)
report_branch_count = math.ceil(report_inst_total_num / (branch_max_num * 1.0))
# task_id 自增
# 上報任務資訊入庫
# 擷取初始化任務id
init_index_obj = TaskIndex.objects.get(id=1)
init_task_id = init_index_obj.task_id
task_index_obj = TaskIndex.objects.create(
unique_id=0
)
task_index_obj.unique_id = task_index_obj.id - 1
task_index_obj.save()
task_index_obj.task_id = task_index_obj.id - 1 + init_task_id
task_index_obj.save()
task_id = task_index_obj.task_id
logger.info('任務id:{0}, 模闆id:{1}'.format(task_id, template_id))
if task_template_object.report_type == 'manual':
task_name = task_template_object.bk_obj_name + '手動報送' + datetime.datetime.now().strftime('%Y%m%d') + str(
task_id)
else:
task_name = task_template_object.bk_obj_name + '定時報送' + datetime.datetime.now().strftime('%Y%m%d') + str(
task_id)
with transaction.atomic():
# 任務報送入庫
try:
logger.info('開始建立cc_ReportTask')
task_obj = cc_ReportTask.objects.create(
# 自增
task_id=task_id,
task_name=task_name,
template_id=task_template_object.template_id,
report_inst_total_num=report_inst_total_num,
report_branch_count=report_branch_count,
branch_max_num=task_template_object.branch_max_num,
report_type=task_template_object.report_type,
task_status=0,
spec=task_template_object.spec,
day=task_template_object.day,
frequency=task_template_object.frequency,
time=task_template_object.time,
workflow_id=workflow_id,
creator='system',
create_time=datetime.datetime.utcfromtimestamp(time.time())
)
# 循環批次數量
logger.info('循環批次次數:{0}'.format(report_branch_count))
for i in range(1, report_branch_count + 1):
# 計算目前批次執行個體數
current_branch_inst_num = branch_max_num
# 如果目前批次是最後一個批次,計算最後一批次的執行個體數量
if i == report_branch_count:
current_branch_inst_num = report_inst_total_num - (branch_max_num * (report_branch_count - 1))
# # 目前批次的上報資料封裝
report_inst_metadata_objs = inst_objects[current_branch_inst_num * (i - 1):current_branch_inst_num * i]
logger.info('開始執行branch_operator')
branch_operator(
task_obj=task_obj,
current_branch_inst_num=current_branch_inst_num,
report_inst_metadata_objs=report_inst_metadata_objs
)
return True, 'success'
except Exception as e:
logger.error('任務報送入庫函數run_function錯誤{0}'.format(str(e)))
return False, str(e)