reference:
http://skyrover.me/post/19/
Celery是一個實時處理和任務排程的分布式任務隊列。任務就是消息,消息中的有效載荷中包含要執行任務需要的全部資料。
這是其使用場景:
- web應用,需要較長時間完成的任務,就可以作為任務交給celery異步執行,執行完傳回給使用者。
- 網站的定時任務
- 異步執行的其他任務。比如清理/設定緩存
有以下特點:
- 任務執行情況
- 管理背景管理任務
- 任務和配置管理相關聯
- 多程序,EventLet和Gevent三種模式并發執行
- 錯誤處理機制
- 任務原語,任務分組,拆分和調用鍊
Celery的架構
- Celery Beat,任務排程器,Beat程序會讀取配置檔案的内容,周期性的将配置中到期需要執行的任務發送給任務隊列
- Celery Worker,執行任務的消費者
- Broker,消息代理,接受生産者的任務消息,存進隊列然後按序發送給消費者
- Producer,定時任務或者調用了API産生任務交給任務隊列進行處理
- Result Backend,任務處理完後儲存狀态資訊和結果
整體機制就是,任務釋出者(一般是web應用)或者任務排程,即定時任務将任務釋出到消息代理上(使用Redis或者RabbitMQ),然後消息代理将任務按序發送給Worker執行,Worker執行完後将結果存儲到Backend中,也可以用Redis。
Celery的序列化
一般使用json, yaml, msgpack,其中msgpack是一個二進制的json序列化方案,比json資料結構更小,更快。序列化的作用是在用戶端和消費者之間傳輸資料的途中需要序列化和反序列化。
一個例子
- 主程式(執行個體化Celery)
一種方式,直接進行,簡單粗暴
from celery import Celery
from config import REDISHOST, REDISDB, REDISPORT
celery = Celery('backend', broker='redis://%s:%d/%d' % (REDISHOST, REDISPORT, REDISDB),backend='redis://%s:%d/%d' % (REDISHOST, REDISPORT, REDISDB))
另一種方式,使用配置檔案,類似于Flask應用執行個體的方式吧
from celery import Celery
app = Celery('proj', include=['proj.tasks'])
app.config_from_object('proj.celeryconfig')
# celery.conf.update(app.config)
- Celery配置檔案
如果都在執行個體化的時候指定好了配置,那麼就不需要了,如果需要指定額外的參數,那麼就可以放在配置檔案裡,以下是幾個常用的參數:
# 導入tasks
from celery.schedules import crontab
CELERY_IMPORTS = ("tasks", "graph_data_tasks")
BROKER_URL = ''
CELERY_RESULT_BACKEND = ''
CELERY_TASK_SERIALIZER = 'msgpack'
CELERY_RESULT_SERIALIZER = 'json'
# A value of None or 0 means results will never expire
CELERY_TASK_RESULT_EXPIRES = * * # 任務結果過期時間
CELERY_ACCEPT_CONTENT = ['json', 'msgpack']
# celery worker的并發數
CELERYD_CONCURRENCY =
# 使用任務排程,使用Beat程序自動生成任務
CELERYBEAT_SCHEDULE = {
'graph_data': {
'task': 'graph_data_tasks.sync_graph',
'schedule': timedelta(minutes=),
'args': ()
},
'rank_for_guchong': {
'task': 'backend.celerytasks.rank_for_guchong.calc_ability_schedule',
'schedule': crontab(hour=, minute=),
'args': ()
}
}
- Celery的tasks
将執行個體化的celery執行個體導入進來之後,使用celery的裝飾器task()即可完成tasks的注冊
@celery.task()
def calc_capacity(strategy_id, start_date, end_date, assets, flows, stocks, output_path):
c = CalcAbility(strategy_id, assets, flows, stocks, output_path)
c.run(start_date, end_date)
return {'current': , 'total': , 'status': 'success', 'reason': ''}
- 使用tasks
@be_data.route('/data/graph', methods=['GET'])
# @token_required
def gen_megapool_from_summary():
'''直接從summary檔案計算這些彙總資訊'''
pie_tuple = Cache.get_summary_data(name='summary_data', data_type=float)
meta = {}
if not pie_tuple:
task = sync_graph.apply_async(args=[])
return gen_response(data=meta, message="success", errorcode=),
elif pie_tuple[] != db_clients.hsize('summary_ability'):
sync_graph.apply_async(args=[])
return gen_response(data=meta, message="success", errorcode=),
另外可以通過task.id來擷取task的運作狀态
task = calc_ability.AsyncResult(task_id)
response = {
'current': task.info.get('current', ),
'total': task.info.get('total', ),
'time': task.info.get('time', ),
'start_time': start_time,
'fundid_nums': calc_task.fundid_nums,
'status': task.info.get('status', ''),
'reason': task.info.get('reason', '')
}
- Celery的執行
celery --app=backend.celery worker --loglevel=DEBUG --config=celery_settings --beat
- 指定隊列
Celery通常使用預設名為celery的隊列來存放任務,可以通過CELERY_DEFAULT_QUEUE修改,可以使用優先級不同的隊列來確定高優先級的任務不需要等待就可以得到相應。
from kombu import Queue
CELERY_QUEUES = (
Queue('default', routing_key='task.#'),
Queue('web_tasks', routing_key='web.#')
)
# 預設交換機名字為tasks
CELERY_DEFAULT_EXCHANGE = 'tasks'
# 交換類型是topic
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
# 預設的路由鍵是task.default
CELERY_DEFAULT_ROUTING_KEY = 'task.default'
CELERY_ROUTES = {
'projq.tasks.add': {
'queue': 'web_tasks',
'routing_key': 'web.add',
}
}
指定隊列方式啟動消費者程序:
celery -A projq worker -Q web_tasks -l info
celery中的任務綁定
任務可以通過 app.task 裝飾器進行注冊,需要注意的一點是,當函數有多個裝飾器時,為了保證 Celery 的正常運作,app.task 裝飾器需要在最外層。其中有一個bind 參數,當設定了 bind 參數,則會為這個任務綁定一個 Task 執行個體,通過第一個 self 參數傳入,可以通過這個 self 參數通路到 Task 對象的所有屬性。綁定任務用于嘗試重新執行任務(使用app.Task.retry()),綁定了任務就可以通路目前請求的任務資訊和任何你添加到指定任務基類中的方法。也可以使用
self.update_state()
方法來更新狀态資訊
@celery.task(bind=True)
def calc_ability(self, fundIds, start_date, end_date, increment_end_date, maxsize):
start_date = parse_date(start_date)
end_date = parse_date(end_date)
self.update_state(state='PROGRESS',
meta={'current': index, 'total': total,
'time': used_time.used_time() / , 'status': 'running', 'reason': ''})
停止任務
rs = add.delay(, )
rs.revoke() # 隻是撤銷,如果任務已經在執行,則撤銷無效
rs.task_id # 任務id
app.control.revoke(rs.task_id) # 通過task_id撤銷
app.control.revoke(rs.task_id, terminate=True) # 撤銷正在執行的任務,預設使用TERM信号
app.control.revoke(rs.task_id, terminate=True, signal='SIGKILL') # 撤銷正在執行的任務,使用KILL信号
# 而在最新的celery3版本中,這樣停止一個任務
celery.control.terminate(task_id)
# 其實本質還是調用了revoke
return self.revoke(task_id, destination=destination, terminate=True, signal=signal, **kwargs)
Celery監控工具Flower
pip install flower
需要在celery_settings中指定
CELERY_SEND_TASK_SENT_EVENT = True
,然後和啟動celery同樣的目錄下運作
flower -A backend.celery --port=5555
,即可看到管理界面了。通路
http://localhost:5555
使用自動擴充
celery -A proj worker -l info --autoscale=6,3
表示平時保持3個程序,最大并發程序數可以達到6個。
在Celery tasks裡面使用多程序
from celery import Celery
import time
import multiprocessing as mp
app = Celery('proj', broker='amqp://admin:[email protected]:5672', include="tasks")
def test_func(i):
print "beg...:", i
time.sleep()
print "....end:", i
return i *
@app.task
def fun_1(n):
curr_proc = mp.current_process()
curr_proc.daemon = False
p = mp.Pool(mp.cpu_count())
curr_proc.daemon = True
for i in range(n):
p.apply_async(test_func, args=(i,))
p.close()
p.join()
return
if __name__ == "__main__":
app.start()
直接啟動多程序是肯定不可以的,因為是守候程序curr_proc.daemon=True,是以啟多程序之前主動設定為非守候程序curr_proc.daemon=False,啟動了以後再設為守候程序