天天看点

高阶应用-celery

一、问题

用户发起request,并且要等待response返回。但是在视图中有一些耗时的操作,导致用户可能会等待很长时间才能接受response,这样用户体验很差

网站每隔一段时间要同步一次数据,但是http请求是需要触发的

celery网址:http://docs.jinkan.org/docs/celery/

二、celery模块包含

  • 任务task

    本质是一个python函数,将耗时操作封装成一个函数

  • 队列queue

    将要执行的任务放队列里

  • 工人worker

    负责执行队列中的任务

  • 代理broker

    负责调度,在部署环境中使用redis

三、解决

  • 将耗时的操作放到celery中执行
  • 定时执行

四、安装

  • pip install celery==3.1.26
  • pip install celery-with-redis=3.0
  • pip install django-celery=3.3.1
  • pip install redis==2.10.6

五、配置settings.py

INSTALLED_APPS 添加

djcelery

在settings下方添加如下代码

<span class="hljs-keyword">import</span> djcelery
djcelery.setup_loader()<span class="hljs-comment">#初始化</span>
BROKER_URL=<span class="hljs-string">'redis://:密码@127.0.0.1:6379/0'</span><span class="hljs-comment">#0带表16个库中使用第0个库</span>
CELERY_IMPORTS=(<span class="hljs-string">'App.task'</span>) <span class="hljs-comment">#myApp是项目名</span>
           

复制

六、创建task.py文件

路径 App/task.py

<span class="hljs-keyword">from</span> celery <span class="hljs-keyword">import</span> task
<span class="hljs-keyword">import</span> time  

<span class="hljs-meta">@task</span>
<span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">tt</span><span class="hljs-params">()</span>:</span>
    print(<span class="hljs-string">"lucky is a good man"</span>)
    time.sleep(<span class="hljs-number">5</span>)
    print(<span class="hljs-string">"lucky is a nice man"</span>)
           

复制

七、迁移,生成celery需要的数据库表

python manage.py migrate

八、在工程project目录下创建celery.py的文件

<span class="hljs-keyword">from</span> __future__ <span class="hljs-keyword">import</span> absolute_import

<span class="hljs-keyword">import</span> os
<span class="hljs-keyword">from</span> celery <span class="hljs-keyword">import</span> Celery
<span class="hljs-keyword">from</span> django.conf <span class="hljs-keyword">import</span> settings

os.environ.setdefault(<span class="hljs-string">'DJANGO_SETTINGS_MODULE'</span>, <span class="hljs-string">'whthas_home.settings'</span>)

app = Celery(<span class="hljs-string">'portal'</span>)

app.config_from_object(<span class="hljs-string">'django.conf:settings'</span>)
app.autodiscover_tasks(<span class="hljs-keyword">lambda</span>: settings.INSTALLED_APPS)


<span class="hljs-meta">@app.task(bind=True)</span>
<span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">debug_task</span><span class="hljs-params">(self)</span>:</span>
    print(<span class="hljs-string">'Request: {0!r}'</span>.format(self.request))
           

复制

九、在工程目录下的project目录下的__init__.py文件中添加

from .celery import app as celery_app

十、视图

<span class="hljs-keyword">from</span> .task <span class="hljs-keyword">import</span> tt
<span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">celery</span><span class="hljs-params">(request)</span>:</span>
    tt.delay()  <span class="hljs-comment"># 添加到celery中执行,不会阻塞 </span>
    <span class="hljs-keyword">return</span> HttpResponse(<span class="hljs-string">"测试celery"</span>)
           

复制

传参:

可以在tt.delay([参数]) delay中添加参数

十一、启动顺序

  • 启动redis

    localhost:~ xialigang$ redis-cli

  • 启动服务

    python manange.py runserver

  • 启动worker

    python manage.py celery worker --loglevel=info

十二、修改报错(解释器为3.7版本)

报错信息

File <span class="hljs-string">"/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/celery/utils/timer2.py"</span>, line <span class="hljs-number">19</span>
    <span class="hljs-keyword">from</span> kombu.<span class="hljs-keyword">async</span>.timer <span class="hljs-keyword">import</span> Entry, Timer <span class="hljs-keyword">as</span> Schedule, to_timestamp, logger
                   ^
           

复制

原因:python3.7版本中async是关键字,所以报错

解决方法有两种:

  • 降python版本,重新安装Python版本为3.7以下 如3.6
  • 把kombu.async模块重命名,再把引用处修改过来

    把所有async的引入位置改为async1

    将site-packages\kombu\async -> site-packages\kombu\async1

    site-packages/celery/concurrency/asynpool.py", line 42

    site-packages/kombu/transport/redis.py"

    line 815 862 871 876 877 914 920 925

    site-packages/celery/worker/autoscale.py", line 21

    site-packages/celery/worker/components.py", line 14

    site-packages/celery/worker/strategy.py", line 13

十三、定时执行

定时执行一个任务

在settings.py文件中添加如下代码

<span class="hljs-keyword">from</span> datetime <span class="hljs-keyword">import</span> timedelta
<span class="hljs-comment"># 在settings.py文件添加</span>
CELERYBEAT_SCHEDULE = { 
    <span class="hljs-string">'schedule-test'</span>: {
        <span class="hljs-string">'task'</span>: <span class="hljs-string">'App.task.test'</span>, <span class="hljs-comment">#App的名称 task为你创建任务的py文件名 test为你的任务的名称</span>
        <span class="hljs-string">'schedule'</span>: timedelta(seconds=<span class="hljs-number">3</span>),
        <span class="hljs-string">'args'</span>: (<span class="hljs-number">2</span>,)
    },
}
           

复制

启动顺序:

  • 启动Django

    Python3 manage.py runserver

  • 启动worker

    python manage.py celery worker --loglevel=info

  • 开启定时任务

    python manage.py celery beat --loglevel=info(或者celery -A 你的工程名称 beat -l info)

定时多个任务

settings.py添加如下代码

<span class="hljs-keyword">from</span> datetime <span class="hljs-keyword">import</span> timedelta
<span class="hljs-comment"># 在settings.py文件添加</span>
CELERYBEAT_SCHEDULE = {
    <span class="hljs-comment">#第一个任务</span>
    <span class="hljs-string">'schedule-test1'</span>: {
        <span class="hljs-string">'task'</span>: <span class="hljs-string">'App.mytask.test'</span>,
        <span class="hljs-string">'schedule'</span>: timedelta(seconds=<span class="hljs-number">3</span>),
        <span class="hljs-string">'args'</span>: (<span class="hljs-number">2</span>,)
    },
    <span class="hljs-comment">#第二个任务</span>
    <span class="hljs-string">'schedule-test2'</span>: {
        <span class="hljs-string">'task'</span>: <span class="hljs-string">'App.mytask.test2'</span>,
        <span class="hljs-string">'schedule'</span>: timedelta(seconds=<span class="hljs-number">3</span>),
    },
}
           

复制

App/task.py

<span class="hljs-keyword">from</span> celery <span class="hljs-keyword">import</span> task
<span class="hljs-keyword">import</span> time

<span class="hljs-meta">@task</span>
<span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">test1</span><span class="hljs-params">(i)</span>:</span>
    print(<span class="hljs-string">'打印'</span>,i)
    time.sleep(<span class="hljs-number">5</span>)
    print(<span class="hljs-string">'打印'</span>,i)

<span class="hljs-meta">@task</span>
<span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">test2</span><span class="hljs-params">()</span>:</span>
    print(<span class="hljs-string">'test2'</span>)
    time.sleep(<span class="hljs-number">5</span>)
    print(<span class="hljs-string">'test2'</span>)
           

复制