AbstractQueuedSynchronizer 抽象同步隊列,它是個模闆類提供了許多以鎖相關的操作,常說的AQS指的就是它。AQS繼承了
AbstractOwnableSynchronizer
類,AOS用于儲存線程對象,儲存什麼線程對象呢?儲存鎖被獨占的線程對象。抽象同步隊列AQS除了實作序列化标記接口,并沒有實作任何的同步接口,該類提供了許多同步狀态擷取和釋放的方法給自定義同步器使用,如ReentrantLock的内部類Sync。抽象同步隊列支援獨占式或共享式的的擷取同步狀态,友善實作不同類型的自定義同步器。一般方法名帶有
Shared
的為共享式,比如,嘗試以共享式的擷取鎖的方法
int tryAcquireShared(int)
,而獨占式擷取鎖方法為
boolean tryAcquire(int)
。AQS是抽象同步隊列,其重點就是
同步隊列
及
如何操作同步隊列
。
同步隊列
雙向同步隊列,采用尾插法新增節點,從頭部的下一個節點擷取操作節點,節點自旋擷取同步鎖,實作FIFO(先進先出)原則。
了解節點中的屬性值作用
- prev:前驅節點;即目前節點的前一個節點,之是以叫前驅節點,是因為前一個節點在使用完鎖之後會解除後一個節點的阻塞狀态;
- next:後繼節點;即目前節點的後一個節點,之是以叫後繼節點,是因為“後繼有人”了,表示有“下一代”節點承接這個獨有的鎖🔒;
- nextWaiter:表示指向下一個
Node.CONDITION
- 狀态的節點(本文不講述Condition隊列,在此可以忽略它);
- thread:節點對象中儲存的線程對象,節點都是配角,線程才是主角;
- waitStatus:目前節點在隊列中的等待狀态;waitStatus = CANCELLED = 1,表示線程已經取消(該狀态下的節點為廢棄節點,将從隊列中斷開);
- waitStatus = SIGNAL = -1,表示線程處于請求釋放的狀态,後繼線程需要阻塞等待(該狀态下的節點線程處于阻塞等待狀态或擷取鎖未釋放狀态);
- waitStatus = CONDITION = -2,表示線程正在等待;
- waitStatus = PROPAGATE = -3,在共享情況下,表示下一個被請求的shared應該無條件傳播;
- waitStatus = 0,表示節點初始化時的預設值(int類型成員變量的預設值)。
注意1:節點對象中的prev、next和nextWaiter都是一個完整的Node節點對象,也就是說每個節點都儲存了前後節點的對象,如果沒有則為null。
注意2:head節點是個虛節點(prev=null、thread=null),但head本身是一個實際存在的節點對象,起到标記隊列的開頭;尾節點tail節點的next=null,等待新節點插入。
頭節點head為什麼是虛節點
重點:必須明确知道頭節點head為什麼是虛節點!!!這很重要。
原因是,目前節點在擷取到鎖🔒之後,它這個線程對象就會被儲存到AOS(AbstractOwnableSynchronizer)中的
exclusiveOwnerThread
// 初始化隊列的方法private Node enq(final Node node) {
// 死循環
for (;;) {
Node t = tail;
// 沒頭沒尾時
if (t == null) { // Must initialize
// 生成頭節點(尾節點)
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 有頭有尾後,才把需要等待的線程節點加入隊列中
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
在獨占線程釋放鎖時,判斷head是否為null,即可知道同步隊列是否存在,如果同步隊列不存在,那麼無需執行嘗試喚醒後繼節點那些操作了。
在同個時間節點中,單個線程隻需要操作AOS的Thread對象和AQS的state狀态即可實作同步鎖和鎖的可重入性。
線程加入同步隊列的過程
在鎖被占用時,擷取鎖失敗後,目前線程被封裝成Node節點并加入到隊列尾部。
即
tryAcquire(arg)
傳回false時,執行
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
操作,而
addWaiter(Node.EXCLUSIVE)
是以,新增節點分為首次生成同步隊列同時新增節點和在原有同步隊列中插入新增節點。
首次生成同步隊列新增節點:通過
enq(final Node node)
方法,先初始化頭節點(虛節點),再通過原子操作
compareAndSetTail
方法從隊列尾部插入新節點。在原有同步隊列中新增節點:通過原子操作
compareAndSetTail
方法從隊列尾部插入新節點。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
private Node addWaiter(Node mode) {
// 使用目前線程生成新節點
Node node = new Node(Thread.currentThread(), mode);
// 擷取同步器的尾節點
Node pred = tail;
if (pred != null) {
// 第一步:新節點的prev節點指向尾部節點(pred=tail)
node.prev = pred;
// 第二部:CAS比較尾節點,相等就讓tail=node
if (compareAndSetTail(pred, node)) {
// 第三步:pred=舊的tail,即舊的尾節點的next節點指向新節點
pred.next = node;
return node;
}
}
// 當tail=null 時執行,邏輯相類似的;enq初始化的head節點為虛節點
enq(node);
return node;
}
// 将節點插入隊列,必要時初始化(即tail=null時,也即是同步隊列沒有節點時初始化)。private Node enq(final Node node) {
// 死循環
for (;;) {
// 第一次進來tail=null,第二次進來tail=head=new Node()
Node t = tail;
if (t == null) {
// 建立一個空節點作為head節點
if (compareAndSetHead(new Node()))
tail = head;
} else { // 下面就是正常的尾插法新增節點
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
配合源碼和動圖了解:新增隊列節點過程(三部曲)
使用尾插法新增同步隊列節點
- 第一步:新增節點的prev節點指向尾節點tail;
- 第二步:尾節點tail 和新節點做CAS操作,即compareAndSetTail(pred,node) ,即同步器的tail節點指向新節點;
- 第三步:舊的尾節點的next節點指向新節點(此時的新節點=尾節點tail)
最終結果圖
新增節點加入隊列之後,在同步隊列中線程怎麼等待?線程怎麼擷取鎖呢?
節點線程擷取鎖
看代碼前必須明确知道哪個節點是要擷取鎖的。頭節點為虛節點,标記隊列的開頭,真正要擷取鎖的是頭節點的後繼節點。
擷取鎖過程
鎖在釋放時調用的關鍵流程:
ReentrantLock#lock() -> Sync#lock() -> AQS#acquire(1) -> NonfairSync#tryAcquire(1) 【或FairSync#tryAcquire(1)】-> AQS#addWaiter(node) -> AQS#acquireQueued(node,1) -> AQS#selfInterrupt()
關鍵代碼👇
// java.util.concurrent.locks.AbstractQueuedSynchronizer/**
* 以獨占不可中斷模式擷取已在隊列中的線程。
*/final boolean acquireQueued(final Node node, int arg) {
// 異常标志狀态
boolean failed = true;
try {
// 是否發生過中斷的标志
boolean interrupted = false;
// 自旋鎖>>>死循環:每個node都獨立執行着這個死循環,直至線程被阻塞
for (;;) {
// 擷取前驅節點(目前節點的前一個節點)
final Node p = node.predecessor();
// 目前節點的前驅節點是否等于頭節點(虛節點),等于就會執行嘗試擷取鎖 tryAcquire(arg)
if (p == head && tryAcquire(arg)) {
setHead(node); // 設定目前節點為頭節點,在擷取鎖成功時,thread對象已經儲存到AQS中的exclusiveOwnerThread了
p.next = null; // 前驅節點的next指向null,斷開前驅節點(舊的頭節點)
failed = false; // 隻要目前節點node正常擷取到鎖,就不會執行finally的cancelAcquire(node)
return interrupted;
}
// 前驅節點的waitStatus=-1時,目前node會被阻塞,防止無限循環浪費資源
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
/* 正常情況:隻有return時,才會執行finally代碼,而隻要return,failed都等于false,
* 是以,failed 是為了避免節點發生異常時,node沒有被釋放。
* 比如:node.predecessor() 可能産生空指針異常。
*/
if (failed)
cancelAcquire(node);
}
}
// java.util.concurrent.locks.AbstractQueuedSynchronizerprivate void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
為什麼在嘗試擷取鎖前要判斷前驅節點是否為頭節點?
因為除了阻塞被釋放會讓死循環繼續執行的情況外,還有中斷指令也會使線程從阻塞狀态中被釋放,是以存在任意節點提前重新執行“死循環”嘗試擷取鎖的情況,如果不判斷擷取鎖節點的前驅節點是否為頭節點,那就會出現提前嘗試擷取鎖,進而破壞了同步隊列的先進先出(FIFO)原則,說白了,就是被插隊了。
目前節點不是頭節點時,執行以下代碼
// java.util.concurrent.locks.AbstractQueuedSynchronizer
/**
* 檢查和更新擷取鎖失敗的節點的狀态。如果線程需要等待,則傳回true,使其執行阻塞操作。
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 擷取前驅節點的等待狀态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 因為前驅節點處于請求釋放的狀态,是以目前節點需要阻塞等待,會傳回true,進而執行後續方法進入阻塞狀态
return true;
if (ws > 0) {
// 前驅節點被标上取消标志了,需要跳過前驅節點并不斷重試
do {
// 循環向前查找取消節點,把取消節點從隊列中移除
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus必須為0或等于PROPAGATE=-3
* 表示需要設定前驅節點等待狀态為SIGNAL,
* 将會在外層循環再次嘗試擷取鎖,如果再次擷取鎖失敗,那麼就會阻塞目前線程
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
當
shouldParkAfterFailedAcquire(p, node)
傳回true時,将會執行阻塞操作
parkAndCheckInterrupt())
,其通過線程阻塞工具類方法
LockSupport.park(this)
private final boolean parkAndCheckInterrupt() {
// 阻塞目前線程線程排程
LockSupport.park(this);
// 清除目前線程的中斷狀态,并傳回上一次的中斷狀态
return Thread.interrupted();
}
注意:
LockSupport.park(this)
阻塞後,需要喚醒阻塞才會執行後續操作,可通過解除阻塞
LockSupport.unpark(thread)
或 中斷
thread.interrupt()
隻有
shouldParkAfterFailedAcquire
和
parkAndCheckInterrupt
都傳回true時,才會執行interrupted = true,即隻有是中斷導緻阻塞結束時,才傳回true,此時
selfInterrupt()
重新執行一次中斷操作。
// java.util.concurrent.locks.AbstractQueuedSynchronizer
final boolean acquireQueued(final Node node, int arg) {
......
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 隻有是中斷導緻阻塞結束時,才傳回true
interrupted = true;
......
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 中斷目前線程
selfInterrupt();
}
static void selfInterrupt() {
// 中斷目前線程
Thread.currentThread().interrupt();
}
為什麼需要再一次執行中斷呢?
因為存在中斷
thread.interrupt()
喚醒自旋鎖阻塞的情況,而
Thread.interrupted()
擷取中斷狀态并清除目前線程的中斷狀态,是以需要重新執行一次中斷操作
selfInterrupt()
,将中斷标志置為true。這種擷取鎖的方式是
非中斷鎖
,就是無法通過中斷的方式結束鎖的擷取,差別于
中斷鎖
,是以該方式在擷取鎖的過程中,不會進行中斷,隻是記錄中斷狀态,Thread.interrupted() 擷取中斷狀态後清除中斷狀态,是以需要重新設定中斷标志為true。
如果你想要進行中斷的情況,那我們可以在acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 傳回true 的時候去處理。比如:抛出中斷異常。
如果你需要線上程發生中斷時結束擷取鎖,那麼可以考慮使用
lockInterruptibly()
來擷取鎖。
兩種方式擷取鎖的差別
lock()
方式擷取鎖:自旋鎖隻會在正常擷取到鎖或發生異常時結束自旋鎖(死循環)。
void lockInterruptibly()
方式擷取鎖:會在發生中斷的情況下,抛出中斷異常
throw new InterruptedException();
擷取鎖的
lock()
和
lockInterruptibly()
的主要差別是:中斷是否會結束鎖的擷取。
看看源碼怎麼實作中斷結束鎖的擷取
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 隻有是中斷導緻阻塞結束時,才抛出中斷異常
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
中斷異常抛出後,将會執行finally 代碼塊,取消正在進行嘗試擷取鎖的節點。
到此為止,lock()擷取鎖的概要過程為
先嘗試擷取鎖、失敗就将線程封裝成節點并加入到隊列尾部、進入自旋鎖(第一次嘗試擷取鎖失敗,将前驅節點的waitStatus改為-1;第二次嘗試擷取鎖失敗,因為前驅節點的waitStatus=-1,是以執行阻塞目前線程操作避免死循環耗費資源)、等待頭節點線程釋放同步狀态之後,将發起解除阻塞指令或阻塞線程被中斷後,後繼節點再次嘗試擷取鎖。
取消異常節點
前面提到,在AQS#shouldParkAfterFailedAcquire(pred, node) 中談到,當節點waitStatus>0 時,也即是對帶有取消狀态的節點進行移除。那麼節點在什麼時候被改為CANCELLED 的呢?
在AQS#acquireQueued(node, arg)中,正常擷取到鎖時,failed都等于false,隻有當發生異常時,failed 才等于true,進而執行到AQS#cancelAcquire(node)。也就是說cancelAcquire() 是用于處理擷取鎖過程中,對發生異常節點的進行移除。
final boolean acquireQueued(final Node node, int arg) {
// 是否發生異常的标志
boolean failed = true;
try {
......
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
......
failed = false; // 隻要目前節點node正常擷取到鎖,就不會執行finally的cancelAcquire(node)
return interrupted;
}
......
}
} finally {
/* 正常情況:隻有return時,才會執行finally代碼,而隻要return,failed都等于false,
* 是以,failed 是為了避免節點發生異常時,node沒有被移除。
*/
if (failed)
cancelAcquire(node);
}
}
詳細看下AQS#cancelAcquire(node) 是怎麼處理的
// java.util.concurrent.locks.AbstractQueuedSynchronizer
/**
* 取消正在進行嘗試擷取鎖的節點
*/
private void cancelAcquire(Node node) {
// 如果節點不存在,則忽略
if (node == null)
return;
// 使目前節點變成虛節點
node.thread = null;
// 跳過帶有“取消狀态”的前驅節點
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext節點
Node predNext = pred.next;
// 修改目前節點的waitStatus為CANCELLED=1
node.waitStatus = Node.CANCELLED;
// 如果目前節點為尾節點tail,則隻需要移除自己即可。
if (node == tail && compareAndSetTail(node, pred)) {
// pred節點變成了尾節點tail,是以 pred.next=null
compareAndSetNext(pred, predNext, null);
} else {
int ws;
/* 目前節點的前驅節點不為頭節點,則true;
* 前驅節點的ws狀态為SIGNAL,則true;ws不為SIGNAL,但ws<=0時(即不是取消狀态),則CAS操作改為SIGNAL,改成功則為true;
* 前驅節點不是虛接點,則true
*/
if (pred != head
&& ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL)))
&& pred.thread != null) {
/* 如果上述都滿足,則将“目前節點的前驅節點的後繼節點”指向“目前節點的後繼節點”
* 說白了,節點的next指向就是由 A->B->C 改為 A->C;B為目前節點。
* 關于節點的prev指向就沒有變 A<-B<-C 還是 A<-B<-C,也就是說B節點還沒真正斷開。
* 節點的prev指向的修改需要判斷ws狀态是否為CANCELLED後,才做修改。
*/
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 喚醒目前節點的後繼節點的阻塞線程
unparkSuccessor(node);
}
// 目前節點的後繼節點指向自己
node.next = node;
}
}
以上都是對next節點的指向做修改,關于節點的prev指向的修改需要判斷ws狀态是否為CANCELLED後,才做修改,循環向前查找取消節點,把取消節點從隊列中剔除。其對應源碼如下
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 擷取前驅節點的等待狀态
int ws = pred.waitStatus;
......
if (ws > 0) {
// 前驅節點被标上取消标志了,需要跳過前驅節點并不斷重試
do {
// 循環向前查找取消節點,把取消節點從隊列中剔除
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
......
}
return false;
}
在AQS#unparkSuccessor() 通過線程阻塞工具類方法
LockSupport.unpark(thread)
來喚醒後繼節點的阻塞線程。
AQS#unparkSuccessor()
除了取消異常節點時用到外,還在鎖的釋放時調用,實作功能都是--喚醒目前節點的後繼節點的阻塞線程,後繼節點就會繼續執行自旋鎖來嘗試擷取鎖。
// java.util.concurrent.locks.AbstractQueuedSynchronizer
/**
* 喚醒目前節點的後繼節點的阻塞線程(如果存在)
*/
private void unparkSuccessor(Node node) {
/*
* 如果waitStatus<0,則将waitStatus 置為預設值0
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* 但如果為null 或為取消狀态,則從tail向前周遊以查找到實際未取消的後繼節點。
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 對後繼節點的線程釋放阻塞
if (s != null)
LockSupport.unpark(s.thread);
}
節點線程釋放鎖
擷取鎖搞懂後,釋放鎖就是很簡單了
處理流程
鎖在釋放時調用的關鍵流程:ReentrantLock#unlock() -> Sync#release(1) -> AQS#tryRelease(1) -> LockSupport#unparkSuccessor(head)
unparkSuccessor(head)方法在前面已經講述過不再贅述,前三個方法源碼如下👇
// java.util.concurrent.locks.ReentrantLock
public void unlock() {
// ReentrantLock API 交由同步隊列模闆方法實作
sync.release(1);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer
public final boolean release(int arg) {
// 嘗試釋放鎖,成功則喚醒後繼節點
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 喚醒目前節點的後繼節點的阻塞線程
unparkSuccessor(h);
return true;
}
return false;
}
// java.util.concurrent.locks.ReentrantLock.Sync
protected final boolean tryRelease(int releases) {
// state:每釋放1次鎖就會-1(相反:重入性,每擷取1次鎖就會+1)
int c = getState() - releases;
// 目前線程是否是獨占鎖線程,不是就抛出異常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 當state=0時,說明擷取鎖的次數已經釋放完,可以解除獨占鎖線程
if (c == 0) {
// 鎖釋放成功
free = true;
// 獨占鎖線程置為null
setExclusiveOwnerThread(null);
}
// 記錄每次state的變化
setState(c);
return free;
}
最後附上以 ReentrantLock 的
lock()
為例,裡面幾乎畫出了擷取鎖的所有代碼的執行過程
更多優質文章,請關注WX公衆号:Java全棧布道師