一、Celery的定義
- Celery(芹菜)是一個簡單、靈活且可靠的,處理大量消息的分布式系統,并且提供維護這樣一個系統的必需工具。
- 我比較喜歡的一點是:Celery支援使用任務隊列的方式在分布的機器、程序、線程上執行任務排程。然後我接着去了解什麼是任務隊列。
- 任務隊列
任務隊列是一種線上程或機器間分發任務的機制。
-
消息隊列
消息隊列的輸入是工作的一個單元,稱為任務,獨立的職程(Worker)程序持續監視隊列中是否有需要處理的新任務。
Celery 用消息通信,通常使用中間人(Broker)在用戶端和職程間斡旋。這個過程從用戶端向隊列添加消息開始,之後中間人把消息派送給職程,職程對消息進行處理。如下圖所示:
Celery 系統可包含多個職程和中間人,以此獲得高可用性和橫向擴充能力。 -
Celery的架構
Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。
消息中間件
任務執行單元Celery本身不提供消息服務,但是可以友善的和第三方提供的消息中間件內建,包括,RabbitMQ,Redis,MongoDB等,這裡我先去了解RabbitMQ,Redis。
任務結果存儲Worker是Celery提供的任務執行的單元,worker并發的運作在分布式的系統節點中
Task result store用來存儲Worker執行的任務的結果,Celery支援以不同方式存儲任務的結果,包括Redis,MongoDB,Django ORM,AMQP等,這裡我先不去看它是如何存儲的,就先選用Redis來存儲任務執行結果。
二、簡單操作
cereryconfig.py
BROKER_URL = "redis://:@127.0.0.1/1"
CELERY_RESULT_BACKEND = "redis://:@127.0.0.1:6379/2"
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任務過期時間
CELERY_ACCEPT_CONTENT = ["json"]
"""
CELERY_DEFAULT_QUEUE # 預設隊列
CELERY_BROKER_URL # Broker 位址
CELERY_RESULT_BACKEND # 結果存儲位址
CELERY_TASK_SERIALIZER # 任務序列化方式
CELERY_RESULT_SERIALIZER # 任務執行結果序列化方式
CELERY_TASK_RESULT_EXPIRES # 任務過期時間
CELERY_ACCEPT_CONTENT # 指定任務接受的内容類型(序列化)
"""
celerys.py
from celery import Celery
"""
1、Celery第一個參數是給其設定一個名字, 第二參數我們設定一個中間人broker, 在這裡我們使用Redis作為中間人。
my_task函數是我們編寫的一個任務函數, 通過加上裝飾器app.task, 将其注冊到broker的隊列中
2、如果我們想跟蹤任務的狀态,Celery需要将結果儲存到某個地方。
有幾種儲存的方案可選:SQLAlchemy、Django ORM、Memcached、 Redis、RPC (RabbitMQ/AMQP)。
配置backend參數
"""
app = Celery("demo")
# 從單獨的配置子產品中加載配置
app.config_from_object('celeryconfig')
# 建立任務函數
@app.task # 将其注冊到broker的隊列中。
def my_task(a, b):
print("任務在執行。。。。")
return a + b
tasks.py
from celerys import my_task
"""
任務加入到broker隊列中,以便剛才我們建立的celery workder伺服器能夠從隊列中取出任務并執行。
如何将任務函數加入到隊列中,可使用delay()。
解決錯誤:ask handler raised error: ValueError('not enough values to unpack (expected 3, got 0)',)
Traceback (most recent call last):
pip install eventlet
celery -A <mymodule> worker -l info -P eventlet
"""
ret = my_task.delay(1, 2)
ret.failed() # 錯誤結果
print(ret.result) # 擷取傳回值
"""
傳回值:
<AsyncResult: 2c0f6100-d499-44a0-aa0d-57ebec7fdd4c>:這個對象可以用來檢查任務的狀态或者獲得任務的傳回值。
"""
三、celery基本結構
cereryconfig.py
from celery.beat import crontab
BROKER_URL = "redis://:@127.0.0.1/1"
CELERY_RESULT_BACKEND = "redis://:@127.0.0.1:6379/2"
# Routing
CELERY_ROUTES = ({
'celery_proj.tasks.my_task1': {'queue': 'queue1'},
'celery_proj.tasks.my_task2': {'queue': 'queue1'},
'celery_proj.tasks.my_task3': {'queue': 'queue2'},
})
# 配置周期性任務,或者定時任務,5秒執行一次(celery beat)
BEAT_SCHEDULE = {
'every-5-seconds':
{
'task': 'celery_proj.tasks.my_task1',
'schedule': 5.0,
# 'args': (16, 16),
}
}
# 如果我們想指定在某天某時某分某秒執行某個任務,可以執行cron任務, 增加配置資訊如下:
beat_schedule = {
'every-5-minute':
{
'task': 'celery_proj.tasks.period_task',
'schedule': 5.0,
'args': (16, 16), # 參數函數
},
'add-every-monday-morning': {
'task': 'celery_proj.tasks.period_task',
'schedule': crontab(hour=7, minute=30, day_of_week=1),
'args': (16, 16),
},
}
"""
啟動隊列:
celery -A celery_proj.celerys worker -l info -Q queue1 -P eventlet
celery -A proj worker --loglevel=info -Q queue1,queue2 直接開啟兩個隊列
"""
celerys.py
from celery import Celery
# 建立celery執行個體
app = Celery('demo')
app.config_from_object('celery_proj.celeryconfig')
# 自動搜尋任務
app.autodiscover_tasks(['celery_proj'])
tasks.py
from celery_proj.celerys import app as celery_app
# 建立任務函數
@celery_app.task
def my_task1():
print("任務函數(my_task1)正在執行....")
@celery_app.task
def my_task2():
print("任務函數(my_task2)正在執行....")
@celery_app.task
def my_task3():
print("任務函數(my_task3)正在執行....")
@celery_app.task
def my_task4(a, b):
print("任務函數(my_task4)正在執行....")
return a + b
@celery_app.task
def my_task5(a, b):
print("任務函數(my_task5)正在執行....")
return a + b
@celery_app.task
def my_task6(a, b):
print("任務函數(my_task6)正在執行....")
return a + b
@celery_app.task
def period_task(a, b):
print(a + b)
test.py
from celery_proj.tasks import my_task1, my_task2, my_task3, my_task4, my_task5, my_task6
from celery import group
from celery import chain
# my_task1.delay()
"""
調用任務:
可以使用apply_async()方法,該方法可讓我們設定一些任務執行的參數,
例如,任務多久之後才執行,任務被發送到那個隊列中等等.
my_task.apply_async((2, 2), queue='my_queue', countdown=10)
任務my_task将會被發送到my_queue隊列中,并且在發送10秒之後執行。
如果我們直接執行任務函數,将會直接執行此函數在目前程序中,并不會向broker發送任何消息。
無論是delay()還是apply_async()方式都會傳回AsyncResult對象,
友善跟蹤任務執行狀态,但需要我們配置result_backend.
每一個被調用的任務都會被配置設定一個ID,我們叫Task ID.
queue="queue1": 指定用哪個隊列
"""
# my_task2.apply_async(queue="queue1", countdown=10)
"""
一個signature包裝了一個參數和執行選項的單個任務調用。我們可将這個signature傳遞給函數。
我們将my_task1()任務包裝稱一個signature:10秒後執行
"""
# t3 = my_task3.signature(countdown=10)
# t3.delay()
"""
Primitives:
這些primitives本身就是signature對象,是以它們可以以多種方式組合成複雜的工作流程。primitives如下:
group: 一組任務并行執行,傳回一組傳回值,并可以按順序檢索傳回值。
chain: 任務一個一個執行,一個執行完将執行return結果傳遞給下一個任務函數.
"""
# 将多個signature放入同一組中
# my_group = group((my_task4.s(11, 12), my_task5.s(1, 12), my_task6.s(11, 2)))
# ret = my_group() # 執行組任務
# print(ret.get()) # 輸出每個任務結果
"""
将多個signature組成一個任務鍊
my_task4的運作結果将會傳遞給my_task5
my_task5的運作結果會傳遞給my_task6
"""
# my_chain = chain(my_task4.s(10, 10) | my_task5.s(10) | my_task6.s(10))
# ret = my_chain() # 執行任務鍊
# print(ret.get()) # 輸出最終結果
"""
Routing(要寫配置檔案)
假如我們有兩個worker,一個worker專門用來處理郵件發送任務和圖像處理任務,一個worker專門用來處理檔案上傳任務。
我們建立兩個隊列,一個專門用于存儲郵件任務隊列和圖像處理,一個用來存儲檔案上傳任務隊列。
Celery支援AMQP(Advanced Message Queue)所有的路由功能,我們也可以使用簡單的路由設定将指定的任務發送到指定的隊列中.
"""
# my_task1.apply_async(queue='queue1')
# my_task3.apply_async(queue='queue2')
"""
Periodic Tasks:(要寫配置檔案)
celery beat是一個排程器,它可以周期内指定某個worker來執行某個任務。
如果我們想周期執行某個任務需要增加beat_schedule配置資訊.
不能在windows上運作
啟動woker處理周期性任務:celery -A proj worker --loglevel=info --beat
celery需要儲存上次任務運作的時間在資料檔案中,檔案在目前目錄下名字叫celerybeat-schedule. beat需要通路此檔案:
celery -A proj beat -s /home/celery/var/run/celerybeat-schedule
"""
三、celery在Django中的應用
tasks.py
from django_celery.celerys import app
import time
# 加上app對象的task裝飾器
# 此函數為任務函數
@app.task
def my_task():
print("任務開始執行....")
time.sleep(5)
print("任務執行結束....")
# 用于定時執行的任務
@app.task
def interval_task():
print("我每隔5秒鐘時間執行一次....")
views.py
from django.shortcuts import render
from django.http import HttpResponse
from .tasks import my_task
# Create your views here.
"""
django-admin startproject celery_demo 建立Django項目
python manage.py startapp demo 建立App
python -m pip install --upgrade pip
"""
def index(request):
# 将my_task任務加入到celery隊列中
# 如果my_task函數有參數,可通過delay()傳遞
# 例如 my_task(a, b), my_task.delay(10, 20)
my_task.delay()
return HttpResponse("<h1>伺服器傳回響應内容!</h1>")
celery.py
from celery import Celery
from django.conf import settings
import os
# 為celery設定環境變量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_celery.settings')
# 建立應用
app = Celery("app1")
# 配置應用
app.conf.update(
# 配置broker, 這裡我們用redis作為broker
BROKER_URL='redis://:@127.0.0.1:6379/1',
# 使用項目資料庫存儲任務執行結果
CELERY_RESULT_BACKEND='django-db',
# 配置定時器子產品,定時器資訊存儲在資料庫中
CELERYBEAT_SCHEDULER='django_celery_beat.schedulers.DatabaseScheduler'
)
# 設定app自動加載任務
# 從已經安裝的app中查找任務
app.autodiscover_tasks(settings.INSTALLED_APPS)
# @app.task(bind=True)
# def test(self):
# print('Request:{0!r}'.format(self.request))
settings.py
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'app1.apps.App1Config',
'django_celery_results', # 注意此處應用名為下劃線
'django_celery_beat', # 安裝應用
]
urls.py
from django.conf.urls import url
from django.contrib import admin
from app1.views import index
urlpatterns = [
url(r'^admin/', admin.site.urls),
url(r'^index/', index),
]