AQS封裝的加鎖和解鎖方法
AQS提供了獨占鎖和共享鎖兩種加鎖方式,每種方式都有響應中斷和不響應中斷的差別,是以AQS的鎖可以分為如下四類
- 不響應中斷的獨占鎖(acquire)
- 響應中斷的獨占鎖(acquireInterruptibly)
- 不響應中斷的共享鎖(acquireShared)
- 響應中斷的共享鎖(acquireSharedInterruptibly)
而釋放鎖的方式隻有兩種
- 獨占鎖的釋放(release)
- 共享鎖的釋放(releaseShared)
獨占鎖的加鎖
我們前面在寫入隊的邏輯的時候,直接将Thread放到Queue中就完事了。看看Doug Lea是如何寫的
AQS隊列,當隊列為空時,第一次生成兩個節點,第一個節點代表目前占有鎖的線程,第二個節點為搶鎖失敗的節點。不為空的時候,每次生成一個節點放入隊尾。
當把線程放入隊列中時,後續應該做哪些操作呢?
如果讓你寫是不是直接放入隊列中就完事了?但Doug Lea是這樣做的
- 如果目前線程是隊列中的第二個節點則再嘗試搶一下鎖(不是第二個節點就不用搶來,輪不到),這樣避免了頻繁的阻塞和喚醒線程,提高了效率
- 上鬧鐘,讓上一個線程來喚醒自己(後續會說到,即更改上一個節點的waitStatus)
- 阻塞
另外還得注意一下出隊的邏輯,當A線程釋放鎖,喚醒隊列中的B線程,A線程會從隊列中删除。那出隊這個事情由誰來做?是由被喚醒的線程來做,即B線程
從加鎖的模版方法開始分析
// 調用ReentrantLock.FairSync#lock方法其實就是調用acquire(1);
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//擷取到鎖傳回false,否則傳回true
selfInterrupt();//目前線程将自己中斷
}
- 先嘗試擷取,如果擷取到直接退出,否則進入2
- 擷取鎖失敗,以獨占模式将線程包裝成Node放到隊列中
- 如果放入的節點是隊列的第二個節點,則再嘗試擷取鎖,因為此時鎖有可能釋放類,不是第二個節點就不用嘗試了,因為輪不到。如果擷取到鎖則将目前節點設為head節點,退出,否則進入4
- 設定好鬧鐘後将自己阻塞
- 線程被喚醒,重新競争鎖,擷取鎖成功,繼續執行。如果線程發生過中斷,則最後重置中斷标志位位true,即執行selfInterrupt()方法
從代碼層面詳細分析一波,走起
tryAcquire是讓子類實作的
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
這裡通過抛出異常來告訴子類要重寫這個方法,為什麼不将這個方法定義為abstract方法呢?因為AQS有2種功能,獨占和共享,如果用abstract修飾,則子類需要同時實作兩種功能的方法,對子類不友好
- 當隊列不為空,嘗試将新節點通過CAS的方式設定為尾節點,如果成功,傳回附加着目前線程的節點
- 當隊列為空,或者新節點通過CAS的方式設定為尾節點失敗,進入enq方法
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
- 當隊列不為空,一直CAS,直到把新節點放入隊尾
- 當隊列為空,先往對列中放入一個節點,在把傳入的節點CAS為尾節點
前面已經說過了哈,AQS隊列為空時,第一次會放入2個節點
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 隊列為空,進行初始化,
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
放入隊列後還要幹什麼?
- 如果是第二個節點再嘗試擷取一波鎖,因為此時有可能鎖已經釋放了,其他節點就不用了,因為還輪不到
- 上鬧鐘,讓别的線程喚醒自己
- 阻塞自己
// 自旋擷取鎖,直到擷取鎖成功,或者異常退出
// 但是并不是busy acquire,因為當擷取失敗後會被挂起,由前驅節點釋放鎖時将其喚醒
// 同時由于喚醒的時候可能有其他線程競争,是以還需要進行嘗試擷取鎖,展現的非公平鎖的精髓。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 擷取前繼節點
final Node p = node.predecessor();
// node節點的前繼節點是head節點,嘗試擷取鎖,如果成功說明head節點已經釋放鎖了
// 将node設為head開始運作(head中不包含thread)
if (p == head && tryAcquire(arg)) {
setHead(node);
// 将第一個節點出隊
p.next = null; // help GC
failed = false;
return interrupted;
}
// 擷取鎖失敗後是否可以挂起
// 如果可以挂起,則阻塞目前線程(擷取鎖失敗的節點)
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
根據前繼節點的狀态,是否可以阻塞目前擷取鎖失敗的節點
一般情況會經曆如下2個過程
- 預設情況下上一個節點的waitStatus=0,是以會進入compareAndSetWaitStatus方法,通過cas将上一個節點的waitStatus設定為SIGNAL,然後return false
- shouldParkAfterFailedAcquire方法外面是一個死循環,當再次進入這個方法時,如果上一步cas成功,則會走第一個if,return true。接着執行parkAndCheckInterrupt,線程會阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 前繼節點釋放時會unpark後繼節點,可以挂起
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
//将CANCELLED狀态的線程清理出隊列
// 後面會提到為什麼會有CANCELLED的節點
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 将前繼節點的狀态設定為SIGNAL,代表釋放鎖時需要喚醒後面的線程
// cas更新可能失敗,是以不能直接傳回true
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
shouldParkAfterFailedAcquire表示上好鬧鐘了,可以阻塞線程了。後續當線程被喚醒的時候會從return語句出繼續執行,然後進入acquireQueued方法的死循環,重新搶鎖。至此,加鎖結束。
// 挂起線程,傳回是否被中斷過
private final boolean parkAndCheckInterrupt() {
// 阻塞線程
LockSupport.park(this);
// 傳回目前線程是否被調用過Thread#interrupt方法
return Thread.interrupted();
}
最後用一個流程圖來解釋不響應中斷的獨占鎖
入隊過程中有異常該怎麼辦?
這部分内容比較難,看着吃力的小夥伴跳過即可,對主流程影響不大
可以看到上面調用acquireQueued方法發生異常的時候,會調用cancelAcquire方法,我們就詳細分析一下這個cancelAcquire方法有哪些作用?
哪些地方執行發生異常會執行cancelAcquire?
可以看到調用cancelAcquire方法的有如下幾個部分
分析這些方法的調用,發現基本就是如下2個地方會發生異常
- 嘗試擷取鎖的方法如tryAcquire,這些一般是交給子類來實作的
- 當線程是被調用Thread#interrupt方法喚醒,如果要響應中斷,會抛出InterruptedException
//處理異常退出的node
private void cancelAcquire(Node node) {
if (node == null)
return;
// 設定該節點不再關聯任何線程
node.thread = null;
// 跳過CANCELLED節點,找到一個有效的前繼節點
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 擷取過濾後的有效節點的後繼節點
Node predNext = pred.next;
// 設定狀态為取消
node.waitStatus = Node.CANCELLED;
// case 1
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// case 2
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// case3
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
将node出隊有如下三種情況
- 目前節點是tail
- 目前節點不是head的後繼節點,也不是tail
- 目前節點是head的後繼節點
目前節點是tail
compareAndSetTail,将tail指向pred
compareAndSetNext,将pred的next指向null,也就是把目前節點移出隊列
目前節點不是head的後繼節點,也不是tail
這裡将node的前繼節點的next指向了node的後繼節點,即compareAndSetNext(pred, predNext, next),注意pred和node節點中間有可能有CANCELLED的節點,怕亂就沒畫出來
目前節點是head的後繼節點
沒有對隊列進行操作,隻是進行head後繼節點的喚醒操作(unparkSuccessor方法,後面會分析這個方法),因為此時他是head的後繼節點,還是有可能擷取到鎖的,是以喚醒它嘗試擷取一波鎖,當再次調用到shouldParkAfterFailedAcquire(判斷是否應該阻塞的方法時)會把CANCELLED狀态的節點從隊列中删除
獨占鎖的解鎖
獨占鎖是釋放其實就是利用cas将state-1,當state=0表示鎖被釋放,需要将阻塞隊列中的線程喚醒
public final boolean release(int arg) {
// 是否需要釋放鎖,傳回ture則需要釋放鎖
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 喚醒阻塞的線程
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease即具體的解鎖邏輯,需要子類自己去實作
喚醒同步隊列中的線程,可以看到前面加了判斷h != null && h.waitStatus != 0
h = null,說明同步同步隊列中沒有資料,則不需要喚醒
h = null && waitStatus = 0,同步隊列是有了,但是沒有線程給自己上鬧鐘,不用喚醒
h != null && waitStatus < 0,說明頭節點被人上了鬧鐘,自己需要喚醒阻塞的線程
h != null && waitStatus > 0,頭節點因為發生異常被設定為取消,但還是得喚醒線程
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 頭結點的下一個節點
Node s = node.next;
// 為空或者被取消
if (s == null || s.waitStatus > 0) {
s = null;
// 從隊列尾部向前周遊找到最前面的一個waitStatus<=0的節點
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 喚醒節點,但并不表示它持有鎖,要從阻塞的地方開始運作
LockSupport.unpark(s.thread);
}
為什麼要從後向前找第一個非CANCELLED的節點呢?
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
// 線程在這裡挂起了
pred.next = node;
return node;
}
}
enq(node);
return node;
}
這其實和入隊的邏輯有關系,假如Node1在圖示位置挂起了,Node1後面又陸續增加了Node2和Node3,如果此時從前向後周遊會導緻元素丢失,不能正确喚醒線程
分析一下獨占鎖響應中斷和不響應中斷的差別
我們之前說過獨占鎖可以響應中斷,也可以不響應中斷,調用的方法如下?
- 不響應中斷的獨占鎖(acquire)
- 響應中斷的獨占鎖(acquireInterruptibly)
是以我們隻需要看這2個方法的差別在哪裡就可以,我下面隻列出有差別的部分哈。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
public final void acquireInterruptibly(int arg)
throws InterruptedException {
// 判斷線程是否被中斷
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
acquire在嘗試擷取鎖的時候完全不管線程有沒有被中斷,而acquireInterruptibly在嘗試擷取鎖之前會判斷線程是否被中斷,如果被中斷,則直接抛出異常。
tryAcquire方法一樣,是以我們隻需要對比acquireQueued方法和doAcquireInterruptibly方法的差別即可
執行acquireQueued方法當線程發生中斷時,隻是将interrupted設定為true,并且調用selfInterrupt方法将中斷标志位設定為true
而執行doAcquireInterruptibly方法,當線程發生中斷時,直接抛出異常。
最後看一下parkAndCheckInterrupt方法,這個方法中判斷線程是否中斷的邏輯特别巧!
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
Thread類提供了如下2個方法來判斷線程是否是中斷狀态
- isInterrupted
- interrupted
這裡為什麼用interrupted而不是isInterrupted的呢?(這部分内容比較難,看着吃力的小夥伴跳過即可,對主流程影響不大)
示範一下這2個方法的差別
@Test
public void testInterrupt() throws InterruptedException {
Thread thread = new Thread(() -> {
while (true) {}
});
thread.start();
TimeUnit.MICROSECONDS.sleep(100);
thread.interrupt();
// true
System.out.println(thread.isInterrupted());
// true
System.out.println(thread.isInterrupted());
// true
System.out.println(thread.isInterrupted());
}
@Test
public void testInterrupt2() {
Thread.currentThread().interrupt();
// true
System.out.println(Thread.interrupted());
// false
System.out.println(Thread.interrupted());
// false
System.out.println(Thread.interrupted());
}
isInterrupted和interrupted的方法差別如下
Thread#isInterrupted:測試線程是否是中斷狀态,執行後不更改狀态标志
Thread#interrupted:測試線程是否是中斷狀态,執行後将中斷标志更改為false
接着再寫2個例子
public static void main(String[] args) {
LockSupport.park();
// end被一直阻塞沒有輸出
System.out.println("end");
}
public static void main(String[] args) {
Thread.currentThread().interrupt();
LockSupport.park();
// 輸出end
System.out.println("end");
}
可以看到當線程被中斷時,調用park()方法并不會被阻塞
public static void main(String[] args) {
Thread.currentThread().interrupt();
LockSupport.park();
// 傳回中斷狀态,并且清除中斷狀态
Thread.interrupted();
// 輸出start
System.out.println("start");
LockSupport.park();
// end被阻塞,沒有輸出
System.out.println("end");
}
到這我們就能了解為什麼要進行中斷的複位了
- 如果目前線程是非中斷狀态,則在執行park時被阻塞,傳回中斷狀态false
- 如果目前線程是中斷狀态,則park方法不起作用,傳回中斷狀态true,interrupted将中斷複位,變為false
- 再次執行循環的時候,前一步已經線上程的中斷狀态進行了複位,則再次調用park方法時會阻塞
是以這裡要對中斷進行複位,是為了不讓循環一直執行,讓目前線程進入阻塞狀态,如果不進行複位,前一個線程在擷取鎖之後執行了很耗時的操作,那目前線程豈不是要一直執行死循環,造成CPU使用率飙升?
獨占鎖的擷取和釋放我們已經搞清楚了,來分析共享鎖的擷取和釋放
共享鎖的加鎖
加鎖成功則執行業務邏輯,否則入隊并阻塞
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
入隊的邏輯和獨占鎖的入隊邏輯差不多
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
在獲鎖成功後,除了設定頭節點,還多了一步操作,就是喚醒同步隊列中的所有線程
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
共享鎖的解鎖
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
喚醒同步隊列中的所有線程
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
獨占鎖和共享鎖的差別
- 獨占鎖:一個鎖能被多個線程擷取,加鎖失敗阻塞放入同步隊列,加鎖成功執行業務邏輯,解鎖喚醒同步隊列中的一個線程
- 共享鎖:一個鎖隻能被一個線程擷取,加鎖失敗阻塞放入同步隊列,加鎖成功會喚醒同步隊列中的所有線程,解鎖也會喚醒同步隊列中的所有線程