天天看點

5種Python使用定時排程任務的方式

摘要:Python 有幾種方法可以定時排程一個任務,這就是我們将在本文中學習的内容。

本文分享自華為雲社群《Python中使用定時排程任務(Schedule Jobs)的5種方式)》,作者: Regan Yue 。

今天建構的大多數應用程式都需要某種方式的排程機制。輪詢 API 或資料庫、不斷檢查系統健康狀況、将日志存檔等是常見的例子。 Kubernetes和Apache Mesos等使用自動伸縮擴容技術(Auto-scaling)的軟體需要檢查部署的應用程式的狀态,為此它們使用定期運作的存活探針(Liveness Probe)。排程任務需要與業務邏輯解耦,是以我們要使用解耦的執行隊列,例如Redis隊列。

Python 有幾種方法可以定時排程一個任務,這就是我們将在本文中學習的内容。我将使用以下方式讨論排程任務:

  1. 簡單循環 (Simple Loops)
  2. 簡單循環但是使用了線程 (Simple Loops but Threaded)
  3. 排程庫 (Schedule Library)
  4. Python Crontab
  5. RQ 排程器作為解耦隊列 (RQ Scheduler as decoupled queues)

簡單循環 Simple loops

使用簡單循環來實作排程任務這是毫不費力的。使用無限運作的 while 循環定期調用函數可用于排程作業,但這不是最好的方法,不過它是很有效的。可以使用内置time子產品的slleep()來延遲執行。不過這并不是大多數作業的排程方式,因為,它看起來很難看,而且與其他方法相比,它的可讀性較差。

import time
​
def task():
    print("Job Completed!")
​
 while 1:
    task()
    time.sleep(10)      

當涉及到每天早上 9:00 或每周三晚上 7:45 等這些日程安排時,事情就變得比較棘手了。

import datetime
​
def task():
    print("Job Completed!")
​
 while 1:
    now = datetime.datetime.now()
    # schedule at every wednesday,7:45 pm
    if now.weekday == 3 and now.strftime("%H:%m") == "19:45":
        task()
    # sleep for 6 days
    time.sleep(6 * 24 * 60 * 60)      

這是我的第一時間想到的解決辦法,不用謝!這種方法的一個問題是這裡的邏輯是阻塞的,即一旦在 python 項目中發現這段代碼,它就會卡在 while 1 循環中,進而阻塞其他代碼的執行。

簡單循環但是使用了線程Simple loops but threaded

線程是計算機科學中的一個概念。具有自己指令的小程式由程序執行并獨立管理,這就可以解決我們第一種方法的阻塞情況,讓我們看看怎麼樣。

import time
import threading
​
def task():
    print("Job Completed!")
​
def schedule():
    while 1:
        task()
        time.sleep(10)
​
# makes our logic non blocking
thread = threading.Thread(target=schedule)
thread.start()      

線程啟動後,其底層邏輯無法被主線程修改,是以我們可能需要添加資源,程式通過這些資源可以檢查特定場景并根據它們執行邏輯。

定時排程庫 Schedule Library

早些時候,我說使用 while 循環進行排程看起來很醜陋,排程庫可以解決這個問題。

import schedule
import time
​
def task():
    print("Job Executing!")
​
# for every n minutes
schedule.every(10).minutes.do(task)
​
# every hour
schedule.every().hour.do(task)
​
# every daya at specific time
schedule.every().day.at("10:30").do(task)
​
# schedule by name of day
schedule.every().monday.do(task)
​
# name of day with time
schedule.every().wednesday.at("13:15").do(task)
​
while True:
    schedule.run_pending()
    time.sleep(1)      

正如您所見,通過這樣我們可以毫不費力地建立多個排程計劃。我特别喜歡建立作業的方式和方法鍊(Method Chaining),另一方面,這個片段有一個 while 循環,這意味着代碼被阻塞,不過我相信你已經知道什麼可以幫助我們解決這個問題。

Liunx 中的 crontab 實用程式是一種易于使用且被廣泛接受的排程解決方案。Python 庫python-crontab提供了一個 API 來使用 Python 中的 CLI 工具。在crontab中,一個定時排程使用 unix-cron字元串格式(* * * * *)來描述,它是一組五個值的一條線,這表明當作業應該被執行時,python-crontab 将在檔案中寫入 crontab 的計劃轉換為寫入程式設計方法。

5種Python使用定時排程任務的方式
from crontab import CronTab
​
cron = CronTab(user='root')
​
job = cron.new(command='my_script.sh')
​
job.hour.every(1)
cron.write()      

python-crontab 不會自動儲存計劃,需要執行 write() 方法來儲存計劃。還有更多功能,我強烈建議您檢視他們的文檔。

RQ 排程器 RQ Scheduler

有些任務不能立即執行,是以我們需要根據 LIFO 或 FIFO 等隊列系統建立任務隊列并彈出任務。python-rq允許我們做到這一點,使用 Redis 作為代理來排隊作業。新作業的條目存儲為帶有資訊的哈希映射,例如created_at, enqueued_at, origin, data, description.

排隊任務由名為 worker 的程式執行。workers 在 Redis 緩存中也有一個條目,負責将任務出列以及更新 Redis 中的任務狀态。任務可以在需要時排隊,但要安排它們,我們需要rq-scheduler。

from rq_scheduler import Scheduler
​
queue = Queue('circle', connection=Redis())
scheduler = Scheduler(queue=queue)
​
scheduler.schedule(
    scheduled_time=datetime.utcnow(), # Time for first execution, in UTC timezone
    func=func,                     # Function to be queued
    args=[arg1, arg2],             # Arguments passed into function when executed
    kwargs={'foo': 'bar'},         # Keyword arguments passed into function when executed
    interval=60,                   # Time before the function is called again, in seconds
    repeat=None,                     # Repeat this number of times (None means repeat forever)
    meta={'foo': 'bar'}            # Arbitrary pickleable data on the job itself
)      

RQ worker(RQ 工作器)必須在終端中單獨啟動或通過 python-rq 工作器啟動。一旦任務被觸發,就可以在工作終端中看到,在成功和失敗場景中都可以使用單獨的函數回調。

總結 Conclusion

還有一些用于排程的庫,但在這裡,我已經讨論了最常見的庫。值得一提的是Celery,celery 的另一個優點是使用者可以在多個代理之間進行選擇。我很感激你讀到最後。也可以看看我的其他文章。幹杯!

翻譯來源: https://python.plainenglish.io/5-ways-to-schedule-jobs-in-python-99de8a80f28e

點選關注,第一時間了解華為雲新鮮技術~