# 我的项目 tree
"""
环境:
celery
celery-with-redis
Django
redis
aliyun-python-sdk-core
"""
--DjCelery
--celery_tasks
--sms.py
--tasks.py
--urls.py
--views.py
--DjCelery
--__init__.py
--celery.py
--setting.py
--urls.py
manage.py
DjCelery下
配置:
setting.py中添加
from __future__ import absolute_import
### celery 配置
BROKER_URL = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json'] # 接受内容
CELERY_TASK_SERIALIZER = 'json' # 任务
CELERY_RESULT_SERIALIZER = 'json' # 结果
###
init.py中添加
# 引入 celery 实例对象
from .celery import app as celery_app
__all__ = ('celery_app',)
urls.py中
urlpatterns = [
path('admin/', admin.site.urls),
path('',include('celery_tasks.urls')),
]
注册 celery 应用
celery.py中
from __future__ import absolute_import, unicode_literals
from celery import Celery
from django.conf import settings
import os
# 设置环境变量 (您不需要此行,但它可以避免您始终将设置模块传递给celery程序。它必须始终在创建)
os.environ.setdefault('DJANGO_SETTINGS_MODULE','DjCelery.settings')
# 创建 celery 应用 并起别名
app = Celery('celerytest')
# 导入 celery 配置
app.config_from_object('django.conf:settings')
# 加载所有注册应用
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
# 手动注册 celery 任务
# app.autodiscover_tasks(['DjCelery.celery_test'])
celery_tasks下
sms.py 可以查看点进去有个Demo,就是用的那个
tasks.py中
from celery_tasks.sms import send_sms
from celery import shared_task
# logger = logging.getLogger('django')
# Create your tests here.
@shared_task
def sms_to(mobile):
"""
mobile: 手机号
"""
send_result = send_sms(int(mobile))
# print(send_result)
return send_result
urls.py中
from django.urls import path
from . import views
urlpatterns = [
path('sms/', views.sms_view),
]
最后 veiws 中调用
from django.shortcuts import render, HttpResponse
from celery_tasks.tasks import sms_to
# Create your views here.
def sms_view(request):
# info = request.POST
# p = info['p']
result = sms_to.delay('这里是手机号')
return HttpResponse('ok!')
运行:
# 在项目下运行:
Celery -A DjCelery worker -l debug
# debug 可以查看运行时 print 的内容;写 info 也可以,查看关键信息。
python manage.py runserver
# 上面2条命令同时运行,访问:http://127.0.0.1:8000/sms/ 就可以查看结果了。
成功运行结果图:
后端输出结果: