天天看點

Python-subprocess執行指令并将輸出劫持實作實時記錄到日志

Python-subprocess執行指令并将輸出劫持實作實時記錄到日志

前言

在寫我自己的練手項目的時候,需要寫一系列Python腳本來幫助我進行運維/環境配置,我希望這些腳本能夠有比較好的日志記錄。

這一篇部落格中,我實作了日志同時向控制台和日志中進行輸出,并且二者的日志等級、日志格式不相同。

這一篇部落格中,我通過自定義日志的格式描述符,實作了定長的日志等級。

解決完日志問題後,我需要解決這些腳本的另一個問題:執行指令。

我寫這一系列Python腳本就是因為我不擅長寫shell腳本,而且Python腳本在我的開發環境(Windows)和運作環境(Linux)都可以運作。其需要擔當的很重要的一個功能就是執行一些指令,如

kubectl apply

mvn test

等。

我希望執行這些指令的時候,能夠實時地将指令的輸出,記錄到日志中。

本部落格,我實作了執行指令,并将指令執行的輸出進行劫持,實時記錄到日志中。

參考

  • Python 官方文檔
  • Python 官方文檔 | logging子產品
  • Python 官方文檔 | 日志 HOWTO

subprocess

關于多程序協同,Python主要有三個手段:

  1. os.system()函數;
  2. multiprocessing子產品;
  3. subprocess子產品;

其中

multiprocessing

主要用于解決計算密集型計算中,Python由于GIL(全局解釋器鎖)的存在,多線程無法提升性能,而需要建立多個程序來加快計算的場景。

os.system()

則是阻塞式的,而且似乎無法将其建立的子程序的輸出進行劫持。

subprocess

子產品幾乎是唯一的選擇。

Popen

subprocess

雖然提供了簡易使用的

subprocess.run()

方法,但是這個方法無法做到實時輸出,也就是指令執行完成之後才能一次性擷取指令的輸出,即使傳入了

stdout=subprocess.PIPE

要求其建立一個管道,仍舊是阻塞式的讀取。

stdout

也可以指定為一個流,是以我們可以将其直接重定向到本程式的标準輸出,但是logger并不是一個流。

是以我們隻能使用更為底層的subprocess.Popen對象來進行操作。

process = subprocess.Popen(
    cmd,
    shell=True,
    text=True,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
    )
           

通過這樣的參數,建立子程序運作

cmd

指定的指令,同時建立與子程序關聯的

process

對象,此時

process.stdout

process.stderr

都是可以調用

readline

方法來一行行讀取字元串的管道。

實作

通過

subprocess.Popen()

建立

Popen

對象後,子程序開始運作,我們可以通過管道來讀取子程序的輸出。因為有兩個管道都需要讀取,并且它們沒有提供檢查管道是否讀取的方法,隻能阻塞地嘗試進行讀取,是以我們需要建立讀取線程來分别讀取這兩個管道。

實作如下:

import logging
import subprocess
import shlex
import threading

class CommandExecutionException(Exception):
    def __init__(self, command: str, exit_code: int) -> None:
        super().__init__(f"command executed fail with exit-code={exit_code}: {command}")

_logger = logging.getLogger(__name__)

class TextReadLineThread(threading.Thread):
    def __init__(self, readline, callback, *args, **kargs) -> None:
        super().__init__(*args, **kargs)
        self.readline = readline
        self.callback = callback

    def run(self):
        for line in iter(self.readline, ""):
            if len(line) == 0:
                break
            self.callback(line)

def cmd_exec(command: str, ensure_success: bool=True) -> int:
    _logger.info("executing command: {}".format(command))

    cmd = shlex.split(command)

    process = subprocess.Popen(
        cmd,
        shell=True,
        text=True,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        )

    _logger.debug("started command")

    def log_warp(func):
        def _wrapper(line: str):
            return func("\t" + line.strip())
        return _wrapper

    read_stdout = TextReadLineThread(process.stdout.readline, log_warp(_logger.info))
    read_stderr = TextReadLineThread(process.stderr.readline, log_warp(_logger.warning))
    read_stdout.start()
    read_stderr.start()

    read_stdout.join()
    _logger.debug("stdout reading finish")
    read_stderr.join()
    _logger.debug("stderr reading finish")
    ret = process.wait()
    _logger.debug("process finish")

    _logger.info("executed command with exit-code={}".format(ret))
    if ensure_success and ret != 0:
        raise CommandExecutionException(command=command, exit_code=ret)
    return ret

if __name__ == '__main__':
    _logger_trans = {
            "DEBUG": "DBG",
            "INFO": "INF",
            "WARNING": "WAR",
            "CRITICAL": "ERR"
        }
    _old_factory = logging.getLogRecordFactory()
    def factory(name, level, fn, lno, msg, args, exc_info, func=None, sinfo=None, **kwargs)->logging.LogRecord:
        record = _old_factory(name, level, fn, lno, msg, args, exc_info, func, sinfo, **kwargs)
        record.shortlevelname = _logger_trans[record.levelname]
        return record
    logging.setLogRecordFactory(factory)
    logging.basicConfig(
        level=logging.DEBUG,
        format='[%(asctime)s %(shortlevelname)s] %(message)s',
        datefmt="%Y/%m/%d %H:%M:%S"
    )
    cmd_exec("ping localhost", ensure_success=False)
           

TextReadLineThread

的run方法中,我們使用了内置函數

iter()

的兩個參數的形式,其第一個參數為一個可調用對象,第二個參數為該可調用對象可能的輸出,通過

iter(func, sentinel)

函數産生的疊代器,每次疊代時都無參數地調用

func

,直到

func()

傳回

sentinel

時結束疊代。

關于

main

中日志配置,參考這一篇部落格和這一篇部落格,對日志格式進行增強,使得我們能夠明确看到子程序的輸出被劫持了。

這裡我建立了兩個線程,一個用于讀取

stdout

,一個用于讀取

stderr

,然後阻塞掉目前線程。其實可以隻建立一個新的線程用于讀取

stderr

,用目前線程讀取

stdout

我使用了

ping

指令來進行測試,因為

ping

明顯是隔一段時間輸出一段,可以明顯看出是不是實時進行日志記錄。但是linux上

ping

預設是不會停止的,需要注意一下。