AQS重生之路
AbstractQueuedSynchronizer抽象同步隊列簡稱AQS,它是實作同步器的基礎元件,并發包中鎖的底層就是使用AQS實作的。AQS是一個FIFO的雙向隊列,其内部通過節點head和tail記錄隊首和隊尾元素,隊列元素的類型為Node。

1.1 Node分析
static final class Node {
//标記該線程是擷取共享資源時被阻塞挂起後放入AQS隊列
static final Node SHARED = new Node();
//标記線程是擷取獨占資源時被挂起後放入AQS隊列的
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int PROPAGATE = -3;
//記錄目前線程等待狀态,可以為
//CANCELLED(線程被取消了)
//SIGNAL(線程需要被喚醒)
//CONDITION(線程在條件隊列裡面等待)
//PROPAGATE(釋放共享資源時需要通知其他節點)
volatile int waitStatus;
volatile Node prev;
volatile Node next;
//存放進入AQS隊列裡面的線程
volatile Thread thread;
Node nextWaiter;
}
/**
頭尾節點
*/
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
上述state變量擴充:對于ReentrantLock的實作來說,state可以用來表示目前線程擷取鎖的可重入次數;對于讀寫鎖ReentrantReadWriteLock來說,state的高16位表示讀狀态,也就是擷取該讀鎖的次數,低16位表示擷取到寫鎖的線程的可重入次數;對于semaphore來說,state用來表示目前可用信号的個數;對于CountDownlatch來說,state用來表示計數器目前的值。
1.2 ConditionObject
**字段:**ConditionObject内部類隻有兩個變量分别存儲條件隊列的頭、尾元素。
方法:
1.3 變量
方法操作
對于AQS來說,線程同步的關鍵是對狀态值state進行操作。根據state是否屬于一個線程,操作state的方式分為獨占方式和共享方式。
獨占方式
注意:阻塞的方式進行資源擷取,try相關的方法都需要子類去實作。
- void acquire(intarg)
//會首先使用tryAcquire方法嘗試擷取資源,具體是設定狀态變量state的值,成功則直接傳回 //失敗則将目前線程封裝為類型為Node.EXCLUSIVE的Node節點後插入到AQS阻塞隊列的尾部,并調用LockSupport.park(this)方法挂起自己。 public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
- void acquireInterruptibly(int arg)
- boolean release(int arg)
1.嘗試使用tryRelease操作釋放資源,這裡是設定狀态變量state的值,然後調用LockSupport.unpark(thread)方法激活AQS隊列裡面被阻塞的一個線程 public final boolean release(int arg) { if (tryRelease(arg)) { Node h = ; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
共享方式
當多個線程去請求資源時通過CAS方式競争擷取資源,當一個線程擷取到了資源後,另外一個線程再次去擷取時如果目前資源還能滿足它的需要,則目前線程隻需要使用CAS方式進行擷取即可。比如Semaphore信号量,當一個線程通過acquire()方法擷取信号量時,會首先看目前信号量個數是否滿足需要,不滿足則把目前線程放入阻塞隊列,如果滿足則通過自旋CAS擷取信号量。
- void acquireShared(int arg)
會首先使用tryAcquireShared嘗試擷取資源,具體是設定狀态變量state的值,成功則直接傳回,失敗則将目前線程封裝為類型為Node.SHARED的Node節點後插入到AQS阻塞隊列的尾部,并使用LockSupport.park(this)方法挂起自己。 public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
- void acquireSharedInterruptibly(int arg)
- boolean releaseShared(int arg)
1.4 維護AQS隊列
入隊操作
簡述:當一個線程擷取鎖失敗後該線程會被轉換為Node節點,然後就會使用enq(final Node node)方法将該節點插入到AQS的阻塞隊列。
1.當隊列為空,首先設定一個head的哨兵節點,并讓尾部也指向head節點
2.第二次循環,将目前節點連結到尾部,并将tail指向尾節點
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
1.5 AQS—條件變量的支援
主要就是通過
Lock.newCondition()
來擷取條件變量,然後通過
condition.await()
和
condition.signal()
來控制線程通信。
案例如下
package demo;
import java.util.HashMap;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class Test {
private static ReentrantLock lock = new ReentrantLock();
private static Condition condition = lock.newCondition();
public static void main(String[] args) throws InterruptedException {
new Thread(Test::await).start();
new Thread(Test::signal).start();
}
private static void await(){
lock.lock();
try {
System.out.println("begin wait");
condition.await();
System.out.println("end wait");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
private static void signal(){
lock.lock();
try {
System.out.println("begin signal");
condition.signal();
System.out.println("end signal");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
a.源碼分析
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//建立新的node節點,并插入到條件隊列末尾
Node node = addConditionWaiter();
//釋放目前線程擷取的鎖
int savedState = fullyRelease(node);
int interruptMode = 0;
//調用park()方法阻塞挂起目前線程
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
...
}
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
//将條件隊列頭元素移動到AQS隊列
doSignal(first);
}
b.總體流程
1.當多個線程同時調用lock.lock()方法擷取鎖時,隻有一個線程擷取到了鎖,其他線程會被轉換為Node節點插入到lock鎖對應的AQS阻塞隊列裡面,并做自旋CAS嘗試擷取鎖。
2.如果擷取到鎖的線程又調用了對應的條件變量的await()方法,則該線程會釋放擷取到的鎖,并被轉換為Node節點插入到條件變量對應的條件隊列裡面。
3.這時候因為調用lock.lock()方法被阻塞到AQS隊列裡面的一個線程會擷取到被釋放的鎖,如果該線程也調用了條件變量的await()方法則該線程也會被放入條件變量的條件隊列裡面。
4.當另外一個線程調用條件變量的signal()或者signalAll()方法時,會把條件隊列裡面的一個或者全部Node節點移動到AQS的阻塞隊列裡面,等待時機擷取鎖。
最後使用一個圖總結如下:一個鎖對應一個AQS阻塞隊列,對應多個條件變量,每個條件變量有自己的一個條件隊列。