天天看點

Django項目中使用Celery1. 簡單的django項結構如下:2. 建立celery工程3. 在django工程中使用celery任務

本篇文章是 celery分布式任務隊列從入門到精通_大帥的部落格-CSDN部落格 中的一個章節,其中celery的安裝部署不再闡述,預設讀者有django和celery的基礎。

1. 簡單的django項結構如下:

Django項目中使用Celery1. 簡單的django項結構如下:2. 建立celery工程3. 在django工程中使用celery任務

         其中隻定義了兩個路由,taskA和taskB,在 view中實作了兩個簡單的方法

        url.py路由部分如下:

urlpatterns = [
    path('admin/', admin.site.urls),
    path('taskA', func_A),
    path('taskB', func_B),
]
           

         views.py視圖函數部分如下:

from django.http import HttpResponse


def func_A(request):
    return HttpResponse("hello taskA!")


def func_B(request):
    return HttpResponse("hello taskB!")
           

        整個django項目十分簡單,通路路由http://127.0.0.1:8000/taskA和taskB路由傳回如下:

Django項目中使用Celery1. 簡單的django項結構如下:2. 建立celery工程3. 在django工程中使用celery任務

Django項目中使用Celery1. 簡單的django項結構如下:2. 建立celery工程3. 在django工程中使用celery任務

​ 

2. 建立celery工程

        celery工程建立在django_demo工程目錄下,

Django項目中使用Celery1. 簡單的django項結構如下:2. 建立celery工程3. 在django工程中使用celery任務

        其中main.py主要用來初始化celery應用的,setting.py用來儲存celery配置項,__init__.py保持為空即可,讓整個目錄成為一個可導入的python包。

        其中還定義了兩個任務子產品:tasks_A 和 tasks_B

        以下的目錄中的代碼

main.py

from celery import Celery
import os

# 讓celery在啟動時提前加載django配置檔案,就可以使用django中的子產品和變量了
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "django_demo.settings")

# 初始化app,導入任務
celery_app = Celery(
    "celery_tasks",
    include=["celery_tasks.tasks_A.tasks", "celery_tasks.tasks_B.tasks"]
)
# 初始化時,include的參數作用的導入任務,還有另一種方式自動發現注冊任務
# celery_app.autodiscover_tasks(['celery_tasks.tasks_A', 'celery_tasks.tasks_B'])

# 加載配置
celery_app.config_from_object("celery_tasks.settings")
           

setting.py

# 設定時區
timezone = "Asia/Shanghai"

# 使用redis 作為消息代理
broker_url = 'redis://127.0.0.1:6379/0'

# 任務結果存在Redis
result_backend = 'redis://127.0.0.1:6379/0'

# 讀取任務結果一般性能要求不高,是以使用了可讀性更好的JSON
result_serializer = 'json'
           

tasks_A/tasks.py

from celery_tasks.main import celery_app
import time

@celery_app.task
def taskA():
    time.sleep(3)
    print("taskA")
           

tasks_B/tasks.py

from celery_tasks.main import celery_app
import time


@celery_app.task
def taskB(x, y):
    time.sleep(3)
    print("taskB")
    return x + y
           

        taskA函數也是十分簡單,隻是設定了阻塞3秒,并列印任務名稱,沒有傳回值。taskB函數則是實作了兩個數相加并傳回之和

3. 在django工程中使用celery任務

        修改views.py,導入celery的任務函數。修改如下

views.py

from django.http import HttpResponse
from celery_tasks.tasks_A.tasks import taskA  # 導入任務
from celery_tasks.tasks_B.tasks import taskB

def func_A(request):
    # 異步執行celery中的任務
    # taskA函數中實作了time.sleep(3)阻塞3秒,但是這裡不會阻塞,會立即傳回hello taskA!
    taskA.delay()
    return HttpResponse("hello taskA!")

def func_B(request):
    # 異步執行celery中的任務
    taskB.delay(2,2)
    return HttpResponse("hello taskB!")
           

        啟動django項目

python3 manage.py runserver 0.0.0.0:8000
           

        啟動celery的worker監視

        因為這次我是在win10主機上運作celery,需要先安裝兩個python庫

pip3 install redis eventlet
           

         安裝完後啟動celery

celery -A celery_tasks worker -l info -P eventlet
           

         啟動成功如下圖所示

Django項目中使用Celery1. 簡單的django項結構如下:2. 建立celery工程3. 在django工程中使用celery任務

驗證: 打開浏覽器,輸入位址http://127.0.0.1:8000/taskA 和taskB的路由,可以發現立馬就傳回了結果,并不需要等待3秒,

Django項目中使用Celery1. 簡單的django項結構如下:2. 建立celery工程3. 在django工程中使用celery任務

 再打開worker的終端檢視,分别列印taskA, taskB,4,兩個任務都成功執行了

Django項目中使用Celery1. 簡單的django項結構如下:2. 建立celery工程3. 在django工程中使用celery任務