天天看點

Python 程序間通信- Queue, Pipe, Manager

回顧線程間通信(Python多線程程式設計之線程間通信),線程間通信用共享變量和queue.Queue。但這兩者都不能用于程序間通信。不同程序之間,變量就算同名也是位于不同的記憶體位址上,也是不同的變量。queue.Queue 也不能用于程序間通信,将不同程序的變量put進queue.Queue對象中,會抛棄常。下面示例代碼:

from queue import Queue

def producer(queue):
    queue.put("a")
    time.sleep(2)

def consumer(queue):
    time.sleep(2)
    data = queue.get()
    print(data)

if __name__ == "__main__":
    queue = Queue(10)
    my_producer = Process(target=producer, args=(queue,))
    my_consumer = Process(target=consumer, args=(queue,))
    my_producer.start()
    my_consumer.start()
    my_producer.join()
    my_consumer.join()

# 運作結果,抛異常退出
           

1. from multiprocessing import Queue ,multiprocessing 中的 Queue, 不能用于 pool 程序池。

import time
from multiprocessing import Process,Queue

 
def producer(queue):
    queue.put("a")
    time.sleep(2)
 
def consumer(queue):
    time.sleep(2)
    data = queue.get()
    print(data)
 
if __name__ == "__main__":
    #queue = queue.Queue()
    queue = Queue()
    my_producer = Process(target=producer,args=(queue,))
    my_consumer = Process(target=consumer,args=(queue,))
    my_producer.start()
    my_consumer.start()
    my_producer.join()
    my_consumer.join()

"""
運作結果:
a

Process finished with exit code 0
"""
# 使用 multiprocessing.Queue 可以正常實作程序間通信,正常列印出 “a”
           

2. from multiprocessing import Manager(執行個體化後調用Queue()可以用于多程序的程序池)

import time
from multiprocessing import Process,Queue,Pool,Manager
 
 
def producer(queue):
    queue.put("a")
    time.sleep(2)
 
def consumer(queue):
    time.sleep(2)
    data = queue.get()
    print(data)
 
if __name__ == "__main__":
	#pool中的程序間通信需要先執行個體化Manager,再調用Queue()
    queue = Manager().Queue()
    pool = Pool()

    pool.apply_async(producer,args=(queue,))
    pool.apply_async(consumer, args=(queue,))
    pool.close()
    pool.join()

"""
a

Process finished with exit code 0
"""
           

3. 管道 Pipe (兩程序間的通信優先考慮), pipe的性能高于queue, 因為 queue要加了很多鎖。

import time
from multiprocessing import Pool, Pipe


def producer(pipe):
    pipe.send("a")
    time.sleep(3)
    print(pipe.recv())

def consumer(pipe):
    time.sleep(2)
    data = pipe.recv()
    pipe.send("b")
    print(data)

if __name__ == "__main__":
    # Pipe實作兩程序間通信
    s_pipe, r_pipe = Pipe()
    pool = Pool()
    pool.apply_async(producer, args=(s_pipe,))
    pool.apply_async(consumer, args=(r_pipe,))
    pool.close()
    pool.join()

"""
運作結果:
a
b

Process finished with exit code 0
"""
           

繼續閱讀