使用步驟
1、安裝
pip install django django-celery
2、建立工程
$ django-admin.py startproject celery_project
$ python manage.py startapp course
$ cd celery_project
項目結構
├── celery_project
│ ├── __init__.py
│ ├── celery_config.py
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
├── course
│ ├── __init__.py
│ ├── admin.py
│ ├── apps.py
│ ├── migrations
│ │ ├── __init__.py
│ ├── models.py
│ ├── tasks.py
│ ├── tests.py
│ └── views.py
├── db.sqlite3
└── manage.py
3、建立任務
course/tasks.py
# -*- coding: utf-8 -*-
import time
from celery.task import Task
class CourseTask(Task):
name = "course-task"
def run(self, *args, **kwargs):
print("start course task")
time.sleep(4) # 模拟耗時
print(args, kwargs)
print("end course task")
4、配置celery
celery_project/celery_config.py
# -*- coding: utf-8 -*-
import djcelery
from datetime import timedelta
djcelery.setup_loader()
BROKER_BACKEND = "redis"
BROKER_URL = 'redis://localhost:6379/1'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/2'
# 設定任務隊列
CELERY_QUEUES = {
"beat_queue": {
"exchange": "beat_queue",
"exchange_type": "direct",
"binding_key": "beat_queue"
},
"work_queue": {
"exchange": "work_queue",
"exchange_type": "direct",
"binding_key": "work_queue"
}
}
# 設定預設隊列
CELERY_DEFAULT_QUEUE = "work_queue"
# 注冊任務函數
CELERY_IMPORTS = (
"course.tasks",
)
# 設定時區,預設UTC
CELERY_TIMEZONE = 'Asia/Shanghai'
# 有些情況下防止死鎖
CELERYD_FORCE_EXECV = True
# 設定并發的worker數量
CELERYD_CONCURRENCY = 4
# 允許重試
CELERYD_ACKS_LATE = True
# 每個worker最多執行100個任務,防止記憶體洩漏
CELERYD_MAX_TASKS_PER_CHILD = 100
# 單個任務最大運作時間
CELERYD_TASK_TIME_LIMIT = 6 * 60
5、配置settings
celery_project/settings.py
# 添加app
INSTALLED_APPS = [
...
'djcelery',
'course'
]
# 引入 Celery設定
from .celery_config import *
6、編寫視圖函數
course/views.py
# -*- coding: utf-8 -*-
from course.tasks import CourseTask
from django.http import JsonResponse
def do(reqeust):
# 執行異步任務
print("start do")
CourseTask.apply_async(args=("hello", ), queue="work_queque")
print("end do")
return JsonResponse({"result": "ok"})
7、配置路由
celery_project/urls.py
from course import views
urlpatterns = [
url(r'^do/$', views.do)
]
8、啟動
Django服務和 worker
$ python manage.py help
$ python manage.py runserver
$ python manage.py celery worker -l INFO
# 或者
$ python manage.py celery worker -Q queue
通路測試:
http://127.0.0.1:8000/do/9、設定定時任務
# 設定定時任務
CELERYBEAT_SCHEDULE = {
"course-task": {
"task": "course-task",
"schedule": timedelta(seconds=5),
"options": {
"queue": "beat_queue"
}
}
}
10、啟動beat
$ python manage.py celery beat -l INFO
監控工具flower
安裝
pip install flower
啟動
celery flower --address=0.0.0.0 --port=5555 --broker=redis://localhost:6379/1 --basic_auth=user:123
django中啟動
python manage.py celery flower
通路管理界面
http://localhost:5555總結
Django中使用Celery 隻是多了3個步驟:
1、task任務編寫
2、celery配置
3、啟動celery-worker