天天看點

Python Django Celery redis 調用阿裡雲短信接口

# 我的項目 tree
"""
環境:
	celery
	celery-with-redis
	Django
	redis
	aliyun-python-sdk-core
"""
--DjCelery
	--celery_tasks
		--sms.py
		--tasks.py
		--urls.py
		--views.py
	--DjCelery
		--__init__.py
		--celery.py
		--setting.py
		--urls.py
	manage.py
           

DjCelery下

配置:

setting.py中添加

from __future__ import absolute_import 

### celery 配置
BROKER_URL = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json'] 	# 接受内容
CELERY_TASK_SERIALIZER = 'json'		# 任務
CELERY_RESULT_SERIALIZER = 'json'	# 結果
###
           

init.py中添加

# 引入 celery 執行個體對象
from .celery import app as celery_app

__all__ = ('celery_app',)
           

urls.py中

urlpatterns = [
    path('admin/', admin.site.urls),
    path('',include('celery_tasks.urls')),
]
           

注冊 celery 應用

celery.py中

from __future__ import absolute_import, unicode_literals
from celery import Celery
from django.conf import settings

import os

# 設定環境變量  (您不需要此行,但它可以避免您始終将設定子產品傳遞給celery程式。它必須始終在建立)
os.environ.setdefault('DJANGO_SETTINGS_MODULE','DjCelery.settings')

# 建立 celery 應用 并起别名
app = Celery('celerytest')

# 導入 celery 配置
app.config_from_object('django.conf:settings')

# 加載所有注冊應用
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
# 手動注冊 celery 任務
# app.autodiscover_tasks(['DjCelery.celery_test'])

           

celery_tasks下

sms.py 可以檢視點進去有個Demo,就是用的那個

tasks.py中

from celery_tasks.sms import send_sms
from celery import shared_task

# logger = logging.getLogger('django')


# Create your tests here.
@shared_task
def sms_to(mobile):
    """
        mobile: 手機号
    """
    send_result = send_sms(int(mobile))
    # print(send_result)
    return send_result
           

urls.py中

from django.urls import path
from . import views

urlpatterns = [
    path('sms/', views.sms_view),
]
           

最後 veiws 中調用

from django.shortcuts import render, HttpResponse
from celery_tasks.tasks import sms_to

# Create your views here.

def sms_view(request):
    # info = request.POST
    # p = info['p']
    result = sms_to.delay('這裡是手機号')
    return HttpResponse('ok!')
           

運作:

# 在項目下運作:

Celery -A DjCelery worker -l debug  
# debug 可以檢視運作時 print 的内容;寫 info 也可以,檢視關鍵資訊。

python manage.py runserver
# 上面2條指令同時運作,通路:http://127.0.0.1:8000/sms/  就可以檢視結果了。
           

成功運作結果圖:

Python Django Celery redis 調用阿裡雲短信接口

後端輸出結果:

Python Django Celery redis 調用阿裡雲短信接口