
孤独,所有人都是孤独的,没有人能独自超脱这一切。
——玛娅·安杰格
0 前言
LinkedBlockingQueue - 单链表实现的阻塞队列。该队列按 FIFO(先进先出)排序元素,新元素从队列尾部插入,从队首获取元素.是深入并发编程的基础数据结构.
1 继承体系
- Queue 作为最基础的接口,定义了队列的三大类基本操作:
LinkedBlockingQueue 核心源码解析0 前言1 继承体系2 属性3 构造方法4 新增5 取出6 总结 - BlockingQueue 即在 Queue 的基础上加上了阻塞的概念,比如
- 一直阻塞
- 阻塞一定时间,返回特殊值
remove 方法,BlockingQueue 类注释中定义的是抛异常,但 LinkedBlockingQueue 中 remove 方法实际是返回 false。
2 属性
2.1 链式存储
- 节点的数据结构
LinkedBlockingQueue 核心源码解析0 前言1 继承体系2 属性3 构造方法4 新增5 取出6 总结 - next 为当前元素的下一个,为 null 表示当前节点是最后一个
- 链表容量 默认大小为 Integer.MAX_VALUE !显然太大了!禁用!
LinkedBlockingQueue 核心源码解析0 前言1 继承体系2 属性3 构造方法4 新增5 取出6 总结 LinkedBlockingQueue 核心源码解析0 前言1 继承体系2 属性3 构造方法4 新增5 取出6 总结 -
链表已有元素大小
使用了 AtomicInteger,线程安全
LinkedBlockingQueue 核心源码解析0 前言1 继承体系2 属性3 构造方法4 新增5 取出6 总结 - 链表头、尾
LinkedBlockingQueue 核心源码解析0 前言1 继承体系2 属性3 构造方法4 新增5 取出6 总结 LinkedBlockingQueue 核心源码解析0 前言1 继承体系2 属性3 构造方法4 新增5 取出6 总结
2.2 锁
- take 时的锁
LinkedBlockingQueue 核心源码解析0 前言1 继承体系2 属性3 构造方法4 新增5 取出6 总结 - take 的条件队列,condition 可以简单理解为基于 ASQ 建立的条件队列
LinkedBlockingQueue 核心源码解析0 前言1 继承体系2 属性3 构造方法4 新增5 取出6 总结 - put 时的锁
LinkedBlockingQueue 核心源码解析0 前言1 继承体系2 属性3 构造方法4 新增5 取出6 总结 - put 的条件队列
LinkedBlockingQueue 核心源码解析0 前言1 继承体系2 属性3 构造方法4 新增5 取出6 总结
锁有 take 锁和 put 锁,是为了保证队列操作时的线程安全,设计两种锁,是为了 take 和 put 两种操作可以同时进行,而互不影响。
2.3 迭代器
- 实现了自己的迭代器
LinkedBlockingQueue 核心源码解析0 前言1 继承体系2 属性3 构造方法4 新增5 取出6 总结
3 构造方法
3.1 无参
- 默认为 Integer 的最大值
LinkedBlockingQueue 核心源码解析0 前言1 继承体系2 属性3 构造方法4 新增5 取出6 总结
3.2 有参
- 指定容量大小.链表头尾相等,节点值(item)都是 null
LinkedBlockingQueue 核心源码解析0 前言1 继承体系2 属性3 构造方法4 新增5 取出6 总结 -
已有集合数据public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
// 集合元素不能为 null
if (e == null)
throw new NullPointerException();
// capacity 代表链表的大小,在这为 Integer.MAX_VALUE
// 若集合大小 > IntegerMAX_VALUE,直接抛异常.
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
4 新增
以 offer 方法为例.
将元素E添加到队列的末尾.
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
// 如果“队列已满”,则返回false,表示插入失败。
final AtomicInteger count = this.count;
if (count.get() == capacity) return false;
int c = -1;
// 新建“节点e”
Node<E> node = new Node(e);
final ReentrantLock putLock = this.putLock;
// 获取“插入锁putLock”
putLock.lock();
try {
// 再次对“队列是不是满”的进行判断。
// 若“队列未满”,则插入节点。
if (count.get() < capacity) {
// 插入节点 enqueue(node);
// 将“当前节点数量”+1,并返回“原始的数量”
c = count.getAndIncrement();
// 如果在插入元素之后,队列仍然未满,则唤醒notFull上的等待线程。
if (c + 1 < capacity) notFull.signal();
}
} finally {
// 释放“插入锁putLock”
putLock.unlock();
}
// 如果在插入节点前,队列为空;则插入节点后,唤醒notEmpty上的等待线程
if (c == 0)
signalNotEmpty();
return c >= 0;
}
复制
enqueue
- 将node添加到队列末尾,并设置node为新的尾节点
LinkedBlockingQueue 核心源码解析0 前言1 继承体系2 属性3 构造方法4 新增5 取出6 总结
signalNotEmpty
- 发出等待信号。 仅能从put/offer调用(否则通常不锁定takeLock),唤醒notEmpty上的等待线程.
LinkedBlockingQueue 核心源码解析0 前言1 继承体系2 属性3 构造方法4 新增5 取出6 总结
新增数据成功后,在适当时机,会唤起 put 的等待线程(队列不满时),或者 take 的等待线程(队列不为空时),这样保证队列一旦满足 put 或者 take 条件时,立马就能唤起阻塞线程,继续运行,保证了唤起的时机不被浪费。
5 取出
以take()为例.取出并返回队列的头。若队列为空,则一直等待。
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
// 获取“取出锁”,若当前线程是中断状态,则抛出InterruptedException异常
takeLock.lockInterruptibly();
try {
// 若“队列为空”,则一直等待。
while (count.get() == 0) {
notEmpty.await();
}
// 取出元素
x = dequeue();
// 取出元素之后,将“节点数量”-1;并返回“原始的节点数量”。
c = count.getAndDecrement();
if (c > 1) notEmpty.signal();
} finally {
// 释放“取出锁”
takeLock.unlock();
}
// 如果在“取出元素之前”,队列是满的;则在取出元素之后,唤醒notFull上的等待线程。
if (c == capacity)
signalNotFull();
return x;
}
复制
dequeue
- 删除队列的头节点,并将表头指向“原头节点的下一个节点”。
LinkedBlockingQueue 核心源码解析0 前言1 继承体系2 属性3 构造方法4 新增5 取出6 总结
signalNotFull
- 发出等待的信号。 仅能从take/poll调用.唤醒notFull上的等待线程.
LinkedBlockingQueue 核心源码解析0 前言1 继承体系2 属性3 构造方法4 新增5 取出6 总结
6 总结
理解好阻塞队列很重要,经常会被用在线程池的场景中.