AQS底層原理
概念
AQS是一個同步隊列,全名為AbstractQueuedSynchronizer,它是一個同步工具,是Lock用來實作線程同步的核心元件。
AQS的功能分為兩種:獨占和共享
獨占鎖:每次隻能有一個線程擷取鎖,比如ReentrantLock是以獨占的方式實作互斥
共享鎖:允許多個線程同時擷取鎖,并發的通路共享資源,比如ReentrantReadWriteLock
内部實作
AQS隊列内部維護的是一個FIFO的雙向連結清單。每個Node由線程封裝,當線程争搶鎖失敗後會封裝成Node加入
AQS隊列中去。當擷取鎖的線程釋放鎖以後,會從隊列中喚醒一個阻塞的節點
Node的組成
static final class Node {
//共享模式下的等待标記
static final Node SHARED = new Node();
//獨占模式下的等待标記
static final Node EXCLUSIVE = null;
//線程的等待狀态,表示線程已經被取消
static final int CANCELLED = 1;
//線程的等待狀态,表示後繼線程需要被喚醒
static final int SIGNAL = -1;
//線程的等待狀态,表示線程在Condition上
static final int CONDITION = -2;
//表示下一個acquireShared需要無條件傳播
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
//目前節點的線程,初始化後使用,在使用後失效
volatile Thread thread;
/**
* 連結到等在等待條件上的下一個節點,或特殊的值SHARED,因為條件隊列隻有在獨占模式時才能被通路,
* 是以我們隻需要一個簡單的連接配接隊列在等待的時候儲存節點,然後把它們轉移到隊列中重新擷取
* 因為條件隻能是獨占性的,我們通過使用特殊的值來表示共享模式
*/
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* 傳回目前節點的前驅節點,如果為空,直接抛出空指針異常
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // 用來建立初始化的head 或 SHARED的标記
}
//将線程構造成一個節點,添加隊列
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { //指定線程和節點狀态的構造方法 // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
釋放鎖以及添加線程對于隊列的變化
添加節點的場景
添加節點會涉及到兩個變化
1:新的線程封裝為Node節點追加到同步隊列中,設定prev節點,以及修改目前節點的前置節點的next節點指向自己(連結清單插入操作)
2:通過CAS将tail重新指向新的尾部節點
head節點表示擷取鎖成功的節點,當頭結點在釋放同步狀态時,會喚醒後繼節點,如果後繼節點獲得鎖成功,會把自己設定為頭結點,節點變化過程如下。
這個過程也是涉及到兩個變化
1:修改head為下一個擷取鎖的節點
2:新的擷取鎖的節點,将pre的指針指向null
設定head節點不需要使用CAS,因為設定head節點是由擷取鎖的線程來完成的,而同步鎖隻能由一個線程獲得,是以不需要CAS保證
ReentrantLock源碼分析
ReentrantLock的時序圖
調用ReentrantLock中的lock()方法,源碼調用過程如下
ReentrantLock.lock()擷取鎖的入口
public void lock() {
sync.lock();
}
sync是一個抽象的靜态内部類,繼承了AQS來實作重入鎖的邏輯,AQS是一個同步隊列,能夠實作線程的阻塞以及喚醒,但是并不具備業務功能,是以在不同場景中,會繼承AQS來實作對應場景的功能,Sync有兩個具體的實作類,分别是:
NofairSync:表示可以存在搶占鎖的功能,允許搶占。
FairSync:表示所有線程嚴格按照FIFO來擷取鎖
NofairSync
以非公平鎖為例,看看lock的具體實作
1:CAS成功,就表示成功擷取到了鎖
2:CAS失敗,調用acquire(1)進行鎖競争
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
CAS的實作原理
//CAS改變狀态
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
通過cas樂觀鎖的方式進行比較并替換,這段代碼意思是,如果目前記憶體的state和預期值expect一樣,則替換為
update,更新成功傳回true,否則傳回false,這個操作是原子性的
state是AQS中的一個屬性,對于重入鎖的實作來說,表示有一個同步狀态,有兩個含義表示:
1:當state=0時,表示無鎖狀态
2:當state>0時,表示已經有線程擷取了鎖,也就是state=1,因為ReentrantLock允許重入,是以在同一線程多次擷取同步鎖時,state會遞增。
Unsafe 可認為是 Java 中留下的後門,提供了一些低層次操作,如直接記憶體通路、線程的挂起和恢複、CAS、線程同步、記憶體屏障
AQS.acquire
如果CAS操作未能成功,說明state已經不為0,此時繼續acquire(1)操作
1:通過tryAcquire 嘗試擷取獨占鎖,如果成功則傳回true,失敗傳回false
2:如果tryAcquire 失敗,則會通過addWaiter方法将目前線程封裝為Node添加到AQS隊列的尾部
3:acquireQueued,将Node作為參數,通過自旋去嘗試擷取鎖
if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//中斷
selfInterrupt();
NonfairSync.tryAcquire
嘗試擷取鎖,如果成功傳回 true,不成功傳回 false,它是重寫 AQS 類中的 tryAcquire 方法
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
ReentrantLock.nonfairTryAcquire
1:擷取目前線程,判斷鎖的狀态
2:如果state=0表示目前是無鎖狀态,通過cas更新state狀态的值
3:目前線程是可重入的,則增加重入次數
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//如果是0的話,則嘗試去原子擷取這個鎖
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//比較與目前線程是否是一個線程,如果是的話
else if (current == getExclusiveOwnerThread()) {
//增加狀态變量的值,是以是可重入操作
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
AQS.addWaiter
當tryAcquire方法擷取鎖失敗後,則會先調用addWaiter将目前線程封裝為Node。形參mode表示目前節點的狀
态,傳遞的參數是Node.EXCLUSIVE,表示獨占狀态。意味着重入鎖用到了AQS的獨占功能
1:将目前線程封裝為Node
2:目前連結清單中的tail節點是否為空,如果不為空,則通過CAS操作把目前線程的node添加到AQS隊列
3:如果為空或CAS失敗,調用enq将節點添加到AQS隊列
private Node addWaiter(Node mode) {
//将目前線程封裝為Node
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
//pred不為空的情況下,說明隊列中存在節點
if (pred != null) {
node.prev = pred;//将目前線程的Node的prev指向tail
//通過 cas 把 node加入到 AQS 隊列,也就是設定為 tail
if (compareAndSetTail(pred, node)) {
//設定成功以後,把原 tail 節點的 next指向目前 node
pred.next = node;
return node;
}
}
//tail=null或者compareAndSetTail(pred, node)=false,把 node 添加到同步隊列
enq(node);
return node;
}
enq
private Node enq(final Node node) {
//自璇操作,入隊
for (;;) {
//尾部
Node t = tail;
//尾部為空
if (t == null) { // Must initialize
//建立頭部節點,設定初始化節點
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
圖解分析:
假設有三個線程來争搶鎖
AQS.acquireQueued
通過addWaiter方法把線程添加到連結清單後,會接着把Node作為參數傳遞給acquireQueued方法,去競争鎖
1:擷取目前節點的prev節點
2:如果prev節點為head節點,那麼它就有資格去争搶鎖,調用tryAcquire搶占鎖
3:搶占鎖成功以後,把擷取鎖的節點設定為head,并且移除原來初始化的head節點
4:如果擷取鎖失敗,則根據waitStatus決定是否需要挂起線程
5:最後,通過cance|Acquire取消獲得鎖的操作
//通過自旋方式擷取同步狀态
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;//預設線程沒有被中斷過
for (;;) {
final Node p = node.predecessor();//擷取該節點的前驅節點p
if (p == head && tryAcquire(arg)) { // 如果p是頭節點并且能擷取到同步狀态
setHead(node); // 把目前節點設為頭節點
p.next = null; // 把p的next設為null,便于GC
failed = false; // 标志--表示成功擷取同步狀态,預設是true,表示失敗
return interrupted; // 傳回該線程在擷取到同步狀态的過程中有沒有被中斷過
}
//ThreadA 可能還沒釋放鎖,使得 ThreadB 在執行 tryAcquire 時會傳回 false
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//并且傳回目前線程在等待過程中有沒有中斷過。
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire
如果線程A的鎖還沒有釋放的情況下,B和C來争搶鎖肯定是會失敗的,那麼失敗之後,調用
shouldParkAfterFailedAcquire方法
Node中有五種狀态,分别是CANCELLED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3)、預設狀态(0)
CANCELLED:在同步隊列中等待的線程逾時或者被打斷,需要從同步隊列中取消該Node的結點,其結點的
waitStatus為CANCELLED,即結束狀态,進入該狀态後的結點将不會再變化
SIGNAL:隻要前置節點釋放鎖,就會通知辨別為SIGNAL狀态的後續節點的線程
CONDITION:線程的等待狀态,表示線程在Condition上
PROPAGATE:共享模式下,PROPAGATE 狀态的線程處于可運作狀态 0:初始狀态,
這個方法主要作用是,通過Node的狀态判斷,A線程競争鎖失敗以後是否應該被挂起
1:如果線程A的pred節點狀态為SIGNAL,那就表示可以放心挂起目前線程
2:通過循環掃描連結清單把CANCELLED狀态的節點移除
3:修改pred節點的狀态為SIGNAL,傳回false
傳回 false 時,也就是不需要挂起,傳回 true,則需要調用 parkAndCheckInterrupt挂起目前線程
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;//前置節點的waitStatus
if (ws == Node.SIGNAL) //如果前置節點為SIGNAL,意味着隻需要等待其他前置節點的線程被釋放
return true; //可以放心挂起
if (ws > 0) { //ws 大于 0,意味着 prev 節點取消了排隊,直接移除這個節點就行
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);//這裡采用循環,從雙向清單中移除 CANCELLED 的節點
pred.next = node;
} else {//利用 cas 設定 prev 節點的狀态為 SIGNAL(-1)
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt
使用 LockSupport.park 挂起目前線程WATING 狀态
Thread.interrupted,傳回目前線程是否被其他線程觸發過中斷請求,也就是thread.interrupt(); 如果有觸發過中
斷請求,那麼這個方法會傳回目前的中斷辨別true,并且對中斷辨別進行複位辨別已經響應過了中斷請求。如果返
回 true,意味着在 acquire 方法中會執行 selfInterrupt()。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
圖解分析
通過 acquireQueued 方法來競争鎖,如果 ThreadA 還在執行中沒有釋放鎖的話,意味着 ThreadB 和 ThreadC
隻能挂起了。
鎖的釋放流程
ReentrantLock.unlock
public final boolean release(int arg) {
if (tryRelease(arg)) { //釋放鎖成功
Node h = head; //擷取到AQS中的head節點
if (h != null && h.waitStatus != 0) //如果不為空,并且狀态不為0,則喚醒後續節點
unparkSuccessor(h);
return true;
}
return false;
}
ReentrantLock.tryRelease
這個方法是一個設定鎖的操作,通過将state狀态減掉傳入的參數值(1),如果結果狀态為0,就将它的排它鎖的
Owner設定為null
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
unparkSuccessor
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;//獲得頭結點的狀态
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;//擷取頭結點的下一個節點
if (s == null || s.waitStatus > 0) {
//如果下一個節點為 null 或者 status>0 表示 cancelled 狀态.
//通過從尾部節點開始掃描,找到距離 head 最近的一個waitStatus<=0 的節點
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//next 節點不為空,直接喚醒這個線程即可
if (s != null)
LockSupport.unpark(s.thread);
}
為什麼釋放節點時候是從tail掃描的
再回到 enq那個方法。8-11行來看一個新的節點是如何加入到連結清單中的
1:将新的節點的prev指向tail
2:通過cas将tail設定為新的節點
3:t.next = node;
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
在 cas 操作之後,t.next=node 操作之前。存在其他線程調用 unlock 方法從 head開始往後周遊,由于
t.next=node 還沒執行意味着連結清單的關系還沒有建立完整。就會導緻周遊到 t 節點的時候被中斷。是以從後往前遍
曆,一定不會存在這個問題。
圖解分析
通過鎖的釋放,原本的結構就發生了一些變化。head 節點的 waitStatus 變成了 0,ThreadB 被喚醒
原本挂起的線程繼續執行
通過ReentrantLock.unlock,原本挂起的線程被喚醒後繼續執行原來被挂起的線程是在 acquireQueued 方法中,
是以被喚醒以後繼續從這個方法開始執行
具體見上面的AQS.acquireQueued ,關鍵代碼如下:
if (p == head && tryAcquire(arg)) {
setHead(node);//設定線程B為頭結點
p.next = null; // help GC
failed = false;//把原 head 節點的 next 節點指向為 null
return interrupted;
}
圖解分析
- 設定新 head 節點的 prev=null
- 設定原 head 節點的 next 節點為 null
公平鎖和非公平鎖的差別
鎖的公平性是相對于擷取鎖的順序而言的,如果是一個公平鎖,那麼鎖的擷取順序就應該符合請求的絕對時間順
序,也就是 FIFO。 在上面分析的例子來說,隻要CAS 設定同步狀态成功,則表示目前線程擷取了鎖,而公平鎖則
不一樣,差異點有兩個
lock
final void lock() {
acquire(1);
}
非公平鎖在擷取鎖的時候,會先通過 CAS 進行搶占,而公平鎖則不會
FairSync.tryAcquire
protected final boolean tryAcquire(int acquires) {
//取到目前線程
final Thread current = Thread.currentThread();
//擷取目前的state(上鎖次數)
int c = getState();
//如果當時沒有鎖
if (c == 0) {
//則去隊列,取隊列中的并嘗試擷取鎖
//通過CAS修改狀态,
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
//如果成功,則把獨占鎖設定為目前這個線程
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
這個方法與 nonfairTryAcquire(int acquires)比較,不同的地方在于判斷條件多了hasQueuedPredecessors()方
法,也就是加入了[同步隊列中目前節點是否有前驅節點]的判斷,如果該方法傳回 true,則表示有線程比目前線程
更早地請求擷取鎖,是以需要等待前驅線程擷取并釋放鎖之後才能繼續擷取鎖。