天天看点

AQS 加锁过程AQS 几个重要组件AQS 怎么使用非公平锁的加锁过程

AQS 全称是 AbstractQueuedSynchronizer (抽象队列同步器),是阻塞式锁和相关同步器工具的框架。阻塞式锁(悲观锁,Synchronize),非阻塞式锁(乐观锁,cas)。

AQS 几个重要组件

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer{
    // 表示锁的的获取和释放。
    private volatile int state;
    // 指向线程队列的队头
    private transient volatile Node head;
    // 执行线程线程队列的队尾。
    private transient volatile Node tail;
    // 当前是哪个线程拿到了锁。
    private transient Thread exclusiveOwnerThread;
    // 多个线程争夺一把锁时,竞争失败的线程包装成 Node 节点保存的 AQS 的线程队列中等待着。
    static final class Node {...}
    
    // 获取锁。
    public final void acquire(int arg){...}
    // 释放锁
    public final boolean release(int arg) {...}
}
           

AQS 怎么使用

AQS 是抽象方法,具体怎么使用要看它的子类

public class ReentrantLock implements Lock, java.io.Serializable {
    //同步器。
    private final Sync sync;
    
    abstract static class Sync extends AbstractQueuedSynchronizer {...}
    static final class FairSync extends Sync {...}
    static final class NonfairSync extends Sync {...}
    
}
           

ReentrantLock 实现了锁的接口,里面有维护了继承自 AQS 的同步器 Sync,它有两个实现 FairSync 和 NonfairSync。

// 非公平锁
public ReentrantLock() {
    sync = new NonfairSync();
}
// 公平锁,fair = 1 
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}
           

公平锁和非公平锁实际上指的是 ReentrantLock 中的同步器是不是公平的。常用的是非公平锁。

非公平锁的加锁过程

ReentrantLock reentrantLock = new ReentrantLock();
reentrantLock.lock()
           

底层:

public void lock() {
    // 调用 NonfairSync 的 lock 方法。
    sync.lock();
}
           

加锁成功

当没有线程竞争时:

final void lock() {
    // CAS 设置 state=1
    if (compareAndSetState(0, 1))
        //  设置 exclusiveOwnerThread = 当前线程
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}
           

加锁成功。

加锁失败

出现竞争时,假设上面的线程 T0 已经加锁成功,且未释放锁,T1又来加锁。

final void lock() {
    // 此时 state == 1,CAS失败。
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        // 执行这个方法。
        acquire(1);
}
           
// AbstractQueuedSynchronizer.java

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
           
  1. tryAcquire(arg) 再次尝试加锁(可能 T0 会将锁释放),尝试失败返回 false,取反,就能执行 && 后面的逻辑了。
  2. addWaiter(Node.EXCLUSIVE):将 T1 线程包装成 Node 对象,并加入到 AQS 的等待队列中。
  3. acquireQueued(node, args) :如果 T1 是等待队列中的第二个节点,那就再尝试拿锁(最后的挣扎),如果还拿不到,那就 park。

看下细节

tryAcquire(arg)

几层调用后,最终会执行到:

// ReectrantLock.java

// acquires=1
final boolean nonfairTryAcquire(int acquires) {
    // 获取当前线程
    final Thread current = Thread.currentThread();
    // 获取 state, 此时 c=1。
    int c = getState();
    // 如果没有加锁
    if (c == 0) {
        //  CAS 修改 state。
        if (compareAndSetState(0, acquires)) {
            // 将当前线程设置成锁持有者。
            setExclusiveOwnerThread(current);
            // 拿锁成功,返回。(T1 从这里返回了)
            return true;
        }
    }
    // (从这里可以看出,ReectrantLock支持锁重入的)
    // 如果已经加锁了,再判断锁的持有者是不是当前线程。
    else if (current == getExclusiveOwnerThread()) {
        // state 值增加。
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        // 写回 state
        setState(nextc);
        // 拿锁成功,返回。
        return true;
    }
    // 拿锁失败,返回。
    return false;
}
           

addWaiter(Node.EXCLUSIVE)

private Node addWaiter(Node mode) {
    // 将当前线程包装成 node。
    Node node = new Node(Thread.currentThread(), mode);
    // 拿出队尾指针。
    Node pred = tail;
    // 队尾指针非空,表示同步器的线程队列中已经有线程等待了。
    // 只需要将当前线程追加到队尾就可以了。
    if (pred != null) {
        // 当前 node 前驱指针指向队尾元素。
        node.prev = pred;
        // CAS 将 tail 从指向原队尾 修改成 指向 node。
        if (compareAndSetTail(pred, node)) {
            // 原队尾的后继指针指向 node。
            pred.next = node;
            // node 入队成功,返回 node。
            return node;
        }
    }
    // 如果 队列非空 或者 CAS入队失败,(T1属于队列非空的情况)
    // 执行 enq 重新入队。
    enq(node);
    // 入队成功,返回 node。
    return node;
}
           

(1)队列为空,重新入队。会执行两次循环,第一次想队列中添加一个不带线程的 node 节点做队头。第二次在将之前入队失败的 node 追加到线程队列中,直到追加成功才会跳出死循环。

为什么要加一个不带线程的 node 作为线程队列的头呢?

因为等待队列的线程最终是要被唤醒的,设计的唤醒方式是:队列中的前一个节点唤醒它后面的节点,所以必须有个不带线程的 node 作为头,启动唤醒。

// 第一次
private Node enq(final Node node) {
    for (;;) {
        // 因为队列是空,所以 t = tail = null
        Node t = tail;
        if (t == null) { // Must initialize
            // CAS 设置 head 指向 new Node()
            if (compareAndSetHead(new Node()))
                // 只有一个 node,队头、队尾都指向这个 node。
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

// 第二次
private Node enq(final Node node) {
    for (;;) {
        // 队列非空,t = tail != null。
        Node t = tail;
        if (t == null) { 
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 将 node 追加到队尾。
            node.prev = t;
            // CAS 更新队尾到指向 node。如果失败,一直死循环直到成功。
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
           

(2)队列非空,重新入队。跟(1)中的第二次执行入队是一样的流程。为什么队列非空还要重新执行入队呢?当多个线程同时没有抢到锁需要入队等待时,入队也会发生竞争,CAS 当初值变了,再更新就会失败。死循环重新入队,最惨的情况就是当前线程最后一个入队。

final boolean acquireQueued(final Node node, int arg) {
    // 拿锁失败?默认是。
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            //获取线程节点的前驱节点。
            final Node p = node.predecessor();
            // 如果前驱节点是头节点,说明 node 是第二个节点。
            // 那就再尝试拿锁。
            if (p == head && tryAcquire(arg)) {
                // 这是拿锁成功的情况
                setHead(node);  // 将 node 变成不带线程的头节点。
                p.next = null; // help GC
                failed = false; //拿锁失败,修改成否。
                return interrupted;
            }
            // 因为T0一直没有释放锁,所以 T1 在上面拿锁还是失败。
            // 此时要判断 T1 是不是该 park。
            // 第一次不 park,会再执行一次循环,第二次再park。(这里也叫自旋一次)
            if (shouldParkAfterFailedAcquire(p, node) &&
                // 执行当前 node 的 park。
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
           

为什么第一次不 park,要自旋一次,第二次再 park?

第一,park 就是启动了重量级锁,重量级锁就涉及到操作系统底层了,开销很大,所以再自旋一次,尽量在 jvm层面解决加锁。

第二,node 执行park 后,是要被唤醒的。它得先把前驱节点的 waitStatus 字段标记成 -1。这样前驱结点在执行完毕释放锁时会将 node唤醒,node 再去竞争锁。

第三,线程队列中有在 node 之前已经有好多个节点了,其中有些节点(线程)已经被取消了,其 waitStatus =1,取消的节点是不会唤醒它后面的节点的。所以 在第一次的 park 中将线程队列中的所有被取消节点全部给清除。

shouldParkAfterFailedAcquire(Node pred, Node node)

// node 是当前节点,pred 是当前节点的前驱节点。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 获取前驱的 ws
    int ws = pred.waitStatus;
    // Node.SIGNAL == -1
    if (ws == Node.SIGNAL)
        return true; // 当前节点park。
    // ws > 0, 其实是判断 ws == 1 是不是成立。
    if (ws > 0) {
        // 如果前驱是的 ws == 1,那表示前驱线程被取消了。
        // do{}while{} 循环一直往前找,直到找到没有被取消的线程。
        // 修改节点的前后引用的指向,将当前节点追加到没有被取消的节点的后面。
        // 也是就是说,那些“被取消的节点”被从队列中清理出去了。
        // if{} 执行完会直接跳到最后的 return false。
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 如果 ws == 0,将前驱的节点的 ws 设为 -1,后面就是 return false。
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    // 当前节点自旋。
    return false;
}
           

以为 T0 一直没有释放锁,所以 T1 最终会执行 park。

private final boolean parkAndCheckInterrupt() {
    // 直接 park T1。
    LockSupport.park(this);
    return Thread.interrupted();
}
           

最终的结果

最终的结果长这个样子。灰色表示 park,T0 在哪里,T0直接拿到了锁,执行逻辑,根本没有入队。

AQS 加锁过程AQS 几个重要组件AQS 怎么使用非公平锁的加锁过程

继续阅读