天天看點

Python中任務隊列-芹菜celery的使用

芹菜celery是一個python實作的異步任務隊列,可以用于爬蟲、web背景查詢、計算等等。通過任務隊列,當一個任務來臨時不再傻傻等待。

一、關于celery

他的架構如下:

Python中任務隊列-芹菜celery的使用
  • Broker

我們的生産者建立任務後會進入celery的任務排程隊列中間件Broker,Broker通過排程規則将消息(任務)排程消息隊列,Broker依賴第三方隊列消息代理如

rabbitmq

redis

等。

  • Worker

廣大勞動者,盯着消息隊列,當隊列中有消息時把它拿過來給處理了。

  • Backend

用于結果存儲經worker處理的結果,比如常用的資料庫等。

使用celery

在本文中咱們使用

rabbitmq

(celery推薦)作為消息代理中間件。

我們建立的celery目錄如下

learn_celery/
...celery_env/
...celery.py
...my_task1.py
...my_task2.py
...task1_run.py
...task2_run.py
           
1. 建立虛拟環境并安裝celery、flower(web監控),這裡不做贅述。
2.安裝咱們的消息隊列中間件

rabbitmq

這裡以docker的方式運作并配置,指定主機名為

rabbit

(rabbitmq是以主機名來通路的,是以這是必須的),容器名稱為

celery_rabbitmq

docker run -d -p 5672:5672 -h rabbit --name celery_rabbitmq rabbitmq
           

添加用于celery通路的使用者,以及配置

configure

write

read

權限,在下面我們配置rabbit_user擁有所有配置、寫入和讀取權限。

docker exec -it celery_rabbitmq rabbitmqctl add_user rabbit_user rabbit_pass
docker exec -it celery_rabbitmq rabbitmqctl add_vhost rabbit_vhost
docker exec -it celery_rabbitmq rabbitmqctl set_user_tags rabbit_user celery
docker exec -it celery_rabbitmq rabbitmqctl  set_permissions -p rabbit_vhost rabbit_user ".*" ".*" ".*"
           
3.建立celery應用
#celery.py
from celery import Celery

broker_rabbitmq="amqp://rabbit_user:rabbit_pass@i-k9pwet2d/rabbit_vhost"
app=Celery("learn_celery",broker=broker_rabbitmq,backend="rpc://",include=["learn_celery.my_task2","learn_celery.my_task2"])
           

我們通過建立app來執行個體化Celery,項目包的名稱為

learn_celery

,通過

broker_rabbitmq

來連接配接rabbitmq,rabbitmq的amqp協定格式為

amqp://userid:password@hostname:port/virtual_host
           

由于我們是在docker中啟動的rabbitmq,是以我們的hostname應該為主控端的hostname。

指定後端通過rpc回傳資料,include加載帶worker處理的任務

learn_celery.my_task1

learn_celery.my_task2

4.建立兩個任務(消息)
#my_task1.py
from .celery import app
import time

@app.task
def args_add1(x,y):
    print("start task no.1 now!")
    time.sleep(10)
    print("task no.1 end!")
    return x+y

#my_task12.py
from .celery import app
import time

@app.task
def args_add2(x,y):
    print("start task no.2 now!")
    time.sleep(20)
    print("task no.2 end!")
    return x+y
           

在這裡我們導入了celery中的app,并用它來裝飾我們的方法

args_add

,在args_add中模拟任務處理時間分别為10s、20s然後傳回結果。

5.發送任務給celery
#tasks1_run.py
from .my_task1 import args_add1
import time

reslut=args_add1.delay(11,22)
print("task over?{}".format(reslut.ready()))
print("task reslut:{}".format(reslut.result))
time.sleep(15)
print("task over?{}".format(reslut.ready()))
print("task reslut:{}".format(reslut.result))

#tasks2_run.py
from .my_task2 import args_add2
import time

reslut=args_add2.delay(33,44)
print("task over?{}".format(reslut.ready()))
print("task reslut:{}".format(reslut.result))
time.sleep(25)
print("task over?{}".format(reslut.ready()))
print("task reslut:{}".format(reslut.result))

           

關于任務的

delay

,官方文檔(參考)是這樣描述的,我把它了解為發送任務給celery或者celery調用待進來的任務。

Python中任務隊列-芹菜celery的使用

reslut.ready()

傳回任務執行是否執行完成

True

or

False

reslut.result

傳回任務執行結果

我們在任務進入celery和結束分别檢查一次。

二、看看結果

1.啟動worker

進入learn_celery的父目錄。啟動learn_celery的這個應用worker,并指定并發數為10個

celery -A learn_celery worker --loglevel=info --concurrency=10
           

若celery連接配接rabbitmq正常,我們可以看到如下的info

Python中任務隊列-芹菜celery的使用
2.執行任務

為了便于觀察,我們另外開啟一個視窗2,到learn_celery父目錄運作task1_run子產品

python -m learn_celery.tasks1_run
           
Python中任務隊列-芹菜celery的使用

開啟視窗3,到learn_celery父目錄運作task2_run子產品

python -m learn_celery.tasks2_run
           
Python中任務隊列-芹菜celery的使用

可以看到經過各自任務的等待時間後,兩個任務都順利執行結束,并得到結果,接下來我們到worker上看一下info

Python中任務隊列-芹菜celery的使用

由于celery的并發性,收到任務馬上被調入執行,任務1耗時10s結果為33,任務2耗時20s結果為77

三、使用Flower監控celery

1.啟動flower
celery -A learn_celery flower
           
2. 檢視web監控 http://ip:5555

Tasks

中可以檢視到目前任務隊列的狀态、參數、接收和啟動、執行時間。

Python中任務隊列-芹菜celery的使用

Dashborad

中檢視目前worker節點的相關資訊

Python中任務隊列-芹菜celery的使用

文章有不足的地方歡迎指出。

歡迎收藏、點贊、提問。關注頂級飲水機管理者,除了管燒熱水,有時還做點别的。

NEXT

  • celery的深入了解
  • celery在django中的使用