本篇文章是 celery分布式任務隊列從入門到精通_大帥的部落格-CSDN部落格 中的一個章節,其中celery的安裝部署不再闡述,預設讀者有django和celery的基礎。
1. 簡單的django項結構如下:
其中隻定義了兩個路由,taskA和taskB,在 view中實作了兩個簡單的方法
url.py路由部分如下:
urlpatterns = [
path('admin/', admin.site.urls),
path('taskA', func_A),
path('taskB', func_B),
]
views.py視圖函數部分如下:
from django.http import HttpResponse
def func_A(request):
return HttpResponse("hello taskA!")
def func_B(request):
return HttpResponse("hello taskB!")
整個django項目十分簡單,通路路由http://127.0.0.1:8000/taskA和taskB路由傳回如下:
2. 建立celery工程
celery工程建立在django_demo工程目錄下,
其中main.py主要用來初始化celery應用的,setting.py用來儲存celery配置項,__init__.py保持為空即可,讓整個目錄成為一個可導入的python包。
其中還定義了兩個任務子產品:tasks_A 和 tasks_B
以下的目錄中的代碼
main.py
from celery import Celery
import os
# 讓celery在啟動時提前加載django配置檔案,就可以使用django中的子產品和變量了
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "django_demo.settings")
# 初始化app,導入任務
celery_app = Celery(
"celery_tasks",
include=["celery_tasks.tasks_A.tasks", "celery_tasks.tasks_B.tasks"]
)
# 初始化時,include的參數作用的導入任務,還有另一種方式自動發現注冊任務
# celery_app.autodiscover_tasks(['celery_tasks.tasks_A', 'celery_tasks.tasks_B'])
# 加載配置
celery_app.config_from_object("celery_tasks.settings")
setting.py
# 設定時區
timezone = "Asia/Shanghai"
# 使用redis 作為消息代理
broker_url = 'redis://127.0.0.1:6379/0'
# 任務結果存在Redis
result_backend = 'redis://127.0.0.1:6379/0'
# 讀取任務結果一般性能要求不高,是以使用了可讀性更好的JSON
result_serializer = 'json'
tasks_A/tasks.py
from celery_tasks.main import celery_app
import time
@celery_app.task
def taskA():
time.sleep(3)
print("taskA")
tasks_B/tasks.py
from celery_tasks.main import celery_app
import time
@celery_app.task
def taskB(x, y):
time.sleep(3)
print("taskB")
return x + y
taskA函數也是十分簡單,隻是設定了阻塞3秒,并列印任務名稱,沒有傳回值。taskB函數則是實作了兩個數相加并傳回之和
3. 在django工程中使用celery任務
修改views.py,導入celery的任務函數。修改如下
views.py
from django.http import HttpResponse
from celery_tasks.tasks_A.tasks import taskA # 導入任務
from celery_tasks.tasks_B.tasks import taskB
def func_A(request):
# 異步執行celery中的任務
# taskA函數中實作了time.sleep(3)阻塞3秒,但是這裡不會阻塞,會立即傳回hello taskA!
taskA.delay()
return HttpResponse("hello taskA!")
def func_B(request):
# 異步執行celery中的任務
taskB.delay(2,2)
return HttpResponse("hello taskB!")
啟動django項目
python3 manage.py runserver 0.0.0.0:8000
啟動celery的worker監視
因為這次我是在win10主機上運作celery,需要先安裝兩個python庫
pip3 install redis eventlet
安裝完後啟動celery
celery -A celery_tasks worker -l info -P eventlet
啟動成功如下圖所示
驗證: 打開浏覽器,輸入位址http://127.0.0.1:8000/taskA 和taskB的路由,可以發現立馬就傳回了結果,并不需要等待3秒,
再打開worker的終端檢視,分别列印taskA, taskB,4,兩個任務都成功執行了