天天看点

Python Django Celery redis 调用阿里云短信接口

# 我的项目 tree
"""
环境:
	celery
	celery-with-redis
	Django
	redis
	aliyun-python-sdk-core
"""
--DjCelery
	--celery_tasks
		--sms.py
		--tasks.py
		--urls.py
		--views.py
	--DjCelery
		--__init__.py
		--celery.py
		--setting.py
		--urls.py
	manage.py
           

DjCelery下

配置:

setting.py中添加

from __future__ import absolute_import 

### celery 配置
BROKER_URL = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json'] 	# 接受内容
CELERY_TASK_SERIALIZER = 'json'		# 任务
CELERY_RESULT_SERIALIZER = 'json'	# 结果
###
           

init.py中添加

# 引入 celery 实例对象
from .celery import app as celery_app

__all__ = ('celery_app',)
           

urls.py中

urlpatterns = [
    path('admin/', admin.site.urls),
    path('',include('celery_tasks.urls')),
]
           

注册 celery 应用

celery.py中

from __future__ import absolute_import, unicode_literals
from celery import Celery
from django.conf import settings

import os

# 设置环境变量  (您不需要此行,但它可以避免您始终将设置模块传递给celery程序。它必须始终在创建)
os.environ.setdefault('DJANGO_SETTINGS_MODULE','DjCelery.settings')

# 创建 celery 应用 并起别名
app = Celery('celerytest')

# 导入 celery 配置
app.config_from_object('django.conf:settings')

# 加载所有注册应用
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
# 手动注册 celery 任务
# app.autodiscover_tasks(['DjCelery.celery_test'])

           

celery_tasks下

sms.py 可以查看点进去有个Demo,就是用的那个

tasks.py中

from celery_tasks.sms import send_sms
from celery import shared_task

# logger = logging.getLogger('django')


# Create your tests here.
@shared_task
def sms_to(mobile):
    """
        mobile: 手机号
    """
    send_result = send_sms(int(mobile))
    # print(send_result)
    return send_result
           

urls.py中

from django.urls import path
from . import views

urlpatterns = [
    path('sms/', views.sms_view),
]
           

最后 veiws 中调用

from django.shortcuts import render, HttpResponse
from celery_tasks.tasks import sms_to

# Create your views here.

def sms_view(request):
    # info = request.POST
    # p = info['p']
    result = sms_to.delay('这里是手机号')
    return HttpResponse('ok!')
           

运行:

# 在项目下运行:

Celery -A DjCelery worker -l debug  
# debug 可以查看运行时 print 的内容;写 info 也可以,查看关键信息。

python manage.py runserver
# 上面2条命令同时运行,访问:http://127.0.0.1:8000/sms/  就可以查看结果了。
           

成功运行结果图:

Python Django Celery redis 调用阿里云短信接口

后端输出结果:

Python Django Celery redis 调用阿里云短信接口