当我们需要执行外部命令或自己写一个自动化执行器时,需要使用到启动进程并获取输出的操作
首先,我们启动进程采用Python的subprocess模块,为了保证标准输出和标准错误输出能够正常运行,启动两个线程来检测输出结果部分
class Daemon(threading.Thread):
def __init__(self, workDir, logFunction=None, *args):
threading.Thread.__init__(self)
self.process = None
self.args = args
self.workDir = workDir
self.errList = []
self.stdOut = ""
self.stdErr = ""
self.isEnd = False
self.logFunction = logFunction
def _start_process(self):
if self.workDir == "":
self.workDir = None
allParas = []
for arg in self.args:
if arg:
allParas.extend(arg)
commandStr = " ".join(allParas)
try:
commandStr = commandStr.encode("gbk")
except:
pass
try:
self.runPath = self.runPath.encode("gbk")
except:
pass
lines = ["@echo off"]
lines.append(commandStr)
tempFilePath = util.get_temp_file("bat")
tempScriptFile = open(tempFilePath, "w")
tempScriptFile.write("\n".join(lines))
tempScriptFile.close()
self.process = subprocess.Popen("\"" + tempFilePath + "\"", stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=self.workDir, shell=True)
# 循环输出标准输出内容,避免阻塞
stdoutThread = ReadLogThread(self.process.stdout)
stdoutThread.start()
stderrThread = ReadLogThread(self.process.stderr)
stderrThread.start()
while self.process.poll() is None:
time.sleep(0.01)
self.isEnd = True
self.stdOut, self.stdErr = stdoutThread.get_log(), stderrThread.get_log()
def run(self):
try:
self._start_process()
except:
log.exception(traceback.format_exc())
def stop(self):
try:
if self.process.pid == 0:
return
os.system("TaskKill /PID %s /T /F" % self.process.pid)
except:
log.exception(traceback.format_exc())
pass
在看下读取输出的线程代码:
class ReadLogThread(threading.Thread):
def __init__(self, _pipe):
threading.Thread.__init__(self)
self._pipe = _pipe
self.logList = []
def run(self):
while True:
try:
line = self._pipe.readline()
if line == "":
break
self.logList.append(line)
except:
break
def get_log(self):
return "".join(self.logList)
ok,进程启动函数就封装完成了,那么提供给外部调用的接口怎么来写呢?
我们除了启动进程外,一般还会给进程加一个超时时间,那么代码如下:
def start_process(workDir, timeout=5 * 60, logFunction=None, *args):
daemon = Daemon(workDir, logFunction, *args)
daemon.start()
start = 0
isTimeout = False
daemonStd = ""
daemonErr = ""
while not daemon.isEnd:
if start > timeout:
daemon.stop()
isTimeout = True
daemonErr = "\n".join(daemon.errList)
break
time.sleep(0.1)
start += 0.1
if daemon.stdOut != "":
daemonStd = daemon.stdOut
if daemon.stdErr != "":
daemonErr = daemon.stdErr
return util.get_unicode_str(daemonStd), util.get_unicode_str(daemonErr), isTimeout
这样,外部调用方式就是process.start_process(workDir, timeout, logFunction, args),然后就可以获取到标准输出、错误输出及是否执行超时等相关信息了
欢迎关注“搜狗测试”公众号,每天一篇测试相关的文章与您分享,共同讨论软件测试的技术与发展
转载请注明:http://blog.csdn.net/sogouauto