天天看點

Celery 任務分多隊列運作Celery 任務分多隊列運作

Celery 任務分多隊列運作

需要安裝 python插件Celery, RabbitMq

  • 代碼結構:
    Celery 任務分多隊列運作Celery 任務分多隊列運作
  • 建立:celery_app.py
from celery import Celery

app = Celery("TestTask")

app.config_from_object("settings")

           
  • 建立:settings.py
from datetime import timedelta
from kombu import Queue
from kombu import Exchange

BROKER_URL = 'amqp://{}:{}@{}:{}'.format(
    'admin',
    'admin',
    '127.0.0.1',
    '5672'
)

# 啟動Celery預設的定時任務
# CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'

CELERY_ACCEPT_CONTENT = ['application/json']
# 
CELERY_TASK_SERIALIZER = 'json'
# 傳回資料序列化格式
CELERY_RESULT_SERIALIZER = 'json'
# 時區
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = False

# 預設開啟10程序
CELERYD_CONCURRENCY = 10
# CELERYD_MAX_TASKS_PER_CHILD = 3  #  每個worker最多執行1個任務就會被銷毀,可防止記憶體洩露


CELERY_QUEUES = (
    Queue('Default', exchange=Exchange('default'), routing_key='default'),
    Queue('Tasks_Main', exchange=Exchange('Tasks_Main'), routing_key='Tasks_Main'),
    )

# Celery 路由設定
CELERY_ROUTES = {
    'tasks.main': {'queue': 'Tasks_Main', 'routing_key': 'Tasks_Main'}
}

# 如果不指定QUEUE 那麼就用Default
CELERY_DEFAULT_QUEUE = 'Default'
CELERY_DEFAULT_EXCHANGE = 'default'
CELERY_DEFAULT_ROUTING_KEY = 'default'

CELERY_IMPORTS = ('tasks',)
           
  • 建立:tasks.py
from celery_app import app

@app.task(name='tasks.main')
def task_main(param):
    print('調用成功:' + str(param))
           
  • 運作
celery -A celery_app worker  -Q Default  -n Queue_Default@%d
celery -A celery_app worker  -Q Tasks_Main  -n Queue_Tasks_Main@%d 
           
  • test.py
from tasks import task_main

task_main.apply_async(args=['Tasks_Main'], queue='Tasks_Main', routing_key='Tasks_Main')
task_main.apply_async(args=['Default'], queue='Default', routing_key='default')
           
  • 運作結果如下:
Celery 任務分多隊列運作Celery 任務分多隊列運作
Celery 任務分多隊列運作Celery 任務分多隊列運作