天天看點

Java阻塞隊列 LinkedBlockingDequeLinkedBlockingDeque源碼BlockingQueue核心要點實戰

轉載請标明出處:http://blog.csdn.net/zhaoyanjun6/article/details/120833494

本文出自【趙彥軍的部落格】

Java隊列 Queue

Java隊列 Deque

Java隊列 PriorityQueue

Java棧 Stack

Java阻塞隊列 LinkedBlockingDeque

文章目錄

  • LinkedBlockingDeque
  • 源碼
    • 增加操作
    • 删除操作
    • 通路操作
  • BlockingQueue
  • 核心要點
  • 實戰

LinkedBlockingDeque

LinkedBlockingDeque

類實作了

BlockingDeque

接口。閱讀

BlockingDeque

文本以擷取有關的更多資訊。

Deque

來自“雙端隊列” 這個詞。

Deque

是一個隊列,你可以在插入和删除隊列兩端的元素。

LinkedBlockingDeque

是一個

Deque

,如果一個線程試圖從中擷取一個元素,而隊列空的,不管線程從哪一端試圖擷取元素,都會被阻塞。

以下是執行個體化和使用LinkedBlockingDeque的例子:

BlockingDeque<String> deque = new LinkedBlockingDeque<String>();

deque.addFirst("1");
deque.addLast("2");

String two = deque.takeLast();
String one = deque.takeFirst();
           

LinkedBlockingDeque的底層資料結構是一個雙端隊列,該隊列使用連結清單實作,其結構圖如下:

Java阻塞隊列 LinkedBlockingDequeLinkedBlockingDeque源碼BlockingQueue核心要點實戰

源碼

LinkedBlockingDeque

LinkedBlockingQueue

的實作大體上類似,差別在于

LinkedBlockingDeque

提供的操作更多。并且

LinkedBlockingQueue

内置兩個鎖分别用于

put

take

操作,而

LinkedBlockingDeque

隻使用一個鎖控制所有操作。因為隊列能夠同時在頭尾進行

put

take

操作,是以使用兩個鎖也需要将兩個鎖同時加鎖才能保證操作的同步性,不如隻使用一個鎖的性能好。

同步節點相比

LinkedBlockingQueue

多了一個

prev

字段。

static final class Node<E> {
    E item;

    Node<E> prev;

    Node<E> next;

    Node(E x) {
        item = x;
    }
}

           

增加操作

增加操作相比

LinkedBlockingQueue

隻能在隊列尾部增加,它能在隊列的頭尾兩端都進行增加操作。

public void addFirst(E e) {
    // 複用offer方法
    if (!offerFirst(e))
        throw new IllegalStateException("Deque full");
}

public void addLast(E e) {
    if (!offerLast(e))
        throw new IllegalStateException("Deque full");
}

public boolean offerFirst(E e) {
    if (e == null) throw new NullPointerException();
    // 構造節點
    Node<E> node = new Node<E>(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 插入到隊列頭部
        return linkFirst(node);
    } finally {
        lock.unlock();
    }
}

private boolean linkFirst(Node<E> node) {
    // assert lock.isHeldByCurrentThread();

    // 如果隊列已滿,傳回false
    if (count >= capacity)
        return false;
    // 擷取頭節點,将自己的 next字段指向頭節點,然後設定自己為頭節點
    Node<E> f = first;
    node.next = f;
    first = node;
    // 如果隊列為空,尾節點也指向自己
    if (last == null)
        last = node;
    else
        f.prev = node;
    ++count;
    // 喚醒等待擷取元素的線程
    notEmpty.signal();
    return true;
}

public boolean offerLast(E e) {
    if (e == null) throw new NullPointerException();
    Node<E> node = new Node<E>(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 插入到隊列尾部
        return linkLast(node);
    } finally {
        lock.unlock();
    }
}

private boolean linkLast(Node<E> node) {
    // assert lock.isHeldByCurrentThread();
    
    // 如果隊列已滿,傳回false
    if (count >= capacity)
        return false;
    // 将自己設定為尾節點
    Node<E> l = last;
    node.prev = l;
    last = node;
    // 如果隊列為空,頭節點也指向自己
    if (first == null)
        first = node;
    else
        l.next = node;
    ++count;
    // 喚醒等待擷取元素的線程
    notEmpty.signal();
    return true;
}

public void putFirst(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    Node<E> node = new Node<E>(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 如果隊列已滿,等待
        while (!linkFirst(node))
            notFull.await();
    } finally {
        lock.unlock();
    }
}

public void putLast(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    Node<E> node = new Node<E>(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 如果隊列已滿,等待
        while (!linkLast(node))
            notFull.await();
    } finally {
        lock.unlock();
    }
}

public boolean offerFirst(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (e == null) throw new NullPointerException();
    Node<E> node = new Node<E>(e);
    // 計算逾時時間
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 如果隊列已滿,逾時等待
        while (!linkFirst(node)) {
            if (nanos <= 0L)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        return true;
    } finally {
        lock.unlock();
    }
}

public boolean offerLast(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (e == null) throw new NullPointerException();
    Node<E> node = new Node<E>(e);
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (!linkLast(node)) {
            if (nanos <= 0L)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        return true;
    } finally {
        lock.unlock();
    }
}

           

删除操作

public E removeFirst() {
    // 複用poll操作
    E x = pollFirst();
    if (x == null) throw new NoSuchElementException();
    return x;
}

public E removeLast() {
    E x = pollLast();
    if (x == null) throw new NoSuchElementException();
    return x;
}

public E pollFirst() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 擷取頭節點的值,并删除它
        return unlinkFirst();
    } finally {
        lock.unlock();
    }
}

private E unlinkFirst() {
    // assert lock.isHeldByCurrentThread();

    // 如果隊列為空,傳回null
    Node<E> f = first;
    if (f == null)
        return null;
    // 重置頭節點
    Node<E> n = f.next;
    E item = f.item;
    f.item = null;
    f.next = f; // help GC
    first = n;
    if (n == null)
        last = null;
    else
        n.prev = null;
    --count;
    // 喚醒等待插入的線程
    notFull.signal();
    return item;
}

public E pollLast() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return unlinkLast();
    } finally {
        lock.unlock();
    }
}

private E unlinkLast() {
    // assert lock.isHeldByCurrentThread();
    Node<E> l = last;
    // 隊列為空,傳回null
    if (l == null)
        return null;
    // 更新尾節點
    Node<E> p = l.prev;
    E item = l.item;
    l.item = null;
    l.prev = l; // help GC
    last = p;
    if (p == null)
        first = null;
    else
        p.next = null;
    --count;
    notFull.signal();
    return item;
}

public E takeFirst() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E x;
        // 如果隊列為空,等待
        while ( (x = unlinkFirst()) == null)
            notEmpty.await();
        return x;
    } finally {
        lock.unlock();
    }
}

public E takeLast() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E x;
        // 如果隊列為空,等待
        while ( (x = unlinkLast()) == null)
            notEmpty.await();
        return x;
    } finally {
        lock.unlock();
    }
}

public E pollFirst(long timeout, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        E x;
        while ( (x = unlinkFirst()) == null) {
            if (nanos <= 0L)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        return x;
    } finally {
        lock.unlock();
    }
}

public E pollLast(long timeout, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        E x;
        while ( (x = unlinkLast()) == null) {
            if (nanos <= 0L)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        return x;
    } finally {
        lock.unlock();
    }
}

           

通路操作

public E getFirst() {
    // 複用peek方法
    E x = peekFirst();
    if (x == null) throw new NoSuchElementException();
    return x;
}

public E getLast() {
    E x = peekLast();
    if (x == null) throw new NoSuchElementException();
    return x;
}

public E peekFirst() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 如果隊列不為空,傳回頭元素
        return (first == null) ? null : first.item;
    } finally {
        lock.unlock();
    }
}

public E peekLast() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 如果隊列不為空,傳回尾元素
        return (last == null) ? null : last.item;
    } finally {
        lock.unlock();
    }
}

           

BlockingQueue

由于

BlockingDeque

繼承自

BlockingQueue

接口,是以需要實作

BlockingQueue

中的方法,具體隻需要複用前面提到的方法即可。

public boolean add(E e) {
    addLast(e);
    return true;
}

public boolean offer(E e) {
    return offerLast(e);
}

public void put(E e) throws InterruptedException {
    putLast(e);
}

public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
    return offerLast(e, timeout, unit);
}

public E remove() {
    return removeFirst();
}

public E poll() {
    return pollFirst();
}

public E take() throws InterruptedException {
    return takeFirst();
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    return pollFirst(timeout, unit);
}

public E element() {
    return getFirst();
}

public E peek() {
    return peekFirst();
}

           

核心要點

  • LinkedBlockingDeque 是基于連結清單的雙端阻塞隊列,線程安全,元素不允許為 null
  • 内部使用一個雙向連結清單
  • 可以在連結清單兩頭同時進行put和take操作,隻能使用一個鎖
  • 插入線程在執行完操作後如果隊列未滿會喚醒其他等待插入的線程,同時隊列非空還會喚醒等待擷取元素的線程;take線程同理。
  • 疊代器與内部的雙向連結清單保持弱一緻性,調用

    remove(T)

    方法删除一個元素後,不會解除其對下一個結點的next引用,否則疊代器将無法工作。
  • 疊代器的forEachRemaining(Consumer<? super E> action)以64個元素為一批進行操作
  • forEach(Consumer<? super E> action),removeIf,removeAll,retainAll都是64個元素為一批進行操作

實戰

因為

LinkedBlockingDeque

取出是阻塞的,是以可以做一個

生産-消費

模型

package zyj;

import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;

public class Product {

    //指定隊列最大值為100
    BlockingDeque<Apple> deque = new LinkedBlockingDeque(100);

    //生産,如果隊列滿了,則抛出 IllegalStateException
    public void produce(Apple apple) {
        deque.push(apple);
    }

    //消費,如果隊列為空,則線程阻塞
    public Apple consume() {
        try {
            return deque.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }
}

           

繼續閱讀