第一次写,刚学,折磨了我一下午。
import time
from multiprocessing import Pool, Manager, Queue
# 获取数据
def reader(queueRead, char, lock):
# 加锁
lock.acquire()
if not queueRead.full():
print("--Reader--" + char)
queueRead.put(char)
time.sleep(1)
# 释放
lock.release()
# 整理数据
def changer(queueRead, queueChange, lock):
lock.acquire()
if not queueRead.empty():
tempChar = queueRead.get()
tempChar = tempChar.lower()
print("--Changer--" + tempChar)
if not queueChange.full():
queueChange.put(tempChar)
time.sleep(1)
lock.release()
# 展示数据
def shower(queueChange):
if not queueChange.empty():
tempChar = queueChange.get()
print("--Shower--" + tempChar)
time.sleep(1)
def main():
charArray = "ABCDEFG"
manager = Manager()
# 读取数据队列
queueRead = manager.Queue(3)
# 整理数据队列
queueChange = manager.Queue(4)
# 多个进程控制一个队列,需要加锁,防堵塞
lock = manager.Lock()
# 创建 3 个子进程
processPool = Pool(3)
for char in charArray:
# 给各子进程指派任务
processPool.apply_async(reader, args=(queueRead, char, lock, ))
processPool.apply_async(changer, args=(queueRead, queueChange, lock, ))
processPool.apply_async(shower, args=(queueChange, ))
processPool.close() # 进程池不再接受任务
processPool.join() # 等待所有子进程结束
if __name__ == '__main__':
print("=== Start ===")
main()
print("=== End ===")
效果: