天天看點

java并發程式設計之核心:AQS進階

如果你想深入研究Java并發的話,那麼AQS一定是繞不開的一塊知識點,Java并發包很多的同步工具類底層都是基于AQS來實作的,比如我們工作中經常用的Lock工具ReentrantLock、栅欄CountDownLatch、信号量Semaphore等,而且關于AQS的知識點也是面試中經常考察的内容,是以,無論是為了更好的使用還是為了應付面試,深入學習AQS都很有必要。

CAS

CAS是樂觀鎖的一種思想,它假設線程對資源的通路是沒有沖突的,同時所有的線程執行都不需要等待,可以持續執行。如果有沖突的話,就用比較+交換的方式來檢測沖突,有沖突就不斷重試。

CAS的全稱是Compare-and-Swap,也就是比較并交換,它包含了三個參數:V,A,B,V表示要讀寫的記憶體位置,A表示舊的預期值,B表示新值,當執行CAS時,隻有當V的值等于預期值A時,才會把V的值改為B,這樣的方式可以讓多個線程同時去修改,但也會因為線程操作失敗而不斷重試,對CPU有一定程式上的開銷。

java并發程式設計之核心:AQS進階

AQS簡介

本文主角正式登場。

AQS,全名AbstractQueuedSynchronizer,是一個抽象類的隊列式同步器,它的内部通過維護一個狀态volatile int state(共享資源),一個FIFO線程等待隊列來實作同步功能。

state用關鍵字volatile修飾,代表着該共享資源的狀态一更改就能被所有線程可見,而AQS的加鎖方式本質上就是多個線程在競争state,當state為0時代表線程可以競争鎖,不為0時代表目前對象鎖已經被占有,其他線程來加鎖時則會失敗,加鎖失敗的線程會被放入一個FIFO的等待隊列中,這些線程會被

UNSAFE.park()

操作挂起,等待其他擷取鎖的線程釋放鎖才能夠被喚醒。

而這個等待隊列其實就相當于一個CLH隊列,用一張原理圖來表示大緻如下:

java并發程式設計之核心:AQS進階

基礎定義

AQS支援兩種資源分享的方式:Exclusive(獨占,隻有一個線程能執行,如ReentrantLock)和Share(共享,多個線程可同時執行,如Semaphore/CountDownLatch)。

自定義的同步器繼承AQS後,隻需要實作共享資源state的擷取和釋放方式即可,其他如線程隊列的維護(如擷取資源失敗入隊/喚醒出隊等)等操作,AQS在頂層已經實作了,

AQS代碼内部提供了一系列操作鎖和線程隊列的方法,主要操作鎖的方法包含以下幾個:

  • compareAndSetState():利用CAS的操作來設定state的值
  • tryAcquire(int):獨占方式擷取鎖。成功則傳回true,失敗則傳回false。
  • tryRelease(int):獨占方式釋放鎖。成功則傳回true,失敗則傳回false。
  • tryAcquireShared(int):共享方式釋放鎖。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
  • tryReleaseShared(int):共享方式釋放鎖。如果釋放後允許喚醒後續等待結點傳回true,否則傳回false。

像ReentrantLock就是實作了自定義的tryAcquire-tryRelease,進而操作state的值來實作同步效果。

除此之外,AQS内部還定義了一個靜态類Node,表示CLH隊列的每一個結點,該結點的作用是對每一個等待擷取資源做了封裝,包含了需要同步的線程本身、線程等待狀态.....

我們可以看下該類的一些重點變量:

static final class Node {
        /** 表示共享模式下等待的Node */
        static final Node SHARED = new Node();
        /** 表示獨占模式下等待的mode */
        static final Node EXCLUSIVE = null;

        /** 下面幾個為waitStatus的具體值 */
        static final int CANCELLED =  1;
        static final int SIGNAL    = -1;
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;
    
        volatile int waitStatus;
        
         /** 表示前面的結點 */
        volatile Node prev;
         /** 表示後面的結點 */
        volatile Node next;
         /**目前結點裝載的線程,初始化時被建立,使用後會置空*/
        volatile Thread thread;
         /**連結到下一個節點的等待條件,用到Condition的時候會使用到*/
        Node nextWaiter;
    
    }
           

代碼裡面定義了一個表示目前Node結點等待狀态的字段

waitStatus

,該字段的取值包含了CANCELLED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3)、0,這五個值代表了不同的特定場景:

  • CANCELLED:表示目前結點已取消排程。當timeout或被中斷(響應中斷的情況下),會觸發變更為此狀态,進入該狀态後的結點将不會再變化。
  • SIGNAL:表示後繼結點在等待目前結點喚醒。後繼結點入隊時,會将前繼結點的狀态更新為SIGNAL(記住這個-1的值,因為後面我們講的時候經常會提到)
  • CONDITION:表示結點等待在Condition上,當其他線程調用了Condition的signal()方法後,CONDITION狀态的結點将從等待隊列轉移到同步隊列中,等待擷取同步鎖。(注:Condition是AQS的一個元件,後面會細說)
  • PROPAGATE:共享模式下,前繼結點不僅會喚醒其後繼結點,同時也可能會喚醒後繼的後繼結點。
  • 0:新結點入隊時的預設狀态。

也就是說,當waitStatus為負值表示結點處于有效等待狀态,為正值的時候表示結點已被取消。

在AQS内部中還維護了兩個Node對象

head

tail

,一開始預設都為null

private transient volatile Node head;
private transient volatile Node tail;   
           

講完了AQS的一些基礎定義,我們就可以開始學習同步的具體運作機制了,為了更好的示範,我們用ReentrantLock作為使用入口,一步步跟進源碼探究AQS底層是如何運作的,這裡說明一下,因為ReentrantLock底層調用的AQS是獨占模式,是以下文講解的AQS源碼也是針對獨占模式的操作

好了,熱身正式結束,來吧。

獨占模式

加鎖過程

我們都知道,ReentrantLock的加鎖和解鎖方法分别為lock()和unLock(),我們先來看擷取鎖的方法,

final void lock() {
 if (compareAndSetState(0, 1))
  setExclusiveOwnerThread(Thread.currentThread());
 else
  acquire(1);
}
           

邏輯很簡單,線程進來後直接利用

CAS

嘗試搶占鎖,如果搶占成功

state

值回被改為1,且設定對象獨占鎖線程為目前線程,否則就調用

acquire(1)

再次嘗試擷取鎖。

我們假定有兩個線程A和B同時競争鎖,A進來先搶占到鎖,此時的AQS模型圖就類似這樣:

java并發程式設計之核心:AQS進階

繼續走下面的方法,

public final void acquire(int arg) {
 if (!tryAcquire(arg) &&
  acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  selfInterrupt();
}
           

acquire

包含了幾個函數的調用,

tryAcquire:嘗試直接擷取鎖,如果成功就直接傳回;

addWaiter:将該線程加入等待隊列FIFO的尾部,并标記為獨占模式;

acquireQueued:線程阻塞在等待隊列中擷取鎖,一直擷取到資源後才傳回。如果在整個等待過程中被中斷過,則傳回true,否則傳回false。

selfInterrupt:自我中斷,就是既拿不到鎖,又在等待時被中斷了,線程就會進行自我中斷selfInterrupt(),将中斷補上。

我們一個個來看源碼,并結合上面的兩個線程來做場景分析。

tryAcquire

不用多說,就是為了再次嘗試擷取鎖

protected final boolean tryAcquire(int acquires) {
 return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
 final Thread current = Thread.currentThread();
 int c = getState();
 if (c == 0) {
  if (compareAndSetState(0, acquires)) {
   setExclusiveOwnerThread(current);
   return true;
  }
 }
 else if (current == getExclusiveOwnerThread()) {
  int nextc = c + acquires;
  if (nextc < 0) // overflow
   throw new Error("Maximum lock count exceeded");
  setState(nextc);
  return true;
 }
 return false;
}
           

當線程B進來後,nonfairTryAcquire方法首先會擷取state的值,如果為0,則正常擷取該鎖,不為0的話判斷是否是目前線程占用了,是的話就累加state的值,這裡的累加也是為了配合釋放鎖時候的次數,進而實作可重入鎖的效果。

當然,因為之前鎖已經被線程A占領了,是以這時候

tryAcquire

會傳回false,繼續下面的流程。

addWaiter

private Node addWaiter(Node mode) {
 Node node = new Node(Thread.currentThread(), mode);
 // Try the fast path of enq; backup to full enq on failure
 Node pred = tail;
 if (pred != null) {
  node.prev = pred;
  if (compareAndSetTail(pred, node)) {
   pred.next = node;
   return node;
  }
 }
 enq(node);
 return node;
}
           

這段代碼首先會建立一個和目前線程綁定的

Node

節點,

Node

為雙向連結清單。此時等待隊列中的

tail

指針為空,直接調用

enq(node)

方法将目前線程加入等待隊列尾部,然後傳回目前結點的前驅結點,

private Node enq(final Node node) {
 // CAS"自旋",直到成功加入隊尾
    for (;;) {
        Node t = tail;
        if (t == null) {
         // 隊列為空,初始化一個Node結點作為Head結點,并将tail結點也指向它
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
         // 把目前結點插入隊列尾部
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
           

第一遍循環時,tail指針為空,初始化一個Node結點,并把head和tail結點都指向它,然後第二次循環進來之後,tail結點不為空了,就将目前的結點加入到tail結點後面,也就是這樣:

java并發程式設計之核心:AQS進階

todo 如果此時有另一個線程C進來的話,發現鎖已經被A拿走了,然後隊列裡已經有了線程B,那麼線程C就隻能乖乖排到線程B的後面去,

java并發程式設計之核心:AQS進階

acquireQueued

接着解讀方法,通過tryAcquire()和addWaiter(),我們的線程還是沒有拿到資源,并且還被排到了隊列的尾部,如果讓你來設計的話,這個時候你會怎麼處理線程呢?其實答案也很簡單,能做的事無非兩個:

1、循環讓線程再搶資源。但仔細一推敲就知道不合理,因為如果有多個線程都參與的話,你搶我也搶隻會降低系統性能

2、進入等待狀态休息,直到其他線程徹底釋放資源後喚醒自己,自己再拿到資源

毫無疑問,選擇2更加靠譜,acquireQueued方法做的也是這樣的處理:

final boolean acquireQueued(final Node node, int arg) {
 boolean failed = true;
 try {
  // 标記是否會被中斷
  boolean interrupted = false;
  // CAS自旋
  for (;;) {
   // 擷取目前結點的前結點
   final Node p = node.predecessor();
   if (p == head && tryAcquire(arg)) {
    setHead(node);
    p.next = null; // help GC
    failed = false;
    return interrupted;
   }
   if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    interrupted = true;
  }
 } finally {
  if (failed)
   // 擷取鎖失敗,則将此線程對應的node的waitStatus改為CANCEL
   cancelAcquire(node);
 }
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
 int ws = pred.waitStatus;
 if (ws == Node.SIGNAL)
  // 前驅結點等待狀态為"SIGNAL",那麼自己就可以安心等待被喚醒了
  return true;
 if (ws > 0) {
  /*
   * 前驅結點被取消了,通過循環一直往前找,直到找到等待狀态有效的結點(等待狀态值小于等于0) ,
   * 然後排在他們的後邊,至于那些被目前Node強制"靠後"的結點,因為已經被取消了,也沒有引用鍊,
   * 就等着被GC了
   */
  do {
   node.prev = pred = pred.prev;
  } while (pred.waitStatus > 0);
  pred.next = node;
 } else {
  // 如果前驅正常,那就把前驅的狀态設定成SIGNAL
  compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
 }
 return false;
}
private final boolean parkAndCheckInterrupt() {
 LockSupport.park(this);
 return Thread.interrupted();
}
           

acquireQueued

方法的流程是這樣的:

1、CAS自旋,先判斷目前傳入的Node的前結點是否為head結點,是的話就嘗試擷取鎖,擷取鎖成功的話就把目前結點置為head,之前的head置為null(友善GC),然後傳回

2、如果前驅結點不是head或者加鎖失敗的話,就調用

shouldParkAfterFailedAcquire

,将前驅節點的waitStatus變為了SIGNAL=-1,最後執行

parkAndChecknIterrupt

方法,調用

LockSupport.park()

挂起目前線程,

parkAndCheckInterrupt

在挂起線程後會判斷線程是否被中斷,如果被中斷的話,就會重新跑

acquireQueued

方法的CAS自旋操作,直到擷取資源。

ps:LockSupport.park方法會讓目前線程進入waitting狀态,在這種狀态下,線程被喚醒的情況有兩種,一是被unpark(),二是被interrupt(),是以,如果是第二種情況的話,需要傳回被中斷的标志,然後在

acquire

頂層方法的視窗那裡自我中斷補上

此時,因為線程A還未釋放鎖,是以線程B狀态都是被挂起的,

java并發程式設計之核心:AQS進階

到這裡,加鎖的流程就分析完了,其實整體來說也并不複雜,而且當你了解了獨占模式加鎖的過程,後面釋放鎖和共享模式的運作機制也沒什麼難懂的了,是以整個加鎖的過程還是有必要多消化下的,也是AQS的重中之重。

為了友善你們更加清晰了解,我加多一張流程圖吧(這個作者也太暖了吧,哈哈)

java并發程式設計之核心:AQS進階

釋放鎖

說完了加鎖,我們來看看釋放鎖是怎麼做的,AQS中釋放鎖的方法是

release()

,當調用該方法時會釋放指定量的資源 (也就是鎖) ,如果徹底釋放了(即state=0),它會喚醒等待隊列裡的其他線程來擷取資源。

還是一步步看源碼吧,

public final boolean release(int arg) {
 if (tryRelease(arg)) {
  Node h = head;
  if (h != null && h.waitStatus != 0)
   unparkSuccessor(h);
  return true;
 }
 return false;
}
           

tryRelease

代碼上可以看出,核心的邏輯都在

tryRelease

方法中,該方法的作用是釋放資源,AQS裡該方法沒有具體的實作,需要由自定義的同步器去實作,我們看下ReentrantLock代碼中對應方法的源碼:

protected final boolean tryRelease(int releases) {
 int c = getState() - releases;
 if (Thread.currentThread() != getExclusiveOwnerThread())
  throw new IllegalMonitorStateException();
 boolean free = false;
 if (c == 0) {
  free = true;
  setExclusiveOwnerThread(null);
 }
 setState(c);
 return free;
}
           

tryRelease

方法會減去state對應的值,如果state為0,也就是已經徹底釋放資源,就傳回true,并且把獨占的線程置為null,否則傳回false。

此時AQS中的資料就會變成這樣:

java并發程式設計之核心:AQS進階

完全釋放資源後,目前線程要做的就是喚醒CLH隊列中第一個在等待資源的線程,也就是head結點後面的線程,此時調用的方法是

unparkSuccessor()

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
     //将head結點的狀态置為0
        compareAndSetWaitStatus(node, ws, 0);
 //找到下一個需要喚醒的結點s
    Node s = node.next;
    //如果為空或已取消
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 從後向前,直到找到等待狀态小于0的結點,前面說了,結點waitStatus小于0時才有效
        for (Node t = tail; t != null && t != node; t = t.prev) 
            if (t.waitStatus <= 0)
                s = t;
    }
    // 找到有效的結點,直接喚醒
    if (s != null)
        LockSupport.unpark(s.thread);//喚醒
}
           

方法的邏輯很簡單,就是先将head的結點狀态置為0,避免下面找結點的時候再找到head,然後找到隊列中最前面的有效結點,然後喚醒,我們假設這個時候線程A已經釋放鎖,那麼此時隊列中排最前邊競争鎖的線程B就會被喚醒。然後被喚醒的線程B就會嘗試用CAS擷取鎖,回到

acquireQueued

方法的邏輯,

for (;;) {
 // 擷取目前結點的前結點
 final Node p = node.predecessor();
 if (p == head && tryAcquire(arg)) {
  setHead(node);
  p.next = null; // help GC
  failed = false;
  return interrupted;
 }
 if (shouldParkAfterFailedAcquire(p, node) &&
  parkAndCheckInterrupt())
  interrupted = true;
}
           

當線程B擷取鎖之後,會把目前結點指派給head,然後原先的前驅結點 (也就是原來的head結點) 去掉引用鍊,友善回收,這樣一來,線程B擷取鎖的整個過程就完成了,此時AQS的資料就會變成這樣:

java并發程式設計之核心:AQS進階

到這裡,我們已經分析完了AQS獨占模式下加鎖和釋放鎖的過程,也就是tryAccquire->tryRelease這一鍊條的邏輯,除此之外,AQS中還支援共享模式的同步,這種模式下關于鎖的操作核心其實就是tryAcquireShared->tryReleaseShared這兩個方法,我們可以簡單看下

共享模式

擷取鎖

AQS中,共享模式擷取鎖的頂層入口方法是

acquireShared

,該方法會擷取指定數量的資源,成功的話就直接傳回,失敗的話就進入等待隊列,直到擷取資源,

public final void acquireShared(int arg) {
 if (tryAcquireShared(arg) < 0)
  doAcquireShared(arg);
}
           

該方法裡包含了兩個方法的調用,

tryAcquireShared:嘗試擷取一定資源的鎖,傳回的值代表擷取鎖的狀态。

doAcquireShared:進入等待隊列,并循環嘗試擷取鎖,直到成功。

tryAcquireShared

tryAcquireShared

在AQS裡沒有實作,同樣由自定義的同步器去完成具體的邏輯,像一些較為常見的并發工具Semaphore、CountDownLatch裡就有對該方法的自定義實作,雖然實作的邏輯不同,但方法的作用是一樣的,就是擷取一定資源的資源,然後根據傳回值判斷是否還有剩餘資源,進而決定下一步的操作。

傳回值有三種定義:

  • 負值代表擷取失敗;
  • 0代表擷取成功,但沒有剩餘的資源,也就是state已經為0;
  • 正值代表擷取成功,而且state還有剩餘,其他線程可以繼續領取

當傳回值小于0時,證明此次擷取一定數量的鎖失敗了,然後就會走

doAcquireShared

方法

doAcquireShared

此方法的作用是将目前線程加入等待隊列尾部休息,直到其他線程釋放資源喚醒自己,自己成功拿到相應量的資源後才傳回,這是它的源碼:

private void doAcquireShared(int arg) {
 // 加入隊列尾部
 final Node node = addWaiter(Node.SHARED);
 boolean failed = true;
 try {
  boolean interrupted = false;
  // CAS自旋
  for (;;) {
   final Node p = node.predecessor();
   // 判斷前驅結點是否是head
   if (p == head) {
    // 嘗試擷取一定數量的鎖
    int r = tryAcquireShared(arg);
    if (r >= 0) {
     // 擷取鎖成功,而且還有剩餘資源,就設定目前結點為head,并繼續喚醒下一個線程
     setHeadAndPropagate(node, r);
     // 讓前驅結點去掉引用鍊,友善被GC
     p.next = null; // help GC
     if (interrupted)
      selfInterrupt();
     failed = false;
     return;
    }
   }
   // 跟獨占模式一樣,改前驅結點waitStatus為-1,并且目前線程挂起,等待被喚醒
   if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    interrupted = true;
  }
 } finally {
  if (failed)
   cancelAcquire(node);
 }
}

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head;
    // head指向自己
    setHead(node);
     // 如果還有剩餘量,繼續喚醒下一個鄰居線程
    if (propagate > 0 || h == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}
           

看到這裡,你會不會一點熟悉的感覺,這個方法的邏輯怎麼跟上面那個

acquireQueued()

 那麼類似啊?對的,其實兩個流程并沒有太大的差别。隻是

doAcquireShared()

比起獨占模式下的擷取鎖上多了一步喚醒後繼線程的操作,當擷取完一定的資源後,發現還有剩餘的資源,就繼續喚醒下一個鄰居線程,這才符合"共享"的思想嘛。

這裡我們可以提出一個疑問,共享模式下,目前線程釋放了一定數量的資源,但這部分資源滿足不了下一個等待結點的需要的話,那麼會怎麼樣?

按照正常的思維,共享模式是可以多個線程同時執行的才對,是以,多個線程的情況下,如果老大釋放完資源,但這部分資源滿足不了老二,但能滿足老三,那麼老三就可以拿到資源。可事實是,從源碼設計中可以看出,如果真的發生了這種情況,老三是拿不到資源的,因為等待隊列是按順序排列的,老二的資源需求量大,會把後面量小的老三以及老四、老五等都給卡住。從這一個角度來看,雖然AQS嚴格保證了順序,但也降低了并發能力

接着往下說吧,喚醒下一個鄰居線程的邏輯在

doReleaseShared()

中,我們放到下面的釋放鎖來解析。

釋放鎖

共享模式釋放鎖的頂層方法是

releaseShared

,它會釋放指定量的資源,如果成功釋放且允許喚醒等待線程,它會喚醒等待隊列裡的其他線程來擷取資源。下面是releaseShared()的源碼:

public final boolean releaseShared(int arg) {
 if (tryReleaseShared(arg)) {
  doReleaseShared();
  return true;
 }
 return false;
}
           

該方法同樣包含兩部分的邏輯:

tryReleaseShared:釋放資源。

doAcquireShared:喚醒後繼結點。

tryAcquireShared

方法一樣,

tryReleaseShared

在AQS中沒有具體的實作,由子同步器自己去定義,但功能都一樣,就是釋放一定數量的資源。

釋放完資源後,線程不會馬上就收工,而是喚醒等待隊列裡最前排的等待結點。

doAcquireShared

喚醒後繼結點的工作在

doReleaseShared()

方法中完成,我們可以看下它的源碼:

private void doReleaseShared() {
 for (;;) {
  // 擷取等待隊列中的head結點
  Node h = head;
  if (h != null && h != tail) {
   int ws = h.waitStatus;
   // head結點waitStatus = -1,喚醒下一個結點對應的線程
   if (ws == Node.SIGNAL) {
    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
     continue;            // loop to recheck cases
    // 喚醒後繼結點
    unparkSuccessor(h);
   }
   else if (ws == 0 &&
      !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
    continue;                // loop on failed CAS
  }
  if (h == head)                   // loop if head changed
   break;
 }
}
           

代碼沒什麼特别的,就是如果等待隊列head結點的waitStatus為-1的話,就直接喚醒後繼結點,喚醒的方法

unparkSuccessor()

在上面已經講過了,這裡也沒必要再複述。

總的來看,AQS共享模式的運作流程和獨占模式很相似,隻要掌握了獨占模式的流程運轉,共享模式什麼的不就那樣嗎,沒難度。這也是我為什麼共享模式講解中不畫流程圖的原因,沒必要嘛。

Condition

介紹完了AQS的核心功能,我們再擴充一個知識點,在AQS中,除了提供獨占/共享模式的加鎖/解鎖功能,它還對外提供了關于Condition的一些操作方法。

Condition是個接口,在jdk1.5版本後設計的,基本的方法就是

await()

signal()

方法,功能大概就對應Object的

wait()

notify()

,Condition必須要配合鎖一起使用,因為對共享狀态變量的通路發生在多線程環境下。一個Condition的執行個體必須與一個Lock綁定,是以Condition一般都是作為Lock的内部實作 ,AQS中就定義了一個類ConditionObject來實作了這個接口,

java并發程式設計之核心:AQS進階

那麼它應該怎麼用呢?我們可以簡單寫個demo來看下效果

public class ConditionDemo {

    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        Condition condition = lock.newCondition();
        Thread tA = new Thread(() -> {
            lock.lock();
            try {
                System.out.println("線程A加鎖成功");
                System.out.println("線程A執行await被挂起");
                condition.await();
                System.out.println("線程A被喚醒成功");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
                System.out.println("線程A釋放鎖成功");
            }
        });

        Thread tB = new Thread(() -> {
            lock.lock();
            try {
                System.out.println("線程B加鎖成功");
                condition.signal();
                System.out.println("線程B喚醒線程A");
            } finally {
                lock.unlock();
                System.out.println("線程B釋放鎖成功");
            }
        });
        tA.start();
        tB.start();
    }
}
           

執行main函數後結果輸出為:

線程A加鎖成功
線程A執行await被挂起
線程B加鎖成功
線程B喚醒線程A
線程B釋放鎖成功
線程A被喚醒成功
線程A釋放鎖成功
           

代碼執行的結果很容易了解,線程A先擷取鎖,然後調用

await()

方法挂起目前線程并釋放鎖,線程B這時候拿到鎖,然後調用

signal

喚醒線程A。

毫無疑問,這兩個方法讓線程的狀态發生了變化,我們仔細來研究一下,

翻看AQS的源碼,我們會發現Condition中定義了兩個屬性

firstWaiter

lastWaiter

,前面說了,AQS中包含了一個FIFO的CLH等待隊列,每個Conditon對象就包含這樣一個等待隊列,而這兩個屬性分别表示的是等待隊列中的首尾結點,

/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
           

注意:Condition當中的等待隊列和AQS主體的同步等待隊列是分開的,兩個隊列雖然結構體相同,但是作用域是分開的

await

先看

await()

的源碼:

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 将目前線程加入到等待隊列中
    Node node = addConditionWaiter();
    // 完全釋放占有的資源,并傳回資源數
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 循環判斷目前結點是不是在Condition的隊列中,是的話挂起
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
           

當一個線程調用Condition.await()方法,将會以目前線程構造結點,這個結點的

waitStatus

指派為Node.CONDITION,也就是-2,并将結點從尾部加入等待隊列,然後尾部結點就會指向這個新增的結點,

private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}
           

我們依然用上面的demo來示範,此時,線程A擷取鎖并調用Condition.await()方法後,AQS内部的資料結構會變成這樣:

java并發程式設計之核心:AQS進階

在Condition隊列中插入對應的結點後,線程A會釋放所持有的資源,走到while循環那層邏輯,

while (!isOnSyncQueue(node)) {
 LockSupport.park(this);
 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  break;
}
           

isOnSyncQueue

方法的會判斷目前的線程節點是不是在同步隊列中,這個時候此結點還在Condition隊列中,是以該方法傳回false,這樣的話循環會一直持續下去,線程被挂起,等待被喚醒,此時,線程A的流程暫時停止了。

當線程A調用

await()

方法挂起的時候,線程B擷取到了線程A釋放的資源,然後執行

signal()

方法:

signal

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}
           

先判斷目前線程是否為擷取鎖的線程,如果不是則直接抛出異常。接着調用

doSignal()

方法來喚醒線程。

private void doSignal(Node first) {
 // 循環,從隊列一直往後找不為空的首結點
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
 // CAS循環,将結點的waitStatus改為0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
 // 上面已經分析過,此方法會把目前結點加入到等待隊列中,并傳回前驅結點
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

           

doSignal

的代碼中可以看出,這時候程式尋找的是Condition等待隊列中首結點firstWaiter的結點,此時該結點指向的是線程A的結點,是以之後的流程作用的都是線程A的結點。

這裡分析下

transferForSignal

方法,先通過CAS自旋将結點waitStatus改為0,然後就把結點放入到同步隊列 (此隊列不是Condition的等待隊列) 中,然後再用CAS将同步隊列中該結點的前驅結點waitStatus改為Node.SIGNAL,也就是-1,此時AQS的資料結構大概如下(額.....少畫了個箭頭,大家就當head結點是線程A結點的前驅結點就好):

java并發程式設計之核心:AQS進階

回到

await()

方法,當線程A的結點被加入同步隊列中時,

isOnSyncQueue()

會傳回true,跳出循環,

while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
           

接着執行

acquireQueued()

方法,這裡就不用多說了吧,嘗試重新擷取鎖,如果擷取鎖失敗繼續會被挂起,直到另外線程釋放鎖才被喚醒。

是以,當線程B釋放完鎖後,線程A被喚醒,繼續嘗試擷取鎖,至此流程結束。

對于這整個通信過程,我們可以畫一張流程圖展示下:

java并發程式設計之核心:AQS進階

總結

說完了Condition的使用和底層運作機制,我們再來總結下它跟普通 wait/notify 的比較,一般這也是問的比較多的,Condition大概有以下兩點優勢:

  • Condition 需要結合 Lock 進行控制,使用的時候要注意一定要對應的unlock(),可以對多個不同條件進行控制,隻要new 多個 Condition對象就可以為多個線程控制通信,wait/notify 隻能和 synchronized 關鍵字一起使用,并且隻能喚醒一個或者全部的等待隊列;
  • Condition 有類似于 await 的機制,是以不會産生加鎖方式而産生的死鎖出現,同時底層實作的是 park/unpark 的機制,是以也不會産生先喚醒再挂起的死鎖,一句話就是不會産生死鎖,但是 wait/notify 會産生先喚醒再挂起的死鎖。

回顧本篇文章,我們不難發現,無論是獨占還是共享模式,或者結合是Condition工具使用,AQS本質上的同步功能都是通過對鎖和隊列中結點的操作來實作的,從設計上講,AQS的組成結構并不算複雜,底層的運轉機制也不會很繞,是以,大家如果看源碼的時候覺得有些困難的話也不用灰心,多看幾遍,順便畫個圖之類的,理清下流程還是沒什麼問題的。

繼續閱讀