天天看點

celery實測之Django調用

1、建立并配置一個celery檔案,需同setting在同一級。

from celery import Celery
from django.conf import settings
import os

# 為celery設定環境變量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'AIMilling.settings.dev')    # 注意此為項目的名稱

# 建立應用
app = Celery("demo")
# 配置應用
app.conf.update(
    # 配置broker, 這裡我們用redis作為broker
    BROKER_URL='redis://@127.0.0.1:6379/1',    # 無密碼寫法
    BROKER_URL='redis://:[email protected]:6379/1',   # 密碼寫法

)
# 設定app自動加載任務
# 從已經安裝的app中查找任務
app.autodiscover_tasks(settings.INSTALLED_APPS)    
           

2、在app下面建立一個tasks檔案。

from AIMilling.celery import app    # 調用自己執行個體化的app


# 建立任務函數
@app.task
def my_task(number1,number2):
    print(number1+number2)
           

3、視圖調用。

class TestView(APIView):
    authentication_classes = []

    def post(self, request, *args, **kwargs):
        """add function"""
        number1=1
        number2=2
        my_task.delay(number1, number2)
        return Response({'info': 'ok'})
           

4、啟動celery。

C:\Users\Administrator\Desktop\AIMilling>celery -A AIMilling worker -l info --pool=solo
           

5、調用api接口即可啟動程式。

post  http://127.0.0.1:8000/user/v1/test/