天天看點

Celery在Django下安裝配置Celery(芹菜)安裝配置過程

Celery(芹菜)安裝配置過程

環境:win10 64;python 3.6.0;Django 2.0.3redis-3.2

開發工具:pycharm

  1. 安裝包:celery,django-celery,celery-with-redis(使用redis作為消息代理(broker))
  2. 項目配置:
首先在"項目\settings.py"下加入celery配置:
# Celery settings
import djcelery
djcelery.setup_loader()
BROKER_URL = 'redis://127.0.0.1:6379/0'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
CELERY_TIMEZONE = 'Asia/Shanghai'
BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 3600}  # 1 hour.
           
其次在INSTALL_APPS裡面加入celery配置
INSTALLED_APPS = [
    ...
    'djcelery',
]
           
建立celery.py,将Django與Celery關聯
from __future__ import absolute_import


import os
from celery import Celery
from django.conf import settings

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'ProjectName.settings')

app = Celery('APPName')

app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))
           

3. 配置應用:

在應用中建立tasks.py,代碼如下:

from celery import task
from time import sleep, time
           
@task()
def Task_A(message):
    Task_A.update_state(state='PROGRESS', meta={'progress': 0})
    sleep(10)
    Task_A.update_state(state='PROGRESS', meta={'progress': 30})
    sleep(10)
    return message


def get_task_status(task_id):
    task = Task_A.AsyncResult(task_id)

    status = task.state
    progress = 0

    if status == u'SUCCESS':
        progress = 100
    elif status == u'FAILURE':
        progress = 0
    elif status == 'PROGRESS':
        progress = task.info['progress']

    return {'status': status, 'progress': progress}


@task
def sendmail(mail):
    print("++++++++++++++++++++++++++++++++++++")
    print('sending mail to %s...' % mail['to'])
    time.sleep(2.0)
    print('mail sent.')
    print("------------------------------------")
    return mail['to']
           

測試應用:

Django環境下運作celery

python manage.py celeryd -l info

-------------- [email protected] v3.1.26.post2 (Cipater)
---- **** -----
--- * ***  * -- Windows-10-10.0.15063-SP0
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         default:0x17b6c901240 (djcelery.loaders.DjangoLoader)
- ** ---------- .> transport:   redis://127.0.0.1:6379/0
- ** ---------- .> results:     redis://127.0.0.1:6379/0
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery
[tasks]
  . DocCheck.tasks.Task_A
  . DocCheck.tasks.sendmail

[2018-04-04 15:27:35,018: INFO/MainProcess] Connected to redis://127.0.0.1:6379/0
[2018-04-04 15:27:35,041: INFO/MainProcess] mingle: searching for neighbors
[2018-04-04 15:27:36,067: WARNING/MainProcess] C:\Python36\lib\site-packages\celery\app\contr
ol.py:36: DuplicateNodenameWarning: Received multiple replies from node name: [email protected]
Please make sure you give each node a unique nodename using the `-n` option.
  pluralize(len(dupes), 'name'), ', '.join(sorted(dupes)),
[2018-04-04 15:27:36,070: INFO/MainProcess] mingle: all alone
[2018-04-04 15:27:36,128: WARNING/MainProcess] C:\Python36\lib\site-packages\djcelery\loaders
.py:133: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in
production environments!
  warn('Using settings.DEBUG leads to a memory leak, never '
[2018-04-04 15:27:36,156: WARNING/MainProcess] [email protected] ready.
[2018-04-04 15:27:36,233: INFO/Worker-1] child process 15852 calling self.run()
[2018-04-04 15:27:36,252: INFO/Worker-1] child process 20336 calling self.run()
[2018-04-04 15:27:36,283: INFO/Worker-1] child process 6920 calling self.run()
[2018-04-04 15:27:36,296: INFO/Worker-1] child process 19668 calling self.run()
           

成功啟動後,在Django manage環境中測試任務執行情況

啟動shell:python manage.py shell

>>> from APPName.tasks import *
>>> sendmail.delay(dict(to='[email protected]'))
           
<AsyncResult: 20a8c817-a6fe-434e-8bc4-5efdb5356046>
           

測試任務執行情況

>>> Task_A.delay("test")
<AsyncResult: c1c7c985-c0c4-48d6-9e21-b71ce87c3eb5>
>>> t = Task_A.delay("test1")
>>> t.ready()
False
>>> t.ready()
True
           

Broker顯示結果如下:

[2018-04-04 15:30:59,903: INFO/MainProcess] Received task: DocCheck.tasks.Task_A[d7315376-5
cc4-4732-b1ea-ccfa513a0f54]
[2018-04-04 15:31:19,914: INFO/MainProcess] Task DocCheck.tasks.Task_A[d7315376-5cc4-4732-b
1ea-ccfa513a0f54] succeeded in 20.0s: test1
[2018-04-04 15:58:29,449: INFO/MainProcess] Received task: DocCheck.tasks.Task_A[39e903d1
-d9c1-4bab-96e4-1b7466538001]
[2018-04-04 15:58:49,673: INFO/MainProcess] Task DocCheck.tasks.Task_A[39e903d1-d9c1-4bab
-96e4-1b7466538001] succeeded in 20.156000000424683s: testget