天天看點

python筆記(celery架構初識)

一、Celery的定義

  1. Celery(芹菜)是一個簡單、靈活且可靠的,處理大量消息的分布式系統,并且提供維護這樣一個系統的必需工具。
  2. 我比較喜歡的一點是:Celery支援使用任務隊列的方式在分布的機器、程序、線程上執行任務排程。然後我接着去了解什麼是任務隊列。
  3. 任務隊列
    任務隊列是一種線上程或機器間分發任務的機制。
               
  4. 消息隊列

    消息隊列的輸入是工作的一個單元,稱為任務,獨立的職程(Worker)程序持續監視隊列中是否有需要處理的新任務。

    Celery 用消息通信,通常使用中間人(Broker)在用戶端和職程間斡旋。這個過程從用戶端向隊列添加消息開始,之後中間人把消息派送給職程,職程對消息進行處理。如下圖所示:

    python筆記(celery架構初識)
    python筆記(celery架構初識)
    Celery 系統可包含多個職程和中間人,以此獲得高可用性和橫向擴充能力。
  5. Celery的架構

    Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。

    消息中間件

    Celery本身不提供消息服務,但是可以友善的和第三方提供的消息中間件內建,包括,RabbitMQ,Redis,MongoDB等,這裡我先去了解RabbitMQ,Redis。
               
    任務執行單元
    Worker是Celery提供的任務執行的單元,worker并發的運作在分布式的系統節點中
               
    任務結果存儲
    Task result store用來存儲Worker執行的任務的結果,Celery支援以不同方式存儲任務的結果,包括Redis,MongoDB,Django ORM,AMQP等,這裡我先不去看它是如何存儲的,就先選用Redis來存儲任務執行結果。
               

二、簡單操作

cereryconfig.py

BROKER_URL = "redis://:@127.0.0.1/1"
CELERY_RESULT_BACKEND = "redis://:@127.0.0.1:6379/2"
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24  # 任務過期時間
CELERY_ACCEPT_CONTENT = ["json"]

"""
CELERY_DEFAULT_QUEUE   # 預設隊列
CELERY_BROKER_URL # Broker 位址
CELERY_RESULT_BACKEND # 結果存儲位址
CELERY_TASK_SERIALIZER # 任務序列化方式
CELERY_RESULT_SERIALIZER # 任務執行結果序列化方式
CELERY_TASK_RESULT_EXPIRES # 	任務過期時間
CELERY_ACCEPT_CONTENT # 指定任務接受的内容類型(序列化)

"""
           

celerys.py

from celery import Celery


"""
1、Celery第一個參數是給其設定一個名字, 第二參數我們設定一個中間人broker, 在這裡我們使用Redis作為中間人。
my_task函數是我們編寫的一個任務函數, 通過加上裝飾器app.task, 将其注冊到broker的隊列中

2、如果我們想跟蹤任務的狀态,Celery需要将結果儲存到某個地方。
有幾種儲存的方案可選:SQLAlchemy、Django ORM、Memcached、 Redis、RPC (RabbitMQ/AMQP)。
配置backend參數

"""
app = Celery("demo")

# 從單獨的配置子產品中加載配置
app.config_from_object('celeryconfig')

# 建立任務函數
@app.task  # 将其注冊到broker的隊列中。
def my_task(a, b):
    print("任務在執行。。。。")
    return a + b
           

tasks.py

from celerys import my_task
"""
任務加入到broker隊列中,以便剛才我們建立的celery workder伺服器能夠從隊列中取出任務并執行。
如何将任務函數加入到隊列中,可使用delay()。

解決錯誤:ask handler raised error: ValueError('not enough values to unpack (expected 3, got 0)',)
Traceback (most recent call last):

pip install eventlet
celery -A <mymodule> worker -l info -P eventlet
"""
ret = my_task.delay(1, 2)
ret.failed()  # 錯誤結果
print(ret.result)  # 擷取傳回值
"""
傳回值:
    <AsyncResult: 2c0f6100-d499-44a0-aa0d-57ebec7fdd4c>:這個對象可以用來檢查任務的狀态或者獲得任務的傳回值。
    
"""
           

三、celery基本結構

python筆記(celery架構初識)

cereryconfig.py

from celery.beat import crontab


BROKER_URL = "redis://:@127.0.0.1/1"
CELERY_RESULT_BACKEND = "redis://:@127.0.0.1:6379/2"


# Routing
CELERY_ROUTES = ({
    'celery_proj.tasks.my_task1': {'queue': 'queue1'},
    'celery_proj.tasks.my_task2': {'queue': 'queue1'},
    'celery_proj.tasks.my_task3': {'queue': 'queue2'},

})


# 配置周期性任務,或者定時任務,5秒執行一次(celery beat)
BEAT_SCHEDULE = {
    'every-5-seconds':
        {
            'task': 'celery_proj.tasks.my_task1',
            'schedule': 5.0,
            # 'args': (16, 16),
        }
}

# 如果我們想指定在某天某時某分某秒執行某個任務,可以執行cron任務, 增加配置資訊如下:

beat_schedule = {
    'every-5-minute':
        {
            'task': 'celery_proj.tasks.period_task',
            'schedule': 5.0,
            'args': (16, 16),  # 參數函數
        },
    'add-every-monday-morning': {
        'task': 'celery_proj.tasks.period_task',
        'schedule': crontab(hour=7, minute=30, day_of_week=1),
        'args': (16, 16),
    },

}

"""
啟動隊列:
celery -A celery_proj.celerys worker -l info -Q queue1 -P eventlet
celery -A proj worker --loglevel=info -Q queue1,queue2 直接開啟兩個隊列
"""
           

celerys.py

from celery import Celery

# 建立celery執行個體
app = Celery('demo')
app.config_from_object('celery_proj.celeryconfig')

# 自動搜尋任務
app.autodiscover_tasks(['celery_proj'])
           

tasks.py

from celery_proj.celerys import app as celery_app


# 建立任務函數
@celery_app.task
def my_task1():
    print("任務函數(my_task1)正在執行....")


@celery_app.task
def my_task2():
    print("任務函數(my_task2)正在執行....")


@celery_app.task
def my_task3():
    print("任務函數(my_task3)正在執行....")


@celery_app.task
def my_task4(a, b):
    print("任務函數(my_task4)正在執行....")
    return a + b


@celery_app.task
def my_task5(a, b):
    print("任務函數(my_task5)正在執行....")
    return a + b


@celery_app.task
def my_task6(a, b):
    print("任務函數(my_task6)正在執行....")
    return a + b


@celery_app.task
def period_task(a, b):
    print(a + b)
           

test.py

from celery_proj.tasks import my_task1, my_task2, my_task3, my_task4, my_task5, my_task6
from celery import group
from celery import chain


# my_task1.delay()
"""
調用任務:
    可以使用apply_async()方法,該方法可讓我們設定一些任務執行的參數,
    例如,任務多久之後才執行,任務被發送到那個隊列中等等.
    my_task.apply_async((2, 2), queue='my_queue', countdown=10)
    任務my_task将會被發送到my_queue隊列中,并且在發送10秒之後執行。
    如果我們直接執行任務函數,将會直接執行此函數在目前程序中,并不會向broker發送任何消息。
    無論是delay()還是apply_async()方式都會傳回AsyncResult對象,
    友善跟蹤任務執行狀态,但需要我們配置result_backend.
    每一個被調用的任務都會被配置設定一個ID,我們叫Task ID.
    queue="queue1": 指定用哪個隊列
"""
# my_task2.apply_async(queue="queue1", countdown=10)

"""
一個signature包裝了一個參數和執行選項的單個任務調用。我們可将這個signature傳遞給函數。
我們将my_task1()任務包裝稱一個signature:10秒後執行
"""
# t3 = my_task3.signature(countdown=10)
# t3.delay()


"""
Primitives:
  這些primitives本身就是signature對象,是以它們可以以多種方式組合成複雜的工作流程。primitives如下:
  group: 一組任務并行執行,傳回一組傳回值,并可以按順序檢索傳回值。
  chain: 任務一個一個執行,一個執行完将執行return結果傳遞給下一個任務函數.

"""
# 将多個signature放入同一組中
# my_group = group((my_task4.s(11, 12), my_task5.s(1, 12), my_task6.s(11, 2)))
# ret = my_group()  # 執行組任務
# print(ret.get())  # 輸出每個任務結果

"""
将多個signature組成一個任務鍊
my_task4的運作結果将會傳遞給my_task5
my_task5的運作結果會傳遞給my_task6
"""
# my_chain = chain(my_task4.s(10, 10) | my_task5.s(10) | my_task6.s(10))
# ret = my_chain()  # 執行任務鍊
# print(ret.get())  # 輸出最終結果


"""
Routing(要寫配置檔案)
  假如我們有兩個worker,一個worker專門用來處理郵件發送任務和圖像處理任務,一個worker專門用來處理檔案上傳任務。
  我們建立兩個隊列,一個專門用于存儲郵件任務隊列和圖像處理,一個用來存儲檔案上傳任務隊列。
  Celery支援AMQP(Advanced Message Queue)所有的路由功能,我們也可以使用簡單的路由設定将指定的任務發送到指定的隊列中.

"""
# my_task1.apply_async(queue='queue1')
# my_task3.apply_async(queue='queue2')

"""
Periodic Tasks:(要寫配置檔案)
  celery beat是一個排程器,它可以周期内指定某個worker來執行某個任務。
  如果我們想周期執行某個任務需要增加beat_schedule配置資訊.  
  不能在windows上運作
  啟動woker處理周期性任務:celery -A proj worker --loglevel=info --beat
  
  celery需要儲存上次任務運作的時間在資料檔案中,檔案在目前目錄下名字叫celerybeat-schedule. beat需要通路此檔案:
    celery -A proj beat -s /home/celery/var/run/celerybeat-schedule
"""
           

三、celery在Django中的應用

python筆記(celery架構初識)

tasks.py

from django_celery.celerys import app
import time


# 加上app對象的task裝飾器
# 此函數為任務函數
@app.task
def my_task():
    print("任務開始執行....")
    time.sleep(5)
    print("任務執行結束....")


# 用于定時執行的任務
@app.task
def interval_task():
    print("我每隔5秒鐘時間執行一次....")
           

views.py

from django.shortcuts import render
from django.http import HttpResponse
from .tasks import my_task
# Create your views here.
"""
django-admin startproject celery_demo 建立Django項目
python manage.py startapp demo 建立App
python -m pip install --upgrade pip
"""


def index(request):
    # 将my_task任務加入到celery隊列中
    # 如果my_task函數有參數,可通過delay()傳遞
    # 例如 my_task(a, b), my_task.delay(10, 20)
    my_task.delay()
    return HttpResponse("<h1>伺服器傳回響應内容!</h1>")
           

celery.py

from celery import Celery
from django.conf import settings
import os

# 為celery設定環境變量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_celery.settings')

# 建立應用
app = Celery("app1")
# 配置應用
app.conf.update(
    # 配置broker, 這裡我們用redis作為broker
    BROKER_URL='redis://:@127.0.0.1:6379/1',
    # 使用項目資料庫存儲任務執行結果
    CELERY_RESULT_BACKEND='django-db',
    # 配置定時器子產品,定時器資訊存儲在資料庫中
    CELERYBEAT_SCHEDULER='django_celery_beat.schedulers.DatabaseScheduler'
)
# 設定app自動加載任務
# 從已經安裝的app中查找任務
app.autodiscover_tasks(settings.INSTALLED_APPS)


# @app.task(bind=True)
# def test(self):
#     print('Request:{0!r}'.format(self.request))
           

settings.py

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'app1.apps.App1Config',
    'django_celery_results',  # 注意此處應用名為下劃線
    'django_celery_beat',  # 安裝應用
]
           

urls.py

from django.conf.urls import url
from django.contrib import admin
from app1.views import index

urlpatterns = [
    url(r'^admin/', admin.site.urls),
    url(r'^index/', index),
]