天天看点

【python系统编程1】多进程1 进程的含义2 Process类3 进程池Pool

【python系统编程1】多进程

  • 1 进程的含义
    • 1.1 程序、进程与线程的管理
    • 1.2 多进程的运行原理
    • 1.3 并发与并行
    • 1.4 pid和ppid
  • 2 Process类
    • 2.1 入门案例
    • 2.2 传递参数
    • 2.3 常用方法与属性
    • 2.4 pid和ppid
    • 2.5 类进程
    • 2.6 Process进程通信
      • 2.6.1 进程通信的问题
      • 2.6.2 简单的通信案例
      • 2.6.3 队列的常用方法与参数
    • 2.7 Process进程的生命周期
  • 3 进程池Pool
    • 3.1 池的概念
    • 3.2 Pool的使用
    • 3.3 Pool的常用方法
    • 3.4 Pool通信
    • 3.5 Pool的生命周期

1 进程的含义

1.1 程序、进程与线程的管理

一个运行的程序,会包括至少一个进程,每一个进程中包括至少一个线程

1.2 多进程的运行原理

每个cpu在单位时间只能运行一个进程

cpu在进程间的切换速度飞快,宏观表现为多个进程同时运行,但微观上仍有先后顺序

1.3 并发与并行

并发:看似一起运行,宏观上的感念

并行:真正一起运行,微观的概念

1.4 pid和ppid

每个运行的进程,都有一个唯一的pid(类似身份证)

父进程的pid,称为ppid(parent pid:爸爸的身份证)

2 Process类

2.1 入门案例

from multiprocessing import Process
import time # 用于延时
# 定义子线程运行的目标函数
def func():
    for i in range(3):
        print(i)

# main:表示主进程
if __name__ == '__main__':
    # 实例化process进程对象,目标执行func函数
    process = Process(target=func)
    # 运行processs进程
    process.start()
    # 主进程也运行func函数
    func() 
    
           

2.2 传递参数

args:根据元组中的索引对应函数接受的参数

kwargs:根据字典中的key对应函数接受的参数

from multiprocessing import Process

# 目标方法,接受两个参数
def func(param1,param2):
    print(param1,param2)

if __name__ == '__main__':
    # 根据索引对应传入参数
    process1 = Process(target=func,args=('value1','value2'))
    process1.start()
    # 根据字典的键对应传入参数
    process2 = Process(target=func,kwargs={'param1':'value1','param2':'value2'})
    process2.start()
    
           

2.3 常用方法与属性

命令 说明
name 设置或获取进程的别名
pid 获取进程的pid
start() 运行
is_alive() 判断进程是否正在运行
join(timeout) 子进程运行完,或已堵塞timeout秒后,放行主进程
terminate() 停止子进程

例如:

from multiprocessing import Process
import time # 用于延时

# 子进程运行的目标方法
def func():
    for i in range(10000):
        print(i)   
        time.sleep(1)

if __name__ == '__main__':
    process = Process(
                    target=func,
                    name='子进程1', # 1.name 为进程起别名
                )
    # 2.start 运行进程
    process.start()
    # 3.pid 得到子进程的pid
    print('子进程的pid=',process.pid)
    # 4.is_alive() 判断进程是否正在运行
    print(process.is_alive())
    # 5.join() 堵塞主进程
    process.join(timeout=2)
    # 6.terminate() 停止子进程
    process.terminate()
           

2.4 pid和ppid

命令 说明
os.getpid() 得到当前进程的pid
os.getppid() 得到当前进程的父进程的pid
process.pid 得到process进程的pid
from multiprocessing import Process
import os

def func():
    print('子进程的pid=',os.getpid())
    print('子进程的ppid=',os.getppid())

if __name__ == '__main__':
    process = Process(target=func)
    process.start()
    print('process的pid=',process.pid)
    print('主进程的pid=',os.getppid())
           

2.5 类进程

步骤:

  1. 定义一个类对象,继承Process类
  2. init中先调用父类的init方法,然后处理接受的参数
  3. 重写run方法
  4. 实例化类对象,通过start运行类进程

例如:

from multiprocessing import Process

# 1.定义类对象,继承Process
class MyProcess(Process):
    def __init__(self,param):
        # 2.1 调用父类的init方法
        Process.__init__(self)
        # 2.2 处理接受参数 
        self.param = param
    # 3. 定义run方法(相当于进程执行的目标方法)
    def run(self):
        print(self.param)
if __name__ == '__main__':
    # 4.1 实例化类对象
    myProcess = MyProcess('value')
    myProcess.start()
           

注意:

  • start()是以多进程的方式,去运行类中的run()方法
  • 如果外部直接调用run()方法,则不是以多进程的方式运行的

2.6 Process进程通信

2.6.1 进程通信的问题

注意:进程之间不能共享全局变量,所以Global关键字不能作为进程通信的桥梁

进程通信的方法:

  • 实例化进程通信队列
  • 进程执行的目标函数,或类对象需要接受通信队列

2.6.2 简单的通信案例

from multiprocessing import Process,Queue # 1.导入进程通信队列

# 2.进程目标函数需要接受队列
def func(q):
    while True:
        print(q.get())

if __name__ == '__main__':
    # 3.实例化进程通信队列
    queue = Queue()
    # 4.传递队列
    Process(target=func,args=(queue,)).start()
    # 5.队列通信例子:
    import time
    for i in range(5):
        queue.put(i)
        time.sleep(1)
           

2.6.3 队列的常用方法与参数

1.queue = Queue(maxsize=-1):

  • 实例化进程通信队列
  • maxsize:队列中数据的最大个数,-1表示个数不限

2.queue.get(block=True,timeout=None)

  • 从队列中取数据
  • block:是否堵塞(如果队列中没有数据,是否等待)
  • timeout:最大堵塞的时间(只有设置block=True才能发挥作用),超过最大堵塞时间会报异常

3.queue.get_nowait()

  • 以不堵塞的方式取队列中的数据,如果队列中没有数据,则报异常

4.queue.put(obj,block=True,timeout=None)

  • 把obj对象放入队列
  • block:是否堵塞(如果队列中的数据达到最大个数,是否堵塞)
  • timeout:堵塞的最大时间,超过则报异常

5.queue.put_nowait(obj)

  • 以不堵塞的方式放入队列数据,如果队列数据个数达到最大值,则报异常

6.queue.qsize()

  • 返回当前队列中数据的个数

7.queue.empty()

  • 判断队列是否为空,返回True|Flase

8.queue.full()

  • 判断队列中数据是否达到最大值,返回True|False

2.7 Process进程的生命周期

生命周期:

  • 关闭主进程,子进程也会关闭
  • 主进程没有需要执行的代码,而子进程还在执行,主进程会等待子进程执行完再关闭

3 进程池Pool

3.1 池的概念

Process进程执行的流程:

  1. 向操作系统索要创建进程的资源–>创建进程
  2. 执行目标函数
  3. 进程结束,操作系统回收进程垃圾

如果某个程序需要频繁的通过进程的方式运行多种目标函数,如果使用Process,频繁的创建和销毁进程会浪费大量的资源,所有就有了池的概念

Pool进程池

  1. 预先创建多个进程,放置在进程池中
  2. 需要运行目标函数时,从池中拿出一个进程去执行
  3. 执行完目标函数后,进程不销毁,而是返回进程池

注:如果进程池中没有空闲进程,那么目标函数无法执行

优势:省去了频繁创建进程和销毁进程的资源

3.2 Pool的使用

步骤:

  1. 实例化进程池
  2. 告知进程池需要执行的目标函数
  3. 关闭进程池
  4. 堵塞主进程

例如:

from multiprocessing import Pool
import time

def func(param):
    for i in range(3):
        print(param)
        time.sleep(1)

if __name__ == '__main__':
    # 1.实例化进程池(池中有两个进程)
    pool = Pool(2)
    # 2.告诉进程池要执行的目标函数
    for i in range(10):
        pool.apply_async(
            func=func,      # 指定目标函数
            args=(i,)       # 传递参数
        )
    # 3.关闭进程池
    pool.close()
    # 4.堵塞主进程
    pool.join()
           

3.3 Pool的常用方法

  1. pool = Pool(processes=None)
  • 实例化进程池
  • processes是池中进程的数量,默认None表示数量不限
  1. pool.apply_async(func,args=(),kwds=(),callback=None,error_back=None)
  • 告知进程池要执行的目标函数
  • args,kwds分别是元组和字典传值
  • callback:回调函数
  • error_callback:异常会掉函数
  • 注意:callback和error_callback也会接收传递给func的args和kwds,所以参数要对应
  1. pool.close()
  • 不允许新的任务再进入进程池
  1. pool.join()
  • 堵塞主进程
  • 注意:pool.join()必须在pool.close()后执行

3.4 Pool通信

类似Process进程通信,唯一不同的地方是导包:

from multiprocessing import Manager

queue = Manager().Queue()
           

Pool通信队列Queue的常用方法与上面Process中列举的相同

3.5 Pool的生命周期

生命周期:

  • 关闭主进程,Pool中的进程也关闭
  • 如果主进程中没有可执行的代码,不会等待Pool中的进程执行完,而会直接关闭

这也是为什么需要使用pool.join()的原因

联系方式:[email protected]