天天看點

Celery完成定時任務

1.什麼是Celery

Celery是一個簡單、靈活且可靠的,處理大量消息的分布式系統

專注于實時處理的異步任務隊列

同時也支援任務排程

celery支援linux,如果windows使用celery出了問題不解決

Celery架構

Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。

消息中間件

Celery本身不提供消息服務,但是可以友善的和第三方提供的消息中間件內建。包括,RabbitMQ, Redis等等

任務執行單元

Worker是Celery提供的任務執行的單元,worker并發的運作在分布式的系統節點中。

任務結果存儲

Task result store用來存儲Worker執行的任務的結果,Celery支援以不同方式存儲任務的結果,包括AMQP, redis等

版本支援情況

Celery version 4.0 runs on
        Python ❨2.7, 3.4, 3.5❩
        PyPy ❨5.4, 5.5❩
    This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.

    If you’re running an older version of Python, you need to be running an older version of Celery:

        Python 2.6: Celery series 3.1 or earlier.
        Python 2.5: Celery series 3.0 or earlier.
        Python 2.4 was Celery series 2.2 or earlier.

    Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.      

2.使用場景

異步任務:将耗時操作任務送出給Celery去異步執行,比如發送短信/郵件、消息推送、音視訊處理等等

定時任務:定時執行某件事情,比如每天資料統計

3.Celery的安裝配置

pip install celery

消息中間件:RabbitMQ/Redis

app=Celery('任務名',backend='xxx',broker='xxx')

4.Celery執行異步任務

基本使用

建立項目celerytest

建立py檔案:celery_app_task.py

import celery
import time

#消息中間件,任務送出2裡面有資料
# broker='redis://127.0.0.1:6379/2' 不加密碼
broker='redis://:[email protected]:6379/2'
#結果存儲,任務完畢1裡面有資料
backend='redis://:[email protected]:6379/1'

#第一個參數,給celery命名,可以是任意命名
cel=celery.Celery('test',backend=backend,broker=broker)

#假設它是一個耗時任務
#用裝飾器修飾任務,這個任務才能被celery排程,否則不能排程
@cel.task
def add(x,y):
    return x+y      

建立py檔案:add_task.py,添加任務

from celery_app_task import add
#以前同步執行,等待結果
#result = add(4,5)
#print(result)

#現在做成異步,把任務送出到消息隊列中去,用celery異步執行
#這句話隻是把任務送出到消息中間件了,其實并沒有執行
result = add.delay(4,5)
print(result.id)      

啟動勞工執行任務

建立py檔案:run.py,執行任務,或者使用指令執行:celery worker -A celery_app_task -l info

注:windows下:celery worker -A celery_app_task -l info -P eventlet

#windows下使用需要安裝eventlet子產品

from celery_app_task import cel
if __name__ == '__main__':
    cel.worker_main()
    # cel.worker_main(argv=['--loglevel=info')      

建立py檔案:result.py,檢視任務執行結果

from celery.result import AsyncResult
from celery_app_task import cel

#id的值為add_task.py檔案中列印的result.id的值
async = AsyncResult(id="e5e25734-ba93-423f-a3d4-bd87574e9be4", app=cel)

#當它值為True,表示任務執行完成,取出結果
if async.successful():
    result = async.get()
    print(result)
    # result.forget() # 将結果删除
elif async.failed():
    print('執行失敗')
elif async.status == 'PENDING':
    print('任務等待中被執行')
elif async.status == 'RETRY':
    print('任務異常後正在重試')
elif async.status == 'STARTED':
    print('任務已經開始被執行')      

執行 add_task.py,添加任務,并擷取任務ID

執行 run.py ,或者執行指令:celery worker -A celery_app_task -l info

執行 result.py,檢查任務狀态并擷取結果

多任務結構

pro_cel
    ├── celery_task# celery相關檔案夾
    │   ├── celery.py   # celery連接配接和配置相關檔案,必須叫這個名字
    │   └── tasks1.py    #  所有任務函數
    │    └── tasks2.py    #  所有任務函數
    ├── check_result.py # 檢查結果
    └── send_task.py    # 觸發任務      

celery.py

from celery import Celery

cel = Celery('celery_demo',
             broker='redis://:[email protected]:6379/1',
             backend='redis://:[email protected]:6379/2',
             # 包含以下兩個任務檔案,去相應的py檔案中找任務,對多個任務做分類
             include=['celery_task.tasks1',
                      'celery_task.tasks2'
                      ])


#如果設定了下列參數,定時任務時就不用将時間轉化為UTC時間 
# 時區
cel.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
cel.conf.enable_utc = False      

tasks1.py

import time
from celery_task.celery import cel

@cel.task
def test_celery(res):
    time.sleep(5)
    return "test_celery任務結果:%s"%res      

tasks2.py

import time
from celery_task.celery import cel
@cel.task
def test_celery2(res):
    time.sleep(5)
    return "test_celery2任務結果:%s"%res      

check_result.py

from celery.result import AsyncResult
from celery_task.celery import cel

async = AsyncResult(id="08eb2778-24e1-44e4-a54b-56990b3519ef", app=cel)

if async.successful():
    result = async.get()
    print(result)
    # result.forget() # 将結果删除,執行完成,結果不會自動删除
    # async.revoke(terminate=True)  # 無論現在是什麼時候,都要終止
    # async.revoke(terminate=False) # 如果任務還沒有開始執行呢,那麼就可以終止。
elif async.failed():
    print('執行失敗')
elif async.status == 'PENDING':
    print('任務等待中被執行')
elif async.status == 'RETRY':
    print('任務異常後正在重試')
elif async.status == 'STARTED':
    print('任務已經開始被執行')      

send_task.py

from celery_task.tasks1 import test_celery
from celery_task.tasks2 import test_celery2

# 立即告知celery去執行test_celery任務,并傳入一個參數
result = test_celery.delay('第一個的執行')
print(result.id)
result = test_celery2.delay('第二個的執行')
print(result.id)      

添加任務(執行send_task.py),開啟work:celery worker -A celery_task -l info -P eventlet (celery_task檔案夾的名字),檢查任務執行結果(執行check_result.py)

5.Celery執行定時任務

設定時間讓celery執行一個任務

add_task.py

from celery_app_task import add
from datetime import datetime

# 方式一
#設定任務執行時間,2019年4月18日18時55分56秒執行該任務
v1 = datetime(2019, 4, 18, 18, 55, 56)
print(v1)
#将時間轉為utc時間
v2 = datetime.utcfromtimestamp(v1.timestamp())
print(v2)
#調用apply_async方法,args是add函數的參數,eta是時間對象
result = add.apply_async(args=[1, 3], eta=v2)
print(result.id)

# 方式二
#取出目前時間
ctime = datetime.now()
#将目前時間轉化為utc時間
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta
#延時30s
time_delay = timedelta(seconds=30)
#得到執行add函數的時間
task_time = utc_ctime + time_delay

# 使用apply_async并設定時間
result = add.apply_async(args=[4, 3], eta=task_time)
print(result.id)      

類似于crontab的定時任務

作用:每年/每月/每天/每秒執行定時任務

多任務結構中celery.py修改如下

from datetime import timedelta
from celery import Celery
from celery.schedules import crontab

cel = Celery('tasks',
             broker='redis://:[email protected]:6379/1',
             backend='redis://:[email protected]:6379/2',
             # 包含以下兩個任務檔案,去相應的py檔案中找任務,對多個任務做分類
             include=['celery_task.tasks1',
                      'celery_task.tasks2'
                      ])

cel.conf.timezone = 'Asia/Shanghai'
cel.conf.enable_utc = False

cel.conf.beat_schedule = {
    # 名字随意命名
    'add-every-2-seconds': {
        # 執行tasks1下的test_celery函數
        'task': 'celery_task.tasks1.test_celery',
        
        # 每隔2秒執行一次
        'schedule': timedelta(seconds=2),
        
        # 傳遞參數
        'args': ('給函數要傳遞的參數')
    },
    
    #名字随意命名
    'add-every-12-seconds': {
         'task': 'celery_task.tasks1.test_celery',
    #     每年4月11号,8點42分執行
         'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
         'args': ('給函數要傳遞的參數')
     },
}

#在pro_cel檔案夾下執行
#發送任務
啟動一個beat:celery beat -A celery_task -l info

#執行任務
啟動work執行:celery worker -A celery_task -l info -P  eventlet

#在cmd中退出任務 ctrl+c      

6.Django中使用Celery

在Python腳本中調用Django環境(Celery是一個腳本)

import os
if __name__ == '__main__':
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "untitled15.settings")
    import django
    django.setup()

    from app01 import models

    books = models.Book.objects.all()
    print(books)      

安裝包

版本特别重要,對上才能用,對不上各種出錯
celery==3.1.25
django-celery==3.1.20      

在項目目錄下建立celeryconfig.py

import djcelery
djcelery.setup_loader()

#其中任務可以寫多個
CELERY_IMPORTS=(
    'app01.tasks',
)
#有些情況可以防止死鎖
CELERYD_FORCE_EXECV=True
# 設定并發worker數量
CELERYD_CONCURRENCY=4
#允許重試
CELERY_ACKS_LATE=True
# 每個worker最多執行100個任務被銷毀,可以防止記憶體洩漏
CELERYD_MAX_TASKS_PER_CHILD=100
# 逾時時間
CELERYD_TASK_TIME_LIMIT=12*30      

在app01目錄下建立tasks.py

from celery import task
@task
def add(a,b):
    return a+b      

視圖函數views.py

from django.shortcuts import render,HttpResponse
from app01.tasks import add
from datetime import datetime
def test(request):
    ctime = datetime.now()
    # 預設用utc時間
    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
    from datetime import timedelta
    time_delay = timedelta(seconds=5)
    task_time = utc_ctime + time_delay
    result = add.apply_async(args=[4, 3], eta=task_time)
    print(result.id)
    return HttpResponse('ok')      

路由層url.py

from app01 import views
urlpatterns = [
    url(r'^test/$', views.test),
]      

settings.py

INSTALLED_APPS = [
    ...
    'djcelery',
    'app01'
]

...

from djagocele import celeryconfig
BROKER_BACKEND='redis'
#BOOKER_URL='redis://127.0.0.1:6379/1' 不加密
BOOKER_URL='redis://:[email protected]:6379/1'
#CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/2' 不加密
CELERY_RESULT_BACKEND='redis://:[email protected]:6379/2'      

轉載于:https://www.cnblogs.com/lizeqian1994/p/10751514.html