簡介:
從最簡單的栗子看起:
import schedule
import time
def job():
print("I'm working...")
schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every(5).to(10).days.do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
while True:
schedule.run_pending()
time.sleep(1)
這是在pypi上面給出的示例。這個栗子簡單到我不需要怎麼解釋。而且,通過這個栗子,我們也可以知道,schedule其實就隻是個定時器。在while True死循環中,schedule.run_pending()是保持schedule一直運作,去查詢上面那一堆的任務,在任務中,就可以設定不同的時間去運作。跟crontab是類似的。
參考官網:https://schedule.readthedocs.io/en/stable/
但是,如果是多個任務運作的話,實際上它們是按照順序從上往下挨個執行的。如果上面的任務比較複雜,會影響到下面任務的運作時間。比如我們這樣:
import datetime
import schedule
import time
def job1():
print("I'm working for job1")
time.sleep(2)
print("job1:", datetime.datetime.now())
def job2():
print("I'm working for job2")
time.sleep(2)
print("job2:", datetime.datetime.now())
def run():
schedule.every(10).seconds.do(job1)
schedule.every(10).seconds.do(job2)
while True:
schedule.run_pending()
time.sleep(1)
接下來你就會發現,兩個定時任務并不是10秒運作一次,而是12秒。是的。由于job1和job2本身的執行時間,導緻任務延遲了。
其實解決方法也很簡單:用多線程/多程序。不要幼稚地問我“python中的多線程不是沒有用嗎?”這是兩碼事。開了一條線程,就把job獨立出去運作了,不會占主程序的cpu時間,schedule并沒有花掉執行一個任務的時間,它的開銷隻是開啟一條線程的時間,是以,下一次執行就變成了10秒後而不是12秒後。
import datetime
import schedule
import threading
import time
def job1():
print("I'm working for job1")
time.sleep(2)
print("job1:", datetime.datetime.now())
def job2():
print("I'm working for job2")
time.sleep(2)
print("job2:", datetime.datetime.now())
def job1_task():
threading.Thread(target=job1).start()
def job2_task():
threading.Thread(target=job2).start()
def run():
schedule.every(10).seconds.do(job1_task)
schedule.every(10).seconds.do(job2_task)
while True:
schedule.run_pending()
time.sleep(1)
就是這麼簡單。
唯一要注意的是,這裡面job不應當是死循環類型的,也就是說,這個線程應該有一個執行完畢的出口。一是因為線程萬一僵死,會是非常棘手的問題;二是下一次定時任務還會開啟一個新的線程,執行次數多了就會演變成災難。如果schedule的時間間隔設定得比job執行的時間短,一樣會線程堆積形成災難,是以,還是需要注意一下的。
schedule源碼學習
源碼分析就圍繞這三個類:CancelJob,Scheduler,job
CancelJob
class CancelJob(object):
pass
可以看到就是一個空類, 這個類的作用就是當你的job執行函數傳回一個CancelJob類型的對象,那麼執行完後就會被Scheduler移除. 簡單說就是隻會執行一次.
Scheduler
class Scheduler(object):
def __init__(self):
self.jobs = []
def run_pending(self):
runnable_jobs = (job for job in self.jobs if job.should_run)
for job in sorted(runnable_jobs):
self._run_job(job)
def run_all(self, delay_seconds=0):
for job in self.jobs:
self._run_job(job)
time.sleep(delay_seconds)
def clear(self):
del self.jobs[:]
def cancel_job(self, job):
try:
self.jobs.remove(job)
except ValueError:
pass
def every(self, interval=1):
job = Job(interval)
self.jobs.append(job)
return job
def _run_job(self, job):
ret = job.run()
if isinstance(ret, CancelJob) or ret is CancelJob:
self.cancel_job(job)
@property
def next_run(self):
if not self.jobs:
return None
return min(self.jobs).next_run
@property
def idle_seconds(self):
return (self.next_run - datetime.datetime.now()).total_seconds()
Scheduler作用就是在job可以執行的時候執行它. 這裡的函數也都比較簡單:
- run_pending:運作所有可以運作的任務
- run_all:運作所有任務,不管是否應該運作
- clear:删除所有排程的任務
- cancel_job:删除一個任務
- every: 建立一個排程任務, 傳回的是一個job
- _run_job:運作一個job
- next_run:擷取下一個要運作任務的時間, 這裡使用的是min去得到最近将執行的job, 之是以這樣使用,是Job重載了__lt_方法,這樣寫起來确實很簡潔.
- idle_seconds:還有多少秒即将開始運作任務.
Job
Job是整個定時任務的核心. 主要功能就是根據建立Job時的參數,得到下一次運作的時間. 代碼如下,稍微有點長(會省略部分代碼,可以看源碼):
class Job(object):
def __init__(self, interval):
self.interval = interval # pause interval * unit between runs
self.job_func = None # the job job_func to run
self.unit = None # time units, e.g. 'minutes', 'hours', ...
self.at_time = None # optional time at which this job runs
self.last_run = None # datetime of the last run
self.next_run = None # datetime of the next run
self.period = None # timedelta between runs, only valid for
self.start_day = None # Specific day of the week to start on
def __lt__(self, other):
return self.next_run < other.next_run
def minute(self):
assert self.interval == 1, 'Use minutes instead of minute'
return self.minutes
@property
def minutes(self):
self.unit = 'minutes'
return self
@property
def hour(self):
assert self.interval == 1, 'Use hours instead of hour'
return self.hours
@property
def hours(self):
self.unit = 'hours'
return self
@property
def day(self):
assert self.interval == 1, 'Use days instead of day'
return self.days
@property
def days(self):
self.unit = 'days'
return self
@property
def week(self):
assert self.interval == 1, 'Use weeks instead of week'
return self.weeks
@property
def weeks(self):
self.unit = 'weeks'
return self
@property
def monday(self):
assert self.interval == 1, 'Use mondays instead of monday'
self.start_day = 'monday'
return self.weeks
def at(self, time_str):
assert self.unit in ('days', 'hours') or self.start_day
hour, minute = time_str.split(':')
minute = int(minute)
if self.unit == 'days' or self.start_day:
hour = int(hour)
assert 0 <= hour <= 23
elif self.unit == 'hours':
hour = 0
assert 0 <= minute <= 59
self.at_time = datetime.time(hour, minute)
return self
def do(self, job_func, *args, **kwargs):
self.job_func = functools.partial(job_func, *args, **kwargs)
try:
functools.update_wrapper(self.job_func, job_func)
except AttributeError:
# job_funcs already wrapped by functools.partial won't have
# __name__, __module__ or __doc__ and the update_wrapper()
# call will fail.
pass
self._schedule_next_run()
return self
@property
def should_run(self):
return datetime.datetime.now() >= self.next_run
def run(self):
logger.info('Running job %s', self)
ret = self.job_func()
self.last_run = datetime.datetime.now()
self._schedule_next_run()
return ret
def _schedule_next_run(self):
assert self.unit in ('seconds', 'minutes', 'hours', 'days', 'weeks')
self.period = datetime.timedelta(**{self.unit: self.interval})
self.next_run = datetime.datetime.now() + self.period
首先看一下幾個參數的含義:
- interval:間隔多久,每interval秒或分等.
- job_func:job執行函數
- unit : 間隔單元,比如minutes, hours
- at_time :job具體執行時間點,比如10:30等
- last_run:job上一次執行時間
- next_run :job下一次即将運作時間
- period: 距離下次運作間隔時間
- start_day: 周的特殊天,也就是monday等的含義
再來看一下各個方法:
- __lt__: 比較哪個job最先即将執行, Scheduler中next_run方法裡使用min會用到, 有時合适的使用python這些特殊方法可以簡化代碼,看起來更pythonic.
- second、seconds的差別就是second時預設interval ==1,即schedule.every().second和schedule.every(1).seconds是等價的,作用就是設定unit為seconds. minute和minutes、hour和hours、day和days、week和weeks也類似.
- monday: 設定start_day 為monday, unit 為weeks,interval為1. 含義就是每周一執行job. 類似 tuesday、wednesday、thursday、friday、saturday、sunday一樣.
- at: 表示某天的某個時間點,是以不适合minutes、weeks且start_day 為空(即單純的周)這些unit. 對于unit為hours時,time_str中小時部分為0.
- do: 設定job對應的函數以及參數, 這裡使用functools.update_wrapper去更新函數名等資訊.主要是functools.partial傳回的函數和原函數名稱不一樣.具體可以看看官網文檔. 然後調用_schedule_next_run去計算job下一次執行時間.
- should_run: 判斷job是否可以運作了.依據是目前時間點大于等于job的next_run
- _schedule_next_run: 這是整個job的定時的邏輯部分是計算job下次運作的時間點的.描述一下流程:
-
計算下一次執行時間:
這裡根據unit和interval計算出下一次運作時間. 舉個例子,比如schedule.every().hour.do(job, message='things')下一次運作時間就是目前時間加上一小時的間隔.
-
但是當start_day不為空時,即表示某個星期. 這時period就不能直接加在目前時間了. 看代碼:
其中days_ahead表示job表示的星期幾與當表示的星期幾差幾天. 比如今天是星期三,job表示的是星期五,那麼days_ahead就為2,最終self.next_run效果就是在now基礎上加了2天.
-
當at_time不為空時, 需要更新執行的時間點,具體就是計算時、分、秒然後調用replace進行更新. 這裡對unit為days或hours進行特殊處理:
當已經過了執行時間的話的話,unit為days的話減去一天, unit為hours的話減去一小時. 這樣可以保證任務今天運作.
-
後面還有一句代碼:
這句的含義時對于像monday這些定時任務特殊情況的處理. 舉個例子, 今天是星期四12:00,建立的job是星期四13:00, days_ahead <=7 這個條件滿足,最終next_run實際加了7,這樣的話這個任務就不會運作了. 是以這一步實際就是把7減掉. 看上去有點繞, 實際隻要把days_ahead <= 0改為days_ahead < 0這句代碼就不用了.
-
參考:
http://www.360doc.com/content/17/0911/21/17725421_686331723.shtml
https://www.cnblogs.com/anpengapple/p/8051923.html