文章目錄
- 1.AQS 内部體系架構
- 2.擷取資源
- 2.1 擷取資源 tryAcquire()e
- 2.2 加入隊列 addWaiter()
- 2.3 排隊擷取資源 acquireQueued()
- 2.4 阻塞檢查 shouldParkAfterFailedAcquire()
- 3.釋放資源
- 3.1 喚醒後繼節點 unparkSuccessor()
- 4.擷取&釋放資源總流程
- 5.其他擷取資源的方法
- 5.1 響應中斷 acquireInterruptibly()
- 5.2 逾時參數 tryAcquireNanos()
1.AQS 内部體系架構
- FairSync: 公平鎖
- NoFairSync: 非公平鎖
- Shared: 共享模式
- Exclusive: 排他模式
2.擷取資源
AbstractQueueSynchronized.acquire()
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- 嘗試擷取資源,成功則結束
- 擷取資源失敗,建立節點插入隊列
- 在隊列裡排隊并擷取資源
-
如果等待過程中出現線程中斷,則中斷自身
(1) 首先就是一次資源擷取的嘗試,具體嘗試邏輯由各實作類實作,失敗再加入隊列
(2) 該方法不響應中斷,如果在等待過程線程被中斷,線程不會響應,隻會在等待結束後中斷重新自身
2.1 擷取資源 tryAcquire()e
FairSync:公平
NoFairSync: 非公平
差別為 hasQueuedPredecessors()方法
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
這個方法可以使得公平和不公平, 去判斷有沒有别的線程排在了目前線程的前面。
先是tail, 後是head, 如果這二者隻有一個為null(另一個不為null), 隻可能為tail = null, head 不為null
2.2 加入隊列 addWaiter()
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
enq()
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
初始化一個排斥模式的節點加入到隊列尾部,tail指向該節點。本方法先進行一次快速嘗試,失敗後在一個循環中自旋嘗試。
2.3 排隊擷取資源 acquireQueued()
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
本方法是核心邏輯,實作了一個節點在隊列中排隊、阻塞到擷取資源成功的一個邏輯,具有傳回值,傳回排隊過程是否線程被中斷過。
最後考慮到排隊過程中抛出異常的場景,加了一個finally的處理,對這類廠家取消排隊擷取資源。
2.4 阻塞檢查 shouldParkAfterFailedAcquire()
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//唯一符合的情況
if (ws == Node.SIGNAL)
return true;
//不符合怎麼辦
if (ws > 0) {
//ws>0說明節點無效,調整排隊位置,向前找到有效的前驅節點
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//修改前驅節點狀态
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
- 符合阻塞的條件:前驅節點狀态為singal
- 不符合怎麼辦:調整隊列,找到有效的前驅節點後,修改前驅節點狀态
parkAndCheckInterrupt(): 阻塞方法
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
隻有被喚醒或者線程中斷才能退出阻塞, 最後一句檢查并清理中斷标志,是以acquire方法中如果被中斷過,需要進行一次自我中斷。
3.釋放資源
嘗試釋放資源tryRelease也是交給實作類去實作。
Sync.tryRelease()
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
3.1 喚醒後繼節點 unparkSuccessor()
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
- 修改目前節點的狀态标志
- 找到下一個節點,如果存在則喚醒線程
4.擷取&釋放資源總流程
5.其他擷取資源的方法
5.1 響應中斷 acquireInterruptibly()
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 嘗試擷取資源時進行一次中斷檢查
- 阻塞時被中斷抛出異常
5.2 逾時參數 tryAcquireNanos()
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 排隊擷取資源時先判斷是否已經逾時
- 每次判斷是否需要阻塞時都先判斷是否逾時
- 阻塞時增加檢查,時間不足隻進行自旋,減少阻塞的性能消耗
- 阻塞時阻塞指定時間