定義
AQS定義了一套多線程通路共享資源的同步器架構,許多同步類實作都依賴于它,如常用的ReentrantLock/Semaphore/CountDownLatch。
資料結構
它維護了一個state,代表共享資源,一個等待隊列,多線程調用資源被阻塞時候就會進入此隊列
AQS使用了模闆方法模式,自定義的同步器在在實作時隻需要實作共享資源state的擷取與釋放即可,值域具體線程等待隊列的維護,AQS已經在頂層實作好了。
模闆方法主要有:
獨占式擷取
- accquire
- acquireInterruptibly
- tryAcquireNanos
共享式擷取
- acquireShared
- acquireSharedInterruptibly
- tryAcquireSharedNanos
獨占式釋放鎖
- release
共享式釋放鎖
- releaseShared
自定義同步器主要是實作以下方法
- isHeldExclusively():該線程是否正在獨占資源。隻有用到condition才需要去實作它。
- tryAcquire(int):獨占方式。嘗試擷取資源,成功則傳回true,失敗則傳回false。
- tryRelease(int):獨占方式。嘗試釋放資源,成功則傳回true,失敗則傳回false。
- tryAcquireShared(int):共享方式。嘗試擷取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
- tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放後允許喚醒後續等待結點傳回true,否則傳回false
源碼解析
先介紹一下它的變量及内部類Node節點。
/** 隊列中的頭節點 */
private transient volatile Node head;
/** 隊列中的尾節點 */
private transient volatile Node tail;
/** 同步狀态、鎖數量 */
private volatile int state;
/** 擷取同步狀态 */
protected final int getState() {
return state;
}
/** 設定同步狀态 */
protected final void setState(int newState) {
state = newState;
}
/** 通過CAS設定同步狀态 */
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
static final class Node {
static final Node SHARED = new Node();
/** 表示在獨占模式下等待 */
static final Node EXCLUSIVE = null;
/** 表示已經被舍棄 */
static final int CANCELLED = 1;
/** 表示後續節點需要被喚醒 */
static final int SIGNAL = -1;
/** 節點處于等待隊列中 */
static final int CONDITION = -2;
/**
* 代表後續結點會傳播喚醒的操作,共享模式下起作用
*/
static final int PROPAGATE = -3;
/** 節點狀态 */
volatile int waitStatus;
/**上一節點*/
volatile Node prev;
/**下一節點*/
volatile Node next;
/** node所屬的線程 */
volatile Thread thread;
/** 下一個等待的節點 */
Node nextWaiter;
/** 是否共享 */
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
}
獨占鎖
我重點介紹的是獨占鎖,因為共享鎖用到的少。
- 擷取獨占鎖
擷取獨占鎖流程
// 擷取鎖
public final void acquire(int arg) {
// tryAcquire擷取鎖,如果state==0,或者持有鎖的線程再次擷取鎖,那麼就得到鎖并修改state,可重入鎖也是在這裡提現出來的,由子類具體實作,
// Node.EXCLUSIVE表示建立獨占模式節點
// 如果擷取不到鎖就将目前線程加入到隊列
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
沒有擷取到鎖,是以将節點加入到隊列尾部
// 将節點插入等待隊列尾部
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 如果尾節點不為空,則快速将node插入到隊列尾部
Node pred = tail;
if (pred != null) {
node.prev = pred;
// CAS設定目前節點為尾節點
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 否則,通過自旋方式直到成功
enq(node);
return node;
}
// 自旋将節點插入等待隊列尾部
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 如果尾節點為空,說明隊列中沒有節點,那麼就通過CAS初始化隊列,并且頭、尾節點指向同一個
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 否則将本節點插入隊列末尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
嘗試去擷取鎖,如果擷取不到,那麼就進入休眠
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// 中斷标志
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); // 拿到前驅節點
// 如果前驅節點是頭結點,那麼就嘗試擷取鎖
if (p == head && tryAcquire(arg)) {
// 擷取到鎖
setHead(node); // 将頭節點設定為該節點
p.next = null; // 将之前的頭結點的next設為null,表示從隊列中脫離
failed = false;
return interrupted;
}
// 沒擷取到鎖,那麼就進入waiting狀态,直到被unpark()
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 線程被中斷,那麼就改變狀态位
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 由方法名就可以看出是 擷取鎖失敗後應該進入睡眠狀态
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; // 擷取前驅節點的狀态
if (ws == Node.SIGNAL)
// 如果狀态為-1即下一節點需要被喚醒,那麼傳回true
return true;
if (ws > 0) {
// 如果狀态>0即該節點已經被廢棄,那麼往前周遊,直到找到正常等待的節點,并排在它後面
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//如果前驅正常,那就把前驅的狀态設定成SIGNAL,告訴它下一節點需要被喚醒
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// 線程進入睡眠
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 進入睡眠,等待被喚醒
return Thread.interrupted(); //如果被喚醒,檢視自己是不是被中斷的
}
/**
* 如果線程發現線程被中斷,那麼就中斷目前線程
*/
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
- 逾時等待擷取鎖
與擷取鎖邏輯一樣,隻不過多了時間判斷
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
// 計算截止時間
final long deadline = System.nanoTime() + nanosTimeout;
// 将node添加到隊列
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
// 擷取前驅節點,判斷是否可以擷取鎖
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 計算是否逾時
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
// 進入有逾時時間的睡眠
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 釋放鎖
釋放鎖邏輯比較簡單,大緻就是将state-1,喚醒下一個等待線程。
public final boolean release(int arg) {
// tryRelease()将state -1,如果state變為0,則将擁有鎖的線程exclusiveOwnerThread置為null,由子類具體實作
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 如果頭節點不為空并且狀态不是0,那麼就去喚醒下一個節點
unparkSuccessor(h);
return true;
}
return false;
}
喚醒下一個節點
// 喚醒下一個沒有被廢棄的節點
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
// 如果線程等待狀态<0,那麼就CAS變為0
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 如果下一個節點為空或者已經被廢棄,則從尾部向前擷取到狀為<=0的節點
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 喚醒這個節點的線程,然後去擷取鎖
if (s != null)
LockSupport.unpark(s.thread);
}
共享鎖
讀寫鎖中的讀鎖,就是實作了共享鎖。
- 擷取共享鎖
public final void acquireShared(int arg) {
// tryAcquireShared嘗試擷取鎖,傳回小于0,則說明擷取鎖失敗,由自類實作
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
tryAcquireShared()方法實作可以看 https://blog.csdn.net/zgsxhdzxl/article/details/106071399
沒有擷取到鎖,則
private void doAcquireShared(int arg) {
// 将節點加入到尾部
final Node node = addWaiter(Node.SHARED);
boolean interrupted = false;
try {
for (;;) {
final Node p = node.predecessor();
// 前驅節點是頭結點,則去擷取鎖
if (p == head) {
int r = tryAcquireShared(arg);
// 擷取鎖成功
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
} finally {
if (interrupted)
selfInterrupt();
}
}
擷取鎖成功之後,還會去喚醒後續的共享節點setHeadAndPropagate()
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
// 将node設定為新的首節點
setHead(node);
// 判斷後續節點是否需要被喚醒
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 下個節點是共享方式,則喚醒後續為共享方式的節點
if (s == null || s.isShared())
doReleaseShared();
}
}
這裡一直沒搞明白,為什麼會喚醒後續的共享節點,因為感覺循環隻會喚醒一個節點。
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 隻有結點狀态為SIGNAL才嘗試喚醒後繼結點
if (ws == Node.SIGNAL) {
// 将waitStatus設定為0
if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
continue;
// 喚醒後續的節點
unparkSuccessor(h);
}
// 如果狀态為0則更新狀态為PROPAGATE
else if (ws == 0 &&
!h.compareAndSetWaitStatus(0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}
- 釋放鎖
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared可以看 https://blog.csdn.net/zgsxhdzxl/article/details/106071399