介紹
在Java中,除了synchronized關鍵字具備鎖的語義,還有juc包中的Lock接口。在觀察Lock接口的多個不同實作後,不難發現,其内部鎖的語義的實作,基本都是仰仗着AbstractQueuedSynchronizer,簡稱AQS。
内部狀态
Node
既然要實作鎖的語義,則必須處理擷取競争鎖失敗,線程等待的情況。在AQS線程擷取鎖失敗,就會構造一個Node接口,放入内部維護的同步隊列中。
字段
- int waitStatus:等待狀态。有以下幾個值。
- 1:目前線程被取消了
- -1:辨別目前節點的後繼節點中的線程目前是被阻塞的,是以目前節點在出隊時應該喚醒後繼節點
- -2:目前節點在某個Condition的等待隊列中
- -3:僅頭節點可能設定為這個辨別,代表doReleaseShared方法應該傳播下去
- 0:差別以上的狀态值
- Node prev:前驅節點
- Node next:後繼節點
- nextWaiter:
- Thread thread:目前節點裡面的線程
state
AQS内部有個int類型的變量state。需要注意:state是volatile變量。在AQS中,鎖的擷取和釋放就表現在state值的變化。
實作
既然AQS的主要作用是用來輔助各種鎖的實作類的。那它自研必然也會有相關的鎖的申請和釋放的方法。而鎖又分為共享鎖和排它鎖,是以,AQS内部也有4個對應的方法:
排它鎖:
- acquire(int arg):排它鎖的申請
- release(int arg):排它鎖的釋放
共享鎖:
- acquireShared(int arg):共享鎖的申請
- releaseShared(int arg):共享鎖的釋放
下面逐個分析每個方法。
排它鎖的申請:acquire(int arg)方法
acquire(int arg)是一個模闆方法,嘗試擷取鎖的方法tryAcquire()是由子類負責實作。
public final void acquire(int arg) {
//1.
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
1.嘗試擷取鎖:tryAcquire(int arg)方法
這是一個抽象方法。子類的實作一般都是以CAS的方式去嘗試改變state的值。如果CAS設定值成功,則為成功擷取到了鎖,方法直接退出;如果沒有擷取到鎖,則會執行後面的方法
2.如果擷取鎖失敗,則建立一個節點:addWaiter(Node node)方法
對于擷取排它鎖而言,這裡構造Node時,傳入的waiter是Node.EXCLUSIVE(其實值為null,僅僅是标記是排它鎖)
private Node addWaiter(Node mode) {
//建立一個Node,waiter是Node.EXCLUSIVE(其實就是null)
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
//把原來的尾節點設定為這次構造的Node的前驅節點
node.prev = pred;
//以cas的方式将尾節點設定為目前節點。如果設定成功,則傳回,方法結束
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果前面cas設定尾結點失敗,則用循環CAS的方式将Node加入到隊列中
enq(node);
return node;
}
3.自旋,嘗試擷取鎖:acquireQueued()方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//擷取目前節點的前驅節點
final Node p = node.predecessor();
//如果前驅節點為頭節點,則嘗試擷取鎖
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//如果擷取鎖失敗
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
3.1.擷取鎖失敗後,判斷是否要挂起目前node裡的線程
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//SIGNAL狀态表示目前Node的後繼節點需要被notify,是以如果是這個狀态,則可以阻塞node
return true;
if (ws > 0) {
//waitStatus>0的值隻有一個 = 1,表示前驅節點以及被取消,則将目前節點的前驅節點設定為更前面的一個非取消節點
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
3.2. 如果需要阻塞目前線程,則阻塞,并傳回線程是否被中斷
排它鎖的釋放:release(int arg)方法
release()也是個模闆方法,嘗試釋放鎖的tryRelease()方法也需要子類自己實作
public final boolean release(int arg) {
//嘗試釋放鎖
if (tryRelease(arg)) {
//釋放成功
Node h = head;
if (h != null && h.waitStatus != 0)
//喚醒後繼節點
unparkSuccessor(h);
return true;
}
//釋放失敗
return false;
}
共享鎖的擷取:acquireShared(int arg)方法
public final void acquireShared(int arg) {
//tryAcquireShared方法由子類負責重寫
if (tryAcquireShared(arg) < 0)
//擷取失敗
doAcquireShared(arg);
}
asd
private void doAcquireShared(int arg) {
//構造節點,waiter是Node.SHARED,并以CAS的方式加入到同步隊列尾部
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//擷取前驅節點
final Node p = node.predecessor();
//前驅節點是頭結點
if (p == head) {
//嘗試擷取共享鎖
int r = tryAcquireShared(arg);
//擷取共享鎖成功
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
共享鎖的釋放:releaseShared(int arg)
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
主要邏輯在doReleaseShared()方法:
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
//喚醒後繼節點
unparkSuccessor(h);
} else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))//如果waitStatus=0且cas設定h的狀态失敗在繼續下一次循環
continue;
}
if (h == head) // loop if head changed
break;
}
}