天天看點

Python異步處理——Celery

文章目錄

          • 為什麼要使用異步
        • 一、Celery簡介
          • Celery使用場景
          • Celery構架
          • Celery安裝
        • 二、Celery的使用
          • 第一個Celery程式
          • Celery配置
為什麼要使用異步

首先,我們要知道計算機的處理分為兩種,CPU處理和IO處理。一般來說,處理一個任務,需要CPU和IO互相協調。

CPU的處理速度遠遠快于IO的,是以當一個任務的CPU處理部分完成後,還需要等待IO的完成。這個等待的過程也就是程序阻塞。程序占着CPU空等沒活幹。

而異步就是把程序在等待過程中占用的CPU釋放掉,讓它去處理其他的東西。那正在處理的IO怎麼辦?系統提供了一種通知機制,使得在IO處理完成後通知系統,該程序進入就緒隊列,等待CPU配置設定。

是以異步使得CPU資源得以充分使用。

一、Celery簡介

Celery 是一個基于 Python 開發的分布式異步消息任務隊列,通過它可以輕松的實作任務的異步處理,

Celery使用場景

▷ 異步任務:解放CPU,将耗時的IO任務交給Celery去異步執行,比如發送郵件、音頻處理

▷ 定時任務:類似于crontab,比如每日資料統計

Celery構架
Python異步處理——Celery

▶ Producer:任務委托方

▶ Broker:任務中心(中介),如RabbitMQ、Redis等1

▶ Beat:任務排程器

▶ Worker:任務執行者,可以有多個(分布式)

▶ Result:任務中心的資料庫,儲存任務執行結果2

▶ Backend:因為任務經由中介,而非直接委派到Worker手上,是以Producer并不知道任務被委派給了誰,以及任務的完成結果,是以這時候需要一個Backend(了解成手機,通過手機檢視任務完成情況)

Celery安裝

安裝消息中間件Redis

sudo apt-get install redis-server

sudo pip3 install redis
           

安裝Celery

sudo pip3 install celery
           

二、Celery的使用

第一個Celery程式

建立一個

task.py

from celery import Celery
 
app = Celery('add_task',
             broker='redis://localhos:6379/1',
             backend='redis://localhost:6379/2') 
            
@app.task
def add(x, y):
    print("running...",x,y)
    return x+y
           

确定Redis服務已經開啟

redis-server
           

啟動 Celery Worker 來開始監聽并執行任務

celery -A tasks worker -l info

# -A 參數表示app名稱,-l 參數表示日志類型
           

好了,現在任務給定了,Workers也已經就緒,再建立一個

test.py

檔案

from task import add

result = add.delay(2, 3)    # 送出任務,傳回任務id
print(result.ready())    # 傳回True的話,表示送出成功
print(result.get())    # 擷取任務執行結果
           
Celery配置

從上面的例子可以看出,每次建立app都需要指定broker和backend。

怎樣簡化這個過程呢?

我們可以将相關的檔案子產品化(子產品名celery_app,再添加__init__.py檔案)

子產品下建立一個

celeryconfig.py

檔案

BROKER_URL = 'redis://localhos:6379/1'

CELERY_RESULT_BACKEND = 'redis://localhost:6379/2')

# 導入指定的任務子產品
CELERY_TASKS = {
	'celery_app.task1',    # task1.py是子產品下的任務檔案
	'celery_app.task2',
}
           

__init__.py

from celery import Celery

app = Celery('demo')
app.config_from_object('celery_app.celeryconfig')
           

以上配置完成後,則在task1.py等檔案(存在@app.task)中添加一行,即可

from celery_app import app
           
  1. Celery 本身不提供消息服務,但是可以友善的和第三方提供的消息中間件內建。包括,RabbitMQ, Redis, MongoDB (experimental), Amazon SQS (experimental),CouchDB (experimental), SQLAlchemy (experimental),Django ORM (experimental), IronMQ ↩︎
  2. Task result store 用來存儲 Worker 執行的任務的結果,Celery 支援以不同方式存儲任務的結果,包括 AMQP, redis,memcached, mongodb,SQLAlchemy, Django ORM,Apache Cassandra, IronCache 等。 ↩︎