轉載請标明出處:http://blog.csdn.net/zhaoyanjun6/article/details/120833494
本文出自【趙彥軍的部落格】
Java隊列 Queue
Java隊列 Deque
Java隊列 PriorityQueue
Java棧 Stack
Java阻塞隊列 LinkedBlockingDeque
文章目錄
- LinkedBlockingDeque
- 源碼
-
- 增加操作
- 删除操作
- 通路操作
- BlockingQueue
- 核心要點
- 實戰
LinkedBlockingDeque
LinkedBlockingDeque
類實作了
BlockingDeque
接口。閱讀
BlockingDeque
文本以擷取有關的更多資訊。
Deque
來自“雙端隊列” 這個詞。
Deque
是一個隊列,你可以在插入和删除隊列兩端的元素。
LinkedBlockingDeque
是一個
Deque
,如果一個線程試圖從中擷取一個元素,而隊列空的,不管線程從哪一端試圖擷取元素,都會被阻塞。
以下是執行個體化和使用LinkedBlockingDeque的例子:
BlockingDeque<String> deque = new LinkedBlockingDeque<String>();
deque.addFirst("1");
deque.addLast("2");
String two = deque.takeLast();
String one = deque.takeFirst();
LinkedBlockingDeque的底層資料結構是一個雙端隊列,該隊列使用連結清單實作,其結構圖如下:
源碼
LinkedBlockingDeque
與
LinkedBlockingQueue
的實作大體上類似,差別在于
LinkedBlockingDeque
提供的操作更多。并且
LinkedBlockingQueue
内置兩個鎖分别用于
put
和
take
操作,而
LinkedBlockingDeque
隻使用一個鎖控制所有操作。因為隊列能夠同時在頭尾進行
put
和
take
操作,是以使用兩個鎖也需要将兩個鎖同時加鎖才能保證操作的同步性,不如隻使用一個鎖的性能好。
同步節點相比
LinkedBlockingQueue
多了一個
prev
字段。
static final class Node<E> {
E item;
Node<E> prev;
Node<E> next;
Node(E x) {
item = x;
}
}
增加操作
增加操作相比
LinkedBlockingQueue
隻能在隊列尾部增加,它能在隊列的頭尾兩端都進行增加操作。
public void addFirst(E e) {
// 複用offer方法
if (!offerFirst(e))
throw new IllegalStateException("Deque full");
}
public void addLast(E e) {
if (!offerLast(e))
throw new IllegalStateException("Deque full");
}
public boolean offerFirst(E e) {
if (e == null) throw new NullPointerException();
// 構造節點
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 插入到隊列頭部
return linkFirst(node);
} finally {
lock.unlock();
}
}
private boolean linkFirst(Node<E> node) {
// assert lock.isHeldByCurrentThread();
// 如果隊列已滿,傳回false
if (count >= capacity)
return false;
// 擷取頭節點,将自己的 next字段指向頭節點,然後設定自己為頭節點
Node<E> f = first;
node.next = f;
first = node;
// 如果隊列為空,尾節點也指向自己
if (last == null)
last = node;
else
f.prev = node;
++count;
// 喚醒等待擷取元素的線程
notEmpty.signal();
return true;
}
public boolean offerLast(E e) {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 插入到隊列尾部
return linkLast(node);
} finally {
lock.unlock();
}
}
private boolean linkLast(Node<E> node) {
// assert lock.isHeldByCurrentThread();
// 如果隊列已滿,傳回false
if (count >= capacity)
return false;
// 将自己設定為尾節點
Node<E> l = last;
node.prev = l;
last = node;
// 如果隊列為空,頭節點也指向自己
if (first == null)
first = node;
else
l.next = node;
++count;
// 喚醒等待擷取元素的線程
notEmpty.signal();
return true;
}
public void putFirst(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果隊列已滿,等待
while (!linkFirst(node))
notFull.await();
} finally {
lock.unlock();
}
}
public void putLast(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果隊列已滿,等待
while (!linkLast(node))
notFull.await();
} finally {
lock.unlock();
}
}
public boolean offerFirst(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
// 計算逾時時間
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 如果隊列已滿,逾時等待
while (!linkFirst(node)) {
if (nanos <= 0L)
return false;
nanos = notFull.awaitNanos(nanos);
}
return true;
} finally {
lock.unlock();
}
}
public boolean offerLast(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (!linkLast(node)) {
if (nanos <= 0L)
return false;
nanos = notFull.awaitNanos(nanos);
}
return true;
} finally {
lock.unlock();
}
}
删除操作
public E removeFirst() {
// 複用poll操作
E x = pollFirst();
if (x == null) throw new NoSuchElementException();
return x;
}
public E removeLast() {
E x = pollLast();
if (x == null) throw new NoSuchElementException();
return x;
}
public E pollFirst() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 擷取頭節點的值,并删除它
return unlinkFirst();
} finally {
lock.unlock();
}
}
private E unlinkFirst() {
// assert lock.isHeldByCurrentThread();
// 如果隊列為空,傳回null
Node<E> f = first;
if (f == null)
return null;
// 重置頭節點
Node<E> n = f.next;
E item = f.item;
f.item = null;
f.next = f; // help GC
first = n;
if (n == null)
last = null;
else
n.prev = null;
--count;
// 喚醒等待插入的線程
notFull.signal();
return item;
}
public E pollLast() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return unlinkLast();
} finally {
lock.unlock();
}
}
private E unlinkLast() {
// assert lock.isHeldByCurrentThread();
Node<E> l = last;
// 隊列為空,傳回null
if (l == null)
return null;
// 更新尾節點
Node<E> p = l.prev;
E item = l.item;
l.item = null;
l.prev = l; // help GC
last = p;
if (p == null)
first = null;
else
p.next = null;
--count;
notFull.signal();
return item;
}
public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
// 如果隊列為空,等待
while ( (x = unlinkFirst()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
public E takeLast() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
// 如果隊列為空,等待
while ( (x = unlinkLast()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
public E pollFirst(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
E x;
while ( (x = unlinkFirst()) == null) {
if (nanos <= 0L)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return x;
} finally {
lock.unlock();
}
}
public E pollLast(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
E x;
while ( (x = unlinkLast()) == null) {
if (nanos <= 0L)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return x;
} finally {
lock.unlock();
}
}
通路操作
public E getFirst() {
// 複用peek方法
E x = peekFirst();
if (x == null) throw new NoSuchElementException();
return x;
}
public E getLast() {
E x = peekLast();
if (x == null) throw new NoSuchElementException();
return x;
}
public E peekFirst() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果隊列不為空,傳回頭元素
return (first == null) ? null : first.item;
} finally {
lock.unlock();
}
}
public E peekLast() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果隊列不為空,傳回尾元素
return (last == null) ? null : last.item;
} finally {
lock.unlock();
}
}
BlockingQueue
由于
BlockingDeque
繼承自
BlockingQueue
接口,是以需要實作
BlockingQueue
中的方法,具體隻需要複用前面提到的方法即可。
public boolean add(E e) {
addLast(e);
return true;
}
public boolean offer(E e) {
return offerLast(e);
}
public void put(E e) throws InterruptedException {
putLast(e);
}
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
return offerLast(e, timeout, unit);
}
public E remove() {
return removeFirst();
}
public E poll() {
return pollFirst();
}
public E take() throws InterruptedException {
return takeFirst();
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
return pollFirst(timeout, unit);
}
public E element() {
return getFirst();
}
public E peek() {
return peekFirst();
}
核心要點
- LinkedBlockingDeque 是基于連結清單的雙端阻塞隊列,線程安全,元素不允許為 null
- 内部使用一個雙向連結清單
- 可以在連結清單兩頭同時進行put和take操作,隻能使用一個鎖
- 插入線程在執行完操作後如果隊列未滿會喚醒其他等待插入的線程,同時隊列非空還會喚醒等待擷取元素的線程;take線程同理。
- 疊代器與内部的雙向連結清單保持弱一緻性,調用
方法删除一個元素後,不會解除其對下一個結點的next引用,否則疊代器将無法工作。remove(T)
- 疊代器的forEachRemaining(Consumer<? super E> action)以64個元素為一批進行操作
- forEach(Consumer<? super E> action),removeIf,removeAll,retainAll都是64個元素為一批進行操作
實戰
因為
LinkedBlockingDeque
取出是阻塞的,是以可以做一個
生産-消費
模型
package zyj;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
public class Product {
//指定隊列最大值為100
BlockingDeque<Apple> deque = new LinkedBlockingDeque(100);
//生産,如果隊列滿了,則抛出 IllegalStateException
public void produce(Apple apple) {
deque.push(apple);
}
//消費,如果隊列為空,則線程阻塞
public Apple consume() {
try {
return deque.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}