文章目錄
-
-
-
-
- 為什麼要使用異步
- 一、Celery簡介
-
- Celery使用場景
- Celery構架
- Celery安裝
- 二、Celery的使用
-
- 第一個Celery程式
- Celery配置
-
-
-
為什麼要使用異步
首先,我們要知道計算機的處理分為兩種,CPU處理和IO處理。一般來說,處理一個任務,需要CPU和IO互相協調。
CPU的處理速度遠遠快于IO的,是以當一個任務的CPU處理部分完成後,還需要等待IO的完成。這個等待的過程也就是程序阻塞。程序占着CPU空等沒活幹。
而異步就是把程序在等待過程中占用的CPU釋放掉,讓它去處理其他的東西。那正在處理的IO怎麼辦?系統提供了一種通知機制,使得在IO處理完成後通知系統,該程序進入就緒隊列,等待CPU配置設定。
是以異步使得CPU資源得以充分使用。
一、Celery簡介
Celery 是一個基于 Python 開發的分布式異步消息任務隊列,通過它可以輕松的實作任務的異步處理,
Celery使用場景
▷ 異步任務:解放CPU,将耗時的IO任務交給Celery去異步執行,比如發送郵件、音頻處理
▷ 定時任務:類似于crontab,比如每日資料統計
Celery構架
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsICM38CXlZHbvN3cpR2Lc1TPB10QGtWUCpEMJ9CXsxWam9CXwADNvwVZ6l2c052bm9CXUJDT1wkNhVzLcRnbvZ2Lc1TPn1kenRVTwkleOpHOsJGcohVYsR2MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2LcRHelR3LcJzLctmch1mclRXY39DM0YjMygTMxATMyATM4EDMy8CX0Vmbu4GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.jpg)
▶ Producer:任務委托方
▶ Broker:任務中心(中介),如RabbitMQ、Redis等1
▶ Beat:任務排程器
▶ Worker:任務執行者,可以有多個(分布式)
▶ Result:任務中心的資料庫,儲存任務執行結果2
▶ Backend:因為任務經由中介,而非直接委派到Worker手上,是以Producer并不知道任務被委派給了誰,以及任務的完成結果,是以這時候需要一個Backend(了解成手機,通過手機檢視任務完成情況)
Celery安裝
安裝消息中間件Redis
sudo apt-get install redis-server
sudo pip3 install redis
安裝Celery
sudo pip3 install celery
二、Celery的使用
第一個Celery程式
建立一個
task.py
from celery import Celery
app = Celery('add_task',
broker='redis://localhos:6379/1',
backend='redis://localhost:6379/2')
@app.task
def add(x, y):
print("running...",x,y)
return x+y
确定Redis服務已經開啟
redis-server
啟動 Celery Worker 來開始監聽并執行任務
celery -A tasks worker -l info
# -A 參數表示app名稱,-l 參數表示日志類型
好了,現在任務給定了,Workers也已經就緒,再建立一個
test.py
檔案
from task import add
result = add.delay(2, 3) # 送出任務,傳回任務id
print(result.ready()) # 傳回True的話,表示送出成功
print(result.get()) # 擷取任務執行結果
Celery配置
從上面的例子可以看出,每次建立app都需要指定broker和backend。
怎樣簡化這個過程呢?
我們可以将相關的檔案子產品化(子產品名celery_app,再添加__init__.py檔案)
子產品下建立一個
celeryconfig.py
檔案
BROKER_URL = 'redis://localhos:6379/1'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/2')
# 導入指定的任務子產品
CELERY_TASKS = {
'celery_app.task1', # task1.py是子產品下的任務檔案
'celery_app.task2',
}
__init__.py
from celery import Celery
app = Celery('demo')
app.config_from_object('celery_app.celeryconfig')
以上配置完成後,則在task1.py等檔案(存在@app.task)中添加一行,即可
from celery_app import app
- Celery 本身不提供消息服務,但是可以友善的和第三方提供的消息中間件內建。包括,RabbitMQ, Redis, MongoDB (experimental), Amazon SQS (experimental),CouchDB (experimental), SQLAlchemy (experimental),Django ORM (experimental), IronMQ ↩︎
- Task result store 用來存儲 Worker 執行的任務的結果,Celery 支援以不同方式存儲任務的結果,包括 AMQP, redis,memcached, mongodb,SQLAlchemy, Django ORM,Apache Cassandra, IronCache 等。 ↩︎