用到的python包:celery--3.1.25;django--1.11.26;python2.7.15;reids--2.10.6;
redis:4.0.11 (後來換的)
一、celery文檔
首先,附上官方文檔連接配接https://docs.celeryproject.org/en/latest/
celery3.1.7文檔:http://docs.jinkan.org/docs/celery/index.html
二、安裝及調試(centos7環境)
由于centos7中不直接支援ifconfig指令,是以,先解決了ifconfig及yum指令支援。此處,我是通過使用Xshell進行的遠端通路,習慣用ifconfig進行檢視ip建立連接配接。Xshell下載下傳位址,支援個人使用者免費使用。
1、Xshell的使用簡單說明
安裝後點選“檔案”--“建立”,即可見下方視窗

在使用者身份認證中,填寫使用者名密碼
在左側生成的圖示處進行輕按兩下,以打開會話界面。支援打開多個。
2、安裝redis作為後續celery的broker使用
下載下傳及解壓安裝(下載下傳安裝的位址建議自行建立一個檔案夾)
wget http://download.redis.io/releases/redis-5.0.5.tar.gz (具體版本根據情況自行選擇)
tar -zxvf redis-5.0.5.tar.gz (如果要解壓到指定位置,後跟 "-C 目标位置",例如:"-C /usr/local")
cd redis-5.0.5 (如果解壓縮到了指定位置,先調轉到解壓的位置後,執行此指令)
make
make過程中遇到了錯誤及解決辦法(參考:CentOS7中編譯安裝redis5.0)
此處問題解決是參考了上述文章,跳轉到deps目錄中,執行 make hiredis lua jemalloc linenoise 指令。
make成功後顯示:
配置檔案修改:
1、注釋掉bind 127.0.0.1,以開發給其他計算機使用
2、關閉保護模式
啟動redis(假設redis解壓到了usr/local/ 目錄下):
/usr/local/redis-5.0.5/src/redis-server /usr/local/redis-5.0.5/redis.conf(服務端啟動)
/usr/local/redis-5.0.5/src/redis-cli (客服端)(新的互動視窗開啟)
服務端啟動成功會出現如下畫面:
注:關閉Redis服務指令:src/resdis-cli shutdown
3、安裝支援的Python包
pip install redis
pip install -U "celery[redis]" # pip install "celery[librabbitmq,redis,auth,msgpack]"
4、建立一個檔案夾,建立一個tasks.py檔案,作為celery application定義任務清單
# -*- coding:utf-8 -*-
from celery import Celery
# 使用redis作為broker及backend
app = Celery('tasks', # 名字
broker='redis://127.0.0.1:6379/1', # broker
backend='redis://127.0.0.1:6379/2') # backend存儲結果
# 建立任務函數
@app.task
def add(x, y):
print("running...")
return x + y
儲存檔案。
注:這裡Celery的第一個參數“名字”,需要確定其與celery釋出任務的“名稱字首(最後一個點前的部分)”保持一緻,否則會報錯“celery.exceptions.NotRegistered”。
5、啟動一個celery worker開始監聽,并等待執行任務
celery -A tasks worker -l info (在tasks.py目錄中執行此指令)
另外,windows環境中,上述語句後還需要參數“-P eventlet”(如果有模拟浏覽器的task,一定不要加,血的教訓)
遇到問題(沒有記錄是啥問題),解決是通過安裝了一個Python的包 pip install 'more-intertools<=5.0.0'。
成功的界面:
6、調用任務
打開新的終端頁面,cd到tasks.py所在的目錄,進入Python互動界面。調用tasks.py中的add函數,通過delay()或apply_async()函數,将任務釋出到broker。
在 worker 監聽終端(執行過 celery -A tasks worker --loglevel=info 的終端頁面),可以看到worker收到并執行了任務。
三、在項目中使用celery
同樣的需要啟動Redis作為broker,方法同上,這裡不再贅述。
建立項目,這裡我使用的是在windows環境中,使用pycharm進行建立項目,同步到遠端伺服器中的方法。pycharm同步伺服器代碼的方法參考:Pycharm連接配接遠端伺服器實作代碼同步
項目結構如下圖:
這裡建立的項目名為celeryProject1,包含檔案:“__init__.py,celery.py(app), config.py(配置檔案),tasks.py(任務)”。
1、celery.py
# -*- coding:utf-8 -*-
from __future__ import absolute_import, unicode_literals
from celery import Celery
# 建立celery執行個體
app = Celery('demo')
# 導入配置檔案
app.config_from_object('celeryProject1.config')
# 自動發現task
app.autodiscover_tasks(['celeryProject1'])
2、config.py (官方文檔)
# -*- coding:utf-8 -*-
"""參數配置見官方文檔說明:http://docs.celeryproject.org/en/latest/userguide/configuration.html#configuration"""
# from kombu import Exchange, Queue
BROKER_URL = 'redis://127.0.0.1:6379/3'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/4'
CELERY_IMPORTS = ('celeryProject1.tasks1',)
3、task1.py
# -*- coding:utf-8 -*-
from celeryProject1.celery import app
# 建立任務
@app.task
def celery_task1():
print "this is celery task1..."
@app.task
def celery_task2(x, y):
print "this is celery task2..."
return x+y
@app.task
def celery_task3(x, y):
print "this is celery task3..."
return x*y
項目檔案完成之後,同步到伺服器中。
使用Xshell工具,打開伺服器互動界面,cd到項目所在的檔案夾。執行:
celery -A celeryProject1 worker -l info
開啟celery任務。
打開新的Xshell互動視窗,cd到項目所在目錄,開啟Python環境:
釋出任務:
此時,我們可以在celery服務監控界面,看到任務的執行情況:
由上可見,celery項目必要的内容包括:app、配置檔案、任務。
需要開啟的視窗界面:Redis(作為broker及backend)、啟動worker的監聽界面(celery -A XXX worker -l info)、Python釋出任務的界面。
四、在Django中使用Celery(Windows)
因為項目本身的緣故,這裡将Django環境搭到了windows環境中。Django環境的建立:Django建立項目記錄。
本部分同時參考了博文:Django中使用Celery;Django+redis+celery 實作異步任務
建立好Django及app後,現在預設你已經啟動了Django服務,後續修改中不需要重新開機Django,它會自動識别修改過的部分。
1、接下來,修改/添加必要的檔案
修改Django的settings檔案添加(這裡我是用的redis作為了broker及backend,配置的ip位址,此處與實際對應):
# ------------------------------------------------------------------------------------
CELERY_BROKER_URL = 'redis://192.168.236.129:6379/1' # Broker配置,使用Redis作為消息中間件
CELERY_RESULT_BACKEND = 'redis://192.168.236.129:6379/2' # BACKEND配置,這裡使用redis
CELERY_RESULT_SERIALIZER = 'json' # 結果序列化方案
# ------------------------------------------------------------------------------------
在與項目的同名python package中添加celery.py檔案:
from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', '項目名稱.settings') # 設定django環境
app = Celery('項目名稱')
app.config_from_object('django.conf:settings')
# 發現任務檔案每個app下的tasks.py
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
修改__init__.py檔案(根據此部分參考的博文):
from __future__ import absolute_import
from .celery import app
__all__ = ['app']
在你的app的python package包中添加tasks.py檔案(一個執行個體,根據你的實際需要定義tasks的内容):
# -*- coding: utf-8 -*-
from DjangoCeleryForCrawler.celery import app
@app.task
def celery_task1():
print "task1 test!"
# return 111
現在,我的項目目錄結構如下(為了不造成不必要的幹擾,将非必須檔案打碼了):
2、開啟Celery服務
好了,我們現在在pycharm點選下邊框處的“terminal”打開一個新的交換視窗,執行
celery -A 你的項目名 worker -l debug
可以觀察到成功開啟了Celery服務。
打開新的Terminal視窗,執行我們定義的tasks中的任務進行測試:
可以看到監視視窗收到并執行了task:
3、通過配置設定路由,在浏覽器頁面調用任務
同此部分“1”,在tasks.py中添加一個task:
@app.task
def test_task1(a, b):
print "task1 test!"
return a+b
在同級目錄下的views.py中, 調用此task:
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import json
import tasks
from django.http import HttpResponse
def test_celery(request, *args, **kwargs):
sums = tasks.test_task1.delay(1, 2)
result = {'code': 0, 'msg': str(sums)}
return HttpResponse(json.dumps(result, ensure_ascii=False), content_type='application/json')
接下來,在項目的urls.py中定義路由:
在浏覽器位址欄輸入此位址進行通路:http://127.0.0.1:8000/test_celery/.
寫在後面:
開始在windows上啟動服務時,加了個 -P eventlet 的參數(不知道在哪個教程看到的)。其他task都沒問題,有一個task包括啟動Chrome浏覽器的代碼,就有了如下報錯:
[2019-11-27 11:03:47,292: ERROR/MainProcess] Task proj.tasks.chrome_test[03e31b41-42b4-46ae-96cc-b856a7f7ae00] raised unexpected: WebD
riverException()
Traceback (most recent call last):
File "d:\software\professional\python27\lib\site-packages\celery\app\trace.py", line 240, in trace_task
R = retval = fun(*args, **kwargs)
File "d:\software\professional\python27\lib\site-packages\celery\app\trace.py", line 438, in __protected_call__
return self.run(*args, **kwargs)
File "E:\MyProjects\PythonProject\CeleryChromeTest\proj\tasks.py", line 23, in chrome_test
options=chrome_options)
File "d:\software\professional\python27\lib\site-packages\selenium\webdriver\chrome\webdriver.py", line 73, in __init__
self.service.start()
File "d:\software\professional\python27\lib\site-packages\selenium\webdriver\common\service.py", line 95, in start
(os.path.basename(self.path), self.start_error_message, str(e)))
WebDriverException: Message: The executable chromedriver.exe needs to be available in the path. Please see https://sites.google.com/a/chromium.org/chromedriver/home
set_nonblocking() on a file object with no setblocking() method (Windows pipes don't support non-blocking I/O)
這個錯誤找個N天, 始終不得其解。誤打誤撞,去掉了“-P eventlet”參數,....,問題解決了。看文檔的該部分,是該任務有阻塞調用(blocking calls)
# celery的一些task參數
1. (time_limit=20)設定任務最大執行時間
達到時間上限若沒有執行完,報錯“Process 'Worker-x' pid:xxxx exited with 'signal 15 (SIGTERM)'”,并退出任務。