天天看點

一文搞懂Celery一.Celery簡介二.安裝三.django和celery進行結合使用

一.Celery簡介

Celery是一個功能完備即插即用的異步任務隊列系統。它适用于異步處理問題,當發送郵件、或者檔案上傳, 圖像處理等等一些比較耗時的操作,我們可将其異步執行,這樣使用者不需要等待很久,提高使用者體驗。

文檔:http://docs.jinkan.org/docs/celery/getting-started/index.html

Celery的特點是:

  • 簡單,易于使用和維護,有豐富的文檔。
  • 高效,單個celery程序每分鐘可以處理數百萬個任務。
  • 靈活,celery中幾乎每個部分都可以自定義擴充。
任務隊列是一種跨線程、跨機器工作的一種機制.
任務隊列中包含稱作任務的工作單元。有專門的工作程序持續不斷的監視任務隊列,并從中獲得新的任務并處理.
celery通過消息進行通信,通常使用一個叫Broker(中間人)來協client(任務的發出者)和worker(任務的處理者). clients發出消息到隊列中,broker将隊列中的資訊派發給worker來處理。
           

Celery的架構

Celery的架構由三部分組成,消息隊列(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。

一文搞懂Celery一.Celery簡介二.安裝三.django和celery進行結合使用
一個celery系統可以包含很多的worker和broker

Celery本身不提供消息隊列功能,但是可以很友善地和第三方提供的消息中間件進行內建,包括RabbitMQ,Redis,MongoDB等
           

二.安裝

pip install -U celery  #-U是update的意思,有就進行更新,沒有就安裝
#後面單獨将celery運作起來就可以了
           

也可從官方直接下載下傳安裝包:https://pypi.python.org/pypi/celery/

tar xvfz celery-0.0.0.tar.gz
cd celery-0.0.0
python setup.py 
python setup.py install
           

使用

使用celery第一件要做的最為重要的事情是需要先建立一個Celery執行個體,我們一般叫做celery應用,或者更簡單直接叫做一個app。app應用是我們使用celery所有功能的入口,比如建立任務,管理任務等,在使用celery的時候,app必須能夠被其他的子產品導入。

一般celery任務目錄直接放在項目的根目錄下即可,路徑:

luffyapi/
├── mycelery/
    ├── config.py     # 配置檔案
    ├── __init__.py   
    ├── main.py       # 主程式
    └── sms/          # 一個目錄可以放置多個任務,該目錄下存放目前任務執行時需要的子產品或依賴,也可以每個任務單獨一個目錄
        └── tasks.py  # 任務的檔案,名稱必須是這個!!!
           

main.py,代碼:

# 主程式
from celery import Celery
# 建立celery執行個體對象
app = Celery("luffy")

# 通過app對象加載配置,檔案路徑
app.config_from_object("mycelery.config")

# 自動搜尋并加載任務
# 參數必須必須是一個清單,裡面的每一個任務都是任務的路徑名稱
# app.autodiscover_tasks(["任務1","任務2"])
app.autodiscover_tasks(["mycelery.sms","mycelery.cache"]) #會自動識别sms目錄下面的tasks.py檔案中的任務,是以不需寫成mycelery.sms.tasks

# 啟動Celery的指令
# 強烈建議切換目錄到項目的根目錄下啟動celery!!
# celery -A mycelery.main worker --loglevel=info
           

配置檔案config.py,代碼:(檔案形式,json形式,對象形式都行)

# 任務隊列的連結位址(變量名必須叫這個)
broker_url = 'redis://127.0.0.1:6379/14'  
# 結果隊列的連結位址(變量名必須叫這個)
result_backend = 'redis://127.0.0.1:6379/15'
           

建立一個任務檔案sms/tasks.py,并建立任務,代碼:

# celery的任務必須寫在tasks.py的檔案中,别的檔案名稱不識别!!!
from mycelery.main import app

@app.task(name="send_sms")  # name表示設定任務的名稱,如果不填寫,則預設使用函數名(路徑)做為任務名
def send_sms():
    print("發送短信!!!")

@app.task  # name表示設定任務的名稱,如果不填寫,則預設使用函數名做為任務名
def send_sms2():
    print("發送短信任務2!!!")
           

接下來,我們運作celery,在終端,項目根目錄下(也就是mycelery的外層目錄裡面)執行指令

celery -A mycelery.main worker --loglevel=info (或者直接寫info也行) #-A是指定celery啟動入口
           

效果如下:

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-fZ4cNNo3-1604837761033)(assets/1562037230098.png)]

- ** ---------- [config]
- ** ---------- .> app:         __main__:0x10b24ba50
- ** ---------- .> transport:   redis://127.0.0.1:6379/14
- ** ---------- .> results:     redis://127.0.0.1:6379/15
- *** --- * --- .> concurrency: 16 (prefork)  #表示它開啟了16個線程準備來來執行任務,可以在後面執行任務的時候自行測試一下,一共可以有16個任務同時執行
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) #有沒有開啟其他的事件(比如事件監聽等等一些東西)

           

運作起來之後,如果又添加了新的任務,需要重新啟動celery。

然後執行任務,可以在mycelery下面建立一個py檔案進行測試,名字随便起,比如叫做runtask.py檔案,内容如下

#引入任務
from mycelery.sms.tasks import send_sms  
#執行任務
send_sms.delay() #這就是将任務交給worker去執行了,這個任務在上面的時候已經加到隊列中了,是以調用它的意思就是讓worker去隊列中找到send_sms這個任務去執行
#然後運作我們這個檔案,右鍵運作就行,celery會在背景一直運作着
           

去redis中檢視,就能看到任務執行結果了

如果想擷取任務結果可以通過get方法,或者AsyncResult這個類來拿

方式1:
import time
from mycelery.sms.tasks import send_sms
from mycelery.mail.tasks import send_email

ret = send_sms.delay()
print(ret,type(ret))
print(ret.ready())
print(ret.id)
# time.sleep(3)
print(ret.ready())
print(ret.get(timeout=1),)

方式2
import time
from mycelery.sms.tasks import send_sms
from mycelery.mail.tasks import send_email

from celery.result import AsyncResult
ret = send_sms.delay()  #執行的任務如果需要參數,那麼就直接在delay方法裡面寫:send_sms(mobile,sms_code),執行時:delay(mobile,sms_code)
async_task = AsyncResult(id=ret.id,app=send_sms)

print(async_task.successful())
result = async_task.get()
print(result)

           

celery還有很多可配置的項,還可以拓展很多的方法,并且還能完成定時任務:定時備份資料庫,定時分析日志檔案等。關于這些,還是建議大家學習一下。

其他參考文檔:

http://docs.celeryproject.org/en/latest/getting-started/introduction.html

https://github.com/celery/celery/tree/master/examples/django/

https://www.jianshu.com/p/1840035cb510

https://flower.readthedocs.io/en/latest/screenshots.html

三.django和celery進行結合使用

在main.py主程式中對django的配置檔案進行加載

# 主程式
import os
from celery import Celery
# 建立celery執行個體對象
app = Celery("luffy") #celery對象可以建立多個,是以我們最好給我們目前的celery應用起個名字,比如叫做luffy


# 把celery和django進行組合,需要識别和加載django的配置檔案
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffyapi.settings.dev')
#如果隻是使用了logging日志功能的話可以不寫以下兩句,因為logging是python提供的子產品,但是将來可能使用celery來執行其他的django任務,是以我們先寫上
import django
django.setup()

# 通過app對象加載配置
app.config_from_object("mycelery.config")

# 加載任務
# 參數必須必須是一個清單,裡面的每一個任務都是任務的路徑名稱
# app.autodiscover_tasks(["任務1","任務2"])
app.autodiscover_tasks(["mycelery.sms","mycelery.mail"])

# 啟動Celery的指令
# 切換目錄到mycelery根目錄下啟動
# celery -A mycelery.main worker --loglevel=info
           

在需要使用django配置的任務中,直接加載配置,是以我們把注冊的短信發送功能,整合成一個任務函數,代碼:

from mycelery.main import app
from luffyapi.libs.yuntongxun.sms import CCP
from luffyapi.settings import constants
import logging

log = logging.getLogger("django")

@app.task(name="send_sms")
def send_sms(mobile, sms_code):
    """發送短信"""
    ccp = CCP()
    ret = ccp.send_template_sms(mobile, [sms_code, constants.SMS_EXPIRE_TIME//60], constants.SMS_TEMPLATE_ID)
    if not ret:
        log.error("使用者注冊短信發送失敗!手機号:%s" % mobile)
           

在這個任務中,我們需要加載短信發送的sdk和相關的配置常量,是以我們可以直接把django中的短信發送子產品和相關的常量配置檔案直接剪切到目前sms任務目錄中

mycelery/
├── config.py
├── __init__.py
├── main.py
└── sms/
    ├── __init__.py
    ├── tasks.py

           

再次啟動項目即可。

最終在django裡面,我們調用Celery來異步執行任務。需要完成2個步驟:

# 1. 聲明一個和celery一模一樣的任務函數,但是我們可以導包來解決
from mycelery.sms.tasks import send_sms

# 2. 調用任務函數,釋出任務
send_sms.delay(mobile,sms_code)
# send_sms.delay() 如果調用的任務函數沒有參數,則不需要填寫任何内容
           

改完之後的views.py

class SMSAPIView(APIView):

    def get(self,request,mobile):

        # todo 1. 判斷手機号是否在60秒曾經發送過短信
        redis_conn = get_redis_connection('sms_code')
        ret = redis_conn.get("mobile_%s" % mobile)
        if ret is not None:
            return Response({'msg':'60秒内已經發送過短信了'},status=status.HTTP_400_BAD_REQUEST)
        # 2. 建立驗證碼
        sms_code = "%06d" % random.randint(1,999999)

        pipe = redis_conn.pipeline()
        pipe.multi()
        pipe.setex('sms_%s' % mobile, constants.SMS_EXPIRE_TIME , sms_code)
        pipe.setex("mobile_%s" % mobile,constants.SMS_INTERVAL_TIME,'_')  
        pipe.execute() #執行事務
        try:

            from mycelery.sms.tasks import send_sms
            ret = send_sms.delay(mobile,sms_code)  #執行任務
            #其實短信發送,沒有必要擷取它的傳回結果,日志中已經記錄了它發送成功與否的狀态,如果我們想擷取結果,那麼可以使用我筆記裡面擷取celery任務結果的方法。
            print('ret>>>',ret.get())
            # ccp = CCP()
            # #由于短信發送那個有效期是分鐘為機關的,是以我們SMS_EXPIRE_TIME//60
            # ret = ccp.send_template_sms(mobile,[sms_code,constants.SMS_EXPIRE_TIME//60],constants.SMS_TEMPLATE_ID)
            # if not ret:
            #     logger = logging.getLogger('django')
            #     logger.error('使用者注冊短信發送失敗,手機号為%s' % mobile)
            #     return Response({'msg':'短信發送錯誤!'})
        except:

            return Response({'msg':'發送短信失敗'},status=status.HTTP_500_INTERNAL_SERVER_ERROR)
        return Response({'msg':'發送短信成功'})