天天看点

Java阻塞队列 LinkedBlockingDequeLinkedBlockingDeque源码BlockingQueue核心要点实战

转载请标明出处:http://blog.csdn.net/zhaoyanjun6/article/details/120833494

本文出自【赵彦军的博客】

Java队列 Queue

Java队列 Deque

Java队列 PriorityQueue

Java栈 Stack

Java阻塞队列 LinkedBlockingDeque

文章目录

  • LinkedBlockingDeque
  • 源码
    • 增加操作
    • 删除操作
    • 访问操作
  • BlockingQueue
  • 核心要点
  • 实战

LinkedBlockingDeque

LinkedBlockingDeque

类实现了

BlockingDeque

接口。阅读

BlockingDeque

文本以获取有关的更多信息。

Deque

来自“双端队列” 这个词。

Deque

是一个队列,你可以在插入和删除队列两端的元素。

LinkedBlockingDeque

是一个

Deque

,如果一个线程试图从中获取一个元素,而队列空的,不管线程从哪一端试图获取元素,都会被阻塞。

以下是实例化和使用LinkedBlockingDeque的例子:

BlockingDeque<String> deque = new LinkedBlockingDeque<String>();

deque.addFirst("1");
deque.addLast("2");

String two = deque.takeLast();
String one = deque.takeFirst();
           

LinkedBlockingDeque的底层数据结构是一个双端队列,该队列使用链表实现,其结构图如下:

Java阻塞队列 LinkedBlockingDequeLinkedBlockingDeque源码BlockingQueue核心要点实战

源码

LinkedBlockingDeque

LinkedBlockingQueue

的实现大体上类似,区别在于

LinkedBlockingDeque

提供的操作更多。并且

LinkedBlockingQueue

内置两个锁分别用于

put

take

操作,而

LinkedBlockingDeque

只使用一个锁控制所有操作。因为队列能够同时在头尾进行

put

take

操作,所以使用两个锁也需要将两个锁同时加锁才能保证操作的同步性,不如只使用一个锁的性能好。

同步节点相比

LinkedBlockingQueue

多了一个

prev

字段。

static final class Node<E> {
    E item;

    Node<E> prev;

    Node<E> next;

    Node(E x) {
        item = x;
    }
}

           

增加操作

增加操作相比

LinkedBlockingQueue

只能在队列尾部增加,它能在队列的头尾两端都进行增加操作。

public void addFirst(E e) {
    // 复用offer方法
    if (!offerFirst(e))
        throw new IllegalStateException("Deque full");
}

public void addLast(E e) {
    if (!offerLast(e))
        throw new IllegalStateException("Deque full");
}

public boolean offerFirst(E e) {
    if (e == null) throw new NullPointerException();
    // 构造节点
    Node<E> node = new Node<E>(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 插入到队列头部
        return linkFirst(node);
    } finally {
        lock.unlock();
    }
}

private boolean linkFirst(Node<E> node) {
    // assert lock.isHeldByCurrentThread();

    // 如果队列已满,返回false
    if (count >= capacity)
        return false;
    // 获取头节点,将自己的 next字段指向头节点,然后设置自己为头节点
    Node<E> f = first;
    node.next = f;
    first = node;
    // 如果队列为空,尾节点也指向自己
    if (last == null)
        last = node;
    else
        f.prev = node;
    ++count;
    // 唤醒等待获取元素的线程
    notEmpty.signal();
    return true;
}

public boolean offerLast(E e) {
    if (e == null) throw new NullPointerException();
    Node<E> node = new Node<E>(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 插入到队列尾部
        return linkLast(node);
    } finally {
        lock.unlock();
    }
}

private boolean linkLast(Node<E> node) {
    // assert lock.isHeldByCurrentThread();
    
    // 如果队列已满,返回false
    if (count >= capacity)
        return false;
    // 将自己设置为尾节点
    Node<E> l = last;
    node.prev = l;
    last = node;
    // 如果队列为空,头节点也指向自己
    if (first == null)
        first = node;
    else
        l.next = node;
    ++count;
    // 唤醒等待获取元素的线程
    notEmpty.signal();
    return true;
}

public void putFirst(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    Node<E> node = new Node<E>(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 如果队列已满,等待
        while (!linkFirst(node))
            notFull.await();
    } finally {
        lock.unlock();
    }
}

public void putLast(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    Node<E> node = new Node<E>(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 如果队列已满,等待
        while (!linkLast(node))
            notFull.await();
    } finally {
        lock.unlock();
    }
}

public boolean offerFirst(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (e == null) throw new NullPointerException();
    Node<E> node = new Node<E>(e);
    // 计算超时时间
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 如果队列已满,超时等待
        while (!linkFirst(node)) {
            if (nanos <= 0L)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        return true;
    } finally {
        lock.unlock();
    }
}

public boolean offerLast(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (e == null) throw new NullPointerException();
    Node<E> node = new Node<E>(e);
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (!linkLast(node)) {
            if (nanos <= 0L)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        return true;
    } finally {
        lock.unlock();
    }
}

           

删除操作

public E removeFirst() {
    // 复用poll操作
    E x = pollFirst();
    if (x == null) throw new NoSuchElementException();
    return x;
}

public E removeLast() {
    E x = pollLast();
    if (x == null) throw new NoSuchElementException();
    return x;
}

public E pollFirst() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 获取头节点的值,并删除它
        return unlinkFirst();
    } finally {
        lock.unlock();
    }
}

private E unlinkFirst() {
    // assert lock.isHeldByCurrentThread();

    // 如果队列为空,返回null
    Node<E> f = first;
    if (f == null)
        return null;
    // 重置头节点
    Node<E> n = f.next;
    E item = f.item;
    f.item = null;
    f.next = f; // help GC
    first = n;
    if (n == null)
        last = null;
    else
        n.prev = null;
    --count;
    // 唤醒等待插入的线程
    notFull.signal();
    return item;
}

public E pollLast() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return unlinkLast();
    } finally {
        lock.unlock();
    }
}

private E unlinkLast() {
    // assert lock.isHeldByCurrentThread();
    Node<E> l = last;
    // 队列为空,返回null
    if (l == null)
        return null;
    // 更新尾节点
    Node<E> p = l.prev;
    E item = l.item;
    l.item = null;
    l.prev = l; // help GC
    last = p;
    if (p == null)
        first = null;
    else
        p.next = null;
    --count;
    notFull.signal();
    return item;
}

public E takeFirst() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E x;
        // 如果队列为空,等待
        while ( (x = unlinkFirst()) == null)
            notEmpty.await();
        return x;
    } finally {
        lock.unlock();
    }
}

public E takeLast() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E x;
        // 如果队列为空,等待
        while ( (x = unlinkLast()) == null)
            notEmpty.await();
        return x;
    } finally {
        lock.unlock();
    }
}

public E pollFirst(long timeout, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        E x;
        while ( (x = unlinkFirst()) == null) {
            if (nanos <= 0L)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        return x;
    } finally {
        lock.unlock();
    }
}

public E pollLast(long timeout, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        E x;
        while ( (x = unlinkLast()) == null) {
            if (nanos <= 0L)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        return x;
    } finally {
        lock.unlock();
    }
}

           

访问操作

public E getFirst() {
    // 复用peek方法
    E x = peekFirst();
    if (x == null) throw new NoSuchElementException();
    return x;
}

public E getLast() {
    E x = peekLast();
    if (x == null) throw new NoSuchElementException();
    return x;
}

public E peekFirst() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 如果队列不为空,返回头元素
        return (first == null) ? null : first.item;
    } finally {
        lock.unlock();
    }
}

public E peekLast() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 如果队列不为空,返回尾元素
        return (last == null) ? null : last.item;
    } finally {
        lock.unlock();
    }
}

           

BlockingQueue

由于

BlockingDeque

继承自

BlockingQueue

接口,所以需要实现

BlockingQueue

中的方法,具体只需要复用前面提到的方法即可。

public boolean add(E e) {
    addLast(e);
    return true;
}

public boolean offer(E e) {
    return offerLast(e);
}

public void put(E e) throws InterruptedException {
    putLast(e);
}

public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
    return offerLast(e, timeout, unit);
}

public E remove() {
    return removeFirst();
}

public E poll() {
    return pollFirst();
}

public E take() throws InterruptedException {
    return takeFirst();
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    return pollFirst(timeout, unit);
}

public E element() {
    return getFirst();
}

public E peek() {
    return peekFirst();
}

           

核心要点

  • LinkedBlockingDeque 是基于链表的双端阻塞队列,线程安全,元素不允许为 null
  • 内部使用一个双向链表
  • 可以在链表两头同时进行put和take操作,只能使用一个锁
  • 插入线程在执行完操作后如果队列未满会唤醒其他等待插入的线程,同时队列非空还会唤醒等待获取元素的线程;take线程同理。
  • 迭代器与内部的双向链表保持弱一致性,调用

    remove(T)

    方法删除一个元素后,不会解除其对下一个结点的next引用,否则迭代器将无法工作。
  • 迭代器的forEachRemaining(Consumer<? super E> action)以64个元素为一批进行操作
  • forEach(Consumer<? super E> action),removeIf,removeAll,retainAll都是64个元素为一批进行操作

实战

因为

LinkedBlockingDeque

取出是阻塞的,所以可以做一个

生产-消费

模型

package zyj;

import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;

public class Product {

    //指定队列最大值为100
    BlockingDeque<Apple> deque = new LinkedBlockingDeque(100);

    //生产,如果队列满了,则抛出 IllegalStateException
    public void produce(Apple apple) {
        deque.push(apple);
    }

    //消费,如果队列为空,则线程阻塞
    public Apple consume() {
        try {
            return deque.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }
}

           

继续阅读