一.Celery簡介
Celery是一個功能完備即插即用的異步任務隊列系統。它适用于異步處理問題,當發送郵件、或者檔案上傳, 圖像處理等等一些比較耗時的操作,我們可将其異步執行,這樣使用者不需要等待很久,提高使用者體驗。
文檔:http://docs.jinkan.org/docs/celery/getting-started/index.html
Celery的特點是:
- 簡單,易于使用和維護,有豐富的文檔。
- 高效,單個celery程序每分鐘可以處理數百萬個任務。
- 靈活,celery中幾乎每個部分都可以自定義擴充。
任務隊列是一種跨線程、跨機器工作的一種機制.
任務隊列中包含稱作任務的工作單元。有專門的工作程序持續不斷的監視任務隊列,并從中獲得新的任務并處理.
celery通過消息進行通信,通常使用一個叫Broker(中間人)來協client(任務的發出者)和worker(任務的處理者). clients發出消息到隊列中,broker将隊列中的資訊派發給worker來處理。
Celery的架構
Celery的架構由三部分組成,消息隊列(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。

一個celery系統可以包含很多的worker和broker
Celery本身不提供消息隊列功能,但是可以很友善地和第三方提供的消息中間件進行內建,包括RabbitMQ,Redis,MongoDB等
二.安裝
pip install -U celery #-U是update的意思,有就進行更新,沒有就安裝
#後面單獨将celery運作起來就可以了
也可從官方直接下載下傳安裝包:https://pypi.python.org/pypi/celery/
tar xvfz celery-0.0.0.tar.gz
cd celery-0.0.0
python setup.py
python setup.py install
使用
使用celery第一件要做的最為重要的事情是需要先建立一個Celery執行個體,我們一般叫做celery應用,或者更簡單直接叫做一個app。app應用是我們使用celery所有功能的入口,比如建立任務,管理任務等,在使用celery的時候,app必須能夠被其他的子產品導入。
一般celery任務目錄直接放在項目的根目錄下即可,路徑:
luffyapi/
├── mycelery/
├── config.py # 配置檔案
├── __init__.py
├── main.py # 主程式
└── sms/ # 一個目錄可以放置多個任務,該目錄下存放目前任務執行時需要的子產品或依賴,也可以每個任務單獨一個目錄
└── tasks.py # 任務的檔案,名稱必須是這個!!!
main.py,代碼:
# 主程式
from celery import Celery
# 建立celery執行個體對象
app = Celery("luffy")
# 通過app對象加載配置,檔案路徑
app.config_from_object("mycelery.config")
# 自動搜尋并加載任務
# 參數必須必須是一個清單,裡面的每一個任務都是任務的路徑名稱
# app.autodiscover_tasks(["任務1","任務2"])
app.autodiscover_tasks(["mycelery.sms","mycelery.cache"]) #會自動識别sms目錄下面的tasks.py檔案中的任務,是以不需寫成mycelery.sms.tasks
# 啟動Celery的指令
# 強烈建議切換目錄到項目的根目錄下啟動celery!!
# celery -A mycelery.main worker --loglevel=info
配置檔案config.py,代碼:(檔案形式,json形式,對象形式都行)
# 任務隊列的連結位址(變量名必須叫這個)
broker_url = 'redis://127.0.0.1:6379/14'
# 結果隊列的連結位址(變量名必須叫這個)
result_backend = 'redis://127.0.0.1:6379/15'
建立一個任務檔案sms/tasks.py,并建立任務,代碼:
# celery的任務必須寫在tasks.py的檔案中,别的檔案名稱不識别!!!
from mycelery.main import app
@app.task(name="send_sms") # name表示設定任務的名稱,如果不填寫,則預設使用函數名(路徑)做為任務名
def send_sms():
print("發送短信!!!")
@app.task # name表示設定任務的名稱,如果不填寫,則預設使用函數名做為任務名
def send_sms2():
print("發送短信任務2!!!")
接下來,我們運作celery,在終端,項目根目錄下(也就是mycelery的外層目錄裡面)執行指令
celery -A mycelery.main worker --loglevel=info (或者直接寫info也行) #-A是指定celery啟動入口
效果如下:
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-fZ4cNNo3-1604837761033)(assets/1562037230098.png)]
- ** ---------- [config]
- ** ---------- .> app: __main__:0x10b24ba50
- ** ---------- .> transport: redis://127.0.0.1:6379/14
- ** ---------- .> results: redis://127.0.0.1:6379/15
- *** --- * --- .> concurrency: 16 (prefork) #表示它開啟了16個線程準備來來執行任務,可以在後面執行任務的時候自行測試一下,一共可以有16個任務同時執行
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) #有沒有開啟其他的事件(比如事件監聽等等一些東西)
運作起來之後,如果又添加了新的任務,需要重新啟動celery。
然後執行任務,可以在mycelery下面建立一個py檔案進行測試,名字随便起,比如叫做runtask.py檔案,内容如下
#引入任務
from mycelery.sms.tasks import send_sms
#執行任務
send_sms.delay() #這就是将任務交給worker去執行了,這個任務在上面的時候已經加到隊列中了,是以調用它的意思就是讓worker去隊列中找到send_sms這個任務去執行
#然後運作我們這個檔案,右鍵運作就行,celery會在背景一直運作着
去redis中檢視,就能看到任務執行結果了
如果想擷取任務結果可以通過get方法,或者AsyncResult這個類來拿
方式1:
import time
from mycelery.sms.tasks import send_sms
from mycelery.mail.tasks import send_email
ret = send_sms.delay()
print(ret,type(ret))
print(ret.ready())
print(ret.id)
# time.sleep(3)
print(ret.ready())
print(ret.get(timeout=1),)
方式2
import time
from mycelery.sms.tasks import send_sms
from mycelery.mail.tasks import send_email
from celery.result import AsyncResult
ret = send_sms.delay() #執行的任務如果需要參數,那麼就直接在delay方法裡面寫:send_sms(mobile,sms_code),執行時:delay(mobile,sms_code)
async_task = AsyncResult(id=ret.id,app=send_sms)
print(async_task.successful())
result = async_task.get()
print(result)
celery還有很多可配置的項,還可以拓展很多的方法,并且還能完成定時任務:定時備份資料庫,定時分析日志檔案等。關于這些,還是建議大家學習一下。
其他參考文檔:
http://docs.celeryproject.org/en/latest/getting-started/introduction.html
https://github.com/celery/celery/tree/master/examples/django/
https://www.jianshu.com/p/1840035cb510
https://flower.readthedocs.io/en/latest/screenshots.html
三.django和celery進行結合使用
在main.py主程式中對django的配置檔案進行加載
# 主程式
import os
from celery import Celery
# 建立celery執行個體對象
app = Celery("luffy") #celery對象可以建立多個,是以我們最好給我們目前的celery應用起個名字,比如叫做luffy
# 把celery和django進行組合,需要識别和加載django的配置檔案
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffyapi.settings.dev')
#如果隻是使用了logging日志功能的話可以不寫以下兩句,因為logging是python提供的子產品,但是将來可能使用celery來執行其他的django任務,是以我們先寫上
import django
django.setup()
# 通過app對象加載配置
app.config_from_object("mycelery.config")
# 加載任務
# 參數必須必須是一個清單,裡面的每一個任務都是任務的路徑名稱
# app.autodiscover_tasks(["任務1","任務2"])
app.autodiscover_tasks(["mycelery.sms","mycelery.mail"])
# 啟動Celery的指令
# 切換目錄到mycelery根目錄下啟動
# celery -A mycelery.main worker --loglevel=info
在需要使用django配置的任務中,直接加載配置,是以我們把注冊的短信發送功能,整合成一個任務函數,代碼:
from mycelery.main import app
from luffyapi.libs.yuntongxun.sms import CCP
from luffyapi.settings import constants
import logging
log = logging.getLogger("django")
@app.task(name="send_sms")
def send_sms(mobile, sms_code):
"""發送短信"""
ccp = CCP()
ret = ccp.send_template_sms(mobile, [sms_code, constants.SMS_EXPIRE_TIME//60], constants.SMS_TEMPLATE_ID)
if not ret:
log.error("使用者注冊短信發送失敗!手機号:%s" % mobile)
在這個任務中,我們需要加載短信發送的sdk和相關的配置常量,是以我們可以直接把django中的短信發送子產品和相關的常量配置檔案直接剪切到目前sms任務目錄中
mycelery/
├── config.py
├── __init__.py
├── main.py
└── sms/
├── __init__.py
├── tasks.py
再次啟動項目即可。
最終在django裡面,我們調用Celery來異步執行任務。需要完成2個步驟:
# 1. 聲明一個和celery一模一樣的任務函數,但是我們可以導包來解決
from mycelery.sms.tasks import send_sms
# 2. 調用任務函數,釋出任務
send_sms.delay(mobile,sms_code)
# send_sms.delay() 如果調用的任務函數沒有參數,則不需要填寫任何内容
改完之後的views.py
class SMSAPIView(APIView):
def get(self,request,mobile):
# todo 1. 判斷手機号是否在60秒曾經發送過短信
redis_conn = get_redis_connection('sms_code')
ret = redis_conn.get("mobile_%s" % mobile)
if ret is not None:
return Response({'msg':'60秒内已經發送過短信了'},status=status.HTTP_400_BAD_REQUEST)
# 2. 建立驗證碼
sms_code = "%06d" % random.randint(1,999999)
pipe = redis_conn.pipeline()
pipe.multi()
pipe.setex('sms_%s' % mobile, constants.SMS_EXPIRE_TIME , sms_code)
pipe.setex("mobile_%s" % mobile,constants.SMS_INTERVAL_TIME,'_')
pipe.execute() #執行事務
try:
from mycelery.sms.tasks import send_sms
ret = send_sms.delay(mobile,sms_code) #執行任務
#其實短信發送,沒有必要擷取它的傳回結果,日志中已經記錄了它發送成功與否的狀态,如果我們想擷取結果,那麼可以使用我筆記裡面擷取celery任務結果的方法。
print('ret>>>',ret.get())
# ccp = CCP()
# #由于短信發送那個有效期是分鐘為機關的,是以我們SMS_EXPIRE_TIME//60
# ret = ccp.send_template_sms(mobile,[sms_code,constants.SMS_EXPIRE_TIME//60],constants.SMS_TEMPLATE_ID)
# if not ret:
# logger = logging.getLogger('django')
# logger.error('使用者注冊短信發送失敗,手機号為%s' % mobile)
# return Response({'msg':'短信發送錯誤!'})
except:
return Response({'msg':'發送短信失敗'},status=status.HTTP_500_INTERNAL_SERVER_ERROR)
return Response({'msg':'發送短信成功'})