![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiMGc902byZ2P1IzY5Y2Y5QTO2UjNhdjN4ITYzQzYyUTM3IWMwYTZxgzLcBza5QTcsJja2FXLp1ibj1ycvR3Lc5Wanlmcv9CXt92YucWbp9WYpRXdvRnL2A3Lc9CX6MHc0RHaiojIsJye.jpg)
什么是生产者-消费者模式
又称有限缓冲问题(英语:Bounded-buffer problem)。
该问题描述了共享固定大小缓冲区的两个进程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。
比如有两个进程A和B,它们共享一个固定大小的缓冲区,A进程产生数据放入缓冲区,B进程从缓冲区中取出数据进行计算,那么这里其实就是一个生产者和消费者的模式,A相当于生产者,B相当于消费者。
生产者Producer负责生产数据,消费者Consumer负责使用数据。多个生产者线程会在同一时间运行,生产数据,并放到内存中一个共享的区域。
为什么要使用生产者消费者模式
在多线程开发中,如果生产者生产数据的速度很快,而消费者消费数据的速度很慢,那么生产者就必须等待消费者消费完了数据才能够继续生产数据,因为生产那么多也没有地方放啊;同理如果消费者的速度大于生产者那么消费者就会经常处理等待状态,所以为了达到生产者和消费者生产数据和消费数据之间的平衡,那么就需要一个缓冲区用来存储生产者生产的数据,所以就引入了生产者-消费者模式。
简单来说这里的缓冲区的作用就是为了平衡生产者和消费者的处理能力,起到一个数据缓存的作用,同时也达到了一个解耦的作用。
生产者消费者模型的优点:
解耦:
将生产者类和消费者类进行解耦,消除代码之间的依赖性,简化工作负载的管理。
复用:
将生产者类和消费者类独立开来,可以对生产者类和消费者类进行独立的复用与扩展。
并发:
由于生产者和消费者的处理速度是不一样的,可以调整并发数,给予慢的一方多的并发数,来提高任务的处理速度。
异步:
对于生产者和消费者来说能够各司其职,生产者只需要关心缓冲区是否还有数据,不需要等待消费者处理完;同样的对于消费者来说,也只需要关注缓冲区的内容,不需要关注生产者,通过异步的方式支持高并发,将一个耗时的流程拆成生产和消费两个阶段,这样生产者因为执行的时间比较短,而支持高并发
分布式:
生产者和消费者通过队列进行通讯,所以不需要运行在同一台机器上,在分布式环境中可以通过redis的list作为队列,而消费者只需要轮询队列中是否有数据。同时还能支持集群的伸缩性,当某台机器宕掉的时候,不会导致整个集群宕掉。
简单示例:
from queue import Queue
import random
import threading
import time
# 生产者线程
class Producer(threading.Thread):
def __init__(self, t_name, queue):
threading.Thread.__init__(self, name=t_name)
self.data = queue
def run(self):
for i in range(5):
print("%s: %s is producing %d to the queue!" % (time.ctime(), self.getName(), i))
# 将生产的数据放入队列
self.data.put(i)
time.sleep(random.randrange(10) / 5)
print("%s: %s finished!" % (time.ctime(), self.getName()))
# 消费者线程
class Consumer(threading.Thread):
def __init__(self, t_name, queue):
threading.Thread.__init__(self, name=t_name)
self.data = queue
def run(self):
for i in range(5):
# 拿出已经生产好的数据
val = self.data.get()
print("%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(), self.getName(), val))
time.sleep(random.randrange(5))
self.data.task_done() # 告诉队列有关这个数据的任务已经处理完成
print("%s: %s finished!" % (time.ctime(), self.getName()))
# 主线程
def main():
queue = Queue()
producer = Producer('Pro.', queue)
consumer = Consumer('Con.', queue)
producer.start()
consumer.start()
# 阻塞,直到生产者生产的数据全都被消费掉
queue.join()
# 等待生产者线程结束
producer.join()
# 等待消费者线程结束
consumer.join()
print('All threads terminate!')
if __name__ == '__main__':
main()
打印结果
参考:
https://zhuanlan.zhihu.com/p/73442055
https://www.cnblogs.com/lincappu/p/12890761.html