當我們需要執行外部指令或自己寫一個自動化執行器時,需要使用到啟動程序并擷取輸出的操作
首先,我們啟動程序采用Python的subprocess子產品,為了保證标準輸出和标準錯誤輸出能夠正常運作,啟動兩個線程來檢測輸出結果部分
class Daemon(threading.Thread):
def __init__(self, workDir, logFunction=None, *args):
threading.Thread.__init__(self)
self.process = None
self.args = args
self.workDir = workDir
self.errList = []
self.stdOut = ""
self.stdErr = ""
self.isEnd = False
self.logFunction = logFunction
def _start_process(self):
if self.workDir == "":
self.workDir = None
allParas = []
for arg in self.args:
if arg:
allParas.extend(arg)
commandStr = " ".join(allParas)
try:
commandStr = commandStr.encode("gbk")
except:
pass
try:
self.runPath = self.runPath.encode("gbk")
except:
pass
lines = ["@echo off"]
lines.append(commandStr)
tempFilePath = util.get_temp_file("bat")
tempScriptFile = open(tempFilePath, "w")
tempScriptFile.write("\n".join(lines))
tempScriptFile.close()
self.process = subprocess.Popen("\"" + tempFilePath + "\"", stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=self.workDir, shell=True)
# 循環輸出标準輸出内容,避免阻塞
stdoutThread = ReadLogThread(self.process.stdout)
stdoutThread.start()
stderrThread = ReadLogThread(self.process.stderr)
stderrThread.start()
while self.process.poll() is None:
time.sleep(0.01)
self.isEnd = True
self.stdOut, self.stdErr = stdoutThread.get_log(), stderrThread.get_log()
def run(self):
try:
self._start_process()
except:
log.exception(traceback.format_exc())
def stop(self):
try:
if self.process.pid == 0:
return
os.system("TaskKill /PID %s /T /F" % self.process.pid)
except:
log.exception(traceback.format_exc())
pass
在看下讀取輸出的線程代碼:
class ReadLogThread(threading.Thread):
def __init__(self, _pipe):
threading.Thread.__init__(self)
self._pipe = _pipe
self.logList = []
def run(self):
while True:
try:
line = self._pipe.readline()
if line == "":
break
self.logList.append(line)
except:
break
def get_log(self):
return "".join(self.logList)
ok,程序啟動函數就封裝完成了,那麼提供給外部調用的接口怎麼來寫呢?
我們除了啟動程序外,一般還會給程序加一個逾時時間,那麼代碼如下:
def start_process(workDir, timeout=5 * 60, logFunction=None, *args):
daemon = Daemon(workDir, logFunction, *args)
daemon.start()
start = 0
isTimeout = False
daemonStd = ""
daemonErr = ""
while not daemon.isEnd:
if start > timeout:
daemon.stop()
isTimeout = True
daemonErr = "\n".join(daemon.errList)
break
time.sleep(0.1)
start += 0.1
if daemon.stdOut != "":
daemonStd = daemon.stdOut
if daemon.stdErr != "":
daemonErr = daemon.stdErr
return util.get_unicode_str(daemonStd), util.get_unicode_str(daemonErr), isTimeout
這樣,外部調用方式就是process.start_process(workDir, timeout, logFunction, args),然後就可以擷取到标準輸出、錯誤輸出及是否執行逾時等相關資訊了
歡迎關注“搜狗測試”公衆号,每天一篇測試相關的文章與您分享,共同讨論軟體測試的技術與發展
轉載請注明:http://blog.csdn.net/sogouauto