什麼是AQS
AQS(AbstractQueuedSynchronizer),AQS是JDK下提供的一套用于實作基于FIFO等待隊列的阻塞鎖和相關的同步器的一個同步架構。這個抽象類被設計為作為一些可用原子int值來表示狀态的同步器的基類。如果你有看過類似 CountDownLatch 類的源碼實作,會發現其内部有一個繼承了 AbstractQueuedSynchronizer 的内部類 Sync 。可見 CountDownLatch 是基于AQS架構來實作的一個同步器.類似的同步器在JUC下還有不少。(eg. Semaphore )
AQS用法
如上所述,AQS管理一個關于狀态資訊的單一整數,該整數可以表現任何狀态。比如, Semaphore 用它來表現剩餘的許可數,ReentrantLock 用它來表現擁有它的線程已經請求了多少次鎖;FutureTask 用它來表現任務的狀态(尚未開始、運作、完成和取消)
如JDK的文檔中所說,使用AQS來實作一個同步器需要覆寫實作如下幾個方法,并且使用
getState
,
setState
,
compareAndSetState
這幾個方法來設定擷取狀态
-
boolean tryAcquire(int arg)
-
boolean tryRelease(int arg)
-
int tryAcquireShared(int arg)
-
boolean tryReleaseShared(int arg)
-
以上方法不需要全部實作,根據擷取的鎖的種類可以選擇實作不同的方法boolean isHeldExclusively()
J.U.C是基于AQS(
AbstractQueuedSynchronizer
)實作的,AQS是一個同步器,設計模式是模闆模式。
核心資料結構:雙向連結清單 + state(鎖狀态)
底層操作:CAS
首先,我們根據API的方法功能,由我們前面階段學習的知識進行一個自己定義的AQS,來加深印象。
// 抽象隊列同步器
// state, owner, waiters
public class kfAqs {
// acquire、 acquireShared : 定義了資源争用的邏輯,如果沒拿到,則等待。
// tryAcquire、 tryAcquireShared : 實際執行占用資源的操作,如何判定一個由使用者具體去實作。
// release、 releaseShared : 定義釋放資源的邏輯,釋放之後,通知後續節點進行争搶。
// tryRelease、 tryReleaseShared: 實際執行資源釋放的操作,具體的AQS使用者去實作。
// 1、 如何判斷一個資源的擁有者
public volatile AtomicReference<Thread> owner = new AtomicReference<>();
// 2、 儲存正在等待的線程
public volatile LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();
// 3、 記錄資源的狀态
public volatile AtomicInteger state = new AtomicInteger(0);
// 共享資源占用的邏輯,傳回資源的占用情況
public int tryAcquireShared(){
throw new UnsupportedOperationException();
}
public void acquireShared(){
boolean addQ = true;
while(tryAcquireShared() < 0) {
if (addQ) {
// 沒拿到鎖,加入到等待集合
waiters.offer(Thread.currentThread());
addQ = false;
} else {
// 阻塞 挂起目前的線程,不要繼續往下跑了
LockSupport.park(); // 僞喚醒,就是非unpark喚醒的
}
}
waiters.remove(Thread.currentThread()); // 把線程移除
}
// 共享資源釋放的邏輯,傳回資源是否已釋放
public boolean tryReleaseShared(){
throw new UnsupportedOperationException();
}
public void releaseShared(){
if (tryReleaseShared()) {
// 通知等待者
Iterator<Thread> iterator = waiters.iterator();
while (iterator.hasNext()) {
Thread next = iterator.next();
LockSupport.unpark(next); // 喚醒
}
}
}
// 獨占資源相關的代碼
public boolean tryAcquire() { // 交給使用者去實作。 模闆方法設計模式
throw new UnsupportedOperationException();
}
public void acquire() {
boolean addQ = true;
while (!tryAcquire()) {
if (addQ) {
// 沒拿到鎖,加入到等待集合
waiters.offer(Thread.currentThread());
addQ = false;
} else {
// 阻塞 挂起目前的線程,不要繼續往下跑了
LockSupport.park(); // 僞喚醒,就是非unpark喚醒的
}
}
waiters.remove(Thread.currentThread()); // 把線程移除
}
public boolean tryRelease() {
throw new UnsupportedOperationException();
}
public void release() { // 定義了 釋放資源之後要做的操作
if (tryRelease()) {
// 通知等待者
Iterator<Thread> iterator = waiters.iterator();
while (iterator.hasNext()) {
Thread next = iterator.next();
LockSupport.unpark(next); // 喚醒
}
}
}
public AtomicInteger getState() {
return state;
}
public void setState(AtomicInteger state) {
this.state = state;
}
}
資源占用流程圖
源碼解析
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;// 等待逾時或被中斷
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;// 釋放鎖之後,是否通知後一個節點
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;// 處于等待隊列中,結點的線程等待在Condition上
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;// 共享模式中使用,線程處于可運作狀态
//核心資料結構:雙向連結清單 + state(鎖狀态)
//資源争用的邏輯
public final void acquire(int arg) {
if (!tryAcquire(arg) && // 判斷是否拿到鎖
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false; // 目前線程釋放中斷的标志位
for (;;) {// 不斷嘗試
final Node p = node.predecessor(); // 擷取前一個節點
if (p == head && tryAcquire(arg)) { // 如果前一個節點是head,嘗試搶一次鎖
setHead(node); // 更換head
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&// 檢查狀态,是否需要挂起線程
parkAndCheckInterrupt())// 如果需要挂起,則通過Park進入停車場挂起
interrupted = true; // 如果出現中斷,則修改标記
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//資源釋放的邏輯
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head; // 從頭開始找
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // 喚醒下一個線程
return true;
}
return false;
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
/** 喚醒等待者
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus; // 正在釋放鎖的線程節點狀态
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0); // 修改目前節點狀态
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next; // 找下一個節點
if (s == null || s.waitStatus > 0) { // 如果不存在或者被取消了,繼續尋找合适的下一個節點
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null) // 如果找到了合适的節點,就喚醒它
LockSupport.unpark(s.thread);
}