天天看點

Java并發系列源碼分析(七)--LinkedBlockingQueue

作者:Java解白

簡介

LinkedBlockingQueue是一個阻塞的有界隊列,底層是通過一個個的Node節點形成的連結清單實作的,連結清單隊列中的頭節點是一個空的Node節點,在多線程下操作時會使用ReentrantLock鎖來保證資料的安全性,并使用ReentrantLock下的Condition對象來阻塞以及喚醒線程。

常量

/**
 * 連結清單中的節點類
 */
static class Node<E> {
    //節點中的元素
    E item;

    //下一個節點
    Node<E> next;
​
    Node(E x) { item = x; }
}
​
/** 連結清單隊列的容量大小,如果沒有指定則使用Integer最大值 */
private final int capacity;
​
/** 記錄連結清單中的節點的數量的原子類 */
private final AtomicInteger count = new AtomicInteger();
​
/**連結清單的頭節點
 */
transient Node<E> head;
​
/**
 * 連結清單的尾節點
 */
private transient Node<E> last;
​
/** 從連結清單隊列中擷取節點時防止多個線程同時操作所産生資料安全問題時所加的鎖 */
private final ReentrantLock takeLock = new ReentrantLock();
​
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
​
/** 添加節點到連結清單隊列中防止多個線程同時操作所産生資料安全問題時所加的鎖 */
private final ReentrantLock putLock = new ReentrantLock();
​
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
複制代碼           
  • Node:連結清單隊列中的節點,用于存放元素。
  • capacity:連結清單隊列中最多能存放的節點數量,如果在建立LinkedBlockingQueue的時候沒有指定.則預設最多存放的節點的數量為Integer的最大值。
  • head:連結清單隊列中的頭節點,一般來說頭節點都是一個沒有元素的空節點。
  • last:連結清單隊列中的尾節點。
  • takeLock:在擷取連結清單隊列中的節點的時候所加的鎖。
  • putLock:在添加連結清單隊列中的節點的時候所加的鎖。
  • Condition:當線程需要進行等待或者喚醒的時候則會調用該對象下的方法。

構造方法

/**
 * 建立預設容量大小的連結清單隊列
 */
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}
​
/**
 * 建立指定容量大小的連結清單隊列
 */
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    //建立一個空節點,并将該節點設定為頭尾節點
    last = head = new Node<E>(null);
}
​
/**
 * 根據指定集合中的元素建立一個預設容量大小的連結清單隊列
 */
public LinkedBlockingQueue(Collection<? extends E> c) {
    //建立預設容量大小的連結清單隊列
    this(Integer.MAX_VALUE);
    //擷取添加元素節點的鎖
    final ReentrantLock putLock = this.putLock;
    //加鎖
    putLock.lock();
    try {
        //連結清單中節點的數量
        int n = 0;
        //周遊集合中的元素
        for (E e : c) {
            if (e == null)
                throw new NullPointerException();
            if (n == capacity)
                throw new IllegalStateException("Queue full");
            //為元素建立一個節點,并将節點添加到連結清單的尾部,并設定節點為尾節點
            enqueue(new Node<E>(e));
            //連結清單中節點的數量自增
            ++n;
        }
        //記錄連結清單中節點的數量
        count.set(n);
    } finally {
        //釋放鎖
        putLock.unlock();
    }
}
複制代碼           

第一個和第三個構造方法中都會調用第二個構造方法,而在第二個構造方法中會設定連結清單隊列中容納節點的數量以及建立一個空的頭節點來填充,再看第三個構造方法中的代碼,首先會擷取putLock鎖,代表目前是一個需要添加節點的線程,再将指定集合中的元素封裝成一個Node節點,并依次将封裝的節點追加到連結清單隊列中的尾部,并使用AtomicInteger來記錄連結清單隊列中節點的數量。

put

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    //為指定元素建立節點
    Node<E> node = new Node<E>(e);
    //擷取添加元素節點的鎖
    final ReentrantLock putLock = this.putLock;
    //擷取記錄連結清單節點數量的原子類
    final AtomicInteger count = this.count;
    //加鎖,如果加鎖的線程被中斷了則抛出異常
    putLock.lockInterruptibly();
    try {
        //校驗連結清單中的節點數量是否到達了指定的容量
        //如果到達了指定的容量就進行阻塞等待
        //如果線程被喚醒了,但是連結清單中的節點數量還是未改變,則繼續阻塞等待
        //隻有當頭節點出隊,新的節點才能繼續添加
        while (count.get() == capacity) {
            notFull.await();
        }
        //将新節點添加到連結清單的尾部并設定為尾節點
        enqueue(node);
        //擷取沒有添加目前節點時連結清單中的節點數量
        //并更新連結清單中的節點數量
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            //喚醒等待添加節點的線程
            //可能目前線程在等待隊列中等待的時候
            //有新的線程要執行添加節點的操作
            //但是連結清單的容量已經到達最大,是以新的線程也會進行等待
            //目前線程被喚醒了并且連結清單的容量沒有到達最大則嘗試去喚醒等待的線程
            notFull.signal();
    } finally {
        //釋放鎖
        putLock.unlock();
    }
    if (c == 0)
        //c等于0說明添加目前節點的時候連結清單中沒有節點
        //可能有線程在擷取節點,但是連結清單中沒有節點
        //進而一直進行等待,當添加了節點的時候就需要喚醒擷取節點的線程
        signalNotEmpty();
}
複制代碼           

LinkedBlockingQueue中的代碼都比較簡單,主要是ReentrantLock下的Condition中的方法比較複雜,我們先整體的了解一下put方法,首先通過new Node為将指定元素封裝成一個節點,再擷取putLock鎖,當連結清單隊列中的節點數量已經到達了capacity大小,那目前線程就需要調用Condition下的await方法進行等待将線程阻塞,直到有節點出隊或者說有節點被删除或者目前線程被中斷了,目前線程被中斷了則會直接退出目前put方法并抛出異常,如果節點出隊了或者節點被删除了,那目前線程被喚醒了則會繼續執行添加節點的操作。

enqueue方法則會将封裝的節點追加到連結清單隊列中的尾部,通過getAndIncrement方法先擷取沒有添加目前節點時連結清單隊列中節點的數量,然後更新添加了目前節點之後連結清單隊列中節點的數量,c則是沒有添加目前節點時連結清單隊列中節點的數量,c+1則是添加目前節點後連結清單隊列中節點的數量,如果說c+1小于capacity則說明線程在添加節點的時候,連結清單隊列中的節點數量已經到達了最大值,後續添加節點的線程都需要進行阻塞,當有節點被删除或出隊的時候,最開始阻塞的線程被喚醒,被喚醒的線程則會去執行添加節點的操作,當添加完節點之後連結清單隊列中的節點數量沒有到達最大值則會去喚醒後續被阻塞的線程執行添加節點的操作。

c等于0說明在添加目前節點之前,可能有線程在擷取連結清單隊列中的節點,但是連結清單隊列中沒有節點,導緻擷取節點的線程處于阻塞狀态,當添加完節點之後,連結清單隊列中有了節點,此時就需要喚醒阻塞的線程去擷取節點。

添加元素的方法分為put和offer,差別在于阻塞與非阻塞,當連結清單隊列中的節點數量已經到達最大值,put方法則會阻塞,而offer方法不會阻塞則是直接傳回。

擷取元素的方法分為take、poll、peek,take方法與put方法相似,隻不過一個是入隊,一個是出隊,poll與peek都是非阻塞的,但是差別在于poll擷取了節點之後,該節點會從連結清單隊列中移除,而peek不會移除節點。

await

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        //線程被中斷抛出異常
        throw new InterruptedException();
    //為目前線程建立一個等待模式的節點并入隊,并将等待隊列中已經取消等待的節點移除掉
    Node node = addConditionWaiter();
    //釋放目前線程的鎖,防止目前線程加了鎖,導緻其它在等待的線程被喚醒之後不能擷取到鎖進而導緻一直阻塞
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    //如果指定節點還在等待隊列中等待則挂起
    //如果指定節點被中斷了則會将指定節點添加到同步等待隊列中
    //如果指定節點被喚醒了則會将指定節點添加到同步等待隊列中
    while (!isOnSyncQueue(node)) {
        //節點在等待隊列中則挂起
        LockSupport.park(this);
        //線程在等待隊列中被中斷則會添加到同步等待隊列中
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    //acquireQueued 指定節點中的線程被中斷了或者被喚醒了則會嘗試去擷取鎖
    //如果還未到指定節點中的線程擷取鎖的時候則會繼續挂起
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        //指定節點的線程已經擷取到了鎖并且節點關聯的下一個節點不為空
        //此時就需要将已經擷取到鎖的節點從等待隊列中移除
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
複制代碼           

首先通過addConditionWaiter方法将目前線程封裝成一個等待模式的節點,并将節點添加到等待隊列中以及會将等待隊列中已經取消等待的線程節點從隊列中移除,再通過fullyRelease方法釋放掉目前線程加的所有的鎖,之是以釋放鎖是防止其它線程擷取不到鎖進而一直阻塞,再看isOnSyncQueue方法,該方法是校驗目前線程節點是否在等待隊列中,如果在等待隊列中那就将節點中的線程挂起等待。

isOnSyncQueue

final boolean isOnSyncQueue(Node node) {
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        //指定節點還在等待隊列中此時就需要繼續等待
        return false;
    if (node.next != null)
        //指定節點已經不在等待隊列中了
        return true;
    //從等待隊列中的尾節點開始向頭節點周遊,校驗指定的節點是否在其中
    return findNodeFromTail(node);
}
複制代碼           

當節點的狀态為CONDITION時,則說明該節點還在等待隊列中,node.prev等于null為什麼說也是在等待隊列中呢?因為等待隊列中的節點是沒有prev指針和next指針的,如果prev指針和next指針指向的節點不為空,那就說明該節點是在同步等待隊列中的,如果在同步等待隊列中的話,那節點中的線程就可以嘗試去擷取鎖并執行後續的操作。

當等待隊列中的線程節點被喚醒和中斷則會添加到同步等待隊列中,如果是被中斷的話則會通過checkInterruptWhileWaiting方法添加一個中斷辨別,再通過acquireQueued方法來擷取鎖,如果擷取鎖失敗則繼續等待,當擷取鎖成功之後則會該節點從等待隊列中移除,如果說你是一個被中斷的線程,最後會通過reportInterruptAfterWait方法抛出中斷異常。

signal

public final void signal() {
    if (!isHeldExclusively())
        //加鎖的線程不是目前線程則抛出異常
        throw new IllegalMonitorStateException();
    //頭節點
    Node first = firstWaiter;
    if (first != null)
        //喚醒頭節點
        doSignal(first);
}
​
/**
 * 喚醒等待隊列中的頭節點
 * 如果等待隊列中的頭節點被取消等待或已經被喚醒了
 * 此時就需要喚醒頭節點的後續的一個節點
 * 直到成功的喚醒一個節點中的線程
 */
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) && (first = firstWaiter) != null);
}
​
/**
* 将指定的節點添加到同步等待隊列中
* 并根據前一個節點的等待狀态來決定是否需要立刻喚醒指定節點
*/
final boolean transferForSignal(Node node) {
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        //更改節點狀态失敗說明該節點已經被喚醒了
        return false;
    //将要喚醒的節點添加到同步等待隊列中
    //并傳回前一個節點
    Node p = enq(node);
    //前一個節點的等待狀态
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        //如果前一個節點的等待狀态大于0則說明已經被取消加鎖,此時就需要喚醒後續的節點,就是目前節點
        //前一個節點的等待狀态不大于0但是更改前一個節點的等待狀态時失敗則說明前一個節點已經被喚醒了并更改了狀态
        //此時就需要嘗試将目前節點中的線程喚醒
        LockSupport.unpark(node.thread);
    return true;
}
複制代碼           

喚醒線程節點的方法主要還是看transferForSignal方法,首先會通過cas操作将需要喚醒的節點的狀态設定為0,如果更改節點狀态失敗則說明該節點已經被喚醒了,更新節點狀态成功則會通過enq方法将節點添加到同步等待隊列中,此時就需要根據前一個節點來決定是否需要立即喚醒目前節點中的線程。

從下面的圖檔中能看出來其實同步等待隊列和等待隊列中使用的節點是共用的節點,并不會建立新的節點,同步等待隊列中的節點使用next指針和prev指針來關聯節點,而等待隊列中則是使用nextWaiter指針來關聯節點的。

Java并發系列源碼分析(七)--LinkedBlockingQueue

繼續閱讀