Python-subprocess執行指令并将輸出劫持實作實時記錄到日志
前言
在寫我自己的練手項目的時候,需要寫一系列Python腳本來幫助我進行運維/環境配置,我希望這些腳本能夠有比較好的日志記錄。
這一篇部落格中,我實作了日志同時向控制台和日志中進行輸出,并且二者的日志等級、日志格式不相同。
這一篇部落格中,我通過自定義日志的格式描述符,實作了定長的日志等級。
解決完日志問題後,我需要解決這些腳本的另一個問題:執行指令。
我寫這一系列Python腳本就是因為我不擅長寫shell腳本,而且Python腳本在我的開發環境(Windows)和運作環境(Linux)都可以運作。其需要擔當的很重要的一個功能就是執行一些指令,如
kubectl apply
、
mvn test
等。
我希望執行這些指令的時候,能夠實時地将指令的輸出,記錄到日志中。
本部落格,我實作了執行指令,并将指令執行的輸出進行劫持,實時記錄到日志中。
參考
- Python 官方文檔
- Python 官方文檔 | logging子產品
- Python 官方文檔 | 日志 HOWTO
subprocess
關于多程序協同,Python主要有三個手段:
- os.system()函數;
- multiprocessing子產品;
- subprocess子產品;
其中
multiprocessing
主要用于解決計算密集型計算中,Python由于GIL(全局解釋器鎖)的存在,多線程無法提升性能,而需要建立多個程序來加快計算的場景。
而
os.system()
則是阻塞式的,而且似乎無法将其建立的子程序的輸出進行劫持。
subprocess
子產品幾乎是唯一的選擇。
Popen
subprocess
雖然提供了簡易使用的
subprocess.run()
方法,但是這個方法無法做到實時輸出,也就是指令執行完成之後才能一次性擷取指令的輸出,即使傳入了
stdout=subprocess.PIPE
要求其建立一個管道,仍舊是阻塞式的讀取。
stdout
也可以指定為一個流,是以我們可以将其直接重定向到本程式的标準輸出,但是logger并不是一個流。
是以我們隻能使用更為底層的subprocess.Popen對象來進行操作。
process = subprocess.Popen(
cmd,
shell=True,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
通過這樣的參數,建立子程序運作
cmd
指定的指令,同時建立與子程序關聯的
process
對象,此時
process.stdout
和
process.stderr
都是可以調用
readline
方法來一行行讀取字元串的管道。
實作
通過
subprocess.Popen()
建立
Popen
對象後,子程序開始運作,我們可以通過管道來讀取子程序的輸出。因為有兩個管道都需要讀取,并且它們沒有提供檢查管道是否讀取的方法,隻能阻塞地嘗試進行讀取,是以我們需要建立讀取線程來分别讀取這兩個管道。
實作如下:
import logging
import subprocess
import shlex
import threading
class CommandExecutionException(Exception):
def __init__(self, command: str, exit_code: int) -> None:
super().__init__(f"command executed fail with exit-code={exit_code}: {command}")
_logger = logging.getLogger(__name__)
class TextReadLineThread(threading.Thread):
def __init__(self, readline, callback, *args, **kargs) -> None:
super().__init__(*args, **kargs)
self.readline = readline
self.callback = callback
def run(self):
for line in iter(self.readline, ""):
if len(line) == 0:
break
self.callback(line)
def cmd_exec(command: str, ensure_success: bool=True) -> int:
_logger.info("executing command: {}".format(command))
cmd = shlex.split(command)
process = subprocess.Popen(
cmd,
shell=True,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
_logger.debug("started command")
def log_warp(func):
def _wrapper(line: str):
return func("\t" + line.strip())
return _wrapper
read_stdout = TextReadLineThread(process.stdout.readline, log_warp(_logger.info))
read_stderr = TextReadLineThread(process.stderr.readline, log_warp(_logger.warning))
read_stdout.start()
read_stderr.start()
read_stdout.join()
_logger.debug("stdout reading finish")
read_stderr.join()
_logger.debug("stderr reading finish")
ret = process.wait()
_logger.debug("process finish")
_logger.info("executed command with exit-code={}".format(ret))
if ensure_success and ret != 0:
raise CommandExecutionException(command=command, exit_code=ret)
return ret
if __name__ == '__main__':
_logger_trans = {
"DEBUG": "DBG",
"INFO": "INF",
"WARNING": "WAR",
"CRITICAL": "ERR"
}
_old_factory = logging.getLogRecordFactory()
def factory(name, level, fn, lno, msg, args, exc_info, func=None, sinfo=None, **kwargs)->logging.LogRecord:
record = _old_factory(name, level, fn, lno, msg, args, exc_info, func, sinfo, **kwargs)
record.shortlevelname = _logger_trans[record.levelname]
return record
logging.setLogRecordFactory(factory)
logging.basicConfig(
level=logging.DEBUG,
format='[%(asctime)s %(shortlevelname)s] %(message)s',
datefmt="%Y/%m/%d %H:%M:%S"
)
cmd_exec("ping localhost", ensure_success=False)
在
TextReadLineThread
的run方法中,我們使用了内置函數
iter()
的兩個參數的形式,其第一個參數為一個可調用對象,第二個參數為該可調用對象可能的輸出,通過
iter(func, sentinel)
函數産生的疊代器,每次疊代時都無參數地調用
func
,直到
func()
傳回
sentinel
時結束疊代。
關于
main
中日志配置,參考這一篇部落格和這一篇部落格,對日志格式進行增強,使得我們能夠明确看到子程序的輸出被劫持了。
這裡我建立了兩個線程,一個用于讀取
stdout
,一個用于讀取
stderr
,然後阻塞掉目前線程。其實可以隻建立一個新的線程用于讀取
stderr
,用目前線程讀取
stdout
。
我使用了
ping
指令來進行測試,因為
ping
明顯是隔一段時間輸出一段,可以明顯看出是不是實時進行日志記錄。但是linux上
ping
預設是不會停止的,需要注意一下。