AQS 全稱是 AbstractQueuedSynchronizer (抽象隊列同步器),是阻塞式鎖和相關同步器工具的架構。阻塞式鎖(悲觀鎖,Synchronize),非阻塞式鎖(樂觀鎖,cas)。
AQS 幾個重要元件
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer{
// 表示鎖的的擷取和釋放。
private volatile int state;
// 指向線程隊列的隊頭
private transient volatile Node head;
// 執行線程線程隊列的隊尾。
private transient volatile Node tail;
// 目前是哪個線程拿到了鎖。
private transient Thread exclusiveOwnerThread;
// 多個線程争奪一把鎖時,競争失敗的線程包裝成 Node 節點儲存的 AQS 的線程隊列中等待着。
static final class Node {...}
// 擷取鎖。
public final void acquire(int arg){...}
// 釋放鎖
public final boolean release(int arg) {...}
}
AQS 怎麼使用
AQS 是抽象方法,具體怎麼使用要看它的子類
public class ReentrantLock implements Lock, java.io.Serializable {
//同步器。
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {...}
static final class FairSync extends Sync {...}
static final class NonfairSync extends Sync {...}
}
ReentrantLock 實作了鎖的接口,裡面有維護了繼承自 AQS 的同步器 Sync,它有兩個實作 FairSync 和 NonfairSync。
// 非公平鎖
public ReentrantLock() {
sync = new NonfairSync();
}
// 公平鎖,fair = 1
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
公平鎖和非公平鎖實際上指的是 ReentrantLock 中的同步器是不是公平的。常用的是非公平鎖。
非公平鎖的加鎖過程
ReentrantLock reentrantLock = new ReentrantLock();
reentrantLock.lock()
底層:
public void lock() {
// 調用 NonfairSync 的 lock 方法。
sync.lock();
}
加鎖成功
當沒有線程競争時:
final void lock() {
// CAS 設定 state=1
if (compareAndSetState(0, 1))
// 設定 exclusiveOwnerThread = 目前線程
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
加鎖成功。
加鎖失敗
出現競争時,假設上面的線程 T0 已經加鎖成功,且未釋放鎖,T1又來加鎖。
final void lock() {
// 此時 state == 1,CAS失敗。
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// 執行這個方法。
acquire(1);
}
// AbstractQueuedSynchronizer.java
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- tryAcquire(arg) 再次嘗試加鎖(可能 T0 會将鎖釋放),嘗試失敗傳回 false,取反,就能執行 && 後面的邏輯了。
- addWaiter(Node.EXCLUSIVE):将 T1 線程包裝成 Node 對象,并加入到 AQS 的等待隊列中。
- acquireQueued(node, args) :如果 T1 是等待隊列中的第二個節點,那就再嘗試拿鎖(最後的掙紮),如果還拿不到,那就 park。
看下細節
tryAcquire(arg)
幾層調用後,最終會執行到:
// ReectrantLock.java
// acquires=1
final boolean nonfairTryAcquire(int acquires) {
// 擷取目前線程
final Thread current = Thread.currentThread();
// 擷取 state, 此時 c=1。
int c = getState();
// 如果沒有加鎖
if (c == 0) {
// CAS 修改 state。
if (compareAndSetState(0, acquires)) {
// 将目前線程設定成鎖持有者。
setExclusiveOwnerThread(current);
// 拿鎖成功,傳回。(T1 從這裡傳回了)
return true;
}
}
// (從這裡可以看出,ReectrantLock支援鎖重入的)
// 如果已經加鎖了,再判斷鎖的持有者是不是目前線程。
else if (current == getExclusiveOwnerThread()) {
// state 值增加。
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 寫回 state
setState(nextc);
// 拿鎖成功,傳回。
return true;
}
// 拿鎖失敗,傳回。
return false;
}
addWaiter(Node.EXCLUSIVE)
private Node addWaiter(Node mode) {
// 将目前線程包裝成 node。
Node node = new Node(Thread.currentThread(), mode);
// 拿出隊尾指針。
Node pred = tail;
// 隊尾指針非空,表示同步器的線程隊列中已經有線程等待了。
// 隻需要将目前線程追加到隊尾就可以了。
if (pred != null) {
// 目前 node 前驅指針指向隊尾元素。
node.prev = pred;
// CAS 将 tail 從指向原隊尾 修改成 指向 node。
if (compareAndSetTail(pred, node)) {
// 原隊尾的後繼指針指向 node。
pred.next = node;
// node 入隊成功,傳回 node。
return node;
}
}
// 如果 隊列非空 或者 CAS入隊失敗,(T1屬于隊列非空的情況)
// 執行 enq 重新入隊。
enq(node);
// 入隊成功,傳回 node。
return node;
}
(1)隊列為空,重新入隊。會執行兩次循環,第一次想隊列中添加一個不帶線程的 node 節點做隊頭。第二次在将之前入隊失敗的 node 追加到線程隊列中,直到追加成功才會跳出死循環。
為什麼要加一個不帶線程的 node 作為線程隊列的頭呢?
因為等待隊列的線程最終是要被喚醒的,設計的喚醒方式是:隊列中的前一個節點喚醒它後面的節點,是以必須有個不帶線程的 node 作為頭,啟動喚醒。
// 第一次
private Node enq(final Node node) {
for (;;) {
// 因為隊列是空,是以 t = tail = null
Node t = tail;
if (t == null) { // Must initialize
// CAS 設定 head 指向 new Node()
if (compareAndSetHead(new Node()))
// 隻有一個 node,隊頭、隊尾都指向這個 node。
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
// 第二次
private Node enq(final Node node) {
for (;;) {
// 隊列非空,t = tail != null。
Node t = tail;
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 将 node 追加到隊尾。
node.prev = t;
// CAS 更新隊尾到指向 node。如果失敗,一直死循環直到成功。
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
(2)隊列非空,重新入隊。跟(1)中的第二次執行入隊是一樣的流程。為什麼隊列非空還要重新執行入隊呢?當多個線程同時沒有搶到鎖需要入隊等待時,入隊也會發生競争,CAS 當初值變了,再更新就會失敗。死循環重新入隊,最慘的情況就是目前線程最後一個入隊。
final boolean acquireQueued(final Node node, int arg) {
// 拿鎖失敗?預設是。
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//擷取線程節點的前驅節點。
final Node p = node.predecessor();
// 如果前驅節點是頭節點,說明 node 是第二個節點。
// 那就再嘗試拿鎖。
if (p == head && tryAcquire(arg)) {
// 這是拿鎖成功的情況
setHead(node); // 将 node 變成不帶線程的頭節點。
p.next = null; // help GC
failed = false; //拿鎖失敗,修改成否。
return interrupted;
}
// 因為T0一直沒有釋放鎖,是以 T1 在上面拿鎖還是失敗。
// 此時要判斷 T1 是不是該 park。
// 第一次不 park,會再執行一次循環,第二次再park。(這裡也叫自旋一次)
if (shouldParkAfterFailedAcquire(p, node) &&
// 執行目前 node 的 park。
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
為什麼第一次不 park,要自旋一次,第二次再 park?
第一,park 就是啟動了重量級鎖,重量級鎖就涉及到作業系統底層了,開銷很大,是以再自旋一次,盡量在 jvm層面解決加鎖。
第二,node 執行park 後,是要被喚醒的。它得先把前驅節點的 waitStatus 字段标記成 -1。這樣前驅結點在執行完畢釋放鎖時會将 node喚醒,node 再去競争鎖。
第三,線程隊列中有在 node 之前已經有好多個節點了,其中有些節點(線程)已經被取消了,其 waitStatus =1,取消的節點是不會喚醒它後面的節點的。是以 在第一次的 park 中将線程隊列中的所有被取消節點全部給清除。
shouldParkAfterFailedAcquire(Node pred, Node node)
// node 是目前節點,pred 是目前節點的前驅節點。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 擷取前驅的 ws
int ws = pred.waitStatus;
// Node.SIGNAL == -1
if (ws == Node.SIGNAL)
return true; // 目前節點park。
// ws > 0, 其實是判斷 ws == 1 是不是成立。
if (ws > 0) {
// 如果前驅是的 ws == 1,那表示前驅線程被取消了。
// do{}while{} 循環一直往前找,直到找到沒有被取消的線程。
// 修改節點的前後引用的指向,将目前節點追加到沒有被取消的節點的後面。
// 也是就是說,那些“被取消的節點”被從隊列中清理出去了。
// if{} 執行完會直接跳到最後的 return false。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 如果 ws == 0,将前驅的節點的 ws 設為 -1,後面就是 return false。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
// 目前節點自旋。
return false;
}
以為 T0 一直沒有釋放鎖,是以 T1 最終會執行 park。
private final boolean parkAndCheckInterrupt() {
// 直接 park T1。
LockSupport.park(this);
return Thread.interrupted();
}
最終的結果
最終的結果長這個樣子。灰色表示 park,T0 在哪裡,T0直接拿到了鎖,執行邏輯,根本沒有入隊。