天天看點

Celery 配置MQ 安排不同的worker 去完成不同的任務

在使用celery的時候,想讓不同的worker去做不同的任務,因為每個任務對機器的性能要求不同。

如要是需要運作較為複雜的算法的話,可能需要CPU 比較強勁的機器,反之如果隻是簡單計算,或者程式對CPU要求不高,對其他硬體有要求,那就可能需要按需配置設定worker, 進而減少成本。

具體步驟如下,在同一個task.py 檔案下寫入相應的 任務

from celery import Celery
import time
from celery.exceptions import SoftTimeLimitExceeded


import os
from celery import Celery

#: Set default configuration module name
os.environ.setdefault('CELERY_CONFIG_MODULE', 'celeryconfig')
str1 =os.environ.setdefault('CELERY_CONFIG_MODULE', 'celeryconfig')

app = Celery()
app.config_from_envvar('CELERY_CONFIG_MODULE')


# 視訊壓縮
@app.task(time_limit=20)
def video_compress(video_name):
    time.sleep(10)
    print ('Compressing the:'+ video_name)
    return 'success'


@app.task(bind=True)
def hello(self, a, b):
    time.sleep(1)
    self.update_state(state="PROGRESS", meta={'progress': 50})
    time.sleep(1)
    self.update_state(state="PROGRESS", meta={'progress': 90})
    time.sleep(1)
    return 'hello world: %i' % (a+b)
#如果是一個大型項目中的一個檔案的話需要寫明這個task的名字,否則route不會識别 如“app.celerytask.tasks.video_upload”
@app.task(name='檔案目錄結構') 
def video_upload(video_name):
    time.sleep(5)
    print ('正在上傳視訊')
    return 'success'

# 壓縮照片
@app.task
def image_compress(image_name):
    time.sleep(10)
    print ('Compressing the:', image_name)
    return 'success'

# 其他任務
@app.task
def other(str):
    time.sleep(10)
    print ('Do other things')
    return 'success'

           

配置檔案 celeryconfig.py

from kombu import Exchange, Queue
# 配置時區
CELERY_TIMEZONE = 'Asia/Shanghai'

BROKER_URL = '(你的RabbitMQ)'
RESULT_BACKEND = 'redis://(你的redis):6379'
# 定義一個預設交換機
default_exchange = Exchange('dedfault', type='direct')

# 定義一個媒體交換機
media_exchange = Exchange('media', type='direct')
timeout_exchange = Exchange('timeoutExchange', type='direct')

# 建立三個隊列,一個是預設隊列,一個是video、一個image
CELERY_QUEUES = (
    Queue('default', default_exchange, routing_key='default'),
    Queue('videos', media_exchange, routing_key='media.video'),
    Queue('images', media_exchange, routing_key='media.image'),
    Queue('timeoutQueue', timeout_exchange, routing_key='timeout')

)

CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = 'default'
CELERY_DEFAULT_ROUTING_KEY = 'default'
#
CELERY_ROUTES = (
    {'tasks.image_compress': {
        'queue': 'images',
        'routing_key': 'media.image'
    }
    },
    {'app.celerytask.tasks.video_upload': {#相應的 在 route 配置中寫明這個 task 的名字 以便 route 成功識别
        'queue': 'videos',
        'routing_key': 'media.video'
    }
    },
    {'tasks.video_compress': {
        'queue': 'videos',
        'routing_key': 'media.video'
    }
    },
    {'tasks.timeout_test': {
        'queue': 'timeoutQueue',
        'routing_key': 'timeout'
    }
    },
)

# 在出現worker接受到的message出現沒有注冊的錯誤時,使用下面一句能解決
CELERY_IMPORTS = ("tasks",)
           

啟動worker時使用 -Q 來限定Worker 的subscribe 的MQ

# 啟動預設的worker
celery worker1 -Q default --loglevel=info
# 啟動處理視訊的worker
celery worker2 -Q videos --loglevel=info
# 啟動處理圖檔的worker
celery worker3 -Q images --loglevel=info
           

這樣 worker1 監聽的就是預設的queue

worker2 監聽的就是 video queue, 負責處理 video_upload  以及 video_compress  任務,相應的worker3 就負責image_compress

這樣我們就可以在不同的節點上分别部署不同的worker 讓他們分别處理不同的任務。

代碼來自網上,我以前看到的博文

具體出處忘了 可能是這裡https://www.jianshu.com/p/11b420aea529