天天看点

Python从门到精通(六):线程-03-线程间通信

一、共享队列

1.1、​简单队列

from queue import Queue
from threading import Thread

def producer(out_q):
    while True:
        data = 'hello world!'
        out_q.put(data)

def consumer(in_q):
    while True:
        data = in_q.get()
        print(f'get data is: {data}')

q = Queue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()      

1.2、协调者队列

from queue import Queue
from threading import Thread

_sentinel = object()

def producer(out_q):
    put_time = 0
    while True:
        data = 'hello world!'
        out_q.put(data)

        put_time += 1
        if put_time == 5:
            out_q.put(_sentinel)

def consumer(in_q):
    while True:
        data = in_q.get()
        print(f'get data is: {data}')

        if data is _sentinel:
            in_q.put(_sentinel)
            break

q = Queue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()      

1.3、优先级队列

import heapq
import threading

class PriorityQueue:
    def __init__(self):
        self._queue = []
        self._count = 0
        self._cv = threading.Condition()
    def put(self, item, priority):
        with self._cv:
            heapq.heappush(self._queue, (-priority, self._count, item))
            self._count += 1
            self._cv.notify()

    def get(self):
        with self._cv:
            while len(self._queue) == 0:
                self._cv.wait()
            return heapq.heappop(self._queue)[-1]      

1.4、队列异常

import queue
q = queue.Queue()

try:
    data = q.get(block=False)
except queue.Empty:
    ...

try:
    item = ''
    q.put(item, block=False)
except queue.Full:
    ...

try:
    data = q.get(timeout=5.0)
except queue.Empty:
    ...      

二、JOIN方法的使用

from queue import Queue
from threading import Thread

_sentinel = object()

def producer(out_q):
    put_time = 0
    while True:
        data = 'hello world!'
        out_q.put(data)

        put_time += 1
        if put_time == 5:
            out_q.put(_sentinel)

# A thread that consumes data
def consumer(in_q):
    while True:
        data = in_q.get()
        print(f'get data is: {data}')

        if data is _sentinel:
            in_q.put(_sentinel)
            break
        in_q.task_done()

q = Queue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()

q.join()      

三、线程监听

from queue import Queue
from threading import Thread, Event

# A thread that produces data
def producer(out_q):
    while True:
        # Produce some data
        ...
        # Make an (data, event) pair and hand it to the consumer
        evt = Event()
        data = ''
        out_q.put((data, evt))
        ...
        # Wait for the consumer to process the item
        evt.wait()

def consumer(in_q):
    while True:
        data, evt = in_q.get()
        # Process the data
        ...
        # Indicate completion
        evt.set()      

四、线程间复制

from queue import Queue
from threading import Thread
import copy

# A thread that produces data
def producer(out_q):
    while True:
        # Produce some data
        ...
        data = ''
        out_q.put(copy.deepcopy(data))

# A thread that consumes data
def consumer(in_q):
    while True:
        # Get some data
        data = in_q.get()
        # Process the data
        ...