Celery is a simple, flexible, and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system.
It’s a task queue with focus on real-time processing, while also supporting task scheduling.
Celery是一個簡單、靈活和可靠的分布式系統,可以處理大量的消息,同時提供維護這樣一個系統所需的工具。它是一個任務隊列,關注實時處理,同時支援任務排程。
内容:Django是一個很好用的python的web開發架構,日常開發任務中,難免會遇到一些需要用到異步或者定時執行的任務,或者是要執行一些耗時操作。Celery是一個很強大的異步任務工具。
配置:1、首先需要安裝響應的包,這裡我們直接使用djcelery
pip install django-celery
pip install celery # 建議目前先不要使用4.1或者4.2版本的celery
pip install django-redis
2、在settings項目配置檔案注冊app ,加入“djcelery“
3、編輯celery配置檔案,一般會在某個需要使用celery的app下面建立celery.py檔案,為了每個app可能對celery配置有不同需求;也可以統一配置,使用一個celery.py檔案,這裡大家根據實際需求配置。
這裡直接使用redis作為消息中間件,任務資料直接存儲到本地資料庫。
# ~*~ coding: utf-8 ~*~
from __future__ import absolute_import, unicode_literals
import os
from datetime import timedelta
from celery import Celery
from celery.schedules import crontab
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'jumpserver.settings')
from django.conf import settings
app = Celery('app')
# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: [app_config.split('.')[0]
for app_config in settings.INSTALLED_APPS])
CELERY_CREATE_MISSING_QUEUES = True # 某個程式中出現的隊列,在broker中不存在,則立刻建立它
CELERYD_CONCURRENCY = 30 # 并發worker數
CELERYD_FORCE_EXECV = True # 非常重要,有些情況下可以防止死鎖
CELERYD_MAX_TASKS_PER_CHILD = 100 # 每個worker最多執行萬100個任務就會被銷毀,可防止記憶體洩露
BROKER_URL = 'redis://:%(password)[email protected]%(host)s:%(port)s/6' % {
'password': CONFIG.REDIS_PASSWORD if CONFIG.REDIS_PASSWORD else '',
'host': CONFIG.REDIS_HOST or '127.0.0.1',
'port': CONFIG.REDIS_PORT or 6379,
}
# BROKER_URL = 'redis://:密碼@主機位址:端口号/資料庫号'
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
# CELERY_RESULT_BACKEND = BROKER_URL
CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERYD_MAX_TASKS_PER_CHILD = 40
CELERY_TIMEZONE = 'Asia/Shanghai'
BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 120}
CELERY_DISABLE_RATE_LIMITS = True
# 配置定時任務
app.conf.update(
CELERYBEAT_SCHEDULE={
'refresh-asset-hardware-info': {
'task': 'app.tasks.update_assets_hardware_period', # 任務名
'schedule': 60*60*60*24, # 執行時間(可以去官網看,執行規則的配置方式)
'args': (), # 執行任務方法所需參數
}
}
)
4、編輯任務,任務方法必須定義在每個app下面的tasks.py檔案,當啟動celery時,會自動去加載每個app下面tasks.py檔案所定義的任務,這裡以一個異步發送郵箱的任務為例子。
from __future__ import absolute_import
# from celery import shared_task
from django.core.mail import send_mail
from django.conf import settings
from app import celery_app as app
@app.task
def send_mail_async(*args, **kwargs): """ Using celery to send email async You can use it as django send_mail function Example: send_mail_sync.delay(subject, message, from_mail, recipient_list, fail_silently=False, html_message=None) Also you can ignore the from_mail, unlike django send_mail, from_email is not a require args: Example: send_mail_sync.delay(subject, message, recipient_list, fail_silently=False, html_message=None) """
if len(args) == 3:
args = list(args)
args[0] = settings.EMAIL_SUBJECT_PREFIX + args[0]
args.insert(2, settings.EMAIL_HOST_USER)
args = tuple(args)
send_mail(*args, **kwargs)
4 啟動celery
celery -A common worker -B -l debug #app為celery.py檔案所在的app名
到這裡,一個最簡單的celery異步任務已經完成了!