# 我的項目 tree
"""
環境:
celery
celery-with-redis
Django
redis
aliyun-python-sdk-core
"""
--DjCelery
--celery_tasks
--sms.py
--tasks.py
--urls.py
--views.py
--DjCelery
--__init__.py
--celery.py
--setting.py
--urls.py
manage.py
DjCelery下
配置:
setting.py中添加
from __future__ import absolute_import
### celery 配置
BROKER_URL = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json'] # 接受内容
CELERY_TASK_SERIALIZER = 'json' # 任務
CELERY_RESULT_SERIALIZER = 'json' # 結果
###
init.py中添加
# 引入 celery 執行個體對象
from .celery import app as celery_app
__all__ = ('celery_app',)
urls.py中
urlpatterns = [
path('admin/', admin.site.urls),
path('',include('celery_tasks.urls')),
]
注冊 celery 應用
celery.py中
from __future__ import absolute_import, unicode_literals
from celery import Celery
from django.conf import settings
import os
# 設定環境變量 (您不需要此行,但它可以避免您始終将設定子產品傳遞給celery程式。它必須始終在建立)
os.environ.setdefault('DJANGO_SETTINGS_MODULE','DjCelery.settings')
# 建立 celery 應用 并起别名
app = Celery('celerytest')
# 導入 celery 配置
app.config_from_object('django.conf:settings')
# 加載所有注冊應用
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
# 手動注冊 celery 任務
# app.autodiscover_tasks(['DjCelery.celery_test'])
celery_tasks下
sms.py 可以檢視點進去有個Demo,就是用的那個
tasks.py中
from celery_tasks.sms import send_sms
from celery import shared_task
# logger = logging.getLogger('django')
# Create your tests here.
@shared_task
def sms_to(mobile):
"""
mobile: 手機号
"""
send_result = send_sms(int(mobile))
# print(send_result)
return send_result
urls.py中
from django.urls import path
from . import views
urlpatterns = [
path('sms/', views.sms_view),
]
最後 veiws 中調用
from django.shortcuts import render, HttpResponse
from celery_tasks.tasks import sms_to
# Create your views here.
def sms_view(request):
# info = request.POST
# p = info['p']
result = sms_to.delay('這裡是手機号')
return HttpResponse('ok!')
運作:
# 在項目下運作:
Celery -A DjCelery worker -l debug
# debug 可以檢視運作時 print 的内容;寫 info 也可以,檢視關鍵資訊。
python manage.py runserver
# 上面2條指令同時運作,通路:http://127.0.0.1:8000/sms/ 就可以檢視結果了。
成功運作結果圖:
後端輸出結果: