天天看点

python消息队列celery_CELERY 消息队列配置 | 学步园

消息队列配置文档:

使用 MongoDB数据库:

1. 安装 MongoDB.

2. easy_install pymongo

3. easy_install celery

4. easy_install django celery

5. settings.py 配置

增加 app INSTALLED_APPS = ('djcelery','tasks',)

# Using the database to store task state and results.

CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend'

# CELERY_ANNOTATIONS = {'tasks.add': {'rate_limit': '10/s'}}

#: Only add pickle to this list if your broker is secured

#: from unwanted access (see userguide/security.html)

CELERY_ACCEPT_CONTENT = ['json']

CELERY_TASK_SERIALIZER = 'json'

CELERY_RESULT_SERIALIZER = 'json'

# CELERY_TIMEZONE = 'Asia/Shanghai'

# from celery.schedules import crontab

# CELERYBEAT_SCHEDULE = {

# Executes every Monday morning at 7:30 A.M

#     'add-every-monday-morning': {

#         'task': 'tasks.add',

#         'schedule': crontab(hour=7, minute=30, day_of_week=1),

#         'args': (16, 16),

#     },

# }

# mongodb 使用以下配置文件

BROKER_URL = 'mongodb://127.0.0.1:27017'

CELERY_RESULT_BACKEND = 'mongodb://localhost:27017/'

CELERY_MONGODB_BACKEND_SETTINGS = {

'database': 'daibang',

'taskmeta_collection': 'my_taskmeta_collection',

}

6. celery.py 内容:

#!/usr/bin/python

#coding=utf-8

from __future__ import absolute_import

import os

from celery import Celery

from django.conf import settings

# set the default Django settings module for the 'celery' program.

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'wei.settings')

app = Celery('tasks.tasks')

# Using a string here means the worker will not have to

# pickle the object when using Windows.

app.config_from_object('django.conf:settings')

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

@app.task(bind=True)

def debug_task(self):

print('Request: {0!r}'.format(self.request))

7. 在任意app目录下 建tasks.py 文件,构建自己的函数:

例:

#!/usr/bin/python

#coding=utf-8

import logging

import traceback

from set_environ import set_environ

set_environ()

# from __future__ import absolute_import

from celery import shared_task

from wei.celery import app

from django.shortcuts import get_object_or_404

# @app.task(ignore_result=True)

# @app.task(bind=True)

@app.task

def test_task():

# 插入一条记录 后发送邮件给相关人

tuser = User.objects.get(id=1)

msg = MessageSendRecord(

user=tuser,

loan_name='张三',

repay_type='repayment',

repay_amount='1',

is_send=False,

loan_url='http://www.test.com',

)

msg.save()

try:

mail_list = [tuser.email, ]

send_mail(

subject='测试邮件',

message='邮件内容',

from_email=MESSAGE_EMAIL_HOST_USER,  # 发件邮箱

recipient_list=mail_list,

fail_silently=False,

auth_user=MESSAGE_EMAIL_HOST_USER,  # SMTP服务器的认证用户名

auth_password=MESSAGE_EMAIL_HOST_PASSWORD,  # SMTP服务器的认证用户密码

connection=None

)

msg.is_send = True

msg.save()

except Exception:

# 发送失败 记录日志

return False

if __name__ == '__main__':

result = test_task.delay() # 加入队列中

print 'OK'

8. MongoDB数据库 Collections(General) - messages 表中查看队列数据

9. 开启服务 python manage.py celery -A wei worker -l info

详细配置请详见:http://docs.celeryproject.org/en/latest/getting-started/introduction.html

如果没有报错,程序已经在执行,查看数据库表MessageSendRecord  是否发生变化。

使用 mysql数据库:

执行上面2、3、4 步骤

5. settings.py 配置

增加 app INSTALLED_APPS = (

'djcelery',

'tasks',

'kombu.transport.django',  # mysql 增加此项

)

# Using the database to store task state and results.

CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend'

CELERY_ACCEPT_CONTENT = ['json']

CELERY_TASK_SERIALIZER = 'json'

CELERY_RESULT_SERIALIZER = 'json'

mysql 使用以下配置文件

BROKER_URL = 'django://'

CELERY_RESULT_BACKEND = 'db+mysql://daybang:[email protected]:3306/wei_test'

增加:

easy_install sqlalchemy   #  mysql 数据库需要安装此项

python manage.py migrate kombu.transport.django   # 创建 消息队列所需表

python manage.py migrate djcelery   # 创建 消息队列所需表

6、7、8、9 与上面相同。

最后感谢喵-喵  同学的指导。