天天看點

celery+Rabbit MQ實戰記錄1.安裝 Rabbit MQ2.安裝celery3.任務過期3.參考

基于以前的一篇文章,celery+Rabbit MQ的安裝和使用,

本文更加詳細的介紹如何安裝和使用celey, Rabbit MQ。

并記錄在使用celery時遇到的一些問題。

1.安裝 Rabbit MQ

在 OS X上,直接執行如下指令:

$ brew install rabbitmq
           

其他作業系統下的安裝可以參考安裝 RabbitMQ

啟動RabbitMQ

$ sudo rabbitmq-server
           

你也可以添加 -detached 屬性來讓它在背景運作(注意:隻有一個破折号):

$ sudo rabbitmq-server -detached
           

檢視RabbitMQ狀态

$ sudo rabbitmqctl status
           

停止RabbitMQ

永遠不要用 kill 停止 RabbitMQ 伺服器,而是應該用 rabbitmqctl 指令:

$ sudo rabbitmqctl stop
           

添加使用者

預設使用者guest,密碼guest,隻允許本地通路,如需遠端通路,需要設定.

$ sudo rabbitmqctl add_user test 123456
Adding user "test" ...
           

添加虛拟主機,并賦予使用者test權限

$ sudo rabbitmqctl add_vhost myvhost
Adding vhost "myvhost" ...
           
$ sudo rabbitmqctl set_permissions -p myvhost test ".*" ".*" ".*"
Setting permissions for user "test" in vhost "myvhost" ...
           

2.安裝celery

2.1 建立虛拟環境,并安裝celery

$ mkdir celery_demo
$ cd celery_demo
$ virtualenv -p python3 venv3
           
$ ./venv3/bin/pip install celery
           

2.2 配置celery

建立配置檔案 celeryconfig.py,裡面包含

BROKER_URL

CELERYD_LOG_FORMAT

CELERY_ROUTES

.

# celeryconfig.py

RABBIT_MQ = {
    'HOST': '127.0.0.1',
    'PORT': 5672,
    'USER': 'test',
    'PASSWORD': '123456'
}

# broker
BROKER_URL = 'amqp://%s:%s@%s:%s/myvhost' % (RABBIT_MQ['USER'], RABBIT_MQ['PASSWORD'], RABBIT_MQ['HOST'], RABBIT_MQ['PORT'])

# celery日志格式
CELERYD_LOG_FORMAT = '[%(asctime)s] [%(levelname)s] %(message)s'

CELERY_ROUTES = {
        'demo_task.add': {'queue': 'sunday'},
}

           

其中,參數定義如下:

  • BROKER_URL

    指定了broker資訊,即消息隊列的位址。
  • CELERYD_LOG_FORMAT

    指定了日志格式。
  • CELERY_ROUTES

    指定了路由資訊,即調用

    demo_task.add

    後,消息具體放入哪個隊列,這裡是隊列名稱為

    sunday

2.3 啟動消費者

消費者

消費者代碼如下:

# demo_task.py
from celery import Celery


app = Celery("orange", backend='amqp')
app.config_from_object("celeryconfig")

@app.task
def add(x, y):
        return x + y
           

首先,建立

Celery

執行個體,從檔案中讀取配置。

接着,定義task。

其中,在建立

Celery

執行個體時,參數

backend

指定了結果存儲後端,用于追蹤task執行狀态和結果。這裡使用

amqp

,即使用RabbitMQ儲存結果。預設情況下,

backend

參數是關閉的。

使用

celery worker

啟動消費者

./venv3/bin/celery worker  -A demo_task -Q sunday --loglevel=info  -f app.log


[email protected] v4.3.0 (rhubarb)

Darwin-18.2.0-x86_64-i386-64bit 2019-04-20 17:49:40

[config]
.> app:         orange:0x109ca0400
.> transport:   amqp://test:**@127.0.0.1:5672/myvhost
.> results:     disabled://
.> concurrency: 8 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)

[queues]
.> sunday           exchange=sunday(direct) key=sunday


[tasks]
  . demo_task.add
           

其中,參數定義如下:

  • 參數

    -A

    是app name,即定義celery的檔案。
  • 參數

    -Q

    指定了隊列的名稱,如果不指定,預設為

    celery

  • 參數

    -f

    指定了日志列印檔案。

可以通過以下指令檢視更多幫助資訊:

  • celery help

    檢視celery的選項
  • celery worker --help

    檢視worker的選項

2.4 啟動生産者

生産者

代碼如下api.py所示:

# api.py
from demo_task import add


print("start...")

result = add.apply_async((1, 2))

print("result:", result)
print(result.ready())

print("end...")
           

代碼中,将

1

2

參數放入消息隊列。

celery配置

backend

參數後, 調用任務時,會傳回

AsyncResult

執行個體。

AsyncResult

ready()

方法可以檢視任務是否完成處理。

執行api.py

執行結果分兩種情況:worker啟動和沒有啟動。

當celery worker啟動的時候,結果如下:

./venv3/bin/python api.py
start...
result: 60dd0ab6-5fa2-4190-954e-584eb519384f
True
end...
           

可以看到,task很快得到執行,結果狀态為True。

當celery worker沒有啟動的時候,結果如下:

./venv3/bin/python api.py
start...
result: 7280466f-73cd-44ec-85e7-5ad4f079a797
False
end...
           

可以看到,結果狀态為False。

直接擷取結果

可以使用

get()

函數等待任務完成,但這很少使用,因為它把異步調用變成了同步調用。

對于

get()

的使用,按照是否設定逾時參數,分為兩種:不使用逾時參數timeout和使用逾時參數timeout。

(1)不使用逾時參數timeout

預設timeout為

None

,不會逾時,而是阻塞住。

通過tcpdump抓包,可以發現,

get

調用後,每隔30s,RabbitMQ(server端)向生産者API(client端)發送心跳包,共發了5次。

最後,過了3分鐘,RabbitMQ(server端)關閉連接配接,生産者API(client端)也關閉連接配接,報錯:

start...
result: aae4fb20-823c-4674-8bff-28429f14d5d7
False
Traceback (most recent call last):
  File "api.py", line 13, in <module>
    result.get()
  File "/workspace/celery_demo/venv3/lib/python3.6/site-packages/celery/result.py", line 226, in get
    on_message=on_message,
  File "/workspace/celery_demo/venv3/lib/python3.6/site-packages/celery/backends/base.py", line 496, in wait_for_pending
    no_ack=no_ack,
  File "/workspace/celery_demo/venv3/lib/python3.6/site-packages/celery/backends/amqp.py", line 146, in wait_for
    on_interval=on_interval)
  File "/workspace/celery_demo/venv3/lib/python3.6/site-packages/celery/backends/amqp.py", line 223, in consume
    conn, consumer, timeout, on_interval)[task_id]
  File "/workspace/celery_demo/venv3/lib/python3.6/site-packages/celery/backends/amqp.py", line 204, in drain_events
    wait(timeout=1)
  File "/workspace/celery_demo/venv3/lib/python3.6/site-packages/kombu/connection.py", line 315, in drain_events
    return self.transport.drain_events(self.connection, **kwargs)
  File "/workspace/celery_demo/venv3/lib/python3.6/site-packages/kombu/transport/pyamqp.py", line 103, in drain_events
    return connection.drain_events(**kwargs)
  File "/workspace/celery_demo/venv3/lib/python3.6/site-packages/amqp/connection.py", line 500, in drain_events
    while not self.blocking_read(timeout):
  File "/workspace/celery_demo/venv3/lib/python3.6/site-packages/amqp/connection.py", line 505, in blocking_read
    frame = self.transport.read_frame()
  File "/workspace/celery_demo/venv3/lib/python3.6/site-packages/amqp/transport.py", line 256, in read_frame
    frame_header = read(7, True)
  File "/workspace/celery_demo/venv3/lib/python3.6/site-packages/amqp/transport.py", line 448, in _read
    raise IOError('Server unexpectedly closed connection')
OSError: Server unexpectedly closed connection
           

(2)使用逾時參數timeout

如果使用

timeout

參數,

get(timeout)

調用發起後, 當超過

timeout

指定的時間仍然沒有獲得結果,會逾時報錯。

例如,調用

get(timeout=1)

... ...

# sync
print(result.get(timeout=1))

print("end...")
           

如果worker沒有啟動或者worker處理逾時,

get

會報逾時錯誤:

start...
result: 190ad871-aae2-48ac-a18e-962cd2d537b7
False
Traceback (most recent call last):
  File "/workspace/celery_demo/venv3/lib/python3.6/site-packages/celery/backends/amqp.py", line 146, in wait_for
    on_interval=on_interval)
  File "/workspace/celery_demo/venv3/lib/python3.6/site-packages/celery/backends/amqp.py", line 223, in consume
    conn, consumer, timeout, on_interval)[task_id]
  File "/workspace/celery_demo/venv3/lib/python3.6/site-packages/celery/backends/amqp.py", line 202, in drain_events
    raise socket.timeout()
socket.timeout

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "api.py", line 14, in <module>
    print(result.get(timeout=1))
  File "/workspace//celery_demo/venv3/lib/python3.6/site-packages/celery/result.py", line 226, in get
    on_message=on_message,
  File "/workspace/celery_demo/venv3/lib/python3.6/site-packages/celery/backends/base.py", line 496, in wait_for_pending
    no_ack=no_ack,
  File "/workspace/celery_demo/venv3/lib/python3.6/site-packages/celery/backends/amqp.py", line 148, in wait_for
    raise TimeoutError('The operation timed out.')
celery.exceptions.TimeoutError: The operation timed out.
           

2.5 檢視結果

檢視app.log中的日志

[2019-04-20 20:53:59,327] [INFO] Connected to amqp://test:**@127.0.0.1:5672/myvhost
[2019-04-20 20:53:59,345] [INFO] mingle: searching for neighbors
[2019-04-20 20:54:00,379] [INFO] mingle: all alone
[2019-04-20 20:54:00,441] [INFO] [email protected] ready.
[2019-04-20 20:55:46,498] [INFO] Received task: demo_task.add[1c8d47bd-449d-4bd9-b4db-819777081d23]
[2019-04-20 20:55:46,561] [INFO] Task demo_task.add[1c8d47bd-449d-4bd9-b4db-819777081d23] succeeded in 0.060540573904290795s: 3
           

3.任務過期

調用

apply_async

時使用參數

expires

,則表示任務有逾時時間,超過這個時間後,task不會得到執行。

result = add.apply_async((1, 2), expires=10)
           

當worker沒有啟動或者其他異常情況下,會出現任務逾時,不被執行。

檢視日志,可以看到task過期,不會被執行。

[2019-04-20 21:32:45,828] [INFO] Received task: demo_task.add[12691c27-3f2f-4c96-9b4e-54636e20d0eb]   expires:[2019-04-20 13:31:51.030013+00:00]
[2019-04-20 21:32:45,829] [INFO] Discarding revoked task: demo_task.add[12691c27-3f2f-4c96-9b4e-54636e20d0eb]
           

3.參考

Celery 初步

calling API

Configuration and defaults

expiration