第1章 celery
1.1什麼是celery?
Celery是一個簡單靈活,且可靠,處理大量消息的分布式系統
用python寫的,用來執行定時任務和異步任務的架構
1.2celery的組成
消息中間àbroker
任務執行單元àworker
任務執行結果存儲àstore
1.3使用場景
異步任務:将耗時操作任務交給celery去異步執行,比如發送短信/郵件,消息推送,音視訊處理等
定時任務:定時執行某件事情,比如每天的資料統計
第2章 celery的安裝配置
pip install celery
消息中間件:rabbitmq/redis,這裡使用redis
docker run -p 6379:6379 --name=redis -d redis:latest redis-server
第3章 celery執行異步任務
3.1基本使用
定義任務:
from celery import Celery
import time
# 任務産生存儲
broker = 'redis://10.211.55.8:16379/1'
# 任務執行結果存儲
backend = 'redis://10.211.55.8:16379/2'
# 第一個參數是别名,随便寫即可
app = Celery('first_celery', backend=backend, broker=broker)
@app.task
def test_celery(x, y):
time.sleep(2)
return x + y
添加任務:
import task
if __name__ == '__main__':
# result不是函數的執行結果,是一個對象
result = tasks.add.delay(2,3)
print(result.id)
使用celery指令開啟work程序
celery –A tasks worker –l info
3.2多任務結構:
當有多個任務需要執行時,會有多個任務函數,位了友善解藕,我們可以使用這種方式,友善管理
celery.py
from celeryimport Celery
broker = 'redis://10.211.55.8:16379/2'
backend = 'redis://10.211.55.8:16379/3'
# include包含兩個任務的函數,去相應檔案中找對應的任務函數
cel = Celery('test', broker=broker, backend=backend, include=['celery_task.task', 'celery_task.task2'])
# 時區,有時候我們想在固定的時間來執行相關任務,就需要設定時區相關配置
cel.conf.timezone = 'Asia/Shanghai'
# 是否UTC
cel.conf.enable_utc = False
task.py
from .celeryimport app
def sum(x, y):
returnx+y
task2.py
def less(x, y):
returnx-y
在celery_task目錄下建立添加任務的py檔案
from celery_taskimport task
from celery_task import task2
task.sum.delay(1, 2)
task2.less.delay(3, 1)
第4章 celery執行定時任務
4.1設定時間讓celery執行一個任務
from datetime import datetime, timedelta
# # 方式一:
# #擷取時間對象
# time = datetime(2019, 8, 3, 15, 0, 0)
# print(time)
# # 将時間轉換成UTC時間
# utc_time = datetime.utcfromtimestamp(time.timestamp())
# print(utc_time)
# # 用apply_async,args是函數參數,eta是指定時間
# result = task.add.apply_async(args=[1, 2], eta=utc_time)
# 方式二:
ctime = datetime.now()
# 把目前時間轉換成UTC時間
utc_time = datetime.utcfromtimestamp(ctime.utcfromtimestamp())
# 目前時間延遲十秒
time_delta = timedelta(seconds=10)
task_time = utc_time + time_delta
result = task.add.apply_async(args=[2, 3], eta=task_time)
# task.sum.delay(1, 2)
# task2.less.delay(3, 1)
4.2類似crontab的定時任務
多任務結構中celery.py檔案如下
from datetime import timedelta
from celery.schedules import crontab
app = Celery('test', broker=broker, backend=backend, include=['celery_task.task', 'celery_task.task2'])
# 時區
app.conf.timezone = 'Asia/Shanghai'
app.conf.enable_utc = False
app.conf.beat_schedules = {
#别名,可随意編寫
'add_crontab': {
#執行task下的add函數
'task': 'celery_task.task.add',
#每隔兩秒執行
'schedules': timedelta(seconds=2),
#固定時間執行,每年4月11号,8點42分執行
#'schedules': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
#傳遞參數
'args': [1, 2],
}
}
第5章 Django中使用celery
在項目的目錄下建立celeryconfig.py
import djcelery
djcelery.setup_loader()
CELERY_IMPORTS=(
'app01.tasks',
)
#有些情況可以防止死鎖
CELERYD_FORCE_EXECV=True
# 設定并發worker數量
CELERYD_CONCURRENCY=4
#允許重試
CELERY_ACKS_LATE=True
# 每個worker最多執行100個任務被銷毀,可以防止記憶體洩漏
CELERYD_MAX_TASKS_PER_CHILD=100
# 逾時時間
CELERYD_TASK_TIME_LIMIT=12*30
在app目錄下建立tasks.py
from celeryimport task
@task
def add(x, y):
試圖函數views中使用
from django.shortcutsimport render,HttpResponse
from app01.tasks import add
def test(request):
# result=add.delay(2,3)
ctime = datetime.now()
# 預設用utc時間
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
time_delay= timedelta(seconds=5)
task_time = utc_ctime + time_delay
result = add.apply_async(args=[4, 3], eta=task_time)
returnHttpResponse('ok')
settings中注冊
INSTALLED_APPS= [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'djcelery',
'app01',
]
from djceleryimport celeryconfig