天天看點

Python程式設計:定時任務apscheduler架構安裝體系結構簡單示例其他示例關閉日志

APScheduler : Advanced Python

Scheduler

官方文檔:

https://apscheduler.readthedocs.io/en/3.3.1/userguide.html 本文隻做簡單總結,具體示例參考文章底部連結
Python程式設計:定時任務apscheduler架構安裝體系結構簡單示例其他示例關閉日志
圖檔來自: https://blog.csdn.net/somezz/article/details/83104368#_225

安裝

$ pip install apscheduler      

體系結構

schedulers(排程器)
    - BlockingScheduler : 主線程中運作,阻塞線程
    - BackgroundScheduler : 背景線程中運作,不會阻塞線程
    - AsyncIOScheduler
    - GeventScheduler
    - TornadoScheduler
    - TwistedScheduler
    - QtScheduler

triggers(觸發器)
    - date 一次性任務
        - run_date (datetime 或 str) 作業的運作日期或時間
        - timezone (datetime.tzinfo 或 str)    指定時區

    - interval 循環任務
        - weeks (int)   間隔幾周
        - days (int)  間隔幾天
        - hours (int) 間隔幾小時
        - minutes (int)   間隔幾分鐘
        - seconds (int)   間隔多少秒
        - start_date (datetime 或 str) 開始日期
        - end_date (datetime 或 str)   結束日期
        - timezone (datetime.tzinfo 或str) 時區
        - job stores(作業存儲器)

    - cron 定時任務
        - year (int 或 str)    年,4位數字
        - month (int 或 str)   月 (範圍1-12)
        - day (int 或 str) 日 (範圍1-31
        - week (int 或 str)    周 (範圍1-53)
        - day_of_week (int 或 str) 周内第幾天或者星期幾 (範圍0-6 或者 mon,tue,wed,thu,fri,sat,sun)
        - hour (int 或 str)    時 (範圍0-23)
        - minute (int 或 str)  分 (範圍0-59)
        - second (int 或 str)  秒 (範圍0-59)
        - start_date (datetime 或 str) 最早開始日期(包含)
        - end_date (datetime 或 str)   最晚結束時間(包含)
        - timezone (datetime.tzinfo 或str) 指定時區

job stores(作業存儲器)
    - 添加 job
        - add_job()
        - scheduled_job()
    - 移除 job
        - remove_job()
        - job.remove()
    - 修改 job
        - modify_job()
     - 擷取 job 清單
        - get_jobs()
    - 關閉 job
        - scheduler.shutdown(wait=false)

executors(執行器)
    - ProcessPoolExecutor
    - ThreadPoolExecutor
      

簡單示例

from apscheduler.schedulers.blocking import BlockingScheduler

scheduler = BlockingScheduler()

# 方式一:interval 間隔任務
scheduler.add_job(task_name, 'interval', seconds=5)

# 方式二:cron 定時任務 */3 0-10 * * *
scheduler.add_job(main, 'cron', minute="*/3", hour="0-10", day="*", month="*",day_of_week="*")

# 方式三:date 定時任務(執行一次)
scheduler.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5))

scheduler.start()      

說明:

task_name 需要修改為自定義任務函數名稱

其他示例

from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler

scheduler = BlockingScheduler()


def jobA():
    print("{}: {}".format("job A", datetime.now().strftime('%Y-%m-%d %H:%M:%S')))

def jobC():
    print("{}: {}".format("job C", datetime.now().strftime('%Y-%m-%d %H:%M:%S')))


# 添加作業, 方式一,可指定job_id
scheduler.add_job(jobA, 'interval', seconds=5)
scheduler.add_job(jobC, 'interval', minutes=2, id="job_c")


# 添加作業, 方式二
@scheduler.scheduled_job('interval', seconds=5)
def jobB():
    print("{}: {}".format("job B", datetime.now().strftime('%Y-%m-%d %H:%M:%S')))


# 移除作業
scheduler.remove_job(job_id="job_c")
scheduler.start()      

關閉日志

如果日志太多,看不到自己的重要日志,那麼就關閉apscheduler的日志

import logging

logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.ERROR)      

參考

  1. 定時任務架構APScheduler學習詳解
  2. apscheduler的使用
  3. Advanced Python Scheduler
  4. https://apscheduler.readthedocs.io/en/latest/index.html