天天看點

python任務排程之schedule

簡介:

從最簡單的栗子看起:

import schedule
import time
 
def job():
    print("I'm working...")
 
schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every(5).to(10).days.do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
 
while True:
    schedule.run_pending()
    time.sleep(1)
           

這是在pypi上面給出的示例。這個栗子簡單到我不需要怎麼解釋。而且,通過這個栗子,我們也可以知道,schedule其實就隻是個定時器。在while True死循環中,schedule.run_pending()是保持schedule一直運作,去查詢上面那一堆的任務,在任務中,就可以設定不同的時間去運作。跟crontab是類似的。

參考官網:https://schedule.readthedocs.io/en/stable/

但是,如果是多個任務運作的話,實際上它們是按照順序從上往下挨個執行的。如果上面的任務比較複雜,會影響到下面任務的運作時間。比如我們這樣:

import datetime
import schedule
import time
 
def job1():
    print("I'm working for job1")
    time.sleep(2)
    print("job1:", datetime.datetime.now())
 
def job2():
    print("I'm working for job2")
    time.sleep(2)
    print("job2:", datetime.datetime.now())
 
def run():
    schedule.every(10).seconds.do(job1)
    schedule.every(10).seconds.do(job2)
 
    while True:
        schedule.run_pending()
        time.sleep(1)
           

接下來你就會發現,兩個定時任務并不是10秒運作一次,而是12秒。是的。由于job1和job2本身的執行時間,導緻任務延遲了。

其實解決方法也很簡單:用多線程/多程序。不要幼稚地問我“python中的多線程不是沒有用嗎?”這是兩碼事。開了一條線程,就把job獨立出去運作了,不會占主程序的cpu時間,schedule并沒有花掉執行一個任務的時間,它的開銷隻是開啟一條線程的時間,是以,下一次執行就變成了10秒後而不是12秒後。

import datetime
import schedule
import threading
import time
 
def job1():
    print("I'm working for job1")
    time.sleep(2)
    print("job1:", datetime.datetime.now())
 
def job2():
    print("I'm working for job2")
    time.sleep(2)
    print("job2:", datetime.datetime.now())
 
def job1_task():
    threading.Thread(target=job1).start()
 
def job2_task():
    threading.Thread(target=job2).start()
 
def run():
    schedule.every(10).seconds.do(job1_task)
    schedule.every(10).seconds.do(job2_task)
 
    while True:
        schedule.run_pending()
        time.sleep(1)
           

就是這麼簡單。

唯一要注意的是,這裡面job不應當是死循環類型的,也就是說,這個線程應該有一個執行完畢的出口。一是因為線程萬一僵死,會是非常棘手的問題;二是下一次定時任務還會開啟一個新的線程,執行次數多了就會演變成災難。如果schedule的時間間隔設定得比job執行的時間短,一樣會線程堆積形成災難,是以,還是需要注意一下的。

schedule源碼學習

源碼分析就圍繞這三個類:CancelJob,Scheduler,job

CancelJob
class CancelJob(object):
    pass
           

可以看到就是一個空類, 這個類的作用就是當你的job執行函數傳回一個CancelJob類型的對象,那麼執行完後就會被Scheduler移除. 簡單說就是隻會執行一次.

Scheduler

class Scheduler(object):
    def __init__(self):
        self.jobs = []
    def run_pending(self):
        runnable_jobs = (job for job in self.jobs if job.should_run)
        for job in sorted(runnable_jobs):
            self._run_job(job)
    def run_all(self, delay_seconds=0):
        for job in self.jobs:
            self._run_job(job)
            time.sleep(delay_seconds)
    def clear(self):
        del self.jobs[:]
    def cancel_job(self, job):
        try:
            self.jobs.remove(job)
        except ValueError:
            pass
    def every(self, interval=1):
        job = Job(interval)
        self.jobs.append(job)
        return job
    def _run_job(self, job):
        ret = job.run()
        if isinstance(ret, CancelJob) or ret is CancelJob:
            self.cancel_job(job)
    @property
    def next_run(self):
        if not self.jobs:
            return None
        return min(self.jobs).next_run
    @property
    def idle_seconds(self):
        return (self.next_run - datetime.datetime.now()).total_seconds()
           

Scheduler作用就是在job可以執行的時候執行它. 這裡的函數也都比較簡單:

  • run_pending:運作所有可以運作的任務
  • run_all:運作所有任務,不管是否應該運作
  • clear:删除所有排程的任務
  • cancel_job:删除一個任務
  • every: 建立一個排程任務, 傳回的是一個job
  • _run_job:運作一個job
  • next_run:擷取下一個要運作任務的時間, 這裡使用的是min去得到最近将執行的job, 之是以這樣使用,是Job重載了__lt_方法,這樣寫起來确實很簡潔.
  • idle_seconds:還有多少秒即将開始運作任務.
Job

Job是整個定時任務的核心. 主要功能就是根據建立Job時的參數,得到下一次運作的時間. 代碼如下,稍微有點長(會省略部分代碼,可以看源碼):

class Job(object):
    def __init__(self, interval):
        self.interval = interval  # pause interval * unit between runs
        self.job_func = None  # the job job_func to run
        self.unit = None  # time units, e.g. 'minutes', 'hours', ...
        self.at_time = None  # optional time at which this job runs
        self.last_run = None  # datetime of the last run
        self.next_run = None  # datetime of the next run
        self.period = None  # timedelta between runs, only valid for
        self.start_day = None  # Specific day of the week to start on
    def __lt__(self, other):
        return self.next_run < other.next_run
    def minute(self):
        assert self.interval == 1, 'Use minutes instead of minute'
        return self.minutes
    @property
    def minutes(self):
        self.unit = 'minutes'
        return self
    @property
    def hour(self):
        assert self.interval == 1, 'Use hours instead of hour'
        return self.hours
    @property
    def hours(self):
        self.unit = 'hours'
        return self
    @property
    def day(self):
        assert self.interval == 1, 'Use days instead of day'
        return self.days
    @property
    def days(self):
        self.unit = 'days'
        return self
    @property
    def week(self):
        assert self.interval == 1, 'Use weeks instead of week'
        return self.weeks
    @property
    def weeks(self):
        self.unit = 'weeks'
        return self
    @property
    def monday(self):
        assert self.interval == 1, 'Use mondays instead of monday'
        self.start_day = 'monday'
        return self.weeks
    def at(self, time_str):
        assert self.unit in ('days', 'hours') or self.start_day
        hour, minute = time_str.split(':')
        minute = int(minute)
        if self.unit == 'days' or self.start_day:
            hour = int(hour)
            assert 0 <= hour <= 23
        elif self.unit == 'hours':
            hour = 0
        assert 0 <= minute <= 59
        self.at_time = datetime.time(hour, minute)
        return self
    def do(self, job_func, *args, **kwargs):
        self.job_func = functools.partial(job_func, *args, **kwargs)
        try:
            functools.update_wrapper(self.job_func, job_func)
        except AttributeError:
            # job_funcs already wrapped by functools.partial won't have
            # __name__, __module__ or __doc__ and the update_wrapper()
            # call will fail.
            pass
        self._schedule_next_run()
        return self
    @property
    def should_run(self):
        return datetime.datetime.now() >= self.next_run
    def run(self):
        logger.info('Running job %s', self)
        ret = self.job_func()
        self.last_run = datetime.datetime.now()
        self._schedule_next_run()
        return ret
    def _schedule_next_run(self):
        assert self.unit in ('seconds', 'minutes', 'hours', 'days', 'weeks')
        self.period = datetime.timedelta(**{self.unit: self.interval})
        self.next_run = datetime.datetime.now() + self.period
           

首先看一下幾個參數的含義:

  • interval:間隔多久,每interval秒或分等.
  • job_func:job執行函數
  • unit : 間隔單元,比如minutes, hours
  • at_time :job具體執行時間點,比如10:30等
  • last_run:job上一次執行時間
  • next_run :job下一次即将運作時間
  • period: 距離下次運作間隔時間
  • start_day: 周的特殊天,也就是monday等的含義

再來看一下各個方法:

  • __lt__: 比較哪個job最先即将執行, Scheduler中next_run方法裡使用min會用到, 有時合适的使用python這些特殊方法可以簡化代碼,看起來更pythonic.
  • second、seconds的差別就是second時預設interval ==1,即schedule.every().second和schedule.every(1).seconds是等價的,作用就是設定unit為seconds. minute和minutes、hour和hours、day和days、week和weeks也類似.
  • monday: 設定start_day 為monday, unit 為weeks,interval為1. 含義就是每周一執行job. 類似 tuesday、wednesday、thursday、friday、saturday、sunday一樣.
  • at: 表示某天的某個時間點,是以不适合minutes、weeks且start_day 為空(即單純的周)這些unit. 對于unit為hours時,time_str中小時部分為0.
  • do: 設定job對應的函數以及參數, 這裡使用functools.update_wrapper去更新函數名等資訊.主要是functools.partial傳回的函數和原函數名稱不一樣.具體可以看看官網文檔. 然後調用_schedule_next_run去計算job下一次執行時間.
  • should_run: 判斷job是否可以運作了.依據是目前時間點大于等于job的next_run
  • _schedule_next_run: 這是整個job的定時的邏輯部分是計算job下次運作的時間點的.描述一下流程:
    • 計算下一次執行時間:

      這裡根據unit和interval計算出下一次運作時間. 舉個例子,比如schedule.every().hour.do(job, message='things')下一次運作時間就是目前時間加上一小時的間隔.

    • 但是當start_day不為空時,即表示某個星期. 這時period就不能直接加在目前時間了. 看代碼:

      其中days_ahead表示job表示的星期幾與當表示的星期幾差幾天. 比如今天是星期三,job表示的是星期五,那麼days_ahead就為2,最終self.next_run效果就是在now基礎上加了2天.

    • 當at_time不為空時, 需要更新執行的時間點,具體就是計算時、分、秒然後調用replace進行更新. 這裡對unit為days或hours進行特殊處理:

      當已經過了執行時間的話的話,unit為days的話減去一天, unit為hours的話減去一小時. 這樣可以保證任務今天運作.

    • 後面還有一句代碼:

      這句的含義時對于像monday這些定時任務特殊情況的處理. 舉個例子, 今天是星期四12:00,建立的job是星期四13:00, days_ahead <=7 這個條件滿足,最終next_run實際加了7,這樣的話這個任務就不會運作了. 是以這一步實際就是把7減掉. 看上去有點繞, 實際隻要把days_ahead <= 0改為days_ahead < 0這句代碼就不用了.

參考:

http://www.360doc.com/content/17/0911/21/17725421_686331723.shtml

https://www.cnblogs.com/anpengapple/p/8051923.html