天天看點

Java隊列Queue

上個星期總結了一下synchronized相關的知識,這次将Queue相關的知識總結一下,和朋友們分享。 

在Java多線程應用中,隊列的使用率很高,多數生産消費模型的首選資料結構就是隊列。Java提供的線程安全的Queue可以分為阻塞隊列和非阻塞隊列,其中阻塞隊列的典型例子是BlockingQueue,非阻塞隊列的典型例子是ConcurrentLinkedQueue,在實際應用中要根據實際需要選用阻塞隊列或者非阻塞隊列。 

注:什麼叫線程安全?這個首先要明确。線程安全的類 ,指的是類内共享的全局變量的通路必須保證是不受多線程形式影響的。如果由于多線程的通路(比如修改、周遊、檢視)而使這些變量結構被破壞或者針對這些變量操作的原子性被破壞,則這個類就不是線程安全的。 

今天就聊聊這兩種Queue,本文分為以下兩個部分,用分割線分開: 

BlockingQueue

ConcurrentLinkedQueue,非阻塞算法

首先來看看BlockingQueue: 

Queue是什麼就不需要多說了吧,一句話:隊列是先進先出。相對的,棧是後進先出。如果不熟悉的話先找本基礎的資料結構的書看看吧。 

BlockingQueue,顧名思義,“阻塞隊列”:可以提供阻塞功能的隊列。 

首先,看看BlockingQueue提供的常用方法: 

<col>

可能報異常

傳回布爾值

可能阻塞      

設定等待時間

入隊 

add(e)         

offer(e)    

put(e)

offer(e, timeout, unit)

出隊 

remove()      

poll()      

take()

poll(timeout, unit)

檢視

element()      

peek()      

從上表可以很明顯看出每個方法的作用,這個不用多說。我想說的是: 

add(e) remove() element() 方法不會阻塞線程。當不滿足限制條件時,會抛出IllegalStateException 異常。例如:當隊列被元素填滿後,再調用add(e),則會抛出異常。

offer(e) poll() peek() 方法即不會阻塞線程,也不會抛出異常。例如:當隊列被元素填滿後,再調用offer(e),則不會插入元素,函數傳回false。

要想要實作阻塞功能,需要調用put(e) take() 方法。當不滿足限制條件時,會阻塞線程。

好,上點源碼你就更明白了。以ArrayBlockingQueue類為例: 

對于第一類方法,很明顯如果操作不成功就抛異常。而且可以看到其實調用的是第二類的方法,為什麼?因為第二類方法傳回boolean啊。 

Java代碼  

Java隊列Queue

public boolean add(E e) {  

     if (offer(e))  

         return true;  

     else  

         throw new IllegalStateException("Queue full");//隊列已滿,抛異常  

}  

public E remove() {  

    E x = poll();  

    if (x != null)  

        return x;  

    else  

        throw new NoSuchElementException();//隊列為空,抛異常  

對于第二類方法,很标準的ReentrantLock使用方式(不熟悉的朋友看一下我上一篇文章吧​​http://hellosure.iteye.com/blog/1121157​​),另外對于insert和extract的實作沒啥好說的。 

注:先不看阻塞與否,這ReentrantLock的使用方式就能說明這個類是線程安全類。 

Java隊列Queue

public boolean offer(E e) {  

        if (e == null) throw new NullPointerException();  

        final ReentrantLock lock = this.lock;  

        lock.lock();  

        try {  

            if (count == items.length)//隊列已滿,傳回false  

                return false;  

            else {  

                insert(e);//insert方法中發出了notEmpty.signal();  

                return true;  

            }  

        } finally {  

            lock.unlock();  

        }  

    }  

public E poll() {  

            if (count == 0)//隊列為空,傳回false  

                return null;  

            E x = extract();//extract方法中發出了notFull.signal();  

            return x;  

對于第三類方法,這裡面涉及到Condition類,簡要提一下, 

await方法指:造成目前線程在接到信号或被中斷之前一直處于等待狀态。 

signal方法指:喚醒一個等待線程。 

Java隊列Queue

public void put(E e) throws InterruptedException {  

        final E[] items = this.items;  

        lock.lockInterruptibly();  

            try {  

                while (count == items.length)//如果隊列已滿,等待notFull這個條件,這時目前線程被阻塞  

                    notFull.await();  

            } catch (InterruptedException ie) {  

                notFull.signal(); //喚醒受notFull阻塞的目前線程  

                throw ie;  

            insert(e);  

public E take() throws InterruptedException {  

                while (count == 0)//如果隊列為空,等待notEmpty這個條件,這時目前線程被阻塞  

                    notEmpty.await();  

                notEmpty.signal();//喚醒受notEmpty阻塞的目前線程  

            E x = extract();  

第四類方法就是指在有必要時等待指定時間,就不詳細說了。 

再來看看BlockingQueue接口的具體實作類吧: 

ArrayBlockingQueue,其構造函數必須帶一個int參數來指明其大小

LinkedBlockingQueue,若其構造函數帶一個規定大小的參數,生成的BlockingQueue有大小限制,若不帶大小參數,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定

PriorityBlockingQueue,其所含對象的排序不是FIFO,而是依據對象的自然排序順序或者是構造函數的Comparator決定的順序

上面是用ArrayBlockingQueue舉得例子,下面看看LinkedBlockingQueue: 

首先,既然是連結清單,就應該有Node節點,它是一個内部靜态類: 

Java隊列Queue

static class Node&lt;E&gt; {    

        /** The item, volatile to ensure barrier separating write and read */    

        volatile E item;    

        Node&lt;E&gt; next;    

        Node(E x) { item = x; }    

    }    

然後,對于連結清單來說,肯定需要兩個變量來标示頭和尾: 

Java隊列Queue

/** 頭指針 */    

private transient Node&lt;E&gt; head;  //head.next是隊列的頭元素  

/** 尾指針 */    

private transient Node&lt;E&gt; last;  //last.next是null  

那麼,對于入隊和出隊就很自然能了解了: 

Java隊列Queue

private void enqueue(E x) {    

    last = last.next = new Node&lt;E&gt;(x);  //入隊是為last再找個下家  

}    

private E dequeue() {    

    Node&lt;E&gt; first = head.next;  //出隊是把head.next取出來,然後将head向後移一位  

    head = first;    

    E x = first.item;    

    first.item = null;    

    return x;    

另外,LinkedBlockingQueue相對于ArrayBlockingQueue還有不同是,有兩個ReentrantLock,且隊列現有元素的大小由一個AtomicInteger對象标示。 

注:AtomicInteger類是以原子的方式操作整型變量。 

Java隊列Queue

private final AtomicInteger count = new AtomicInteger(0);   

/** 用于讀取的獨占鎖*/    

private final ReentrantLock takeLock = new ReentrantLock();    

/** 隊列是否為空的條件 */    

private final Condition notEmpty = takeLock.newCondition();    

/** 用于寫入的獨占鎖 */    

private final ReentrantLock putLock = new ReentrantLock();    

/** 隊列是否已滿的條件 */    

private final Condition notFull = putLock.newCondition();  

有兩個Condition很好了解,在ArrayBlockingQueue也是這樣做的。但是為什麼需要兩個ReentrantLock呢?下面會慢慢道來。 

讓我們來看看offer和poll方法的代碼: 

Java隊列Queue

 public boolean offer(E e) {  

     if (e == null) throw new NullPointerException();  

     final AtomicInteger count = this.count;  

     if (count.get() == capacity)  

         return false;  

     int c = -1;  

     final ReentrantLock putLock = this.putLock;//入隊當然用putLock   

     putLock.lock();  

     try {  

         if (count.get() &lt; capacity) {  

             enqueue(e); //入隊  

             c = count.getAndIncrement(); //隊長度+1  

             if (c + 1 &lt; capacity)  

                 notFull.signal(); //隊列沒滿,當然可以解鎖了  

         }  

     } finally {  

         putLock.unlock();  

     }  

     if (c == 0)  

         signalNotEmpty();//這個方法裡發出了notEmpty.signal();  

     return c &gt;= 0;  

 }  

     if (count.get() == 0)  

         return null;  

     E x = null;  

     final ReentrantLock takeLock = this.takeLock;出隊當然用takeLock   

     takeLock.lock();  

         if (count.get() &gt; 0) {  

             x = dequeue();//出隊  

             c = count.getAndDecrement();//隊長度-1  

             if (c &gt; 1)  

                 notEmpty.signal();//隊列沒空,解鎖  

         takeLock.unlock();  

     if (c == capacity)  

         signalNotFull();//這個方法裡發出了notFull.signal();  

     return x;  

看看源代碼發現和上面ArrayBlockingQueue的很類似,關鍵的問題在于:為什麼要用兩個ReentrantLockputLock和takeLock? 

我們仔細想一下,入隊操作其實操作的隻有隊尾引用last,并且沒有牽涉到head。而出隊操作其實隻針對head,和last沒有關系。那麼就是說入隊和出隊的操作完全不需要公用一把鎖,是以就設計了兩個鎖,這樣就實作了多個不同任務的線程入隊的同時可以進行出隊的操作,另一方面由于兩個操作所共同使用的count是AtomicInteger類型的,是以完全不用考慮計數器遞增遞減的問題。 

另外,還有一點需要說明一下:await()和singal()這兩個方法執行時都會檢查目前線程是否是獨占鎖的目前線程,如果不是則抛出java.lang.IllegalMonitorStateException異常。是以可以看到在源碼中這兩個方法都出現在Lock的保護塊中。 

-------------------------------我是分割線-------------------------------------- 

下面再來說說ConcurrentLinkedQueue,它是一個無鎖的并發線程安全的隊列。 

以下部分的内容參照了這個文章​​http://yanxuxin.iteye.com/blog/586943​​ 

對比鎖機制的實作,使用無鎖機制的難點在于要充分考慮線程間的協調。簡單的說就是多個線程對内部資料結構進行通路時,如果其中一個線程執行的中途因為一些原因出現故障,其他的線程能夠檢測并幫助完成剩下的操作。這就需要把對資料結構的操作過程精細的劃分成多個狀态或階段,考慮每個階段或狀态多線程通路會出現的情況。 

ConcurrentLinkedQueue有兩個volatile的線程共享變量:head,tail。要保證這個隊列的線程安全就是保證對這兩個Node的引用的通路(更新,檢視)的原子性和可見性,由于volatile本身能夠保證可見性,是以就是對其修改的原子性要被保證。 

下面通過offer方法的實作來看看在無鎖情況下如何保證原子性: 

Java隊列Queue

public boolean offer(E e) {    

    if (e == null) throw new NullPointerException();    

    Node&lt;E&gt; n = new Node&lt;E&gt;(e, null);    

    for (;;) {    

        Node&lt;E&gt; t = tail;    

        Node&lt;E&gt; s = t.getNext();    

        if (t == tail) { //------------------------------a    

            if (s == null) { //---------------------------b    

                if (t.casNext(s, n)) { //-------------------c    

                    casTail(t, n); //------------------------d    

                    return true;    

                }    

            } else {    

                casTail(t, s); //----------------------------e    

            }    

        }    

此方法的循環内首先獲得尾指針和其next指向的對象,由于tail和Node的next均是volatile的,是以保證了獲得的分别都是最新的值。 

    代碼a:t==tail是最上層的協調,如果其他線程改變了tail的引用,則說明現在獲得不是最新的尾指針需要重新循環獲得最新的值。 

    代碼b:s==null的判斷。靜止狀态下tail的next一定是指向null的,但是多線程下的另一個狀态就是中間态:tail的指向沒有改變,但是其next已經指向新的結點,即完成tail引用改變前的狀态,這時候s!=null。這裡就是協調的典型應用,直接進入代碼e去協調參與中間态的線程去完成最後的更新,然後重新循環獲得新的tail開始自己的新一次的入隊嘗試。另外值得注意的是a,b之間,其他的線程可能會改變tail的指向,使得協調的操作失敗。從這個步驟可以看到無鎖實作的複雜性。 

    代碼c:t.casNext(s, n)是入隊的第一步,因為入隊需要兩步:更新Node的next,改變tail的指向。代碼c之前可能發生tail引用指向的改變或者進入更新的中間态,這兩種情況均會使得t指向的元素的next屬性被原子的改變,不再指向null。這時代碼c操作失敗,重新進入循環。 

    代碼d:這是完成更新的最後一步了,就是更新tail的指向,最有意思的協調在這兒又有了展現。從代碼看casTail(t, n)不管是否成功都會接着傳回true标志着更新的成功。首先如果成功則表明本線程完成了兩步的更新,傳回true是理所當然的;如果 casTail(t, n)不成功呢?要清楚的是完成代碼c則代表着更新進入了中間态,代碼d不成功則是tail的指向被其他線程改變。意味着對于其他的線程而言:它們得到的是中間态的更新,s!=null,進入代碼e幫助本線程執行最後一步并且先于本線程成功。這樣本線程雖然代碼d失敗了,但是是由于别的線程的協助先完成了,是以傳回true也就理所當然了。 

    通過分析這個入隊的操作,可以清晰的看到無鎖實作的每個步驟和狀态下多線程之間的協調和工作。 

注:上面這大段文字看起來很累,先能看懂多少看懂多少,現在看不懂先不急,下面還會提到這個算法,并且用示意圖說明,就易懂很多了。 

在使用ConcurrentLinkedQueue時要注意,如果直接使用它提供的函數,比如add或者poll方法,這樣我們自己不需要做任何同步。 

但如果是非原子操作,比如: 

Java隊列Queue

if(!queue.isEmpty()) {    

   queue.poll(obj);    

我們很難保證,在調用了isEmpty()之後,poll()之前,這個queue沒有被其他線程修改。是以對于這種情況,我們還是需要自己同步: 

Java隊列Queue

synchronized(queue) {    

    if(!queue.isEmpty()) {    

       queue.poll(obj);    

注:這種需要進行自己同步的情況要視情況而定,不是任何情況下都需要這樣做。 

另外還說一下,ConcurrentLinkedQueue的size()是要周遊一遍集合的,是以盡量要避免用size而改用isEmpty(),以免性能過慢。 

好,最後想說點什麼呢,阻塞算法其實很好了解,簡單點了解就是加鎖,比如在BlockingQueue中看到的那樣,再往前推點,那就是synchronized。相比而言,非阻塞算法的設計和實作都很困難,要通過低級的原子性來支援并發。下面就簡要的介紹一下非阻塞算法,以下部分的内容參照了一篇很經典的文章​​http://www.ibm.com/developerworks/cn/java/j-jtp04186/​​ 

注:我覺得可以這樣了解,阻塞對應同步,非阻塞對應并發。也可以說:同步是阻塞模式,異步是非阻塞模式 

舉個例子來說明什麼是非阻塞算法:非阻塞的計數器 

首先,使用同步的線程安全的計數器代碼如下 

Java隊列Queue

public final class Counter {  

    private long value = 0;  

    public synchronized long getValue() {  

        return value;  

    public synchronized long increment() {  

        return ++value;  

下面的代碼顯示了一種最簡單的非阻塞算法:使用 AtomicInteger的compareAndSet()(CAS方法)的計數器。compareAndSet()方法規定“将這個變量更新為新值,但是如果從我上次看到這個變量之後其他線程修改了它的值,那麼更新就失敗” 

Java隊列Queue

public class NonblockingCounter {  

    private AtomicInteger value;//前面提到過,AtomicInteger類是以原子的方式操作整型變量。  

    public int getValue() {  

        return value.get();  

    public int increment() {  

        int v;  

        do {  

            v = value.get();  

        while (!value.compareAndSet(v, v + 1));  

        return v + 1;  

非阻塞版本相對于基于鎖的版本有幾個性能優勢。首先,它用硬體的原生形态代替 JVM 的鎖定代碼路徑,進而在更細的粒度層次上(獨立的記憶體位置)進行同步,失敗的線程也可以立即重試,而不會被挂起後重新排程。更細的粒度降低了争用的機會,不用重新排程就能重試的能力也降低了争用的成本。即使有少量失敗的 CAS 操作,這種方法仍然會比由于鎖争用造成的重新排程快得多。 

NonblockingCounter 這個示例可能簡單了些,但是它示範了所有非阻塞算法的一個基本特征——有些算法步驟的執行是要冒險的,因為知道如果 CAS 不成功可能不得不重做。非阻塞算法通常叫作樂觀算法,因為它們繼續操作的假設是不會有幹擾。如果發現幹擾,就會回退并重試。在計數器的示例中,冒險的步驟是遞增——它檢索舊值并在舊值上加一,希望在計算更新期間值不會變化。如果它的希望落空,就會再次檢索值,并重做遞增計算。 

再來一個例子,Michael-Scott 非阻塞隊列算法的插入操作,ConcurrentLinkedQueue 就是用這個算法實作的,現在來結合示意圖分析一下,很明朗: 

Java隊列Queue

public class LinkedQueue &lt;E&gt; {  

    private static class Node &lt;E&gt; {  

        final E item;  

        final AtomicReference&lt;Node&lt;E&gt;&gt; next;  

        Node(E item, Node&lt;E&gt; next) {  

            this.item = item;  

            this.next = new AtomicReference&lt;Node&lt;E&gt;&gt;(next);  

    private AtomicReference&lt;Node&lt;E&gt;&gt; head  

        = new AtomicReference&lt;Node&lt;E&gt;&gt;(new Node&lt;E&gt;(null, null));  

    private AtomicReference&lt;Node&lt;E&gt;&gt; tail = head;  

    public boolean put(E item) {  

        Node&lt;E&gt; newNode = new Node&lt;E&gt;(item, null);  

        while (true) {  

            Node&lt;E&gt; curTail = tail.get();  

            Node&lt;E&gt; residue = curTail.next.get();  

            if (curTail == tail.get()) {  

                if (residue == null) /* A */ {  

                    if (curTail.next.compareAndSet(null, newNode)) /* C */ {  

                        tail.compareAndSet(curTail, newNode) /* D */ ;  

                        return true;  

                    }  

                } else {  

                    tail.compareAndSet(curTail, residue) /* B */;  

                }  

看看這代碼完全就是ConcurrentLinkedQueue 源碼啊。 

插入一個元素涉及頭指針和尾指針兩個指針更新,這兩個更新都是通過 CAS 進行的:從隊列目前的最後節點(C)連結到新節點,并把尾指針移動到新的最後一個節點(D)。如果第一步失敗,那麼隊列的狀态不變,插入線程會繼續重試,直到成功。一旦操作成功,插入被當成生效,其他線程就可以看到修改。還需要把尾指針移動到新節點的位置上,但是這項工作可以看成是 “清理工作”,因為任何處在這種情況下的線程都可以判斷出是否需要這種清理,也知道如何進行清理。 

隊列總是處于兩種狀态之一:正常狀态(或稱靜止狀态,圖 1 和 圖 3)或中間狀态(圖 2)。在插入操作之前和第二個 CAS(D)成功之後,隊列處在靜止狀态;在第一個 CAS(C)成功之後,隊列處在中間狀态。在靜止狀态時,尾指針指向的連結節點的 next 字段總為 null,而在中間狀态時,這個字段為非 null。任何線程通過比較 tail.next 是否為 null,就可以判斷出隊列的狀态,這是讓線程可以幫助其他線程 “完成” 操作的關鍵。 

Java隊列Queue

上圖顯示的是:有兩個元素,處在靜止狀态的隊列 

插入操作在插入新元素(A)之前,先檢查隊列是否處在中間狀态。如果是在中間狀态,那麼肯定有其他線程已經處在元素插入的中途,在步驟(C)和(D)之間。不必等候其他線程完成,目前線程就可以 “幫助” 它完成操作,把尾指針向前移動(B)。如果有必要,它還會繼續檢查尾指針并向前移動指針,直到隊列處于靜止狀态,這時它就可以開始自己的插入了。 

第一個 CAS(C)可能因為兩個線程競争通路隊列目前的最後一個元素而失敗;在這種情況下,沒有發生修改,失去 CAS 的線程會重新裝入尾指針并再次嘗試。如果第二個 CAS(D)失敗,插入線程不需要重試 —— 因為其他線程已經在步驟(B)中替它完成了這個操作! 

上圖顯示的是:處在插入中間狀态的隊列,在新元素插入之後,尾指針更新之前 

Java隊列Queue

上圖顯示的是:在尾指針更新後,隊列重新處在靜止狀态

參考連結:http://hellosure.iteye.com/blog/1126541

繼續閱讀