天天看点

python多进程_python多进程

当有多个非相关任务需要处理时,并行能大大提高处理速度。这里简要介绍python的multiprocessing模块。

简单多进程编写

当我们任务数量确定而且比较少的时候,可以手动为每个任务指定一个进程来运行。

import multiprocessing as mp

def f(a):

print(a)

if __name__ == '__main__':

# 这里有三个任务,手动指定3个进程

p1 = mp.Process(target=f, args=(1,))

p2 = mp.Process(target=f, args=(2,))

p3 = mp.Process(target=f, args=(3,))

p1.start()

p2.start()

p3.start()

p1.join()

p2.join()

p3.join()

# 输出

1

2

3

使用进程池来处理多任务

当我们任务比较多而且不确定数量(又或者想使得代码更简洁)的时候可以使用进程池Pool来编写多进程。

import multiprocessing as mp

def f(a):

return a

if __name__ == '__main__':

pool = mp.Pool()

res = pool.map(f, (1, 2, 3, 4))

print(res)

# 输出

[1, 2, 3, 4]

可以看到,进程池不仅可以方便的处理多进程,还将各个进程的的处理结果储存了在一个列表中。Pool默认使用计算机所有cpu核来进行运算,也可使用Pool(process=4)来指定并行的进程数。

另一个可以储存结果的函数是apply_async(),但是这个函数只支持传入一个参数,也即运行一个任务,运行多个任务需要多次指定。此外,他返回的结果是一个类似生成器的东西,需要通过get函数取出来。

import multiprocessing as mp

def f(a):

return a

if __name__ == '__main__':

pool = mp.Pool()

res = [pool.apply_async(f, (i,)) for i in range(4)]

print([r.get() for r in res])

# 输出

[0, 1, 2, 3]

多参数函数的多进程

上面处理任务的函数都只有一个参数,在实际情况这种情况是很少的,一般我们的函数都需要传入多个参数。python2中未能支持传多个参数,python3.3后则有starmap来支持传入多参数。

import multiprocessing as mp

def f(a, b):

return (a, b)

if __name__ == '__main__':

pool = mp.Pool()

res = pool.starmap(f, ((1, 2), ('a', 'b')))

print(res)

# 输出

[(1, 2), ('a', 'b')]

另一个传多个参数的方法是通过python中的魔术方法*args:

import multiprocessing as mp

def f(*l):

a, b = l[0] # 为什么这里是l[0]而不是l?因为这里传入的l是一个嵌套元组((a,b),)。

return (a, b)

if __name__ == '__main__':

pool = mp.Pool()

res = pool.map(f, ((1, 2), ('a', 'b')))

print(res)

# 输出

[(1, 2), ('a', 'b')]

亦或使用**kwargs:

import multiprocessing as mp

def f(*l):

d = l[0] # 这里l为({'a': 1, 'b': 2},)

return (d['a'], d['b'])

if __name__ == '__main__':

pool = mp.Pool()

res = pool.map(f, ({'a': 1, 'b': 2}, {'a': 3, 'b': 4}))

print(res)

# 输出

[(1, 2), (3, 4)]

共享内存和进程锁

一般情况下,各个进程中的数据变量是无法发生交流的,但我们可以通过使用Value数据存储在一个共享的内存表中。

import multiprocessing as mp

import time

def job(v, num):

for _ in range(5):

time.sleep(0.1) # 暂停0.1秒,让输出效果更明显

v.value += num # v.value获取共享变量值

print(v.value)

def multicore():

v = mp.Value('i', 0) # 定义共享变量

p1 = mp.Process(target=job, args=(v, 1))

p2 = mp.Process(target=job, args=(v, 3)) # 设定不同的number看如何抢夺内存

p1.start()

p2.start()

p1.join()

p2.join()

if __name__ == '__main__':

multicore()

# 输出

1

5

9

13

17

4

8

12

16

20

在上面的代码中,我们定义了一个共享变量v,两个进程都可以对它进行操作。 在job()中我们想让v每隔0.1秒输出一次累加num的结果,但是在两个进程p1和p2 中设定了不同的累加值。很明显可以看到他们发生了冲突。

为了解决上述不同进程抢共享资源的问题,我们可以用加进程锁来解决。

import multiprocessing as mp

def job(v, num, l):

l.acquire() # 锁住

for _ in range(5):

time.sleep(0.1)

v.value += num # 获取共享内存

print(v.value)

l.release() # 释放

def multicore():

l = mp.Lock() # 定义一个进程锁

v = mp.Value('i', 0) # 定义共享内存

p1 = mp.Process(target=job, args=(v, 1, l)) # 需要将lock传入

p2 = mp.Process(target=job, args=(v, 3, l))

p1.start()

p2.start()

p1.join()

p2.join()

if __name__ == '__main__':

multicore()

# 输出

3

6

9

12

15

16

17

18

19

20

需要主义的是上面可能仍然会发生冲突——p1先执行还是p2先执行的问题。为了解决这个问题我们可以在start,join中决定他们的顺序。

import multiprocessing as mp

def job(v, num, l):

l.acquire() # 锁住

for _ in range(5):

time.sleep(0.1)

v.value += num # 获取共享内存

print(v.value)

l.release() # 释放

def multicore():

l = mp.Lock() # 定义一个进程锁

v = mp.Value('i', 0) # 定义共享内存

p1 = mp.Process(target=job, args=(v, 1, l)) # 需要将lock传入

p2 = mp.Process(target=job, args=(v, 3, l))

p1.start()

p1.join()

p2.start()

p2.join()

if __name__ == '__main__':

multicore()

# 输出

1

2

3

4

5

8

11

14

17

20

继续阅读