天天看點

Python程式設計:Django中使用Celery執行異步任務和定時任務使用步驟監控工具flower總結

使用步驟

1、安裝

pip install django django-celery      

2、建立工程

$ django-admin.py startproject celery_project
$ python manage.py startapp course
$ cd celery_project      

項目結構

├── celery_project
│   ├── __init__.py
│   ├── celery_config.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
├── course
│   ├── __init__.py
│   ├── admin.py
│   ├── apps.py
│   ├── migrations
│   │   ├── __init__.py
│   ├── models.py
│   ├── tasks.py
│   ├── tests.py
│   └── views.py
├── db.sqlite3
└── manage.py
      

3、建立任務

course/tasks.py

# -*- coding: utf-8 -*-

import time
from celery.task import Task


class CourseTask(Task):
    name = "course-task"

    def run(self, *args, **kwargs):
        print("start course task")
        time.sleep(4)  # 模拟耗時
        print(args, kwargs)
        print("end course task")
      

4、配置celery

celery_project/celery_config.py

# -*- coding: utf-8 -*-

import djcelery
from datetime import timedelta

djcelery.setup_loader()


BROKER_BACKEND = "redis"

BROKER_URL = 'redis://localhost:6379/1'

CELERY_RESULT_BACKEND = 'redis://localhost:6379/2'


# 設定任務隊列
CELERY_QUEUES = {
    "beat_queue": {
        "exchange": "beat_queue",
        "exchange_type": "direct",
        "binding_key": "beat_queue"
    },
    "work_queue": {
        "exchange": "work_queue",
        "exchange_type": "direct",
        "binding_key": "work_queue"
    }
}

# 設定預設隊列
CELERY_DEFAULT_QUEUE = "work_queue"

# 注冊任務函數
CELERY_IMPORTS = (
    "course.tasks",
)

# 設定時區,預設UTC
CELERY_TIMEZONE = 'Asia/Shanghai'

# 有些情況下防止死鎖
CELERYD_FORCE_EXECV = True

# 設定并發的worker數量
CELERYD_CONCURRENCY = 4

# 允許重試
CELERYD_ACKS_LATE = True

# 每個worker最多執行100個任務,防止記憶體洩漏
CELERYD_MAX_TASKS_PER_CHILD = 100

# 單個任務最大運作時間
CELERYD_TASK_TIME_LIMIT = 6 * 60

      

5、配置settings

celery_project/settings.py

# 添加app
INSTALLED_APPS = [
    ...
    'djcelery',
    'course'
]

# 引入 Celery設定
from .celery_config import *
      

6、編寫視圖函數

course/views.py

# -*- coding: utf-8 -*-

from course.tasks import CourseTask
from django.http import JsonResponse


def do(reqeust):
    # 執行異步任務
    print("start do")
    CourseTask.apply_async(args=("hello", ), queue="work_queque")
    print("end do")
    return JsonResponse({"result": "ok"})

      

7、配置路由

celery_project/urls.py

from course import views

urlpatterns = [
    url(r'^do/$', views.do)
]      

8、啟動

Django

服務和 worker

$ python manage.py help
$ python manage.py runserver
$ python manage.py celery worker -l INFO

# 或者
$ python manage.py celery worker -Q queue      

通路測試:

http://127.0.0.1:8000/do/

9、設定定時任務

# 設定定時任務
CELERYBEAT_SCHEDULE = {
    "course-task": {
        "task": "course-task",
        "schedule": timedelta(seconds=5),
        "options": {
            "queue": "beat_queue"
        }
    }
}      

10、啟動beat

$ python manage.py celery beat -l INFO
      

監控工具flower

安裝

pip install flower      

啟動

celery flower --address=0.0.0.0 --port=5555 --broker=redis://localhost:6379/1 --basic_auth=user:123      

django中啟動

python manage.py celery flower      

通路管理界面

http://localhost:5555

總結

Django中使用Celery 隻是多了3個步驟:

1、task任務編寫

2、celery配置

3、啟動celery-worker