天天看點

【大資料Java基礎- Java并發 14】J.U.C之阻塞隊列:LinkedBlockingDeque

前面的BlockingQueue都是單向的FIFO隊列,而LinkedBlockingDeque則是一個由連結清單組成的雙向阻塞隊列,雙向隊列就意味着可以從對頭、對尾兩端插入和移除元素,同樣意味着LinkedBlockingDeque支援FIFO、FILO兩種操作方式。

LinkedBlockingDeque是可選容量的,在初始化時可以設定容量防止其過度膨脹,如果不設定,預設容量大小為Integer.MAX_VALUE。

LinkedBlockingDeque

LinkedBlockingDeque 繼承AbstractQueue,實作接口BlockingDeque,而BlockingDeque又繼承接口BlockingQueue,BlockingDeque是支援兩個附加操作的 Queue,這兩個操作是:擷取元素時等待雙端隊列變為非空;存儲元素時等待雙端隊列中的空間變得可用。這兩類操作就為LinkedBlockingDeque 的雙向操作Queue提供了可能。BlockingDeque接口提供了一系列的以First和Last結尾的方法,如addFirst、addLast、peekFirst、peekLast。

public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {

	// 雙向連結清單的表頭
	transient Node<E> first;

	// 雙向連結清單的表尾
	transient Node<E> last;

	// 大小,雙向連結清單中目前節點個數
	private transient int count;

	// 容量,在建立LinkedBlockingDeque時指定的
	private final int capacity;

	final ReentrantLock lock = new ReentrantLock();

	private final Condition notEmpty = lock.newCondition();

	private final Condition notFull = lock.newCondition();

}           

通過上面的Lock可以看出,LinkedBlockingDeque底層實作機制與LinkedBlockingQueue一樣,依然是通過互斥鎖ReentrantLock 來實作,notEmpty 、notFull 兩個Condition做協調生産者、消費者問題。

與其他BlockingQueue一樣,節點還是使用内部類Node:

static final class Node<E> {
        E item;

        Node<E> prev;

        Node<E> next;

        Node(E x) {
            item = x;
        }
    }           

雙向嘛,節點肯定得要有前驅prev、後繼next咯。

基礎方法

LinkedBlockingDeque 的add、put、offer、take、peek、poll系列方法都是通過調用XXXFirst,XXXLast方法。是以這裡就僅以putFirst、putLast、pollFirst、pollLast分析下。

putFirst

putFirst(E e) :将指定的元素插入此雙端隊列的開頭,必要時将一直等待可用空間。

public void putFirst(E e) throws InterruptedException {
        // check null
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        // 擷取鎖
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            while (!linkFirst(node))
                // 在notFull條件上等待,直到被喚醒或中斷
                notFull.await();
        } finally {
            // 釋放鎖
            lock.unlock();
        }
    }           

先擷取鎖,然後調用linkFirst方法入列,最後釋放鎖。如果隊列是滿的則在notFull上面等待。linkFirst設定Node為對頭:

private boolean linkFirst(Node<E> node) {
        // 超出容量
        if (count >= capacity)
            return false;

        // 首節點
        Node<E> f = first;
        // 新節點的next指向原first
        node.next = f;
        // 設定node為新的first
        first = node;

        // 沒有尾節點,設定node為尾節點
        if (last == null)
            last = node;
        // 有尾節點,那就将之前first的pre指向新增node
        else
            f.prev = node;
        ++count;
        // 喚醒notEmpty
        notEmpty.signal();
        return true;
    }           

linkFirst主要是設定node節點隊列的列頭節點,成功傳回true,如果隊列滿了傳回false。整個過程還是比較簡單的。

putLast

putLast(E e) :将指定的元素插入此雙端隊列的末尾,必要時将一直等待可用空間。

public void putLast(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            while (!linkLast(node))
                notFull.await();
        } finally {
            lock.unlock();
        }
    }           

調用linkLast将節點Node連結到隊列尾部:

private boolean linkLast(Node<E> node) {
        if (count >= capacity)
            return false;
        // 尾節點
        Node<E> l = last;

        // 将Node的前驅指向原本的last
        node.prev = l;

        // 将node設定為last
        last = node;
        // 首節點為null,則設定node為first
        if (first == null)
            first = node;
        else
        //非null,說明之前的last有值,就将之前的last的next指向node
            l.next = node;
        ++count;
        notEmpty.signal();
        return true;
    }           

pollFirst

pollFirst():擷取并移除此雙端隊列的第一個元素;如果此雙端隊列為空,則傳回 null。

public E pollFirst() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return unlinkFirst();
        } finally {
            lock.unlock();
        }
    }           

調用unlinkFirst移除隊列首元素:

private E unlinkFirst() {
        // 首節點
        Node<E> f = first;

        // 空隊列,直接傳回null
        if (f == null)
            return null;

        // first.next
        Node<E> n = f.next;

        // 節點item
        E item = f.item;

        // 移除掉first ==> first = first.next
        f.item = null;
        f.next = f; // help GC
        first = n;

        // 移除後為空隊列,僅有一個節點
        if (n == null)
            last = null;
        else
        // n的pre原來指向之前的first,現在n變為first了,pre指向null
            n.prev = null;
        --count;
        notFull.signal();
        return item;
    }           

pollLast

pollLast():擷取并移除此雙端隊列的最後一個元素;如果此雙端隊列為空,則傳回 null。

public E pollLast() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return unlinkLast();
        } finally {
            lock.unlock();
        }
    }           
private E unlinkLast() {
        // assert lock.isHeldByCurrentThread();
        Node<E> l = last;
        if (l == null)
            return null;
        Node<E> p = l.prev;
        E item = l.item;
        l.item = null;
        l.prev = l; // help GC
        last = p;
        if (p == null)
            first = null;
        else
            p.next = null;
        --count;
        notFull.signal();
        return item;
    }