天天看點

Django項目中使用celery分布式任務方法

項目架構目錄樹:

django_celery
|
├── course  # 建立應用
│   ├── admin.py
│   ├── __init__.py
│   ├── migrations
│   ├── models.py
│   ├── tasks.py  # 任務子產品
│   ├── urls.py  # 路由檔案
│   └── views.py
├── db.sqlite3
├── django_celery
│   ├── celeryconfig.py  # celery配置檔案
│   ├── __init__.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
└── manage.py

           

安裝依賴包:

pip3 install django-celery
pip3 install redis
           

第一步:在項目

settings.py

同級目錄下建立

celeryconfig.py

檔案儲存celery配置資訊,内容如下:

import djcelery
from datetime import timedelta


# 加載django中注冊的應用中的任務(例如我們此項目中course應用中的tasks.py任務)
djcelery.setup_loader()


# 手動定義queue隊列
CELERY_QUEUES = {
    # 定時任務隊列
    'beat_tasks-1': {
        'exchange': 'beat_tasks-1', 
        'exchange_type': 'direct',
        'binding_key': 'beat_tasks-1'
     },
    'beat_tasks-2': {
        'exchange': 'beat_tasks-2',
        'exchange_type': 'direct',
        'binding_key': 'beat_tasks-2',
    },
}


# 設定預設隊列,不指定任務隊列時預設使用普通任務隊列
CELERY_DEFAULT_QUEUE = 'work_queue'


# 導入應用中任務檔案
CELERY_IMPORTS = (
    'course.tasks',
)


# 有些情況下可以防止死鎖
CELERYD_FORCE_EXECV = True

# 設定并發的worker數量
CELERYD_CONCURRENCY = 4

# 設定失敗允許重試
CELERYD_ACKS_LATE = True

# 每個worker最多執行100個任務被銷毀,可以防止記憶體洩漏
CELERYD_MAX_TASKS_PER_CHILD = 100

# 單個任務的最大運作時間,逾時會被殺死
CELERYD_TASK_TIME_LIMIT = 12 * 30


# 定時任務
CELERYBEAT_SCHEDULE = {
    'task1': {
        'task': 'course-task',  # 任務名
        'schedule': timedelta(seconds=10),  # 設定每10秒執行一次任務
        # 'args': (10, 100)  # 任務函數傳參
        'options': {
			'queue': 'beat_tasks-1'  # 指定任務隊列
		}
    },
    'task2': {
        'task': 'course-task',  # 任務名
        'schedule': timedelta(seconds=10),  # 設定每10秒執行一次任務
        # 'args': (100, 1000)  # 任務函數傳參
        'options': {
			'queue': 'beat_tasks-2'  # 指定任務隊列
		}
    }
}
           

第二步:在項目

settings.py

配置檔案中添加以下内容

# 注冊應用中添加我們的course應用和要使用的異步隊列應用djcelery
INSTALLED_APPS = (
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'djcelery',  # 注冊djcelery
    'course',  # 注冊我們的
)
           
# 導入celery配置資訊
from .celeryconfig import *

# 以下配置可以放在celeryconfig.py檔案中,但為了友善配置,放在settings檔案中更好
BROKER_BACKEND = 'redis'
BROKER_URL = 'redis://localhost:6379/1'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/2'
           

第三步:在項目跟路由配置檔案

urls.py

檔案中配置我們course應用路由:

from django.conf.urls import include, url
from django.contrib import admin

urlpatterns = [
    url(r'^admin/', include(admin.site.urls)),
    # 當通路位址為http://127.0.0.1:8000時對應到我們course應用路由
    url(r'', include('course.urls', namespace='course')),
]
           

第四步:在

course

應用的

urls.py

檔案中,定義視圖處理函數路由

from django.conf.urls import url
from .views import do  # 導入視圖處理函數

urlpatterns = [
	# 當通路位址為http://127.0.0.1:8000/do時,對應到course應用中views.py檔案中的視圖處理函數do
    url(r'^do/', do, name='do'),  # 為視圖函數添加通路路由
]
           

第五步:在應用中建立

tasks.py

任務檔案,測試内容如下:

import time
from celery.task import Task


class CourseTask(Task):
    # 給任務起名字
    name = 'course-task'

    def run(self, *args, **kwargs):
        print("start course task")
        # 模拟耗時
        time.sleep(4)
        print("args={}, kwargs={}".format(args, kwargs))
        print("end course task")
           

第六步:在

course

應用

views.py

視圖檔案中定義視圖函數,在視圖函數中調用任務

from django.http import JsonResponse
from django.shortcuts import render
from .tasks import CourseTask


def do(request):
    # 執行異步任務
    print('start do request')
    # 調用任務
    CourseTask.delay()
    # apply_async() 方法可以直接傳參并且指定任務隊列,使用更友善
    # CourseTask.apply_async(args=('hello',), queue=('work_queue'))
    print('end do request')
    return JsonResponse({'result': 'OK'})
           

說明:在任務函數中,我們模拟了4秒的延遲,正常情況下,我們通路該視圖處理函數時執行到任務調用将會等待4秒,直到任務子產品完成,才會繼續向下執行,但我們使用了celery異步任務隊列,此處将不會阻塞,直接傳回response資訊。

第七步:啟動任務隊列

# 由于定時任務發送給了兩個不同的隊列,是以worker隊列必須同時監聽這兩個隊列,來進行消費

python3 manage.py celery worker -l info -n workerA.%h -Q beat_tasks-1

python3 manage.py celery worker -l info -n workerA.%h -Q beat_tasks-2
           
  • 注意點:

如果使用supervisor進行管理時,啟動指令應該如下(注意是兩個%号):

python3 manage.py celery worker -l info -n workerA.%%h -Q beat_tasks-1

第八步:觸發定時任務

python3 manage.py celery beat -l info
           

第九步:啟動django項目

python3 manage.py runserver
           

第九步:打開浏覽器通路位址

http://127.0.0.1:8000/do

Django項目中使用celery分布式任務方法

說明:我們在任務函數中設定的4秒延遲并沒有阻塞我們任務函數後面的代碼執行,當我們回車後直接傳回了json資料。

使用flower管理監控celery任務

(1)安裝依賴包

flower

pip3 install flower
           

(2)啟動

django manage.py celery flower
           

(3)在浏覽器中檢視

(4)為了安全起見,通常啟動時設定賬号密碼

django manage.py celery flower --basic_auth=使用者名:密碼