天天看點

Java并發之AQS原理淺析

 鎖是最常用的同步方法之一,在高并發的環境下激烈的鎖競争會導緻程式的性能下降,是以我們自然有必要深入的學習一下鎖的相關知識。 

      在介紹Lock之前,我們需要先熟悉一個非常重要的基礎元件,JUC包下的核心基礎元件。也是實作大部分同步需求的基礎。學習該元件是學習JUC繞不開的一塊内容。該元件就是AQS。

AQS簡介

  • AQS:AbstractQueuedSynchronizer,即隊列同步器。它是建構鎖或者其他同步元件的基礎架構(如ReentrantLock、ReentrantReadWriteLock、Semaphore等)。
  • AQS解決了子類實作同步器時涉及當的大量細節問題,例如擷取同步狀态、FIFO同步隊列。基于AQS來建構同步器可以帶來很多好處。自定義同步器在實作時隻需要實作共享資源state的擷取與釋放方式即可,至于具體線程等待隊列的維護(如擷取資源失敗入隊/喚醒出隊等),AQS已經在頂層實作好了,是以使用AQS不僅能夠極大地減少實作工作,而且也不必處理在多個位置上發生的競争問題。
  • 在基于AQS建構的同步器中,隻能在一個時刻發生阻塞,進而降低上下文切換的開銷,提高了吞吐量。同時在設計AQS時充分考慮了可伸縮行,是以J.U.C中所有基于AQS建構的同步器均可以獲得這個優勢。
  • AQS的主要使用方式是繼承,子類通過繼承同步器并實作它的抽象方法來管理同步狀态。
  • AQS使用一個int類型的成員變量state來表示同步狀态,當state>0時表示已經擷取了鎖,當state = 0時表示釋放了鎖。它提供了三個方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))來對同步狀态state進行操作,當然AQS可以確定對state的操作是安全的。
  • AQS通過内置的FIFO同步隊列來完成資源擷取線程的排隊工作,如果目前線程擷取同步狀态失敗(鎖)時,AQS則會将目前線程以及等待狀态等資訊構造成一個節點(Node)并将其加入同步隊列,同時會阻塞目前線程,當同步狀态釋放時,則會把節點中的線程喚醒,使其再次嘗試擷取同步狀态。

AQS常用方法

    關于state的方法主要有一下三種

  • getState():傳回同步狀态的目前值;
  • setState(int newState):設定目前同步狀态;
  • compareAndSetState(int expect, int update):使用CAS設定目前狀态,該方法能夠保證狀态設定的原子性;

   自定義同步器實作時主要實作以下幾種方法

  • tryAcquire(int arg):獨占式擷取同步狀态,擷取同步狀态成功後,其他線程需要等待該線程釋放同步狀态才能擷取同步狀态
  • tryRelease(int arg):獨占式釋放同步狀态;
  • tryAcquireShared(int arg):共享式擷取同步狀态,傳回值大于等于0則表示擷取成功,否則擷取失敗;
  • tryReleaseShared(int arg):共享式釋放同步狀态;
  • isHeldExclusively():目前同步器是否在獨占式模式下被線程占用,一般該方法表示是否被目前線程所獨占;

其餘方法

  • acquire(int arg):獨占式擷取同步狀态,如果目前線程擷取同步狀态成功,則由該方法傳回,否則,将會進入同步隊列等待,該方法将會調用可重寫的tryAcquire(int arg)方法;
  • acquireInterruptibly(int arg):與acquire(int arg)相同,但是該方法響應中斷,目前線程為擷取到同步狀态而進入到同步隊列中,如果目前線程被中斷,則該方法會抛出InterruptedException異常并傳回;
  • tryAcquireNanos(int arg,long nanos):逾時擷取同步狀态,如果目前線程在nanos時間内沒有擷取到同步狀态,那麼将會傳回false,已經擷取則傳回true;
  • acquireShared(int arg):共享式擷取同步狀态,如果目前線程未擷取到同步狀态,将會進入同步隊列等待,與獨占式的主要差別是在同一時刻可以有多個線程擷取到同步狀态;
  • acquireSharedInterruptibly(int arg):共享式擷取同步狀态,響應中斷;
  • tryAcquireSharedNanos(int arg, long nanosTimeout):共享式擷取同步狀态,增加逾時限制;
  • release(int arg):獨占式釋放同步狀态,該方法會在釋放同步狀态之後,将同步隊列中第一個節點包含的線程喚醒;
  • releaseShared(int arg):共享式釋放同步狀态;

CLH

       CLH同步隊列是一個FIFO雙向隊列,AQS依賴它來完成同步狀态的管理,目前線程如果擷取同步狀态失敗時,AQS則會将目前線程已經等待狀态等資訊構造成一個節點(Node)并将其加入到CLH同步隊列,同時會阻塞目前線程,當同步狀态釋放時,會把首節點喚醒(公平鎖),使其再次嘗試擷取同步狀态。

在CLH同步隊列中,一個節點表示一個線程,它儲存着線程的引用(thread)、狀态(waitStatus)、前驅節點(prev)、後繼節點(next),其資料結構如下

Java并發之AQS原理淺析

   其實就是個雙端雙向連結清單。

   資料定義如下

Java并發之AQS原理淺析
static final class Node {
    /** 共享 */
    static final Node SHARED = new Node();
    /** 獨占 */
    static final Node EXCLUSIVE = null;
    /**
     * 因為逾時或者中斷,節點會被設定為取消狀态,被取消的節點時不會參與到競争中的,他會一直保持取消狀态不會轉變為其他狀态;
     */
    static final int CANCELLED =  1;
    /**
     * 後繼節點的線程處于等待狀态,而目前節點的線程如果釋放了同步狀态或者被取消,将會通知後繼節點,使後繼節點的線程得以運作
     */
    static final int SIGNAL    = -1;
    /**
     * 節點在等待隊列中,節點線程等待在Condition上,當其他線程對Condition調用了signal()後,改節點将會從等待隊列中轉移到同步隊列中,加入到同步狀态的擷取中
     */
    static final int CONDITION = -2;
    /**
     * 表示下一次共享式同步狀态擷取将會無條件地傳播下去
     */
    static final int PROPAGATE = -3;
    /** 等待狀态 */
    volatile int waitStatus;
    /** 前驅節點 */
    volatile Node prev;
    /** 後繼節點 */
    volatile Node next;
    /** 擷取同步狀态的線程 */
    volatile Thread thread;
    Node nextWaiter;
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
    Node() {
    }
    Node(Thread thread, Node mode) {
        this.nextWaiter = mode;
        this.thread = thread;
    }
    Node(Thread thread, int waitStatus) {
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}      
Java并發之AQS原理淺析

    可以看到AQS支援兩種同步模式,分别是Exclusive(獨占,隻有一個線程能執行,如ReentrantLock)和Share(共享,多個線程可同時執行,如Semaphore/CountDownLatch)。這樣友善使用者實作不同類型的同步元件。簡而言之,AQS為使用者提供了多樣的底層支撐,具體如何組裝實作,使用者可以自由發揮。

   入列

   CLH這種連結清單式結構入列,無非就是tail指向新節點、新節點的前驅節點指向目前最後的節點,目前最後一個節點的next指向目前節點,直接看源碼相關操作在addWaiter(Node node)方法裡。此方法用于将目前線程加入到等待隊列的隊尾,并傳回目前線程所在的結點  

Java并發之AQS原理淺析
private Node addWaiter(Node mode) {
        //根據給定的模式(獨占或者共享)建立Node
        Node node = new Node(Thread.currentThread(), mode);
        //快速嘗試添加尾節點
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            //CAS設定尾節點
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //多次嘗試
        enq(node);
        return node;
    }      
Java并發之AQS原理淺析

    addWaiter(Node node)先通過快速嘗試設定尾節點,如果失敗,則調用enq(Node node)方法設定尾節點

Java并發之AQS原理淺析
private Node enq(final Node node) {
        //多次嘗試,直到成功為止
        for (;;) {
            Node t = tail;
            //tail不存在,設定為首節點
            if (t == null) {
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                //設定為尾節點
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }      
Java并發之AQS原理淺析

        此方法用于将node加入隊尾,該方法核心就是通過CAS自旋的方式來設定尾節點,知道獲得預期的結果即添加節點成功,目前線程才會傳回。(這種方式很經典AtomicInteger.getAndIncrement()方法也是這樣做的)

     出列

     CLH同步隊列遵循FIFO(先進先出),首節點的線程釋放同步狀态後,将會喚醒它的後繼節點(next),而後繼節點将會在擷取同步狀态成功時将自己設定為首節點,這個過程非常簡單,head執行該節點并斷開原首節點的next和目前節點的prev即可,注意在這個過程是不需要使用CAS來保證的,因為隻有一個線程能夠成功擷取到同步狀态。

同步狀态的擷取與釋放

       AQS的設計模式采用的模闆方法模式,子類通過繼承的方式,實作它的抽象方法來管理同步狀态,對于子類而言它并沒有太多的活要做,AQS提供了大量的模闆方法來實作同步,主要是分為三類:獨占式擷取和釋放同步狀态、共享式擷取和釋放同步狀态、查詢同步隊列中的等待線程情況。自定義子類使用AQS提供的模闆方法就可以實作自己的同步語義。

    獨占式同步狀态擷取

    此方法是獨占模式下線程擷取共享資源的頂層入口。如果擷取到資源,線程直接傳回,否則進入等待隊列,直到擷取到資源為止,且整個過程忽略中斷的影響。這也正是lock()的語義,當然不僅僅隻限于lock()。也就是說由于線程擷取同步狀态失敗加入到CLH同步隊列中,後續對線程進行中斷操作時,線程不會從同步隊列中移除擷取到資源後。下面是acquire()的源碼:

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }      
  • tryAcquire:去嘗試擷取鎖,擷取成功則設定鎖狀态并傳回true,否則傳回false。該方法由自定義同步元件自己實作(通過state的get/set/CAS),該方法必須要保證線程安全的擷取同步狀态。
  • addWaiter:如果tryAcquire傳回FALSE(擷取同步狀态失敗),則調用該方法将目前線程加入到CLH同步隊列尾部,并标記為獨占模式。
  • acquireQueued:目前線程會根據公平性原則來進行阻塞等待(自旋),直到擷取鎖為止;如果在整個等待過程中被中斷過,則傳回true,否則傳回false。
  • selfInterrupt:如果線程在等待過程中被中斷過,它是不響應的。隻是擷取資源後才再進行自我中斷selfInterrupt(),将中斷補上。

 tryAcquire(int)

protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }      

      該方法直接抛出異常,具體實作交自定義同步器類實作。這裡之是以沒有定義成abstract,是因為獨占模式下隻用實作tryAcquire-tryRelease,而共享模式下隻用實作tryAcquireShared-tryReleaseShared。如果都定義成abstract,那麼每個模式也要去實作另一模式下的接口。

 acquireQueued

   在執行到此方法時已經說明一點:該線程擷取資源失敗,已經被放入等待隊列尾部了。是以 acquireQueued方法就是讓線程進入等待狀态休息,直到其他線程徹底釋放資源後喚醒該線程,擷取所需資源,然後執行該線程所需執行的任務。

   acquireQueued方法為一個自旋的過程,也就是說目前線程(Node)進入同步隊列後,就會進入一個自旋的過程,每個節點都會自我觀察,當條件滿足,擷取到同步狀态後,就可以從這個自旋過程中退出,否則會一直執行下去。

Java并發之AQS原理淺析
final boolean acquireQueued(final Node node, int arg) {
       /* 标記是否成功拿到資源 */
       boolean failed = true;
        try {
            /* 中斷标志*/
            boolean interrupted = false;
            /*  自旋,一個死循環 */
            for (;;) {
                /* 擷取前線程的前驅節點*/
                final Node p = node.predecessor();
                /*目前線程的前驅節點是頭結點,即該節點是第二個節點,且同步狀态成功*/
                if (p == head && tryAcquire(arg)) {
                    /*将head指向該節點*/
                    setHead(node);
                   /* 友善GC回收垃圾 */
                    p.next = null; 
                    failed = false;
                   /*傳回等待過程中是否被中斷過*/
                    return interrupted;
                }
                /*擷取失敗,線程就進入waiting狀态,直到被unpark()*/
                if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())
                    /*如果等待過程中被中斷過一次,就标記為true*/
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }      
Java并發之AQS原理淺析

       從上面代碼中可以看到,目前線程會一直嘗試擷取同步狀态,當然前提是隻有其前驅節點為頭結點才能夠嘗試擷取同步狀态,理由:

  • 保持FIFO同步隊列原則。
  • 頭節點釋放同步狀态後,将會喚醒其後繼節點,後繼節點被喚醒後需要檢查自己是否為頭節點。

shouldParkAfterFailedAcquire(Node, Node)

    此方法主要用于檢查狀态,檢視目前節點是否進入waiting狀态

Java并發之AQS原理淺析
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;//拿到前驅節點的狀态
    if (ws == Node.SIGNAL)
        //狀态為SIGNAL,如果前驅節點處于等待狀态,直接傳回true
        return true;
    if (ws > 0) {
        /*
         * 如果前驅節點放棄了,那就一直往前找,直到找到最近一個正常等待的狀态,并排在它的後邊。
         * 注意:那些放棄的結點,由于被自己“加塞”到它們前邊,它們相當于形成一個無引用鍊,稍後就會被GC回收
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
         //如果前驅節點正常,那就把前驅的狀态通過CAS的方式設定成SIGNAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}      
Java并發之AQS原理淺析

 這段代碼主要檢查目前線程是否需要被阻塞,具體規則如下:

  1. 如果目前線程的前驅節點狀态為SINNAL,則表明目前線程需要被阻塞,調用unpark()方法喚醒,直接傳回true,目前線程阻塞
  2. 如果目前線程的前驅節點狀态為CANCELLED(ws > 0),則表明該線程的前驅節點已經等待逾時或者被中斷了,則需要從CLH隊列中将該前驅節點删除掉,直到回溯到前驅節點狀态 <= 0 ,傳回false
  3. 如果前驅節點非SINNAL,非CANCELLED,則通過CAS的方式将其前驅節點設定為SINNAL,傳回false

      整個流程中,如果前驅結點的狀态不是SIGNAL,那麼自己就不能被阻塞,需要去找個安心的休息點(前驅節點狀态 <= 0 ),同時可以再嘗試下看有沒有機會去擷取資源。

     如果 shouldParkAfterFailedAcquire(Node pred, Node node) 方法傳回true,則調用parkAndCheckInterrupt()方法阻塞目前線程: 

Java并發之AQS原理淺析
private final boolean parkAndCheckInterrupt() {
        //調用park()使線程進入waiting狀态
          LockSupport.park(this); 
          //如果被喚醒,檢視自己是不是被中斷的
          return Thread.interrupted();
 }      
Java并發之AQS原理淺析

    parkAndCheckInterrupt() 方法主要是把目前線程挂起,進而阻塞住線程的調用棧,同時傳回目前線程的中斷狀态。

深入了解:

談到并發,不得不談

ReentrantLock

;而談到

ReentrantLock

,不得不談AbstractQueuedSynchronized(AQS)!,類如其名,抽象的隊列式的同步器,AQS定義了一套多線程通路共享資源的同步器架構,許多同步類實作都依賴于它,如常用的ReentrantLock/Semaphore/CountDownLatch...。我們以ReentrantLock作為講解切入點。

1. ReentrantLock的調用過程

ReentrantLock把所有Lock接口的操作都委派到一個Sync類上,該類繼承了AbstractQueuedSynchronizer:

static abstract class Sync extends AbstractQueuedSynchronizer  
           

Sync又有兩個子類:

final static class NonfairSync extends Sync  
final static class FairSync extends Sync  
           

顯然是為了支援公平鎖和非公平鎖而定義,預設情況下為非公平鎖。

先理一下Reentrant.lock()方法的調用過程(預設非公平鎖):

Java并發之AQS原理淺析

0_13119022769n5R.gif

2. 鎖實作(加鎖)

簡單說來,AbstractQueuedSynchronizer會把所有的請求線程構成一個CLH隊列,當一個線程執行完畢(lock.unlock())時會激活自己的後繼節點,但正在執行的線程并不在隊列中,而那些等待執行的線程全部處于阻塞狀态.

線程的顯式阻塞是通過調用LockSupport.park()完成,而LockSupport.park()則調用sun.misc.Unsafe.park()本地方法,再進一步,HotSpot在Linux中中通過調用pthread_mutex_lock函數把線程交給系統核心進行阻塞。該隊列如圖:

Java并發之AQS原理淺析

0_1311847049xnXb.gif

與synchronized相同的是,這也是一個虛拟隊列,不存在隊列執行個體,僅存在節點之間的前後關系。令人疑惑的是為什麼采用CLH隊列呢?原生的CLH隊列是用于自旋鎖,但Doug Lea把其改造為阻塞鎖。

當有線程競争鎖時,該線程會首先嘗試獲得鎖,這對于那些已經在隊列中排隊的線程來說顯得不公平,這是非公平鎖的由來之一,與synchronized實作類似,這樣會極大提高吞吐量。

如果已經存在Running線程,則新的競争線程會被追加到隊尾,具體是采用基于CAS的Lock-Free算法,因為線程并發對Tail調用CAS可能會導緻其他線程CAS失敗,解決辦法是循環CAS直至成功。

AbstractQueuedSynchronizer的實作非常精巧,令人歎為觀止,不入細節難以完全領會其精髓,下面詳細說明實作過程:

2.1 Sync.nonfairTryAcquire

nonfairTryAcquire方法将是lock方法間接調用的第一個方法,每次請求鎖時都會首先調用該方法。

final boolean nonfairTryAcquire(int acquires) {  
    final Thread current = Thread.currentThread();  
    int c = getState();  
    if (c == 0) {  
        if (compareAndSetState(0, acquires)) {  
            setExclusiveOwnerThread(current);  
            return true;  
        }  
    }  
    else if (current == getExclusiveOwnerThread()) {  
        int nextc = c + acquires;  
        if (nextc < 0) // overflow  
            throw new Error("Maximum lock count exceeded");  
        setState(nextc);  
        return true;  
    }  
    return false;  
}  
           

1.該方法會首先判斷目前狀态,如果c==0說明沒有線程正在競争該鎖,如果不c !=0 說明有線程正擁有了該鎖。

2.如果發現c==0,則通過CAS設定該狀态值為acquires,acquires的初始調用值為1,每次線程重入該鎖都會+1,每次unlock都會-1,但為0時釋放鎖。如果CAS設定成功,則可以預計其他任何線程調用CAS都不會再成功,也就認為目前線程得到了該鎖,也作為Running線程,很顯然這個Running線程并未進入等待隊列。

3.如果c !=0 但發現自己已經擁有鎖,隻是簡單地++acquires,并修改status值,但因為沒有競争,是以通過setStatus修改,而非CAS,也就是說這段代碼實作了偏向鎖的功能,并且實作的非常漂亮。

2.2 AbstractQueuedSynchronizer.addWaiter

addWaiter方法負責把目前無法獲得鎖的線程包裝為一個Node添加到隊尾:

private Node addWaiter(Node mode) {  
    Node node = new Node(Thread.currentThread(), mode);  
    // Try the fast path of enq; backup to full enq on failure  
    Node pred = tail;  
    if (pred != null) {  
        node.prev = pred;  
        if (compareAndSetTail(pred, node)) {  
            pred.next = node;  
            return node;  
        }  
    }  
    enq(node);  
    return node;  
}  
           

其中參數mode是獨占鎖還是共享鎖,預設為null,獨占鎖。追加到隊尾的動作分兩步:

1.如果目前隊尾已經存在(tail!=null),則使用CAS把目前線程更新為Tail

2.如果目前Tail為null或則線程調用CAS設定隊尾失敗,則通過enq方法繼續設定Tail

下面是enq方法:

private Node enq(final Node node) {  
    for (;;) {  
        Node t = tail;  
        if (t == null) { // Must initialize  
            Node h = new Node(); // Dummy header  
            h.next = node;  
            node.prev = h;  
            if (compareAndSetHead(h)) {  
                tail = node;  
                return h;  
            }  
        }  
        else {  
            node.prev = t;  
            if (compareAndSetTail(t, node)) {  
                t.next = node;  
                return t;  
            }  
        }  
    }  
}  
           

該方法就是循環調用CAS,即使有高并發的場景,無限循環将會最終成功把目前線程追加到隊尾(或設定隊頭)。總而言之,addWaiter的目的就是通過CAS把目前現在追加到隊尾,并傳回包裝後的Node執行個體。

把線程要包裝為Node對象的主要原因,除了用Node構造供虛拟隊列外,還用Node包裝了各種線程狀态,這些狀态被精心設計為一些數字值:

  • SIGNAL(-1) :線程的後繼線程正/已被阻塞,當該線程release或cancel時要重新這個後繼線程(unpark)
  • CANCELLED(1):因為逾時或中斷,該線程已經被取消
  • CONDITION(-2):表明該線程被處于條件隊列,就是因為調用了>- Condition.await而被阻塞
  • PROPAGATE(-3):傳播共享鎖
  • 0:0代表無狀态

2.3 AbstractQueuedSynchronizer.acquireQueued

acquireQueued的主要作用是把已經追加到隊列的線程節點(addWaiter方法傳回值)進行阻塞,但阻塞前又通過tryAccquire重試是否能獲得鎖,如果重試成功能則無需阻塞,這裡是非公平鎖的由來之二

final boolean acquireQueued(final Node node, int arg) {  
    try {  
        boolean interrupted = false;  
        for (;;) {  
            final Node p = node.predecessor();  
            if (p == head && tryAcquire(arg)) {  
                setHead(node);  
                p.next = null; // help GC  
                return interrupted;  
            }  
            if (shouldParkAfterFailedAcquire(p, node) &&  
                parkAndCheckInterrupt())  
                interrupted = true;  
        }  
    } catch (RuntimeException ex) {  
        cancelAcquire(node);  
        throw ex;  
    }  
}  
           

仔細看看這個方法是個無限循環,感覺如果p == head && tryAcquire(arg)條件不滿足循環将永遠無法結束,當然不會出現死循環,奧秘在于第12行的parkAndCheckInterrupt會把目前線程挂起,進而阻塞住線程的調用棧。

private final boolean parkAndCheckInterrupt() {  
    LockSupport.park(this);  
    return Thread.interrupted();  
}  
           

如前面所述,LockSupport.park最終把線程交給系統(Linux)核心進行阻塞。當然也不是馬上把請求不到鎖的線程進行阻塞,還要檢查該線程的狀态,比如如果該線程處于Cancel狀态則沒有必要,具體的檢查在shouldParkAfterFailedAcquire中:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {  
      int ws = pred.waitStatus;  
      if (ws == Node.SIGNAL)  
          /* 
           * This node has already set status asking a release 
           * to signal it, so it can safely park 
           */  
          return true;  
      if (ws > 0) {  
          /* 
           * Predecessor was cancelled. Skip over predecessors and 
           * indicate retry. 
           */  
   do {  
node.prev = pred = pred.prev;  
   } while (pred.waitStatus > 0);  
   pred.next = node;  
      } else {  
          /* 
           * waitStatus must be 0 or PROPAGATE. Indicate that we 
           * need a signal, but don't park yet. Caller will need to 
           * retry to make sure it cannot acquire before parking.  
           */  
          compareAndSetWaitStatus(pred, ws, Node.SIGNAL);  
      }   
      return false;  
  }  
           

檢查原則在于:

規則1:如果前繼的節點狀态為SIGNAL,表明目前節點需要unpark,則傳回成功,此時acquireQueued方法的第12行(parkAndCheckInterrupt)将導緻線程阻塞

規則2:如果前繼節點狀态為CANCELLED(ws>0),說明前置節點已經被放棄,則回溯到一個非取消的前繼節點,傳回false,acquireQueued方法的無限循環将遞歸調用該方法,直至規則1傳回true,導緻線程阻塞

規則3:如果前繼節點狀态為非SIGNAL、非CANCELLED,則設定前繼的狀态為SIGNAL,傳回false後進入acquireQueued的無限循環,與規則2同

總體看來,shouldParkAfterFailedAcquire就是靠前繼節點判斷目前線程是否應該被阻塞,如果前繼節點處于CANCELLED狀态,則順便删除這些節點重新構造隊列。

至此,鎖住線程的邏輯已經完成,下面讨論解鎖的過程。

3. 解鎖

請求鎖不成功的線程會被挂起在acquireQueued方法的第12行,12行以後的代碼必須等線程被解鎖鎖才能執行,假如被阻塞的線程得到解鎖,則執行第13行,即設定interrupted = true,之後又進入無限循環。

從無限循環的代碼可以看出,并不是得到解鎖的線程一定能獲得鎖,必須在第6行中調用tryAccquire重新競争,因為鎖是非公平的,有可能被新加入的線程獲得,進而導緻剛被喚醒的線程再次被阻塞,這個細節充分展現了“非公平”的精髓。通過之後将要介紹的解鎖機制會看到,第一個被解鎖的線程就是Head,是以p == head的判斷基本都會成功。

至此可以看到,把tryAcquire方法延遲到子類中實作的做法非常精妙并具有極強的可擴充性,令人歎為觀止!當然精妙的不是這個Templae設計模式,而是Doug Lea對鎖結構的精心布局。

解鎖代碼相對簡單,主要展現在AbstractQueuedSynchronizer.release和Sync.tryRelease方法中:

class AbstractQueuedSynchronizer

public final boolean release(int arg) {  
    if (tryRelease(arg)) {  
        Node h = head;  
        if (h != null && h.waitStatus != 0)  
            unparkSuccessor(h);  
        return true;  
    }  
    return false;  
}  
           

class Sync

protected final boolean tryRelease(int releases) {  
    int c = getState() - releases;  
    if (Thread.currentThread() != getExclusiveOwnerThread())  
        throw new IllegalMonitorStateException();  
    boolean free = false;  
    if (c == 0) {  
        free = true;  
        setExclusiveOwnerThread(null);  
    }  
    setState(c);  
    return free;  
}  
           
tryRelease與tryAcquire語義相同,把如何釋放的邏輯延遲到子類中。tryRelease語義很明确:如果線程多次鎖定,則進行多次釋放,直至status==0則真正釋放鎖,所謂釋放鎖即設定status為0,因為無競争是以沒有使用CAS。

release的語義在于:如果可以釋放鎖,則喚醒隊列第一個線程(Head),具體喚醒代碼如下:

private void unparkSuccessor(Node node) {  
    /* 
     * If status is negative (i.e., possibly needing signal) try 
     * to clear in anticipation of signalling. It is OK if this 
     * fails or if status is changed by waiting thread. 
     */  
    int ws = node.waitStatus;  
    if (ws < 0)  
        compareAndSetWaitStatus(node, ws, 0);   
  
    /* 
     * Thread to unpark is held in successor, which is normally 
     * just the next node.  But if cancelled or apparently null, 
     * traverse backwards from tail to find the actual 
     * non-cancelled successor. 
     */  
    Node s = node.next;  
    if (s == null || s.waitStatus > 0) {  
        s = null;  
        for (Node t = tail; t != null && t != node; t = t.prev)  
            if (t.waitStatus <= 0)  
                s = t;  
    }  
    if (s != null)  
        LockSupport.unpark(s.thread);  
}  
           
這段代碼的意思在于找出第一個可以unpark的線程,一般說來head.next == head,Head就是第一個線程,但Head.next可能被取消或被置為null,是以比較穩妥的辦法是從後往前找第一個可用線程。貌似回溯會導緻性能降低,其實這個發生的幾率很小,是以不會有性能影響。之後便是通知系統核心繼續該線程,在Linux下是通過pthread_mutex_unlock完成。之後,被解鎖的線程進入上面所說的重新競争狀态。

4. Lock VS Synchronized

AbstractQueuedSynchronizer通過構造一個基于阻塞的CLH隊列容納所有的阻塞線程,而對該隊列的操作均通過Lock-Free(CAS)操作,但對已經獲得鎖的線程而言,ReentrantLock實作了偏向鎖的功能。

synchronized的底層也是一個基于CAS操作的等待隊列,但JVM實作的更精細,把等待隊列分為ContentionList和EntryList,目的是為了降低線程的出列速度;當然也實作了偏向鎖,從資料結構來說二者設計沒有本質差別。但synchronized還實作了自旋鎖,并針對不同的系統和硬體體系進行了優化,而Lock則完全依靠系統阻塞挂起等待線程。

當然Lock比synchronized更适合在應用層擴充,可以繼承AbstractQueuedSynchronizer定義各種實作,比如實作讀寫鎖(ReadWriteLock),公平或不公平鎖;同時,Lock對應的Condition也比wait/notify要友善的多、靈活的多。