天天看点

python任务调度之schedule

简介:

从最简单的栗子看起:

import schedule
import time
 
def job():
    print("I'm working...")
 
schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every(5).to(10).days.do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
 
while True:
    schedule.run_pending()
    time.sleep(1)
           

这是在pypi上面给出的示例。这个栗子简单到我不需要怎么解释。而且,通过这个栗子,我们也可以知道,schedule其实就只是个定时器。在while True死循环中,schedule.run_pending()是保持schedule一直运行,去查询上面那一堆的任务,在任务中,就可以设置不同的时间去运行。跟crontab是类似的。

参考官网:https://schedule.readthedocs.io/en/stable/

但是,如果是多个任务运行的话,实际上它们是按照顺序从上往下挨个执行的。如果上面的任务比较复杂,会影响到下面任务的运行时间。比如我们这样:

import datetime
import schedule
import time
 
def job1():
    print("I'm working for job1")
    time.sleep(2)
    print("job1:", datetime.datetime.now())
 
def job2():
    print("I'm working for job2")
    time.sleep(2)
    print("job2:", datetime.datetime.now())
 
def run():
    schedule.every(10).seconds.do(job1)
    schedule.every(10).seconds.do(job2)
 
    while True:
        schedule.run_pending()
        time.sleep(1)
           

接下来你就会发现,两个定时任务并不是10秒运行一次,而是12秒。是的。由于job1和job2本身的执行时间,导致任务延迟了。

其实解决方法也很简单:用多线程/多进程。不要幼稚地问我“python中的多线程不是没有用吗?”这是两码事。开了一条线程,就把job独立出去运行了,不会占主进程的cpu时间,schedule并没有花掉执行一个任务的时间,它的开销只是开启一条线程的时间,所以,下一次执行就变成了10秒后而不是12秒后。

import datetime
import schedule
import threading
import time
 
def job1():
    print("I'm working for job1")
    time.sleep(2)
    print("job1:", datetime.datetime.now())
 
def job2():
    print("I'm working for job2")
    time.sleep(2)
    print("job2:", datetime.datetime.now())
 
def job1_task():
    threading.Thread(target=job1).start()
 
def job2_task():
    threading.Thread(target=job2).start()
 
def run():
    schedule.every(10).seconds.do(job1_task)
    schedule.every(10).seconds.do(job2_task)
 
    while True:
        schedule.run_pending()
        time.sleep(1)
           

就是这么简单。

唯一要注意的是,这里面job不应当是死循环类型的,也就是说,这个线程应该有一个执行完毕的出口。一是因为线程万一僵死,会是非常棘手的问题;二是下一次定时任务还会开启一个新的线程,执行次数多了就会演变成灾难。如果schedule的时间间隔设置得比job执行的时间短,一样会线程堆积形成灾难,所以,还是需要注意一下的。

schedule源码学习

源码分析就围绕这三个类:CancelJob,Scheduler,job

CancelJob
class CancelJob(object):
    pass
           

可以看到就是一个空类, 这个类的作用就是当你的job执行函数返回一个CancelJob类型的对象,那么执行完后就会被Scheduler移除. 简单说就是只会执行一次.

Scheduler

class Scheduler(object):
    def __init__(self):
        self.jobs = []
    def run_pending(self):
        runnable_jobs = (job for job in self.jobs if job.should_run)
        for job in sorted(runnable_jobs):
            self._run_job(job)
    def run_all(self, delay_seconds=0):
        for job in self.jobs:
            self._run_job(job)
            time.sleep(delay_seconds)
    def clear(self):
        del self.jobs[:]
    def cancel_job(self, job):
        try:
            self.jobs.remove(job)
        except ValueError:
            pass
    def every(self, interval=1):
        job = Job(interval)
        self.jobs.append(job)
        return job
    def _run_job(self, job):
        ret = job.run()
        if isinstance(ret, CancelJob) or ret is CancelJob:
            self.cancel_job(job)
    @property
    def next_run(self):
        if not self.jobs:
            return None
        return min(self.jobs).next_run
    @property
    def idle_seconds(self):
        return (self.next_run - datetime.datetime.now()).total_seconds()
           

Scheduler作用就是在job可以执行的时候执行它. 这里的函数也都比较简单:

  • run_pending:运行所有可以运行的任务
  • run_all:运行所有任务,不管是否应该运行
  • clear:删除所有调度的任务
  • cancel_job:删除一个任务
  • every: 创建一个调度任务, 返回的是一个job
  • _run_job:运行一个job
  • next_run:获取下一个要运行任务的时间, 这里使用的是min去得到最近将执行的job, 之所以这样使用,是Job重载了__lt_方法,这样写起来确实很简洁.
  • idle_seconds:还有多少秒即将开始运行任务.
Job

Job是整个定时任务的核心. 主要功能就是根据创建Job时的参数,得到下一次运行的时间. 代码如下,稍微有点长(会省略部分代码,可以看源码):

class Job(object):
    def __init__(self, interval):
        self.interval = interval  # pause interval * unit between runs
        self.job_func = None  # the job job_func to run
        self.unit = None  # time units, e.g. 'minutes', 'hours', ...
        self.at_time = None  # optional time at which this job runs
        self.last_run = None  # datetime of the last run
        self.next_run = None  # datetime of the next run
        self.period = None  # timedelta between runs, only valid for
        self.start_day = None  # Specific day of the week to start on
    def __lt__(self, other):
        return self.next_run < other.next_run
    def minute(self):
        assert self.interval == 1, 'Use minutes instead of minute'
        return self.minutes
    @property
    def minutes(self):
        self.unit = 'minutes'
        return self
    @property
    def hour(self):
        assert self.interval == 1, 'Use hours instead of hour'
        return self.hours
    @property
    def hours(self):
        self.unit = 'hours'
        return self
    @property
    def day(self):
        assert self.interval == 1, 'Use days instead of day'
        return self.days
    @property
    def days(self):
        self.unit = 'days'
        return self
    @property
    def week(self):
        assert self.interval == 1, 'Use weeks instead of week'
        return self.weeks
    @property
    def weeks(self):
        self.unit = 'weeks'
        return self
    @property
    def monday(self):
        assert self.interval == 1, 'Use mondays instead of monday'
        self.start_day = 'monday'
        return self.weeks
    def at(self, time_str):
        assert self.unit in ('days', 'hours') or self.start_day
        hour, minute = time_str.split(':')
        minute = int(minute)
        if self.unit == 'days' or self.start_day:
            hour = int(hour)
            assert 0 <= hour <= 23
        elif self.unit == 'hours':
            hour = 0
        assert 0 <= minute <= 59
        self.at_time = datetime.time(hour, minute)
        return self
    def do(self, job_func, *args, **kwargs):
        self.job_func = functools.partial(job_func, *args, **kwargs)
        try:
            functools.update_wrapper(self.job_func, job_func)
        except AttributeError:
            # job_funcs already wrapped by functools.partial won't have
            # __name__, __module__ or __doc__ and the update_wrapper()
            # call will fail.
            pass
        self._schedule_next_run()
        return self
    @property
    def should_run(self):
        return datetime.datetime.now() >= self.next_run
    def run(self):
        logger.info('Running job %s', self)
        ret = self.job_func()
        self.last_run = datetime.datetime.now()
        self._schedule_next_run()
        return ret
    def _schedule_next_run(self):
        assert self.unit in ('seconds', 'minutes', 'hours', 'days', 'weeks')
        self.period = datetime.timedelta(**{self.unit: self.interval})
        self.next_run = datetime.datetime.now() + self.period
           

首先看一下几个参数的含义:

  • interval:间隔多久,每interval秒或分等.
  • job_func:job执行函数
  • unit : 间隔单元,比如minutes, hours
  • at_time :job具体执行时间点,比如10:30等
  • last_run:job上一次执行时间
  • next_run :job下一次即将运行时间
  • period: 距离下次运行间隔时间
  • start_day: 周的特殊天,也就是monday等的含义

再来看一下各个方法:

  • __lt__: 比较哪个job最先即将执行, Scheduler中next_run方法里使用min会用到, 有时合适的使用python这些特殊方法可以简化代码,看起来更pythonic.
  • second、seconds的区别就是second时默认interval ==1,即schedule.every().second和schedule.every(1).seconds是等价的,作用就是设置unit为seconds. minute和minutes、hour和hours、day和days、week和weeks也类似.
  • monday: 设置start_day 为monday, unit 为weeks,interval为1. 含义就是每周一执行job. 类似 tuesday、wednesday、thursday、friday、saturday、sunday一样.
  • at: 表示某天的某个时间点,所以不适合minutes、weeks且start_day 为空(即单纯的周)这些unit. 对于unit为hours时,time_str中小时部分为0.
  • do: 设置job对应的函数以及参数, 这里使用functools.update_wrapper去更新函数名等信息.主要是functools.partial返回的函数和原函数名称不一样.具体可以看看官网文档. 然后调用_schedule_next_run去计算job下一次执行时间.
  • should_run: 判断job是否可以运行了.依据是当前时间点大于等于job的next_run
  • _schedule_next_run: 这是整个job的定时的逻辑部分是计算job下次运行的时间点的.描述一下流程:
    • 计算下一次执行时间:

      这里根据unit和interval计算出下一次运行时间. 举个例子,比如schedule.every().hour.do(job, message='things')下一次运行时间就是当前时间加上一小时的间隔.

    • 但是当start_day不为空时,即表示某个星期. 这时period就不能直接加在当前时间了. 看代码:

      其中days_ahead表示job表示的星期几与当表示的星期几差几天. 比如今天是星期三,job表示的是星期五,那么days_ahead就为2,最终self.next_run效果就是在now基础上加了2天.

    • 当at_time不为空时, 需要更新执行的时间点,具体就是计算时、分、秒然后调用replace进行更新. 这里对unit为days或hours进行特殊处理:

      当已经过了执行时间的话的话,unit为days的话减去一天, unit为hours的话减去一小时. 这样可以保证任务今天运行.

    • 后面还有一句代码:

      这句的含义时对于像monday这些定时任务特殊情况的处理. 举个例子, 今天是星期四12:00,创建的job是星期四13:00, days_ahead <=7 这个条件满足,最终next_run实际加了7,这样的话这个任务就不会运行了. 所以这一步实际就是把7减掉. 看上去有点绕, 实际只要把days_ahead <= 0改为days_ahead < 0这句代码就不用了.

参考:

http://www.360doc.com/content/17/0911/21/17725421_686331723.shtml

https://www.cnblogs.com/anpengapple/p/8051923.html