天天看點

Java并發同步器AQS一、什麼是同步器二、AQS架構如何建構同步器三、AQS在各同步器内的Sync與State實作

AQS是AbstractQueuedSynchronizer的簡寫,中文名應該叫抽象隊列同步器(我給的名字,哈哈),出生于Java 1.5。

一、什麼是同步器

多線程并發的執行,之間通過某種 共享 狀态來同步,隻有當狀态滿足 xxxx 條件,才能觸發線程執行 xxxx 。

這個共同的語義可以稱之為同步器。可以認為以上所有的鎖機制都可以基于同步器定制來實作的。

而juc(java.util.concurrent)裡的思想是 将這些場景抽象出來的語義通過統一的同步架構來支援。

juc 裡所有的這些鎖機制都是基于 AQS ( AbstractQueuedSynchronizer )架構上建構的。下面簡單介紹下 AQS( AbstractQueuedSynchronizer )。 可以參考Doug Lea的論文

The java.util.concurrent Synchronizer Framework

我們來看下java.util.concurrent.locks大緻結構

Java并發同步器AQS一、什麼是同步器二、AQS架構如何建構同步器三、AQS在各同步器内的Sync與State實作

上圖中,LOCK的實作類其實都是建構在AbstractQueuedSynchronizer上,為何圖中沒有用UML線表示呢,這是每個Lock實作類都持有自己内部類Sync的執行個體,而這個Sync就是繼承AbstractQueuedSynchronizer(AQS)。為何要實作不同的Sync呢?這和每種Lock用途相關。另外還有AQS的State機制。下文會舉例說明不同同步器内的Sync與state實作。

二、AQS架構如何建構同步器

1、同步器的基本功能

一個同步器至少需要包含兩個功能:

  1. 擷取同步狀态

    如果允許,則擷取鎖,如果不允許就阻塞線程,直到同步狀态允許擷取。

  2. 釋放同步狀态

    修改同步狀态,并且喚醒等待線程。

aqs 同步機制同時考慮了如下需求:

  1. 獨占鎖和共享鎖兩種機制。
  2. 線程阻塞後,如果需要取消,需要支援中斷。
  3. 線程阻塞後,如果有逾時要求,應該支援逾時後中斷的機制。

2、同步狀态的擷取與釋放

AQS實作了一個同步器的基本結構,下面以獨占鎖與共享鎖分開讨論,來說明AQS怎樣實作擷取、釋放同步狀态。

2.1、獨占模式

獨占擷取: tryAcquire 本身不會阻塞線程,如果傳回 true 成功就繼續,如果傳回 false 那麼就阻塞線程并加入阻塞隊列。

public final void acquire(int arg) {  
  
        if (!tryAcquire(arg) &&  
  
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//擷取失敗,則加入等待隊列  
  
            selfInterrupt();  
  
}
           

獨占且可中斷模式擷取:支援中斷取消

public final void acquireInterruptibly(int arg) throws InterruptedException {  
  
        if (Thread.interrupted())  
            throw new InterruptedException();  
        if (!tryAcquire(arg))  
  
            doAcquireInterruptibly(arg);  
  
    } 
           

獨占且支援逾時模式擷取: 帶有逾時時間,如果經過逾時時間則會退出。

public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {  
  
     if (Thread.interrupted())  
  
         throw new InterruptedException();  
  
     return tryAcquire(arg) ||  
  
         doAcquireNanos(arg, nanosTimeout);  
           

獨占模式釋放:釋放成功會喚醒後續節點

public final boolean release(int arg) {  
    if (tryRelease(arg)) {  
        Node h = head;  
        if (h != null && h.waitStatus != 0)  
            unparkSuccessor(h);  
        return true;  
    }  
    return false;  
}
           

2.2、共享模式

共享模式擷取

public final void acquireShared(int arg) {  
  
    if (tryAcquireShared(arg) < 0)  
  
        doAcquireShared(arg); 
           

可中斷模式共享擷取

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {  
        if (Thread.interrupted())  
            throw new InterruptedException();  
        if (tryAcquireShared(arg) < 0)  
            doAcquireSharedInterruptibly(arg);  
    }  
           

共享模式帶定時擷取

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {  
     if (Thread.interrupted())  
         throw new InterruptedException();  
     return tryAcquireShared(arg) >= 0 ||  
         doAcquireSharedNanos(arg, nanosTimeout);  
}  
           

共享鎖釋放

public final boolean releaseShared(int arg) {  
        if (tryReleaseShared(arg)) {  
            doReleaseShared();  
            return true;  
        }  
        return false;  
    }  
           

注意以上架構隻定義了一個同步器的基本結構架構,的基本方法裡依賴的 tryAcquire 、 tryRelease 、tryAcquireShared 、 tryReleaseShared 四個方法在 AQS 裡沒有實作,這四個方法不會涉及線程阻塞,而是由各自不同的使用場景根據情況來定制:

protected boolean tryAcquire(int arg) {  
    throw new UnsupportedOperationException();  
}  
protected boolean tryRelease(int arg) {  
    throw new UnsupportedOperationException();  
}  
protected int tryAcquireShared(int arg) {  
    throw new UnsupportedOperationException();  
  
}  
protected boolean tryReleaseShared(int arg) {  
    throw new UnsupportedOperationException();  
}
           

從以上源碼可以看出AQS實作基本的功能:

AQS雖然實作了acquire,和release方法是可能阻塞的,但是裡面調用的tryAcquire和tryRelease是由子類來定制的且是不阻塞的可。以認為同步狀态的維護、擷取、釋放動作是由子類實作的功能,而動作成功與否的後續行為時有AQS架構來實作。

3、狀态擷取、釋放成功或失敗的後續行為:線程的阻塞、喚醒機制

有别于wait和notiry。這裡利用 jdk1.5 開始提供的 LockSupport.park() 和 LockSupport.unpark() 的本地方法實作,實作線程的阻塞和喚醒。

得到鎖的線程禁用(park)和喚醒(unpark),也是直接native實作(這幾個native方法的實作代碼在hotspot\src\share\vm\prims\unsafe.cpp檔案中,但是關鍵代碼park的最終實作是和作業系統相關的,比如windows下實作是在os_windows.cpp中,有興趣的同學可以下載下傳jdk源碼檢視)。喚醒一個被park()線程主要手段包括以下幾種

  1. 其他線程調用以被park()線程為參數的unpark(Thread thread).
  2. 其他線程中斷被park()線程,如waiters.peek().interrupt();waiters為存儲線程對象的隊列.
  3. 不知原因的傳回。

park()方法傳回并不會報告到底是上訴哪種傳回,是以傳回好最好檢查下線程狀态,如

LockSupport.park();  //禁用目前線程
if(Thread.interrupted){
//doSomething
}
           

AbstractQueuedSynchronizer(AQS)對于這點實作得相當巧妙,如下所示

private void doAcquireSharedInterruptibly(int arg)throwsInterruptedException {
    final Node node = addWaiter(Node.SHARED);
    try {
         for (;;) {
             final Node p = node.predecessor();
             if (p == head) {
                 int r = tryAcquireShared(arg);
                 if (r >= 0) {
                     setHeadAndPropagate(node, r);
                     p.next = null; // help GC
                     return;
                 }
            }
             //parkAndCheckInterrupt()會傳回park住的線程在被unpark後的線程狀态,如果線程中斷,跳出循環。
             if (shouldParkAfterFailedAcquire(p, node) &&
                 parkAndCheckInterrupt())
                 break;
      }
     } catch (RuntimeException ex) {
          cancelAcquire(node);
          throw ex;
 }
 
     // 隻有線程被interrupt後才會走到這裡
     cancelAcquire(node);
     throw new InterruptedException();
}
 
//在park()住的線程被unpark()後,第一時間傳回目前線程是否被打斷
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}
           

4、線程阻塞隊列的維護

阻塞線程節點隊列 CHL Node queue 。

根據論文裡描述, AQS 裡将阻塞線程封裝到一個内部類 Node 裡。并維護一個 CHL Node FIFO 隊列。 CHL隊列是一個非阻塞的 FIFO 隊列,也就是說往裡面插入或移除一個節點的時候,在并發條件下不會阻塞,而是通過自旋鎖和 CAS 保證節點插入和移除的原子性。實作無鎖且快速的插入。關于非阻塞算法可以參考

Java 理論與實踐: 非阻塞算法簡介

。CHL隊列對應代碼如下:

/** 
 * CHL頭節點 
 */   
rivate transient volatile Node head;  
/** 
 * CHL尾節點 
 */  
private transient volatile Node tail; 
           

Node節點是對Thread的一個封裝,結構大概如下:

tatic final class Node {  
    /** 代表線程已經被取消*/  
    static final int CANCELLED =  1;  
    /** 代表後續節點需要喚醒 */  
    static final int SIGNAL    = -1;  
    /** 代表線程在等待某一條件/ 
    static final int CONDITION = -2; 
    /** 标記是共享模式*/  
    static final Node SHARED = new Node();  
    /** 标記是獨占模式*/  
    static final Node EXCLUSIVE = null;  
  
    /** 
     * 狀态位 ,分别可以使CANCELLED、SINGNAL、CONDITION、0 
     */  
    volatile int waitStatus;  
  
    /** 
     * 前置節點 
     */  
    volatile Node prev;  
  
    /** 
     * 後續節點 
     */  
    volatile Node next;  
  
    /** 
     * 節點代表的線程 
     */  
    volatile Thread thread;  
  
    /** 
     *連接配接到等待condition的下一個節點 
     */  
    Node nextWaiter;  
  
}  

           

5、小結

從源碼可以看出AQS實作基本的功能:

1.同步器基本範式、結構

2.線程的阻塞、喚醒機制

3.線程阻塞隊列的維護

AQS雖然實作了acquire,和release方法,但是裡面調用的tryAcquire和tryRelease是由子類來定制的。可以認為同步狀态的維護、擷取、釋放動作是由子類實作的功能,而動作成功與否的後續行為時有AQS架構來實作

還有以下一些私有方法,用于輔助完成以上的功能:

final boolean acquireQueued(final Node node, int arg) :申請隊列

private Node enq(final Node node) : 入隊

private Node addWaiter(Node mode) :以mode建立建立節點,并加入到隊列

private void unparkSuccessor(Node node) : 喚醒節點的後續節點,如果存在的話。

private void doReleaseShared() :釋放共享鎖

private void setHeadAndPropagate(Node node, int propagate):設定頭,并且如果是共享模式且propagate大于0,則喚醒後續節點。

private void cancelAcquire(Node node) :取消正在擷取的節點

private static void selfInterrupt() :自我中斷

private final boolean parkAndCheckInterrupt() : park 并判斷線程是否中斷

三、AQS在各同步器内的Sync與State實作

1、什麼是state機制:

提供 volatile 變量 state; 用于同步線程之間的共享狀态。通過 CAS 和 volatile 保證其原子性和可見性。對應源碼裡的定義:

/** 
 * 同步狀态 
 */  
private volatile int state;  
  
/** 
 *cas 
 */  
protected final boolean compareAndSetState(int expect, int update) {  
    // See below for intrinsics setup to support this  
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);  
} 
           

2、不同實作類的Sync與State:

基于AQS建構的Synchronizer包括ReentrantLock,Semaphore,CountDownLatch, ReetrantRead WriteLock,FutureTask等,這些Synchronizer實際上最基本的東西就是原子狀态的擷取和釋放,隻是條件不一樣而已。

2.1、ReentrantLock

需要記錄目前線程擷取原子狀态的次數,如果次數為零,那麼就說明這個線程放棄了鎖(也有可能其他線程占據着鎖進而需要等待),如果次數大于1,也就是獲得了重進入的效果,而其他線程隻能被park住,直到這個線程重進入鎖次數變成0而釋放原子狀态。以下為ReetranLock的FairSync的tryAcquire實作代碼解析。

//公平擷取鎖
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    //如果目前重進入數為0,說明有機會取得鎖
    if (c == 0) {
        //如果是第一個等待者,并且設定重進入數成功,那麼目前線程獲得鎖
        if (isFirst(current) &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
     }
 }
    //如果目前線程本身就持有鎖,那麼疊加重進入數,并且繼續獲得鎖
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
 }
     //以上條件都不滿足,那麼線程進入等待隊列。
     return false;
}
           

2.2、Semaphore

則是要記錄目前還有多少次許可可以使用,到0,就需要等待,也就實作并發量的控制,Semaphore一開始設定許可數為1,實際上就是一把互斥鎖。以下為Semaphore的FairSync實作

protected int tryAcquireShared(int acquires) {
    Thread current = Thread.currentThread();
    for (;;) {
         Thread first = getFirstQueuedThread();
         //如果目前等待隊列的第一個線程不是目前線程,那麼就傳回-1表示目前線程需要等待
         if (first != null && first != current)
              return -1;
         //如果目前隊列沒有等待者,或者目前線程就是等待隊列第一個等待者,那麼先取得semaphore還有幾個許可證,并且減去目前線程需要的許可證得到剩下的值
         int available = getState();
         int remaining = available - acquires;
         //如果remining<0,那麼回報給AQS目前線程需要等待,如果remaining>0,并且設定availble成功設定成剩餘數,那麼傳回剩餘值(>0),也就告知AQS目前線程拿到許可,可以繼續執行。
         if (remaining < 0 ||compareAndSetState(available, remaining))
             return remaining;
 }
}
           

2.3、CountDownLatch

閉鎖則要保持其狀态,在這個狀态到達終止态之前,所有線程都會被park住,閉鎖可以設定初始值,這個值的含義就是這個閉鎖需要被countDown()幾次,因為每次CountDown是sync.releaseShared(1),而一開始初始值為10的話,那麼這個閉鎖需要被countDown()十次,才能夠将這個初始值減到0,進而釋放原子狀态,讓等待的所有線程通過。

//await時候執行,隻檢視目前需要countDown數量減為0了,如果為0,說明可以繼續執行,否則需要park住,等待countDown次數足夠,并且unpark所有等待線程
public int tryAcquireShared(int acquires) {
     return getState() == 0? 1 : -1;
}
 
//countDown 時候執行,如果目前countDown數量為0,說明沒有線程await,直接傳回false而不需要喚醒park住線程,如果不為0,得到剩下需要 countDown的數量并且compareAndSet,最終傳回剩下的countDown數量是否為0,供AQS判定是否釋放所有await線程。
public boolean tryReleaseShared(int releases) {
    for (;;) {
         int c = getState();
         if (c == 0)
             return false;
         int nextc = c-1;
         if (compareAndSetState(c, nextc))
             return nextc == 0;
 }
}
           

2.4、FutureTask

需要記錄任務的執行狀态,當調用其執行個體的get方法時,内部類Sync會去調用AQS的acquireSharedInterruptibly()方法,而這個方法會反向調用Sync實作的tryAcquireShared()方法,即讓具體實作類決定是否讓目前線程繼續還是park,而FutureTask的tryAcquireShared方法所做的唯一事情就是檢查狀态,如果是RUNNING狀态那麼讓目前線程park。而跑任務的線程會在任務結束時調用FutureTask 執行個體的set方法(與等待線程持相同的執行個體),設定執行結果,并且通過unpark喚醒正在等待的線程,傳回結果。

//get時待用,隻檢查目前任務是否完成或者被Cancel,如果未完成并且沒有被cancel,那麼告訴AQS目前線程需要進入等待隊列并且park住
protected int tryAcquireShared(int ignore) {
     return innerIsDone()? 1 : -1;
}
 
//判定任務是否完成或者被Cancel
boolean innerIsDone() {
    return ranOrCancelled(getState()) &&    runner == null;
}
 
//get時調用,對于CANCEL與其他異常進行抛錯
V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
    if (!tryAcquireSharedNanos(0,nanosTimeout))
        throw new TimeoutException();
    if (getState() == CANCELLED)
        throw new CancellationException();
    if (exception != null)
        throw new ExecutionException(exception);
    return result;
}
 
//任務的執行線程執行完畢調用(set(V v))
void innerSet(V v) {
     for (;;) {
        int s = getState();
        //如果線程任務已經執行完畢,那麼直接傳回(多線程執行任務?)
        if (s == RAN)
            return;
        //如果被CANCEL了,那麼釋放等待線程,并且會抛錯
        if (s == CANCELLED) {
            releaseShared(0);
            return;
     }
        //如果成功設定任務狀态為已完成,那麼設定結果,unpark等待線程(調用get()方法而阻塞的線程),以及後續清理工作(一般由FutrueTask的子類實作)
        if (compareAndSetState(s, RAN)) {
            result = v;
            releaseShared(0);
            done();
            return;
     }
 }
}

           

以上4個AQS的使用是比較典型,然而有個問題就是這些狀态存在哪裡呢?并且是可以計數的。從以上4個example,我們可以很快得到答案,AQS提供給了子類一個int state屬性。并且暴露給子類getState()和setState()兩個方法(protected)。這樣就為上述狀态解決了存儲問題,RetrantLock可以将這個state用于存儲目前線程的重進入次數,Semaphore可以用這個state存儲許可數,CountDownLatch則可以存儲需要被countDown的次數,而Future則可以存儲目前任務的執行狀态(RUNING,RAN,CANCELL)。其他的Synchronizer存儲他們的一些狀态。

AQS留給實作者的方法主要有5個方法,其中tryAcquire,tryRelease和isHeldExclusively三個方法為需要獨占形式擷取的synchronizer實作的,比如線程獨占ReetranLock的Sync,而tryAcquireShared和tryReleasedShared為需要共享形式擷取的synchronizer實作。

ReentrantLock内部Sync類實作的是tryAcquire,tryRelease, isHeldExclusively三個方法(因為擷取鎖的公平性問題,tryAcquire由繼承該Sync類的内部類FairSync和NonfairSync實作)Semaphore内部類Sync則實作了tryAcquireShared和tryReleasedShared(與CountDownLatch相似,因為公平性問題,tryAcquireShared由其内部類FairSync和NonfairSync實作)。CountDownLatch内部類Sync實作了tryAcquireShared和tryReleasedShared。FutureTask内部類Sync也實作了tryAcquireShared和tryReleasedShared。