1、建立并配置一個celery檔案,需同setting在同一級。
from celery import Celery
from django.conf import settings
import os
# 為celery設定環境變量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'AIMilling.settings.dev') # 注意此為項目的名稱
# 建立應用
app = Celery("demo")
# 配置應用
app.conf.update(
# 配置broker, 這裡我們用redis作為broker
BROKER_URL='redis://@127.0.0.1:6379/1', # 無密碼寫法
BROKER_URL='redis://:[email protected]:6379/1', # 密碼寫法
)
# 設定app自動加載任務
# 從已經安裝的app中查找任務
app.autodiscover_tasks(settings.INSTALLED_APPS)
2、在app下面建立一個tasks檔案。
from AIMilling.celery import app # 調用自己執行個體化的app
# 建立任務函數
@app.task
def my_task(number1,number2):
print(number1+number2)
3、視圖調用。
class TestView(APIView):
authentication_classes = []
def post(self, request, *args, **kwargs):
"""add function"""
number1=1
number2=2
my_task.delay(number1, number2)
return Response({'info': 'ok'})
4、啟動celery。
C:\Users\Administrator\Desktop\AIMilling>celery -A AIMilling worker -l info --pool=solo
5、調用api接口即可啟動程式。
post http://127.0.0.1:8000/user/v1/test/