1、前言
通過上篇并發程式設計專題(五)抽象隊列同步器AQS應用Lock詳解
我們已經知道AQS的功能分為獨占和共享,底層資料結構是一個雙向連結清單
獨占鎖,每次隻能有一個線程持有鎖,比如 ReentrantLock 就是以獨占方式實作的互斥鎖
共享鎖,允許多個線程同時擷取鎖,并發通路共享資源 ,比如ReentrantReadWriteLock
2、ReentrantLock源碼分析
以 ReentrantLock 作為切入點,來看看在ReentrantLock是如何使用 AQS 來實作線程的同步的
2.1、入口
public void lock() {
sync.lock();
}
這個是 reentrantLock 擷取鎖的入口
sync 實際上是一個抽象的靜态内部類,它繼承了 AQS 來實作重入鎖的邏輯,我們知道AQS 是一個同步隊列,它能夠實作線程的阻塞以及喚醒,但它并不具備業務功能,是以在不同的同步場景中,會繼承 AQS 來實作對應場景的功能
Sync 有兩個具體的實作類,分别是:
NofairSync:非公平,表示可以存在搶占鎖的功能,也就是說不管目前隊列上是否存在其他線程等待,新線程都有機會搶占鎖
FailSync:公平,表示所有線程嚴格按照 FIFO 來擷取鎖
2.2、NofairSync.lock加鎖方法詳解
以非公平鎖為例,看看lock中的實作
- 非公平鎖和公平鎖最大的差別在于,在非公平鎖中我搶占鎖的邏輯是,不管有沒有線程排隊,我先上來 cas 去搶占一下
- CAS 成功,就表示成功獲得了鎖
- CAS 失敗,調用 acquire(1)走鎖競争邏輯
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
cas的實作原理
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
通過 cas 樂觀鎖的方式來做比較并替換,這段代碼的意思是,如果目前記憶體中的state 的值和預期值 expect 相等,則替換為 update。更新成功傳回 true,否則傳回 false.
這個操作是原子的,不會出現線程安全問題,這裡面涉及到Unsafe這個類的操作,以及涉及到 state 這個屬性的意義。
state 是 AQS 中的一個屬性,它在不同的實作中所表達的含義不一樣,對于重入鎖的實作來說,表示一個同步狀态。它有兩個含義的表示
1. 當 state=0 時,表示無鎖狀态
2. 當 state>0 時,表示已經有線程獲得了鎖,也就是 state=1,但是因為
ReentrantLock 允許重入,是以同一個線程多次獲得同步鎖的時候,state 會遞增,比如重入 5 次,那麼 state=5。而在釋放鎖的時候,同樣需要釋放 5 次直到 state=0,其他線程才有資格獲得鎖
(對于unsafe類會在後續講解)
AQS.acquire
acquire 是 AQS 中的方法,如果 CAS 操作未能成功,說明 state 已經不為 0,此時繼續 acquire(1)操作
這個方法的主要邏輯是:
- 通過 tryAcquire 嘗試擷取獨占鎖,如果成功傳回 true,失敗傳回 false
- 如果 tryAcquire 失敗,則會通過 addWaiter 方法将目前線程封裝成 Node 添加到 AQS 隊列尾部
- acquireQueued,将 Node 作為參數,通過自旋去嘗試擷取鎖。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
NonfairSync.tryAcquire
這個方法的作用是嘗試擷取鎖,如果成功傳回 true,不成功傳回 false
它是重寫 AQS 類中的 tryAcquire 方法,并且大家仔細看一下 AQS 中 tryAcquire方法的定義,并沒有實作,而是抛出異常。按照一般的思維模式,既然是一個不實作的模版方法,那應該定義成 abstract,讓子類來實作呀?這裡這麼做是因為獨占鎖和共享鎖的調用方法不同,子類隻需要實作所需要的方法
ReentrantLock.nofairTryAcquire
- 擷取目前線程,判斷目前的鎖的狀态
- 如果 state=0 表示目前是無鎖狀态,通過 cas 更新 state 狀态的值
- 目前線程是屬于重入,則增加重入次數
final boolean nonfairTryAcquire(int acquires) {
// 擷取目前執行的線程
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {//表示無鎖狀态
//cas 替換 state 的值,cas 成功表示擷取鎖成功
if (compareAndSetState(0, acquires)) {
//儲存目前獲得鎖的線程,下次再來的時候不要再嘗試競争鎖
setExclusiveOwnerThread(current);
return true;
}
}
// 如果同一個線程來獲得鎖,直接增加重入次數
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
AQS.addWaiter
當 tryAcquire 方法擷取鎖失敗以後,則會先調用 addWaiter 将目前線程封裝成Node.
入參 mode 表示目前節點的狀态,傳遞的參數是 Node.EXCLUSIVE,表示獨占狀态。意味着重入鎖用到了 AQS 的獨占鎖功能
- 将目前線程封裝成 Node
- 目前連結清單中的 tail 節點是否為空,如果不為空,則通過 cas 操作把目前線程的node 添加到 AQS 隊列
- 如果為空或者 cas 失敗,調用 enq 将節點添加到 AQS 隊列
private Node addWaiter(Node mode) {
//把目前線程封裝為 Node
Node node = new Node(Thread.currentThread(), mode);
//tail是AQS中表示同步隊列隊尾的屬性,預設是null
Node pred = tail;
//tail 不為空的情況下,說明隊列中存在節點
if (pred != null) {
//把目前線程的 Node 的 prev 指向 tail
node.prev = pred;
//通過cas把node加入到AQS隊尾,也就是設定為tail
if (compareAndSetTail(pred, node)) {
////設定成功以後,把原tail節點的next指向目前 node
pred.next = node;
return node;
}
}
//tail=null,把 node 添加到同步隊列
enq(node);
return node;
}
enq
通過自旋把目前節點加入隊列
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
圖解分析:
假設 3 個線程來争搶鎖,那麼截止到 enq 方法運作結束之後,或者調用 addwaiter
方法結束後,AQS 中的連結清單結構圖
AQS.acquireQueued
通過 addWaiter 方法把線程添加到連結清單後,會接着把 Node 作為參數傳遞給acquireQueued 方法,去競争鎖
- 擷取目前節點的 prev 節點
- 如果 prev 節點為 head 節點,那麼它就有資格去争搶鎖,調用 tryAcquire 搶占鎖
- 搶占鎖成功以後,把獲得鎖的節點設定為 head,并且移除原來的初始化 head節點
- 如果獲得鎖失敗,則根據 waitStatus 決定是否需要挂起線程
- 最後,通過 cancelAcquire 取消獲得鎖的操作
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//擷取目前節點的 prev 節點
final Node p = node.predecessor();
//如果是head節點,說明有資格去争搶鎖
if (p == head && tryAcquire(arg)) {
//擷取鎖成功,也就是ThreadA已經釋放了鎖,然後設定 head為ThreadB獲得執行權限
setHead(node);
//把原head節點從連結清單中移除
p.next = null; // help GC
failed = false;
return interrupted;
}
//ThreadA 可能還沒釋放鎖,使得ThreadB在執行tryAcquire時會傳回false
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
////并且傳回目前線程在等待過程中有沒有中斷過
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
NofairSync.tryAcquire
這個方法在前面分析過,就是通過 state 的狀态來判斷是否處于無鎖狀态,然後在通過 cas 進行競争鎖操作。成功表示獲得鎖,失敗表示獲得鎖失敗
shouldParkAfterFailedAcquire
如果 ThreadA 的鎖還沒有釋放的情況下,ThreadB 和 ThreadC 來争搶鎖肯定是會失敗,那麼失敗以後會調用 shouldParkAfterFailedAcquire 方法
Node 有 5 種狀态,分别是:CANCELLED(1),SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3)、預設狀态(0)
- CANCELLED: 在同步隊列中等待的線程等待逾時或被中斷,需要從同步隊列中取消該 Node 的結點, 其結點的 waitStatus 為 CANCELLED,即結束狀态,進入該狀态後的結點将不會再變化
- SIGNAL: 隻要前置節點釋放鎖,就會通知辨別為 SIGNAL 狀态的後續節點的線程
- CONDITION:和 Condition 有關系,後續會講解
- PROPAGATE:共享模式下,PROPAGATE 狀态的線程處于可運作狀态
- 0:初始狀态
這個方法的主要作用是,通過 Node 的狀态來判斷,ThreadA 競争鎖失敗以後是否應該被挂起。
- 如果 ThreadA 的 pred 節點狀态為 SIGNAL,那就表示可以放心挂起目前線程
- 通過循環掃描連結清單把 CANCELLED 狀态的節點移除
- 修改 pred 節點的狀态為 SIGNAL,傳回 false.
傳回 false 時,也就是不需要挂起,傳回 true,則需要調用 parkAndCheckInterrupt挂起目前線程
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//前置節點的waitStatus
int ws = pred.waitStatus;
//如果前置節點為SIGNAL,意味着隻需要等待其他前置節點的線程被釋放,
if (ws == Node.SIGNAL)
//傳回true,意味着可以直接放心的挂起了
return true;
//ws大于0,意味着prev節點取消了排隊,直接移除這個節點就行
if (ws > 0) {
do {
node.prev = pred = pred.prev;
// //這裡采用循環,移除CANCELLED的節點
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//利用cas設定prev節點的狀态為SIGNAL(-1)
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt
使用 LockSupport.park 挂起目前線程程式設計 WATING 狀态
Thread.interrupted,傳回目前線程是否被其他線程觸發過中斷請求,也就是
thread.interrupt(); 如果有觸發過中斷請求,那麼這個方法會傳回目前的中斷辨別true,并且對中斷辨別進行複位辨別已經響應過了中斷請求。如果傳回 true,意味着在 acquire 方法中會執行 selfInterrupt()。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
2.3、鎖的釋放流程
ReentrantLock.unlock
public final boolean release(int arg) {
if (tryRelease(arg)) {
// 拿到head節點
Node h = head;
//如果head節點不為空并且狀态!=0.調用unparkSuccessor(h)
// 喚醒後續節點
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
ReentrantLock.tryRelease
這個方法可以認為是一個設定鎖狀态的操作,通過将 state 狀态減掉傳入的參數值(參數是 1),如果結果狀态為 0,就将排它鎖的 Owner 設定為 null,以使得其它的線程有機會進行執行。
在排它鎖中,加鎖的時候狀态會增加 1(當然可以自己修改這個值),在解鎖的時候減掉 1,同一個鎖,在可以重入後,可能會被疊加為 2、3、4 這些值,隻有 unlock()的次數與 lock()的次數對應才會将 Owner 線程設定為空,而且也隻有這種情況下才會傳回 true。
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
unparkSuccessor
private void unparkSuccessor(Node node) {
//獲得head節點狀态
int ws = node.waitStatus;
if (ws < 0)
// 設定head節點狀态為0
compareAndSetWaitStatus(node, ws, 0);
// 拿到head的下一節點
Node s = node.next;
if (s == null || s.waitStatus > 0) {
//如果下一個節點為 null 或者 status>0 表示 cancelled 狀态.
//通過從尾部節點開始掃描,找到距離head最近的一個waitStatus<=0 的節點
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//next 節點不為空,直接喚醒這個線程即可
if (s != null)
LockSupport.unpark(s.thread);
}
為什麼在釋放鎖的時候是從 tail 進行掃描
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
因為在之前enq添加節點方法中,在 cas 操作之後,t.next=node 操作之前。存在其他線程調用 unlock 方法從 head開始往後周遊,由于 t.next=node 還沒執行意味着連結清單的關系還沒有建立完整。就會導緻周遊到 t 節點的時候被中斷。是以從後往前周遊,一定不會存在這個問題。
原本挂起的線程繼續執行
通過 ReentrantLock.unlock,原本挂起的線程被喚醒以後繼續執行,應該從哪裡執行呢。原來被挂起的線程是在 acquireQueued 方法中,是以被喚醒以後繼續從這個方法開始執行
AQS.acquireQueued
這個方法前面已經完整分析過了,我們隻關注一下 ThreadB 被喚醒以後的執行流程。由于 ThreadB 的 prev 節點指向的是 head,并且 ThreadA 已經釋放了鎖。是以這個時候調用 tryAcquire 方法時,就可以順利搶占到鎖
2.4、公平鎖和非公平鎖的差別
鎖的公平性是相對于擷取鎖的順序而言的,如果是一個公平鎖,那麼鎖的擷取順序就應該符合請求的絕對時間順序,也就是 FIFO。隻要CAS 設定同步狀态成功,則表示目前線程擷取了鎖,而公平鎖則不一樣,差異點有兩個
FairSync.tryAcquire
非公平鎖在擷取鎖的時候,會先通過 CAS 進行搶占,而公平鎖則不會
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
這個方法與 nonfairTryAcquire(int acquires)比較,不同的地方在于判斷條件多了hasQueuedPredecessors()方法,也就是加入了[同步隊列中目前節點是否有前驅節點]的判斷,如果該方法傳回 true,則表示有線程比目前線程更早地請求擷取鎖,是以需要等待前驅線程擷取并釋放鎖之後才能繼續擷取鎖。