celery是一個分布式的任務排程子產品,那麼celery是如何和分布式挂鈎呢?
celery可以支援多台不同的計算機執行不同的任務或者相同的任務。
如果要說celery的分布式應用的話,就要提到celery的消息路由機制,提到AMQP協定。
具體可以檢視AMQP文檔詳細了解。
簡單了解:
可以有多個"消息隊列"(message Queue),不同的消息可以指定發送給不同的Message Queue,
而這是通過Exchange來實作的,發送消息到"消息隊列"中時,可以指定routiing_key,Exchange通過routing_key來吧消息路由(routes)到不同的"消息隊列"中去。
如圖:
exchange 對應 一個消息隊列(queue),即:通過"消息路由"的機制使exchange對應queue,每個queue對應每個worker
寫個例子:
vim demon3.py
<code>from</code> <code>celery </code><code>import</code> <code>Celery</code>
<code>app </code><code>=</code> <code>Celery()</code>
<code>app.config_from_object(</code><code>"celeryconfig"</code><code>)</code>
<code>@app</code><code>.task</code>
<code>def</code> <code>taskA(x, y):</code>
<code> </code><code>return</code> <code>x </code><code>*</code> <code>y</code>
<code>def</code> <code>taskB(x, y, z):</code>
<code> </code><code>return</code> <code>x </code><code>+</code> <code>y </code><code>+</code> <code>z</code>
<code>def</code> <code>add(x, y):</code>
<code> </code><code>return</code> <code>x </code><code>+</code> <code>y</code>
vim celeryconfig.py
<code>from</code> <code>kombu </code><code>import</code> <code>Queue</code>
<code>BORKER_URL </code><code>=</code> <code>"redis://192.168.48.131:6379/1"</code> <code>#1庫</code>
<code>CELERY_RESULT_BACKEND </code><code>=</code> <code>"redis://192.168.48.131:6379/2"</code> <code>#2庫</code>
<code>CELERY_QUEUES </code><code>=</code> <code>{</code>
<code> </code><code>Queue(</code><code>"default"</code><code>, Exchange(</code><code>"default"</code><code>), routing_key </code><code>=</code> <code>"default"</code><code>),</code>
<code> </code><code>Queue(</code><code>"for_task_A"</code><code>, Exchange(</code><code>"for_task_A"</code><code>), routing_key </code><code>=</code> <code>"for_task_A"</code><code>),</code>
<code> </code><code>Queue(</code><code>"for_task_B"</code><code>, Exchange(</code><code>"for_task_B"</code><code>), routing_key </code><code>=</code> <code>"for_task_B"</code><code>)</code>
<code>}</code>
<code>#路由</code>
<code>CELERY_ROUTES </code><code>=</code> <code>{</code>
<code> </code><code>"demon3.taskA"</code><code>:{</code><code>"queue"</code><code>: </code><code>"for_task_A"</code><code>, </code><code>"routing_key"</code><code>: </code><code>"for_task_A"</code><code>},</code>
<code> </code><code>"demon3.taskB"</code><code>:{</code><code>"queue"</code><code>: </code><code>"for_task_B"</code><code>, </code><code>"routing_key"</code><code>: </code><code>"for_task_B"</code><code>}</code>
下面把兩個腳本導入伺服器:
指定taskA啟動一個worker:
<code># celery -A demon3 worker -l info -n workerA.%h -Q for_task_A</code>
同理:
<code># celery -A demon3 worker -l info -n workerB.%h -Q for_task_B</code>
下面遠端用戶端調用:新檔案
vim remote.py
<code>from</code> <code>demon3 </code><code>import</code> <code>*</code>
<code>r1 </code><code>=</code> <code>taskA.delay(</code><code>10</code><code>, </code><code>20</code><code>)</code>
<code>print</code> <code>(r1.result)</code>
<code>print</code> <code>(r1.status)</code>
<code>r2 </code><code>=</code> <code>taskB.delay(</code><code>10</code><code>, </code><code>20</code><code>, </code><code>30</code><code>)</code>
<code>time.sleep(</code><code>1</code><code>)</code>
<code>prnit (r2.result)</code>
<code>print</code> <code>(r2.status)</code>
<code>#print (dir(r2))</code>
<code>r3 </code><code>=</code> <code>add.delay(</code><code>100</code><code>, </code><code>200</code><code>)</code>
<code>print</code> <code>(r3.result)</code>
<code>print</code> <code>(r3.status) </code><code>#PENDING</code>
看到狀态是PENDING,表示沒有執行,這個是因為沒有celeryconfig.py檔案中指定改route到哪一個Queue中,是以會被發動到預設的名字celery的Queue中,但是我們還沒有啟動worker執行celery中的任務。
下面,我們來啟動一個worker來執行celery隊列中的任務
<code># celery -A tasks worker -l info -n worker.%h -Q celery ##預設的</code>
可以看到這行的結果為success
print(re3.status) #SUCCESS
定時任務:
Celery 與 定時任務
在celery中執行定時任務非常簡單,隻需要設定celery對象中的CELERYBEAT_SCHEDULE屬性即可。
下面我們接着在配置檔案:celeryconfig.py,添加關于 CELERYBEAT_SCHEDULE 變量到腳本中去:
<code>CELERY_TIMEZONE </code><code>=</code> <code>'UTC'</code>
<code>CELERYBEAT_SCHEDULE </code><code>=</code> <code>{</code>
<code> </code><code>'taskA_schedule'</code> <code>: {</code>
<code> </code><code>'task'</code><code>:</code><code>'tasks.taskA'</code><code>,</code>
<code> </code><code>'schedule'</code><code>:</code><code>20</code><code>,</code>
<code> </code><code>'args'</code><code>:(</code><code>5</code><code>,</code><code>6</code><code>)</code>
<code> </code><code>},</code>
<code>'taskB_scheduler'</code> <code>: {</code>
<code> </code><code>'task'</code><code>:</code><code>"tasks.taskB"</code><code>,</code>
<code> </code><code>"schedule"</code><code>:</code><code>200</code><code>,</code>
<code> </code><code>"args"</code><code>:(</code><code>10</code><code>,</code><code>20</code><code>,</code><code>30</code><code>)</code>
<code>'add_schedule'</code><code>: {</code>
<code> </code><code>"task"</code><code>:</code><code>"tasks.add"</code><code>,</code>
<code> </code><code>"schedule"</code><code>:</code><code>10</code><code>,</code>
<code> </code><code>"args"</code><code>:(</code><code>1</code><code>,</code><code>2</code><code>)</code>
<code> </code><code>}</code>
注意格式,否則會有問題
啟動:
celery -A demon3 worker -l info -n workerA.%h -Q for_task_A
celery -A demon3 worker -l info -n workerB.%h -Q for_task_B
celery -A tasks worker -l info -n worker.%h -Q celery
celery -A demon3 beat
本文轉自 聽丶飛鳥說 51CTO部落格,原文連結:http://blog.51cto.com/286577399/2052690