天天看點

apscheduler的使用

 最近一個程式要用到背景定時任務,看了看python背景任務,一般2個選擇,一個是apscheduler,一個celery。apscheduler比較直覺簡單一點,就選說說這個庫吧。網上一搜尋,暈死,好多寫apscheduler的都是超級老的版本,而且部落格之間互相亂抄,錯誤一大堆。還是自己讀官方文檔,為大家理一遍吧。

  先安裝一下吧,最新版本的apscheduler是3.0.5版

  1. 安裝  
pip install apscheduler      

  

apscheduler的使用

  安裝完畢

  2. 簡單任務

  首先,來個最簡單的例子,看看它的威力。

apscheduler的使用
1 # coding:utf-8
 2 from apscheduler.schedulers.blocking import BlockingScheduler
 3 import datetime
 4 
 5 
 6 def aps_test():
 7     print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), '你好'
 8 
 9 
10 scheduler = BlockingScheduler()
11 scheduler.add_job(func=aps_test, trigger='cron', second='*/5')
12 scheduler.start()      
apscheduler的使用

  看代碼,定義一個函數,然後定義一個scheduler類型,添加一個job,然後執行,就可以了,代碼是不是超級簡單,而且非常清晰。看看結果吧。

apscheduler的使用

5秒整倍數,就執行這個函數,是不是超級超級簡單?對了,apscheduler就是通俗易懂。

再寫一個帶參數的。

apscheduler的使用
1 # coding:utf-8
 2 from apscheduler.schedulers.blocking import BlockingScheduler
 3 import datetime
 4 
 5 
 6 def aps_test(x):
 7     print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x
 8 
 9 scheduler = BlockingScheduler()
10 scheduler.add_job(func=aps_test, args=('你好',), trigger='cron', second='*/5')
11 scheduler.start()      
apscheduler的使用

結果跟上面一樣的。

好了,上面隻是給大家看的小例子,我們先從頭到位梳理一遍吧。apscheduler分為4個子產品,分别是Triggers,Jobstores,Executors,Schedulers.從上面的例子我們就可以看出來了,triggers就是觸發器,上面的代碼中,用了cron,其實還有其他觸發器,看看它的源碼解釋。

The ``trigger`` argument can either be:
          #. the alias name of the trigger (e.g. ``date``, ``interval`` or ``cron``), in which case any extra keyword
             arguments to this method are passed on to the trigger's constructor
          #. an instance of a trigger class      

看見沒有,源碼中解釋說,有date, interval, cron可供選擇,其實看字面意思也可以知道,date表示具體的一次性任務,interval表示循環任務,cron表示定時任務,好了,分别寫個代碼看看效果最明顯。

apscheduler的使用
1 # coding:utf-8
 2 from apscheduler.schedulers.blocking import BlockingScheduler
 3 import datetime
 4 
 5 
 6 def aps_test(x):
 7     print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x
 8 
 9 scheduler = BlockingScheduler()
10 scheduler.add_job(func=aps_test, args=('定時任務',), trigger='cron', second='*/5')
11 scheduler.add_job(func=aps_test, args=('一次性任務',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12))
12 scheduler.add_job(func=aps_test, args=('循環任務',), trigger='interval', seconds=3)
13 
14 scheduler.start()      
apscheduler的使用

看看結果

apscheduler的使用

其實應該不用我解釋代碼,大家也可以看出結果了,非常清晰。除了一次性任務,trigger是不要寫的,直接定義next_run_time就可以了,關于date這部分,官網沒有解釋,但是去看看源碼吧,看這行代碼。

apscheduler的使用
1     def _create_trigger(self, trigger, trigger_args):
 2         if isinstance(trigger, BaseTrigger):
 3             return trigger
 4         elif trigger is None:
 5             trigger = 'date'
 6         elif not isinstance(trigger, six.string_types):
 7             raise TypeError('Expected a trigger instance or string, got %s instead' % trigger.__class__.__name__)
 8 
 9         # Use the scheduler's time zone if nothing else is specified
10         trigger_args.setdefault('timezone', self.timezone)
11 
12         # Instantiate the trigger class
13         return self._create_plugin_instance('trigger', trigger, trigger_args)      
apscheduler的使用

第4行,如果trigger為None,直接定義trigger為'date'類型。其實弄到這裡,大家應該自己拓展一下,如果實作web的異步任務。假設接到一個移動端任務,任務完成後,發送一個推送到移動端,用date類型的trigger完成可以做的很好。

  3.日志

  好了,scheduler的基本應用,我想大家已經會了,但這僅僅隻是開始。如果代碼有意外咋辦?會阻斷整個任務嗎?如果我要計算密集型的任務咋辦?下面有個代碼,我們看看會發生什麼情況。

apscheduler的使用
1 # coding:utf-8
 2 from apscheduler.schedulers.blocking import BlockingScheduler
 3 import datetime
 4 
 5 
 6 def aps_test(x):
 7     print 1/0
 8     print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x
 9 
10 scheduler = BlockingScheduler()
11 scheduler.add_job(func=aps_test, args=('定時任務',), trigger='cron', second='*/5')
12 
13 scheduler.start()      
apscheduler的使用

還是上面代碼,但我們中間故意加了個錯誤,看看會發生什麼情況。

apscheduler的使用

說我們沒有log檔案,好吧,我們添加一個log檔案,看看寫的什麼。

apscheduler的使用
apscheduler的使用
1 # coding:utf-8
 2 from apscheduler.schedulers.blocking import BlockingScheduler
 3 import datetime
 4 import logging
 5 
 6 logging.basicConfig(level=logging.INFO,
 7                     format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
 8                     datefmt='%Y-%m-%d %H:%M:%S',
 9                     filename='log1.txt',
10                     filemode='a')
11 
12 
13 def aps_test(x):
14     print 1/0
15     print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x
16 
17 scheduler = BlockingScheduler()
18 scheduler.add_job(func=aps_test, args=('定時任務',), trigger='cron', second='*/5')
19 scheduler._logger = logging
20 scheduler.start()      
apscheduler的使用
apscheduler的使用

終于可以看到了,這時候才看到錯誤,這個是一定要注意的。

其實,到這裡,完全可以執行大多數任務了,但我們為了效率,安全性,再往下面看看,還有什麼。

  4.删除任務

假設我們有個奇葩任務,要求執行一定階段任務以後,删除某一個循環任務,其他任務照常進行。有如下代碼:

apscheduler的使用
apscheduler的使用
1 # coding:utf-8
 2 from apscheduler.schedulers.blocking import BlockingScheduler
 3 import datetime
 4 import logging
 5 
 6 logging.basicConfig(level=logging.INFO,
 7                     format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
 8                     datefmt='%Y-%m-%d %H:%M:%S',
 9                     filename='log1.txt',
10                     filemode='a')
11 
12 
13 def aps_test(x):
14     print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x
15 
16 
17 def aps_date(x):
18     scheduler.remove_job('interval_task')
19     print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x
20     
21 
22 scheduler = BlockingScheduler()
23 scheduler.add_job(func=aps_test, args=('定時任務',), trigger='cron', second='*/5', id='cron_task')
24 scheduler.add_job(func=aps_date, args=('一次性任務,删除循環任務',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12), id='date_task')
25 scheduler.add_job(func=aps_test, args=('循環任務',), trigger='interval', seconds=3, id='interval_task')
26 scheduler._logger = logging
27 
28 scheduler.start()      
apscheduler的使用

看看結果,

apscheduler的使用

在運作過程中,成功删除某一個任務,其實就是為每個任務定義一個id,然後remove_job這個id,是不是超級簡單,直覺?那還有什麼呢?

  5.停止任務,恢複任務

看看官方文檔,還有pause_job, resume_job,用法跟remove_job一樣,這邊就不詳細介紹了,就寫個代碼。

apscheduler的使用
apscheduler的使用
1 # coding:utf-8
 2 from apscheduler.schedulers.blocking import BlockingScheduler
 3 import datetime
 4 import logging
 5 
 6 logging.basicConfig(level=logging.INFO,
 7                     format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
 8                     datefmt='%Y-%m-%d %H:%M:%S',
 9                     filename='log1.txt',
10                     filemode='a')
11 
12 
13 def aps_test(x):
14     print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x
15 
16 
17 def aps_pause(x):
18     scheduler.pause_job('interval_task')
19     print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x
20 
21 
22 def aps_resume(x):
23     scheduler.resume_job('interval_task')
24     print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x
25 
26 scheduler = BlockingScheduler()
27 scheduler.add_job(func=aps_test, args=('定時任務',), trigger='cron', second='*/5', id='cron_task')
28 scheduler.add_job(func=aps_pause, args=('一次性任務,停止循環任務',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12), id='pause_task')
29 scheduler.add_job(func=aps_resume, args=('一次性任務,恢複循環任務',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=24), id='resume_task')
30 scheduler.add_job(func=aps_test, args=('循環任務',), trigger='interval', seconds=3, id='interval_task')
31 scheduler._logger = logging
32 
33 scheduler.start()      
apscheduler的使用
apscheduler的使用

 是不是很容易?好了,删除任務,停止任務,恢複任務就介紹到這,下面我們看看監聽任務。

  6.意外

任何代碼都可能發生意外,關鍵是,發生意外了,如何第一時間知道,這才是公司最關心的,apscheduler已經為我們想到了這些。

看下面的代碼,

apscheduler的使用
apscheduler的使用
1 # coding:utf-8
 2 from apscheduler.schedulers.blocking import BlockingScheduler
 3 from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
 4 import datetime
 5 import logging
 6 
 7 logging.basicConfig(level=logging.INFO,
 8                     format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
 9                     datefmt='%Y-%m-%d %H:%M:%S',
10                     filename='log1.txt',
11                     filemode='a')
12 
13 
14 def aps_test(x):
15     print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x
16 
17 
18 def date_test(x):
19     print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x
20     print 1/0
21 
22 
23 def my_listener(event):
24     if event.exception:
25         print '任務出錯了!!!!!!'
26     else:
27         print '任務照常運作...'
28 
29 scheduler = BlockingScheduler()
30 scheduler.add_job(func=date_test, args=('一定性任務,會出錯',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=15), id='date_task')
31 scheduler.add_job(func=aps_test, args=('循環任務',), trigger='interval', seconds=3, id='interval_task')
32 scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
33 scheduler._logger = logging
34 
35 scheduler.start()      
apscheduler的使用
apscheduler的使用

是不是很直覺,在生産環境中,你可以把出錯資訊換成發送一封郵件或者發送一個短信,這樣定時任務出錯就可以立馬就知道了。

  好了,今天就講到這,以後我們有機會再來拓展這個apscheduler,這個非常強大而且直覺的背景任務庫

異常處理

        當job抛出異常時,APScheduler會默默的把他吞掉,不提供任何提示,這不是一種好的實踐,我們必須知曉程式的任何差錯。APScheduler提供注冊listener,可以監聽一些事件,包括:job抛出異常、job沒有來得及執行等。

Constant Event class Triggered when...
EVENT_SCHEDULER_START SchedulerEvent The scheduler is started
EVENT_SCHEDULER_SHUTDOWN The scheduler is shut down
EVENT_JOBSTORE_ADDED JobStoreEvent A job store is added to the scheduler
EVENT_JOBSTORE_REMOVED A job store is removed from the scheduler
EVENT_JOBSTORE_JOB_ADDED A job is added to a job store
EVENT_JOBSTORE_JOB_REMOVED A job is removed from a job store
EVENT_JOB_EXECUTED JobEvent A job is executed successfully
EVENT_JOB_ERROR A job raised an exception during execution
EVENT_JOB_MISSED A job’s execution time is missed

        看下面的例子,監聽異常和miss事件,這裡用logging子產品列印日志,logger.exception()可以列印出異常堆棧資訊。

[python] ​​view plain​​​​copy​​

  1. def err_listener(ev):  
  2.     err_logger = logging.getLogger('schedErrJob')  
  3.     if ev.exception:  
  4.         err_logger.exception('%s error.', str(ev.job))  
  5.     else:  
  6.         err_logger.info('%s miss', str(ev.job))  
  7. schedudler.add_listener(err_listener, apscheduler.events.EVENT_JOB_ERROR | apscheduler.events.EVENT_JOB_MISSED)  

        事件的屬性包括:

  • job – the job instance in question
  • scheduled_run_time – the time when the job was scheduled to be run
  • retval – the return value of the successfully executed job
  • exception – the exception raised by the job
  • traceback – the traceback object associated with the exception

        最後,需要注意一點當job不以daemon模式運作時,并且APScheduler也不是daemon的,那麼在關閉腳本時,Ctrl + C是不奏效的,必須kill才可以。可以通過指令實作關閉腳本:

[plain] ​​view plain​​​​copy​​

  1. ps axu | grep {腳本名} | grep -v grep | awk '{print $2;}' | xargs kill