項目架構目錄樹:
django_celery
|
├── course # 建立應用
│ ├── admin.py
│ ├── __init__.py
│ ├── migrations
│ ├── models.py
│ ├── tasks.py # 任務子產品
│ ├── urls.py # 路由檔案
│ └── views.py
├── db.sqlite3
├── django_celery
│ ├── celeryconfig.py # celery配置檔案
│ ├── __init__.py
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
└── manage.py
安裝依賴包:
pip3 install django-celery
pip3 install redis
第一步:在項目
settings.py
同級目錄下建立
celeryconfig.py
檔案儲存celery配置資訊,内容如下:
import djcelery
from datetime import timedelta
# 加載django中注冊的應用中的任務(例如我們此項目中course應用中的tasks.py任務)
djcelery.setup_loader()
# 手動定義queue隊列
CELERY_QUEUES = {
# 定時任務隊列
'beat_tasks-1': {
'exchange': 'beat_tasks-1',
'exchange_type': 'direct',
'binding_key': 'beat_tasks-1'
},
'beat_tasks-2': {
'exchange': 'beat_tasks-2',
'exchange_type': 'direct',
'binding_key': 'beat_tasks-2',
},
}
# 設定預設隊列,不指定任務隊列時預設使用普通任務隊列
CELERY_DEFAULT_QUEUE = 'work_queue'
# 導入應用中任務檔案
CELERY_IMPORTS = (
'course.tasks',
)
# 有些情況下可以防止死鎖
CELERYD_FORCE_EXECV = True
# 設定并發的worker數量
CELERYD_CONCURRENCY = 4
# 設定失敗允許重試
CELERYD_ACKS_LATE = True
# 每個worker最多執行100個任務被銷毀,可以防止記憶體洩漏
CELERYD_MAX_TASKS_PER_CHILD = 100
# 單個任務的最大運作時間,逾時會被殺死
CELERYD_TASK_TIME_LIMIT = 12 * 30
# 定時任務
CELERYBEAT_SCHEDULE = {
'task1': {
'task': 'course-task', # 任務名
'schedule': timedelta(seconds=10), # 設定每10秒執行一次任務
# 'args': (10, 100) # 任務函數傳參
'options': {
'queue': 'beat_tasks-1' # 指定任務隊列
}
},
'task2': {
'task': 'course-task', # 任務名
'schedule': timedelta(seconds=10), # 設定每10秒執行一次任務
# 'args': (100, 1000) # 任務函數傳參
'options': {
'queue': 'beat_tasks-2' # 指定任務隊列
}
}
}
第二步:在項目
settings.py
配置檔案中添加以下内容
# 注冊應用中添加我們的course應用和要使用的異步隊列應用djcelery
INSTALLED_APPS = (
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'djcelery', # 注冊djcelery
'course', # 注冊我們的
)
# 導入celery配置資訊
from .celeryconfig import *
# 以下配置可以放在celeryconfig.py檔案中,但為了友善配置,放在settings檔案中更好
BROKER_BACKEND = 'redis'
BROKER_URL = 'redis://localhost:6379/1'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/2'
第三步:在項目跟路由配置檔案
urls.py
檔案中配置我們course應用路由:
from django.conf.urls import include, url
from django.contrib import admin
urlpatterns = [
url(r'^admin/', include(admin.site.urls)),
# 當通路位址為http://127.0.0.1:8000時對應到我們course應用路由
url(r'', include('course.urls', namespace='course')),
]
第四步:在
course
應用的
urls.py
檔案中,定義視圖處理函數路由
from django.conf.urls import url
from .views import do # 導入視圖處理函數
urlpatterns = [
# 當通路位址為http://127.0.0.1:8000/do時,對應到course應用中views.py檔案中的視圖處理函數do
url(r'^do/', do, name='do'), # 為視圖函數添加通路路由
]
第五步:在應用中建立
tasks.py
任務檔案,測試内容如下:
import time
from celery.task import Task
class CourseTask(Task):
# 給任務起名字
name = 'course-task'
def run(self, *args, **kwargs):
print("start course task")
# 模拟耗時
time.sleep(4)
print("args={}, kwargs={}".format(args, kwargs))
print("end course task")
第六步:在
course
應用
views.py
視圖檔案中定義視圖函數,在視圖函數中調用任務
from django.http import JsonResponse
from django.shortcuts import render
from .tasks import CourseTask
def do(request):
# 執行異步任務
print('start do request')
# 調用任務
CourseTask.delay()
# apply_async() 方法可以直接傳參并且指定任務隊列,使用更友善
# CourseTask.apply_async(args=('hello',), queue=('work_queue'))
print('end do request')
return JsonResponse({'result': 'OK'})
說明:在任務函數中,我們模拟了4秒的延遲,正常情況下,我們通路該視圖處理函數時執行到任務調用将會等待4秒,直到任務子產品完成,才會繼續向下執行,但我們使用了celery異步任務隊列,此處将不會阻塞,直接傳回response資訊。
第七步:啟動任務隊列
# 由于定時任務發送給了兩個不同的隊列,是以worker隊列必須同時監聽這兩個隊列,來進行消費
python3 manage.py celery worker -l info -n workerA.%h -Q beat_tasks-1
python3 manage.py celery worker -l info -n workerA.%h -Q beat_tasks-2
- 注意點:
如果使用supervisor進行管理時,啟動指令應該如下(注意是兩個%号):
python3 manage.py celery worker -l info -n workerA.%%h -Q beat_tasks-1
第八步:觸發定時任務
python3 manage.py celery beat -l info
第九步:啟動django項目
python3 manage.py runserver
第九步:打開浏覽器通路位址
http://127.0.0.1:8000/do
說明:我們在任務函數中設定的4秒延遲并沒有阻塞我們任務函數後面的代碼執行,當我們回車後直接傳回了json資料。
使用flower管理監控celery任務
(1)安裝依賴包
flower
pip3 install flower
(2)啟動
django manage.py celery flower
(3)在浏覽器中檢視
(4)為了安全起見,通常啟動時設定賬号密碼
django manage.py celery flower --basic_auth=使用者名:密碼