開始
本文不是針對分析celery或者教學的,隻是在學習之餘對自己在項目中使用的總結,董老師知乎上有篇文章寫的非常好,大家可以移步,我也是看了這篇文章了解了很多。
如果想直接看項目的直接移步github項目。
項目中Celery是使用redis最為代理的,功能主要是:
1. 發送郵件;
2. 定時更新一些有時效性的資料,判斷是否到期;
配置
官方文檔的配置清單在這裡。
下面是項目中的配置,使用的crontab的方式執行周期任務:
# config/settings.py
...
# Celery.
CELERY_BROKER_URL = 'redis://:[email protected]:6379/0' # redis作為消息代理
CELERY_RESULT_BACKEND = CELERY_BROKER_URL # 任務結果存儲在redis
CELERY_ACCEPT_CONTENT = ['json'] # 接受的内容類型
CELERY_TASK_SERIALIZER = 'json' # 任務序列化和反序列化使用json
CELERY_RESULT_SERIALIZER = 'json' # 同上,結果使用json
CELERY_REDIS_MAX_CONNECTIONS = # 允許redis連接配接池用于發取消息的連接配接數
CELERYBEAT_SCHEDULE = { # 任務排程,定期将任務發送到隊列中
'mark-soon-to-expire-credit-cards': {
'task': 'snakeeyes.blueprints.billing.tasks.mark_old_credit_cards',
'schedule': crontab(hour=, minute=)
},
'expire-old-coupons': {
'task': 'snakeeyes.blueprints.billing.tasks.expire_old_coupons',
'schedule': crontab(hour=, minute=)
},
}
docker-compose.yml配置
celery:
build: . # 指定Dockerfile檔案所在位置
command: celery worker -B -l info -A snakeeyes.blueprints.contact.tasks # 運作celery worker,-A是指定應用,-l是指定消息級别為info,-B啟動Beat定時任務
env_file: # 環境配置
- '.env'
volumes: # 挂載路徑
- '.:/snakeeyes'
app.py中celery的構造函數
這個構造函數是根據Flask官方配置寫的,目的是增加上下文支援等。
# 為了後續友善管理和修改,單獨将Celery的include參數值放到這裡,這裡就是所有通過Celery排程的任務清單
CELERY_TASK_LIST = [
'snakeeyes.blueprints.contact.tasks',
'snakeeyes.blueprints.feedback.tasks',
'snakeeyes.blueprints.user.tasks',
'snakeeyes.blueprints.billing.tasks',
]
def create_celery_app(app=None):
"""
Create a new Celery object and tie together the Celery config to the app's
config. Wrap all tasks in the context of the application.
:param app: Flask app
:return: Celery app
"""
app = app or create_app()
celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'],
include=CELERY_TASK_LIST)
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
tasks
項目中的任務一般都放在各個藍圖的tasks.py中。比如重置密碼的郵件:
celery = create_celery_app() # 構造celery執行個體
@celery.task() # 用過裝飾器告訴celery任務函數
def deliver_password_reset_email(user_id, reset_token):
"""
Send a reset password e-mail to a user.
:param user_id: The user id
:type user_id: int
:param reset_token: The reset token
:type reset_token: str
:return: None if a user was not found
"""
user = User.query.get(user_id)
if user is None:
return
ctx = {'user': user, 'reset_token': reset_token}
send_template_message(subject='Password reset from Snake Eyes',
recipients=[user.email],
template='user/mail/password_reset', ctx=ctx) # 實際渲染郵件和發送郵件的函數
return None
實際應用是在User model下面一個重置密碼的類方法:
@classmethod
def initialize_password_reset(cls, identity):
"""
Generate a token to reset the password for a specific user.
:param identity: User e-mail address or username
:type identity: str
:return: User instance
"""
u = User.find_by_identity(identity) # 查找使用者
reset_token = u.serialize_token() # 序列化token
# This prevents circular imports.
from snakeeyes.blueprints.user.tasks import (
deliver_password_reset_email)
deliver_password_reset_email.delay(u.id, reset_token) # 通過調用delay将任務發送
return u
在views裡面使用者觸發是通過送出一個密碼重置的表單,如果使用者在網頁上點選了重置密碼就會送出這個表單,觸發後面的操作,并通過flash告知使用者資訊,将使用者重定向到user.login這個view下面:
@user.route('/account/begin_password_reset', methods=['GET', 'POST'])
@anonymous_required()
def begin_password_reset():
form = BeginPasswordResetForm()
if form.validate_on_submit():
u = User.initialize_password_reset(request.form.get('identity'))
flash('An email has been sent to {0}.'.format(u.email), 'success')
return redirect(url_for('user.login'))
return render_template('user/begin_password_reset.html', form=form)
下面是郵件中通過jinja2渲染的連結:
{{ url_for('user.password_reset', reset_token=reset_token, _external=True) }}
可以看到連結會将使用者定位到user.password_reset這個view。我們再看看這個view是怎麼操作的。
可以看到初次get請求會得到password_reset.html,如果在這個頁面有送出重置密碼的表單的話,會用表單的資料替換掉對應User的資料并儲存,達到修改密碼的效果。
@user.route('/account/password_reset', methods=['GET', 'POST'])
@anonymous_required()
def password_reset():
form = PasswordResetForm(reset_token=request.args.get('reset_token'))
if form.validate_on_submit():
u = User.deserialize_token(request.form.get('reset_token'))
if u is None:
flash('Your reset token has expired or was tampered with.',
'error')
return redirect(url_for('user.begin_password_reset'))
form.populate_obj(u)
u.password = User.encrypt_password(request.form.get('password'))
u.save()
if login_user(u):
flash('Your password has been reset.', 'success')
return redirect(url_for('user.settings'))
return render_template('user/password_reset.html', form=form)
定時任務
上面的task說的是使用者觸發的操作,将任務推送到隊列中。定時任務我們已經在配置中設定好了(有關crontab的參數詳見這裡):
CELERYBEAT_SCHEDULE = { # 任務排程,定期将任務發送到隊列中
'mark-soon-to-expire-credit-cards': { # 名稱
'task': 'snakeeyes.blueprints.billing.tasks.mark_old_credit_cards', # 任務
'schedule': crontab(hour=, minute=) # 每天淩晨執行
},
'expire-old-coupons': {
'task': 'snakeeyes.blueprints.billing.tasks.expire_old_coupons',
'schedule': crontab(hour=, minute=) # 每天零點一分執行
},
}
這裡說明第一個任務,每天淩晨檢查并标記即将到期的信用卡。根據任務路徑可以看到代碼片段:
@celery.task()
def mark_old_credit_cards():
"""
Mark credit cards that are going to expire soon or have expired.
:return: Result of updating the records
"""
return CreditCard.mark_old_credit_cards() # 調用的是CreditCard這個類下面的一個類方法并傳回結果
可以看到和生産任務的方式類似,隻是在配置是将其放入
CELERYBEAT_SCHEDULE
這個字典。