天天看點

AQS 加鎖過程AQS 幾個重要元件AQS 怎麼使用非公平鎖的加鎖過程

AQS 全稱是 AbstractQueuedSynchronizer (抽象隊列同步器),是阻塞式鎖和相關同步器工具的架構。阻塞式鎖(悲觀鎖,Synchronize),非阻塞式鎖(樂觀鎖,cas)。

AQS 幾個重要元件

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer{
    // 表示鎖的的擷取和釋放。
    private volatile int state;
    // 指向線程隊列的隊頭
    private transient volatile Node head;
    // 執行線程線程隊列的隊尾。
    private transient volatile Node tail;
    // 目前是哪個線程拿到了鎖。
    private transient Thread exclusiveOwnerThread;
    // 多個線程争奪一把鎖時,競争失敗的線程包裝成 Node 節點儲存的 AQS 的線程隊列中等待着。
    static final class Node {...}
    
    // 擷取鎖。
    public final void acquire(int arg){...}
    // 釋放鎖
    public final boolean release(int arg) {...}
}
           

AQS 怎麼使用

AQS 是抽象方法,具體怎麼使用要看它的子類

public class ReentrantLock implements Lock, java.io.Serializable {
    //同步器。
    private final Sync sync;
    
    abstract static class Sync extends AbstractQueuedSynchronizer {...}
    static final class FairSync extends Sync {...}
    static final class NonfairSync extends Sync {...}
    
}
           

ReentrantLock 實作了鎖的接口,裡面有維護了繼承自 AQS 的同步器 Sync,它有兩個實作 FairSync 和 NonfairSync。

// 非公平鎖
public ReentrantLock() {
    sync = new NonfairSync();
}
// 公平鎖,fair = 1 
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}
           

公平鎖和非公平鎖實際上指的是 ReentrantLock 中的同步器是不是公平的。常用的是非公平鎖。

非公平鎖的加鎖過程

ReentrantLock reentrantLock = new ReentrantLock();
reentrantLock.lock()
           

底層:

public void lock() {
    // 調用 NonfairSync 的 lock 方法。
    sync.lock();
}
           

加鎖成功

當沒有線程競争時:

final void lock() {
    // CAS 設定 state=1
    if (compareAndSetState(0, 1))
        //  設定 exclusiveOwnerThread = 目前線程
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}
           

加鎖成功。

加鎖失敗

出現競争時,假設上面的線程 T0 已經加鎖成功,且未釋放鎖,T1又來加鎖。

final void lock() {
    // 此時 state == 1,CAS失敗。
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        // 執行這個方法。
        acquire(1);
}
           
// AbstractQueuedSynchronizer.java

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
           
  1. tryAcquire(arg) 再次嘗試加鎖(可能 T0 會将鎖釋放),嘗試失敗傳回 false,取反,就能執行 && 後面的邏輯了。
  2. addWaiter(Node.EXCLUSIVE):将 T1 線程包裝成 Node 對象,并加入到 AQS 的等待隊列中。
  3. acquireQueued(node, args) :如果 T1 是等待隊列中的第二個節點,那就再嘗試拿鎖(最後的掙紮),如果還拿不到,那就 park。

看下細節

tryAcquire(arg)

幾層調用後,最終會執行到:

// ReectrantLock.java

// acquires=1
final boolean nonfairTryAcquire(int acquires) {
    // 擷取目前線程
    final Thread current = Thread.currentThread();
    // 擷取 state, 此時 c=1。
    int c = getState();
    // 如果沒有加鎖
    if (c == 0) {
        //  CAS 修改 state。
        if (compareAndSetState(0, acquires)) {
            // 将目前線程設定成鎖持有者。
            setExclusiveOwnerThread(current);
            // 拿鎖成功,傳回。(T1 從這裡傳回了)
            return true;
        }
    }
    // (從這裡可以看出,ReectrantLock支援鎖重入的)
    // 如果已經加鎖了,再判斷鎖的持有者是不是目前線程。
    else if (current == getExclusiveOwnerThread()) {
        // state 值增加。
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        // 寫回 state
        setState(nextc);
        // 拿鎖成功,傳回。
        return true;
    }
    // 拿鎖失敗,傳回。
    return false;
}
           

addWaiter(Node.EXCLUSIVE)

private Node addWaiter(Node mode) {
    // 将目前線程包裝成 node。
    Node node = new Node(Thread.currentThread(), mode);
    // 拿出隊尾指針。
    Node pred = tail;
    // 隊尾指針非空,表示同步器的線程隊列中已經有線程等待了。
    // 隻需要将目前線程追加到隊尾就可以了。
    if (pred != null) {
        // 目前 node 前驅指針指向隊尾元素。
        node.prev = pred;
        // CAS 将 tail 從指向原隊尾 修改成 指向 node。
        if (compareAndSetTail(pred, node)) {
            // 原隊尾的後繼指針指向 node。
            pred.next = node;
            // node 入隊成功,傳回 node。
            return node;
        }
    }
    // 如果 隊列非空 或者 CAS入隊失敗,(T1屬于隊列非空的情況)
    // 執行 enq 重新入隊。
    enq(node);
    // 入隊成功,傳回 node。
    return node;
}
           

(1)隊列為空,重新入隊。會執行兩次循環,第一次想隊列中添加一個不帶線程的 node 節點做隊頭。第二次在将之前入隊失敗的 node 追加到線程隊列中,直到追加成功才會跳出死循環。

為什麼要加一個不帶線程的 node 作為線程隊列的頭呢?

因為等待隊列的線程最終是要被喚醒的,設計的喚醒方式是:隊列中的前一個節點喚醒它後面的節點,是以必須有個不帶線程的 node 作為頭,啟動喚醒。

// 第一次
private Node enq(final Node node) {
    for (;;) {
        // 因為隊列是空,是以 t = tail = null
        Node t = tail;
        if (t == null) { // Must initialize
            // CAS 設定 head 指向 new Node()
            if (compareAndSetHead(new Node()))
                // 隻有一個 node,隊頭、隊尾都指向這個 node。
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

// 第二次
private Node enq(final Node node) {
    for (;;) {
        // 隊列非空,t = tail != null。
        Node t = tail;
        if (t == null) { 
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 将 node 追加到隊尾。
            node.prev = t;
            // CAS 更新隊尾到指向 node。如果失敗,一直死循環直到成功。
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
           

(2)隊列非空,重新入隊。跟(1)中的第二次執行入隊是一樣的流程。為什麼隊列非空還要重新執行入隊呢?當多個線程同時沒有搶到鎖需要入隊等待時,入隊也會發生競争,CAS 當初值變了,再更新就會失敗。死循環重新入隊,最慘的情況就是目前線程最後一個入隊。

final boolean acquireQueued(final Node node, int arg) {
    // 拿鎖失敗?預設是。
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            //擷取線程節點的前驅節點。
            final Node p = node.predecessor();
            // 如果前驅節點是頭節點,說明 node 是第二個節點。
            // 那就再嘗試拿鎖。
            if (p == head && tryAcquire(arg)) {
                // 這是拿鎖成功的情況
                setHead(node);  // 将 node 變成不帶線程的頭節點。
                p.next = null; // help GC
                failed = false; //拿鎖失敗,修改成否。
                return interrupted;
            }
            // 因為T0一直沒有釋放鎖,是以 T1 在上面拿鎖還是失敗。
            // 此時要判斷 T1 是不是該 park。
            // 第一次不 park,會再執行一次循環,第二次再park。(這裡也叫自旋一次)
            if (shouldParkAfterFailedAcquire(p, node) &&
                // 執行目前 node 的 park。
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
           

為什麼第一次不 park,要自旋一次,第二次再 park?

第一,park 就是啟動了重量級鎖,重量級鎖就涉及到作業系統底層了,開銷很大,是以再自旋一次,盡量在 jvm層面解決加鎖。

第二,node 執行park 後,是要被喚醒的。它得先把前驅節點的 waitStatus 字段标記成 -1。這樣前驅結點在執行完畢釋放鎖時會将 node喚醒,node 再去競争鎖。

第三,線程隊列中有在 node 之前已經有好多個節點了,其中有些節點(線程)已經被取消了,其 waitStatus =1,取消的節點是不會喚醒它後面的節點的。是以 在第一次的 park 中将線程隊列中的所有被取消節點全部給清除。

shouldParkAfterFailedAcquire(Node pred, Node node)

// node 是目前節點,pred 是目前節點的前驅節點。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 擷取前驅的 ws
    int ws = pred.waitStatus;
    // Node.SIGNAL == -1
    if (ws == Node.SIGNAL)
        return true; // 目前節點park。
    // ws > 0, 其實是判斷 ws == 1 是不是成立。
    if (ws > 0) {
        // 如果前驅是的 ws == 1,那表示前驅線程被取消了。
        // do{}while{} 循環一直往前找,直到找到沒有被取消的線程。
        // 修改節點的前後引用的指向,将目前節點追加到沒有被取消的節點的後面。
        // 也是就是說,那些“被取消的節點”被從隊列中清理出去了。
        // if{} 執行完會直接跳到最後的 return false。
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 如果 ws == 0,将前驅的節點的 ws 設為 -1,後面就是 return false。
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    // 目前節點自旋。
    return false;
}
           

以為 T0 一直沒有釋放鎖,是以 T1 最終會執行 park。

private final boolean parkAndCheckInterrupt() {
    // 直接 park T1。
    LockSupport.park(this);
    return Thread.interrupted();
}
           

最終的結果

最終的結果長這個樣子。灰色表示 park,T0 在哪裡,T0直接拿到了鎖,執行邏輯,根本沒有入隊。

AQS 加鎖過程AQS 幾個重要元件AQS 怎麼使用非公平鎖的加鎖過程

繼續閱讀