天天看点

高效实现Java并发编程之Queue

作者:博文小火柴

在实际的软件开发过程中,经常会遇到如下场景:某个模块负责产生数据,这些数据由另一个模块负责处理。

产生数据的模块形象地称为生产者;

而处理数据的模块就称为消费者。

生产者和消费者之间通常还有一个缓冲,生产者把数据放入缓冲,而消费读取缓冲中的数据,这样的好处是:

1.支持解耦:生产者和消费者不需要知道对方的信息,比如邮件投递,只需要把邮件交给邮递员就行,邮递员如何把邮件送到,用户不需要关心。

2.异步:生产者把数据放入缓存即可返回,不需要等待消费者处理完毕。这样的好处是提高了生产者的性能。

3.隔离:生产模块和消费模块出现异常不会相互影响。比如生产者出现异常无法生产数据,但不影响消费者消费。

反之,如果消费模块暂时出了问题不能消费,那么生产者还可以把数据放入缓冲,消费者从故障中恢复后可以继续消费数据。

这种缓冲在 Java 中通过 BlockingQueue 子类实现。消息中间件(比如 RabbitMQ)也提供了这种缓冲功能,如下图所示。

高效实现Java并发编程之Queue

示意图

尽管提供了隔离特性,但当消费者出现故障或消费数据缓慢时,生产者产生的数据放入队列逐渐累积成海量数据,将导致队列所在的系统出现内存溢出等故障。

即使消费者端的故障恢复,但也可能消费过时的数据。一般处理方法是丢弃,或者在丢弃前保存数据,比如 RabbitMQ提供死信来保存丢弃的数据。

BlockingQueue 类实现了缓冲,线程生成的数据放到 BlockingQueue 中,消费线程从BlockingQueue 中获得数据。

BlockingQueue 的核心方法如下:

一、 放入数据:

m offer(anObject):表示如果可能的话,将 anObject 加到 BlockingQueue 中,即如果BlockingQueue 可以容纳,则返回 true,否则返回 false。

m offer(E o, long timeout, TimeUnit unit):可以设定等待的时间,如果在指定的时间内,还不能往队列中加入 BlockingQueue,则返回失败。

m put(anObject):把 anObject 加到 BlockingQueue 中,如果 BlockQueue 没有空间,则调用此方法的线程被阻断,直到 BlockingQueue 里面有空间再继续加入。

二、获取数据:

m poll(time):取走 BlockingQueue 中排在首位的对象,若不能立即取出,则可以等 time参数规定的时间过后再取,取不到时返回 null。

m poll(long timeout, TimeUnit unit):从 BlockingQueue 取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则直到时间超时还没有数据可取,返回失败。

m take():取走 BlockingQueue 中排在首位的对象,若 BlockingQueue 为空,则阻断进入等待状态直到 BlockingQueue 有新的数据被加入。

m drainTo():一次性从 BlockingQueue 中获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法可以提升获取数据效率;不需要多次分批加锁或释放锁。

常见的 BlockingQueue 有如下图所示的子类。

高效实现Java并发编程之Queue

ArrayBlockingQueue:基于数组的阻塞队列实现,在 ArrayBlockingQueue 内部维护了一个定长数组,以便缓存队列中的数据对象。

因此 ArrayBlockingQueue 容纳的数据是有限的,如果队列满,则生产者线程无法再放入新的数据,线程阻塞。

我们还可以控制 ArrayBlockingQueue 的内部锁是否采用公平锁,默认采用非公平锁。

LinkedBlockingQueue:基于链表的阻塞队列,同 ArrayListBlockingQueue 类似,其内部也维持着一个数据缓冲队列,该队列由一个链表构成,链表既可以固定大小,也可以不限制容量。

在不限制容量情况下,如果生产者的速度大于消费者的速度,则LinkedBlockingQueue 的容量会不断增加,系统内存就有可能被消耗殆尽。

实际系统建议使用固定大小的 ArrayListBlockingQueue。

因为 LinkedBlockingQueue 内部是链表构成的,所以每次新增数据都会创建一个 Node对象,消费数据后删除 Node 对象,这样会造成垃圾回收负担。

从性能角度看,不如ArrayBlockingQueue 性能好,对虚拟机影响较大。

PriorityBlockingQueue:基于优先级的无大小限制的队列,通过构造函数传入的Comparator 对象来决定优先级。

BlockingQueue 都是先进先出的,PriorityBlockingQueue可以设定元素的优先级,比如优先级高的请求放入 PriorityBlockingQueue,会优先被处理。

DelayQueue:DelayQueue 是一个没有限制容量的队列,只有当元素指定的延迟时间到了,才能够从队列中获取到该元素。

SynchronousQueue:一种无缓冲的等待队列,线程调用 take 获取元素,必须等待另外一个线程调用 put 设置 SynchronousQueue 队列的元素,队列没有任何容量,这是一种快速传递元素的方式。

也就是说,在这种情况下,元素总是以最快的方式从插入者(生产者)传递给移除者(消费者),这在多任务队列中是最快的处理任务的方式。HikariCP连接池使用 SynchronousQueue 来获取一个可用的数据库连接。

内容摘自《高性能Java系统权威指南》第三章

高效实现Java并发编程之Queue

李家智 著

本书特点:

内容上,总结作者从事Java开发20年来在头部IT企业的高并发系统经历的真实案例,极具参考意义和可读性。

对于程序员和架构师而言,Java 系统的性能优化是一个超常规的挑战。这是因为 Java 语言和 Java 运行平台,以及 Java 生态的复杂性决定了 Java 系统的性能优化不再是简单的升级配置或者简单的 “空间换时间”的技术实现,这涉及 Java 的各种知识点。

本书从高性能、易维护、代码增强以及在微服务系统中编写Java代码的角度来描述如何实现高性能Java系统,结合真实案例,让读者能够快速上手实战。

风格上,本书的风格偏实战,读者可以下载书中的示例代码并运行测试。读者可以从任意一章开始阅读,掌握性能优化知识为公司的系统所用。

本书适合:

中高级程序员和架构师;

以及有志从事基础技术研发、开源工具研发的极客阅读;

也可以作为 Java 笔试和面试的参考书。

继续阅读