天天看點

學習Celery在項目中使用的總結開始配置tasks定時任務

開始

本文不是針對分析celery或者教學的,隻是在學習之餘對自己在項目中使用的總結,董老師知乎上有篇文章寫的非常好,大家可以移步,我也是看了這篇文章了解了很多。

如果想直接看項目的直接移步github項目。

項目中Celery是使用redis最為代理的,功能主要是:

1. 發送郵件;

2. 定時更新一些有時效性的資料,判斷是否到期;

配置

官方文檔的配置清單在這裡。

下面是項目中的配置,使用的crontab的方式執行周期任務:

# config/settings.py
...
# Celery.
CELERY_BROKER_URL = 'redis://:[email protected]:6379/0'  # redis作為消息代理
CELERY_RESULT_BACKEND = CELERY_BROKER_URL  # 任務結果存儲在redis
CELERY_ACCEPT_CONTENT = ['json']  # 接受的内容類型
CELERY_TASK_SERIALIZER = 'json'  # 任務序列化和反序列化使用json
CELERY_RESULT_SERIALIZER = 'json'  # 同上,結果使用json
CELERY_REDIS_MAX_CONNECTIONS =   # 允許redis連接配接池用于發取消息的連接配接數
CELERYBEAT_SCHEDULE = {  # 任務排程,定期将任務發送到隊列中 
    'mark-soon-to-expire-credit-cards': {
        'task': 'snakeeyes.blueprints.billing.tasks.mark_old_credit_cards',
        'schedule': crontab(hour=, minute=)
    },
    'expire-old-coupons': {
        'task': 'snakeeyes.blueprints.billing.tasks.expire_old_coupons',
        'schedule': crontab(hour=, minute=)
    },
}
           

docker-compose.yml配置

celery:
  build: .  # 指定Dockerfile檔案所在位置
  command: celery worker -B -l info -A snakeeyes.blueprints.contact.tasks  # 運作celery worker,-A是指定應用,-l是指定消息級别為info,-B啟動Beat定時任務
  env_file:  # 環境配置
    - '.env'
  volumes:  # 挂載路徑
    - '.:/snakeeyes'
           

app.py中celery的構造函數

這個構造函數是根據Flask官方配置寫的,目的是增加上下文支援等。

# 為了後續友善管理和修改,單獨将Celery的include參數值放到這裡,這裡就是所有通過Celery排程的任務清單
CELERY_TASK_LIST = [
    'snakeeyes.blueprints.contact.tasks',
    'snakeeyes.blueprints.feedback.tasks',
    'snakeeyes.blueprints.user.tasks',
    'snakeeyes.blueprints.billing.tasks',
]


def create_celery_app(app=None):
    """
    Create a new Celery object and tie together the Celery config to the app's
    config. Wrap all tasks in the context of the application.

    :param app: Flask app
    :return: Celery app
    """
    app = app or create_app()

    celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'],
                    include=CELERY_TASK_LIST)
    celery.conf.update(app.config)
    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)

    celery.Task = ContextTask
    return celery
           

tasks

項目中的任務一般都放在各個藍圖的tasks.py中。比如重置密碼的郵件:

celery = create_celery_app()  # 構造celery執行個體


@celery.task()  # 用過裝飾器告訴celery任務函數
def deliver_password_reset_email(user_id, reset_token):
    """
    Send a reset password e-mail to a user.

    :param user_id: The user id
    :type user_id: int
    :param reset_token: The reset token
    :type reset_token: str
    :return: None if a user was not found
    """
    user = User.query.get(user_id)

    if user is None:
        return

    ctx = {'user': user, 'reset_token': reset_token}

    send_template_message(subject='Password reset from Snake Eyes',
                          recipients=[user.email],
                          template='user/mail/password_reset', ctx=ctx)  # 實際渲染郵件和發送郵件的函數

    return None
           

實際應用是在User model下面一個重置密碼的類方法:

@classmethod
    def initialize_password_reset(cls, identity):
        """
        Generate a token to reset the password for a specific user.

        :param identity: User e-mail address or username
        :type identity: str
        :return: User instance
        """
        u = User.find_by_identity(identity)  # 查找使用者
        reset_token = u.serialize_token()  # 序列化token

        # This prevents circular imports.
        from snakeeyes.blueprints.user.tasks import (
            deliver_password_reset_email)
        deliver_password_reset_email.delay(u.id, reset_token)  # 通過調用delay将任務發送

        return u
           

在views裡面使用者觸發是通過送出一個密碼重置的表單,如果使用者在網頁上點選了重置密碼就會送出這個表單,觸發後面的操作,并通過flash告知使用者資訊,将使用者重定向到user.login這個view下面:

@user.route('/account/begin_password_reset', methods=['GET', 'POST'])
@anonymous_required()
def begin_password_reset():
    form = BeginPasswordResetForm()

    if form.validate_on_submit():
        u = User.initialize_password_reset(request.form.get('identity'))

        flash('An email has been sent to {0}.'.format(u.email), 'success')
        return redirect(url_for('user.login'))

    return render_template('user/begin_password_reset.html', form=form)
           

下面是郵件中通過jinja2渲染的連結:

{{ url_for('user.password_reset', reset_token=reset_token, _external=True) }}
           

可以看到連結會将使用者定位到user.password_reset這個view。我們再看看這個view是怎麼操作的。

可以看到初次get請求會得到password_reset.html,如果在這個頁面有送出重置密碼的表單的話,會用表單的資料替換掉對應User的資料并儲存,達到修改密碼的效果。

@user.route('/account/password_reset', methods=['GET', 'POST'])
@anonymous_required()
def password_reset():
    form = PasswordResetForm(reset_token=request.args.get('reset_token'))

    if form.validate_on_submit():
        u = User.deserialize_token(request.form.get('reset_token'))

        if u is None:
            flash('Your reset token has expired or was tampered with.',
                  'error')
            return redirect(url_for('user.begin_password_reset'))

        form.populate_obj(u)
        u.password = User.encrypt_password(request.form.get('password'))
        u.save()

        if login_user(u):
            flash('Your password has been reset.', 'success')
            return redirect(url_for('user.settings'))

    return render_template('user/password_reset.html', form=form)
           

定時任務

上面的task說的是使用者觸發的操作,将任務推送到隊列中。定時任務我們已經在配置中設定好了(有關crontab的參數詳見這裡):

CELERYBEAT_SCHEDULE = {  # 任務排程,定期将任務發送到隊列中 
    'mark-soon-to-expire-credit-cards': {  # 名稱
        'task': 'snakeeyes.blueprints.billing.tasks.mark_old_credit_cards',  # 任務
        'schedule': crontab(hour=, minute=)  # 每天淩晨執行
    },
    'expire-old-coupons': {
        'task': 'snakeeyes.blueprints.billing.tasks.expire_old_coupons',
        'schedule': crontab(hour=, minute=)  # 每天零點一分執行
    },
}
           

這裡說明第一個任務,每天淩晨檢查并标記即将到期的信用卡。根據任務路徑可以看到代碼片段:

@celery.task()
def mark_old_credit_cards():
    """
    Mark credit cards that are going to expire soon or have expired.

    :return: Result of updating the records
    """
    return CreditCard.mark_old_credit_cards()  # 調用的是CreditCard這個類下面的一個類方法并傳回結果
           

可以看到和生産任務的方式類似,隻是在配置是将其放入

CELERYBEAT_SCHEDULE

這個字典。