AbstractQueuedSynchronizer是并發程式設計包中最重要的類,是并發程式設計包的實作基層。簡單來說,AbstractQueuedSynchronizer提供了一個基于FIFO的隊列,用于存儲處于阻塞狀态的線程;提供了一個volatile修改的state變量,用于作為鎖擷取的辨別位,對于FIFO隊列和state的操作都是通過Unsafe這個類來實作CAS原子操作。
AQS的功能可以分為兩類:獨占功能和共享功能,它的所有子類中,要麼實作了并使用了它獨占鎖功能的API,要麼使用了共享鎖的功能,而不會同時使用兩套API,即便是它最有名的子類讀寫鎖ReentrantReadWriteLock也是通過兩個内部類:讀鎖和寫鎖,分别實作兩套API來實作的。
一、FIFO隊列:
//阻塞線程節點
static final class Node {
static final Node SHARED = new Node(); //共享鎖
static final Node EXCLUSIVE = null; //獨占鎖
static final int CANCELLED = 1; //用于表示目前線程被取消
static final int SIGNAL = -1; //表示目前線程處于阻塞狀态
static final int CONDITION = -2; //表示目前節點在等待Condition
static final int PROPAGATE = -3; //表示目前場景下後續的acquireShared能夠得以執行
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
}
當線程沒有競争到鎖資源時就會被阻塞并放到FIFO隊列中,當鎖資源釋放後再從FIFO由阻塞狀态變為可執行狀态來擷取鎖資源并執行。當線程擷取鎖資源時有公平鎖和非公平鎖可以參考
并發程式設計--公平鎖和非公平鎖;此外對于鎖的分類還有獨占鎖和共享鎖,簡單來說獨占鎖state就是目前鎖隻能有一個線程擷取,其他線程處于阻塞狀态,共享鎖是多個線程可以同時持有目前鎖狀态state,隻要符合某種要求所有線程可以并發執行,不用進行阻塞等待。
AQS FIFO隊列模型:
二、鎖狀态:
AbstractQueuedSynchronizer提供了一個變量state,用來作為目前鎖是否可以擷取的辨別,對于所有線程來說,目前state的值對于他們來說都是可見的,每個線程持有的state值都是相同的。
private volatile int state;
三、擷取鎖
1、成功擷取鎖
我們介紹一下一個線程是如何擷取鎖的,當一個獨占鎖被擷取之後,其他線程是該如何進行操作的。我們用ReentrantLock的接口lock來看一下鎖是如何擷取的。
lock接口:
final void lock() {
//但擷取鎖時,會将state設定為1,如果是單個線程多次擷取鎖則state++
acquire(1);
}
(1)tryAcquire是嘗試擷取鎖,如果擷取鎖則繼續執行
(2)如果tryAcquire傳回false,則調用acquireQueued将目前線程阻塞并添加的FIFO隊列中
//tryAcquire是嘗試擷取鎖,如果擷取鎖則繼續執行
//如果tryAcquire傳回false,則調用acquireQueued将目前線程阻塞并添加的FIFO隊列中
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
我們看看tryAcquire嘗試擷取鎖的機制,我們介紹一下公平鎖的擷取吧
(1)首先會擷取lock中的state值,如果state等于0表示目前鎖可擷取,接下來hasQueuedPredecessors判斷目前鎖是否在FIFO隊列中,當FIFO隊列為空時,目前線程可以擷取鎖,不然根據公平規則,需要讓FIFO中的線程優先擷取鎖,當可以擷取鎖是調用 compareAndSetState(0, acquires)将state值通過cas操作設定為acquires,目前線程擷取了鎖,state值為acquires值,其他線程來擷取鎖時state肯定就不為0了。
(2)當state值不為0時,首先要判斷一下目前擷取鎖的線程和申請鎖的線程是否是一個線程,如果時一個線程state++,目前線程擷取了多次鎖
(3)當以上條件都不滿足時,表示線程無法擷取鎖,傳回false。
protected final boolean tryAcquire(int acquires) {
//擷取目前線程
final Thread current = Thread.currentThread();
//擷取鎖辨別state
int c = getState();
//如果c等于0則表示目前線程可以擷取鎖
if (c == 0) {
//hasQueuedPredecessors判斷目前鎖是否在FIFO隊列中,當FIFO隊列為空時,目前線程可以擷取鎖,不然根據公平規則,需要讓FIFO中的線程優先擷取鎖
//當可以擷取鎖是調用 compareAndSetState(0, acquires)将state值通過cas操作設定為acquires
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
//将獨占線程設定為目前線程
setExclusiveOwnerThread(current);
return true;
}
}
//即使目前擷取不到鎖,如果擷取鎖的線程是目前線程則state++,線程擷取多個鎖
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//擷取鎖失敗
return false;
}
}
總結:鎖的擷取是根據鎖狀态值state和判斷目前擷取鎖的線程是否是申請鎖的線程來判斷的
2、進入FIFO線程隊列
當擷取鎖失敗時傳回false,則執行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)操作。
首先調用addWaiter(Node.EXCLUSIVE),将線程添加到FIFO隊列中,并阻塞。
private Node addWaiter(Node mode) {
//建立目前線程節點Node
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
//當尾節點不為空時
if (pred != null) {
node.prev = pred;
//cas原子操作将目前線程添加到FIFO隊列中
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//當尾節點為空時,初始化head和tail節點
enq(node);
//傳回node
return node;
}
接下來調用acquireQueued将線程進行阻塞
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//擷取node節點的前一個節點
final Node p = node.predecessor();
//再嘗試擷取鎖
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//shouldParkAfterFailedAcquire 用來設定node的waitstatus狀态為SIGNAL,值為-1,表示目前節點的後繼節點包含的線程需要運作,也就是unpark;
//parkAndCheckInterrupt() 操作是将目前線程park,阻塞,線程阻塞在這個地方
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
線程阻塞的操作是在parkAndCheckInterrupt中将目前線程阻塞。
private final boolean parkAndCheckInterrupt() {
//阻塞目前線程
LockSupport.park(this);
return Thread.interrupted();
}
總結:以上操作完成了線程的進FIFO隊列及目前線程的阻塞。
3、釋放鎖資源
鎖的釋放還是比較簡單的,簡單的來說就是将鎖辨別位state進行減一操作,然後将阻塞線程喚起讓他們重新競争
public void unlock() {
sync.release(1);
}
release中進行state恢複值并喚起阻塞線程。
public final boolean release(int arg) {
//tryRelease釋放鎖,将state進行減值,并設定目前可執行線程為null
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//喚起阻塞線程
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease中對state進行修改,如果state的值為0的話将目前執行線程設定為null。
protected final boolean tryRelease(int releases) {
//修改state值
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
//将目前執行線程設定為空
setExclusiveOwnerThread(null);
}
//修改state值
setState(c);
return free;
}
接下來的操作是調用unparkSuccessor将阻塞線程喚起
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//喚起目前線程
if (s != null)
LockSupport.unpark(s.thread);
}