天天看點

異步任務_Django+Celery實作定時任務和異步任務

簡介:

Celery is a simple, flexible, and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system.

It’s a task queue with focus on real-time processing, while also supporting task scheduling.

Celery是一個簡單、靈活和可靠的分布式系統,可以處理大量的消息,同時提供維護這樣一個系統所需的工具。它是一個任務隊列,關注實時處理,同時支援任務排程。

内容:

Django是一個很好用的python的web開發架構,日常開發任務中,難免會遇到一些需要用到異步或者定時執行的任務,或者是要執行一些耗時操作。Celery是一個很強大的異步任務工具。

配置:

1、首先需要安裝響應的包,這裡我們直接使用djcelery

pip install django-celery

pip install celery # 建議目前先不要使用4.1或者4.2版本的celery

pip install django-redis

2、在settings項目配置檔案注冊app ,加入“djcelery“

異步任務_Django+Celery實作定時任務和異步任務

3、編輯celery配置檔案,一般會在某個需要使用celery的app下面建立celery.py檔案,為了每個app可能對celery配置有不同需求;也可以統一配置,使用一個celery.py檔案,這裡大家根據實際需求配置。

這裡直接使用redis作為消息中間件,任務資料直接存儲到本地資料庫

# ~*~ coding: utf-8 ~*~

from __future__ import absolute_import, unicode_literals

import os

from datetime import timedelta

from celery import Celery

from celery.schedules import crontab

# set the default Django settings module for the 'celery' program.

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'jumpserver.settings')

from django.conf import settings

app = Celery('app')

# Using a string here means the worker will not have to

# pickle the object when using Windows.

app.config_from_object('django.conf:settings')

app.autodiscover_tasks(lambda: [app_config.split('.')[0]

for app_config in settings.INSTALLED_APPS])

CELERY_CREATE_MISSING_QUEUES = True # 某個程式中出現的隊列,在broker中不存在,則立刻建立它

CELERYD_CONCURRENCY = 30 # 并發worker數

CELERYD_FORCE_EXECV = True # 非常重要,有些情況下可以防止死鎖

CELERYD_MAX_TASKS_PER_CHILD = 100 # 每個worker最多執行萬100個任務就會被銷毀,可防止記憶體洩露

BROKER_URL = 'redis://:%(password)[email protected]%(host)s:%(port)s/6' % {

'password': CONFIG.REDIS_PASSWORD if CONFIG.REDIS_PASSWORD else '',

'host': CONFIG.REDIS_HOST or '127.0.0.1',

'port': CONFIG.REDIS_PORT or 6379,

}

# BROKER_URL = 'redis://:密碼@主機位址:端口号/資料庫号'

CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'

# CELERY_RESULT_BACKEND = BROKER_URL

CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend'

CELERY_ACCEPT_CONTENT = ['application/json']

CELERY_TASK_SERIALIZER = 'json'

CELERY_RESULT_SERIALIZER = 'json'

CELERYD_MAX_TASKS_PER_CHILD = 40

CELERY_TIMEZONE = 'Asia/Shanghai'

BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 120}

CELERY_DISABLE_RATE_LIMITS = True

# 配置定時任務

app.conf.update(

CELERYBEAT_SCHEDULE={

'refresh-asset-hardware-info': {

'task': 'app.tasks.update_assets_hardware_period', # 任務名

'schedule': 60*60*60*24, # 執行時間(可以去官網看,執行規則的配置方式)

'args': (), # 執行任務方法所需參數

}

}

)

4、編輯任務,任務方法必須定義在每個app下面的tasks.py檔案,當啟動celery時,會自動去加載每個app下面tasks.py檔案所定義的任務,這裡以一個異步發送郵箱的任務為例子。

from __future__ import absolute_import

# from celery import shared_task

from django.core.mail import send_mail

from django.conf import settings

from app import celery_app as app

@app.task

def send_mail_async(*args, **kwargs): """ Using celery to send email async You can use it as django send_mail function Example: send_mail_sync.delay(subject, message, from_mail, recipient_list, fail_silently=False, html_message=None) Also you can ignore the from_mail, unlike django send_mail, from_email is not a require args: Example: send_mail_sync.delay(subject, message, recipient_list, fail_silently=False, html_message=None) """

if len(args) == 3:

args = list(args)

args[0] = settings.EMAIL_SUBJECT_PREFIX + args[0]

args.insert(2, settings.EMAIL_HOST_USER)

args = tuple(args)

send_mail(*args, **kwargs)

4 啟動celery

celery -A common worker -B -l debug #app為celery.py檔案所在的app名

到這裡,一個最簡單的celery異步任務已經完成了!

繼續閱讀