天天看点

python自动安装apk包(单线程和线程池两种代码)

1、readme

1、把需要安装的应用放在apps里,包名嵌套apk的形式
2、连接设备,保证adb devices能连上
3、运行main.py
4、安装包较大,请耐心等待
           

2、main.py(单线程 4个app88秒,后面有线程池的写法)

import datetime
import os
import subprocess


class App:
    def install_app(self, app_path):
        # 安装app
        # [b'Performing Streamed Install\r\n', b'Success\r\n']
        print('===开始安装{}'.format(app_path))
        status = False  # True安装成功,False 失败
        installCmd = r'adb install -r %s' % (app_path)
        result = subprocess.Popen(installCmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        for i in result.stdout.readlines():
            info = i.decode('utf-8')
            if 'Success' in info:
                status = True
        return status, app_path

    def run(self, app_path):
        return self.install_app(app_path)


def get_devices():
    # 获取设备
    cmd = r'adb devices'
    devList = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    devices = []
    for line in devList.stdout.readlines()[1:]:
        line = line.decode('utf-8')
        if 'List' not in line:
            devices.append((line.split('\t'))[0])
    devices.pop()
    return devices


if __name__ == '__main__':

    start = datetime.datetime.now()
    devices = get_devices()
    if len(devices) == 0:
        print('error:请连接设备再运行!')

    # apps根路径,在当前包的apps里
    url = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'apps')
    app_paths = []
    for package in os.listdir(url):
        for app in os.listdir(os.path.join(url, package)):
            if '.apk' in app:
                app_paths.append(os.path.join(url, package, app))
    success = []
    error = []
    for app_path in app_paths:
        status, app_path = App().run(app_path)
        if status:
            success.append(app_path)
        else:
            error.append(app_path)

    print('成功安装:{}'.format(','.join(success)))
    if len(error) == 0:
        error.append('无')
    if len(success) == 0:
        success.append('无')
    print('失败安装:{}'.format(','.join(error)))

    end = datetime.datetime.now()
    print('===cost:{}s'.format((end - start).seconds))
           

3、目录结构

python自动安装apk包(单线程和线程池两种代码)

4、线程池(4个app55秒)

import datetime
import os
import subprocess
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

class App:

    def install_app(self, app_path):
        # 安装app
        # [b'Performing Streamed Install\r\n', b'Success\r\n']
        print('===开始安装{}'.format(app_path))
        status = False  # True安装成功,False 失败
        installCmd = r'adb install -r %s' % (app_path)
        result = subprocess.Popen(installCmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        for i in result.stdout.readlines():
            info = i.decode('utf-8')
            if 'Success' in info:
                status = True
        return status, app_path

    def run(self, app_path):
        return self.install_app(app_path)


def get_devices():
    # 获取设备
    cmd = r'adb devices'
    devList = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    devices = []
    for line in devList.stdout.readlines()[1:]:
        line = line.decode('utf-8')
        if 'List' not in line:
            devices.append((line.split('\t'))[0])
    devices.pop()
    return devices


if __name__ == '__main__':

    start = datetime.datetime.now()
    devices = get_devices()
    if len(devices) == 0:
        print('error:请连接设备再运行!')

    # apps根路径,在当前包的apps里
    url = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'apps')
    app_paths = []
    for package in os.listdir(url):
        for app in os.listdir(os.path.join(url, package)):
            if '.apk' in app:
                app_paths.append(os.path.join(url, package, app))
    success = []
    error = []

    executor = ThreadPoolExecutor(max_workers=4)
    tasks = [executor.submit(App().run,app_path) for app_path in app_paths]
    for future in as_completed(tasks):
        try:
            status,app_path = future.result()
            if status:
                success.append(app_path)
            else:
                error.append(app_path)
        except Exception as e:
            print('===error:{}'.format(e))

    if len(error) == 0:
        error.append('无')
    if len(success) == 0:
        success.append('无')
    print('成功安装:{}'.format(','.join(success)))
    print('失败安装:{}'.format(','.join(error)))

    end = datetime.datetime.now()
    print('===cost:{}s'.format((end - start).seconds))


           

继续阅读