天天看點

celery (芹菜) 異步任務

第1章 celery

1.1什麼是celery?

Celery是一個簡單靈活,且可靠,處理大量消息的分布式系統

用python寫的,用來執行定時任務和異步任務的架構

1.2celery的組成

消息中間àbroker

任務執行單元àworker

任務執行結果存儲àstore

1.3使用場景

異步任務:将耗時操作任務交給celery去異步執行,比如發送短信/郵件,消息推送,音視訊處理等

定時任務:定時執行某件事情,比如每天的資料統計

第2章 celery的安裝配置

pip install celery

消息中間件:rabbitmq/redis,這裡使用redis

docker run -p 6379:6379 --name=redis -d redis:latest redis-server

第3章 celery執行異步任務

3.1基本使用

定義任務:

from celery import Celery

import time

# 任務産生存儲

broker = 'redis://10.211.55.8:16379/1'

# 任務執行結果存儲

backend = 'redis://10.211.55.8:16379/2'

# 第一個參數是别名,随便寫即可

app = Celery('first_celery', backend=backend, broker=broker)

@app.task

def test_celery(x, y):

    time.sleep(2)

    return x + y

添加任務:

import task

if __name__ == '__main__':

    # result不是函數的執行結果,是一個對象

    result = tasks.add.delay(2,3)

    print(result.id)

使用celery指令開啟work程序

celery –A tasks worker –l info

3.2多任務結構:

當有多個任務需要執行時,會有多個任務函數,位了友善解藕,我們可以使用這種方式,友善管理

celery.py

from celeryimport Celery

broker = 'redis://10.211.55.8:16379/2'

backend = 'redis://10.211.55.8:16379/3'

# include包含兩個任務的函數,去相應檔案中找對應的任務函數

cel = Celery('test', broker=broker, backend=backend, include=['celery_task.task', 'celery_task.task2'])

# 時區,有時候我們想在固定的時間來執行相關任務,就需要設定時區相關配置

cel.conf.timezone = 'Asia/Shanghai'

# 是否UTC

cel.conf.enable_utc = False

task.py

from .celeryimport app

def sum(x, y):

    returnx+y

task2.py

def less(x, y):

    returnx-y

在celery_task目錄下建立添加任務的py檔案

from celery_taskimport task

from celery_task import task2

task.sum.delay(1, 2)

task2.less.delay(3, 1)

第4章 celery執行定時任務

4.1設定時間讓celery執行一個任務

from datetime import datetime, timedelta

# # 方式一:

# #擷取時間對象

# time = datetime(2019, 8, 3, 15, 0, 0)

# print(time)

# # 将時間轉換成UTC時間

# utc_time = datetime.utcfromtimestamp(time.timestamp())

# print(utc_time)

# # 用apply_async,args是函數參數,eta是指定時間

# result = task.add.apply_async(args=[1, 2], eta=utc_time)

# 方式二:

ctime = datetime.now()

# 把目前時間轉換成UTC時間

utc_time = datetime.utcfromtimestamp(ctime.utcfromtimestamp())

# 目前時間延遲十秒

time_delta = timedelta(seconds=10)

task_time = utc_time + time_delta

result = task.add.apply_async(args=[2, 3], eta=task_time)

# task.sum.delay(1, 2)

# task2.less.delay(3, 1)

4.2類似crontab的定時任務

 多任務結構中celery.py檔案如下

from datetime import timedelta

from celery.schedules import crontab

app = Celery('test', broker=broker, backend=backend, include=['celery_task.task', 'celery_task.task2'])

# 時區

app.conf.timezone = 'Asia/Shanghai'

app.conf.enable_utc = False

app.conf.beat_schedules = {

    #别名,可随意編寫

    'add_crontab': {

        #執行task下的add函數

        'task': 'celery_task.task.add',

        #每隔兩秒執行

        'schedules': timedelta(seconds=2),

        #固定時間執行,每年4月11号,8點42分執行

        #'schedules': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),

        #傳遞參數

        'args': [1, 2],

    }

}

第5章 Django中使用celery

在項目的目錄下建立celeryconfig.py

import djcelery

djcelery.setup_loader()

CELERY_IMPORTS=(

    'app01.tasks',

)

#有些情況可以防止死鎖

CELERYD_FORCE_EXECV=True

# 設定并發worker數量

CELERYD_CONCURRENCY=4

#允許重試

CELERY_ACKS_LATE=True

# 每個worker最多執行100個任務被銷毀,可以防止記憶體洩漏

CELERYD_MAX_TASKS_PER_CHILD=100

# 逾時時間

CELERYD_TASK_TIME_LIMIT=12*30

在app目錄下建立tasks.py

from celeryimport task

@task

def add(x, y):

試圖函數views中使用

from django.shortcutsimport render,HttpResponse

from app01.tasks import add

def test(request):

    # result=add.delay(2,3)

    ctime = datetime.now()

    # 預設用utc時間

    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())

    time_delay= timedelta(seconds=5)

    task_time = utc_ctime + time_delay

    result = add.apply_async(args=[4, 3], eta=task_time)

    returnHttpResponse('ok')

settings中注冊

INSTALLED_APPS= [

    'django.contrib.admin',

    'django.contrib.auth',

    'django.contrib.contenttypes',

    'django.contrib.sessions',

    'django.contrib.messages',

    'django.contrib.staticfiles',

    'djcelery',

    'app01',

]

from djceleryimport celeryconfig

繼續閱讀