天天看点

线程安全(十)AQS(AbstractQueuedSynchronizer)系列文章目录0.前言1.大致整体流程2.Node3.AQS中队列的节点变换4.源码解析

系列文章目录

线程安全(一)java对象头分析以及锁状态

线程安全(二)java中的CAS机制

线程安全(三)实现方法sychronized与ReentrantLock(阻塞同步)

线程安全(四)Java内存模型与volatile关键字

线程安全(五)线程状态和线程创建

线程安全(六)线程池

线程安全(七)ThreadLocal和java的四种引用

线程安全(八)Semaphore

线程安全(九)CyclicBarrier

线程安全(十)AQS(AbstractQueuedSynchronizer)

0.前言

本文想从三个角度来看下AbstractQueuedSynchronizer,

第一、加锁整体流程与解锁整体流程。

第二、Node节点。

第三、以Reentrantlock分析具体调用

1.大致整体流程

AQS全称为AbstractQueuedSynchronizer,它提供了一个FIFO队列,可以看成是一个用来实现同步锁以及其他涉及到同步功能的核心组件

AQS是一个抽象类,主要是通过继承的方式来使用,它本身没有实现任何的同步接口,仅仅是定义了同步状态的获取以及释放的方法来提供自定义的同步组件。

AQS继承AbstractOwnableSynchronizer,AbstractOwnableSynchronizer实现了两个方法,一个是setExclusiveOwnerThread设置当前拥有独占访问权限的线程,一个是返回上次设置的0线程。

这两个方法显而易见是用来记录哪个线程拥有当前访问线程的锁的(所有权)。

以Reentrantlock为例:

1.1.lock方法

尝试获取所有权,成功-》结束;失败-》加入队列,然后每个节点循环判断是否是pre指向head,且尝试获取所有权成功,并把当前节点赋值为head。

lock时:

1.会尝试获取所有权:

先判断当前对象状态state,

1.1如果为0(即未被所有),尝试用cas的更改state状态,然后调用setsetExclusiveOwnerThread,把当前线程设置成拥有独占访问权限的线程。获取成功。

1.2如果不为0(已被拥有),判断当前线程是不是拥有独占访问权限的线程,是,state+1(可重入锁通过计数来实现),获取成功。

1.3都不是,则获取失败,加入队列,并且循环判断,自己是否pre指向head节点,且再次尝试获取锁并成功,才可以。如果中间线程状态有被中断过,则还需要调用interrupt方法。

返回true,当前节点赋值为head。

1.2.unlock方法

1.1.当前状态减1,状态如果为0,则setExclusiveOwnerThread为null,没有对应拥有线程,返回true,

1.2.状态不为0,则返回false

返回ture,会更新waitStatus,替换head节点的next指向下一个waitStatus小于0的节点

2.Node

AQS的实现依赖内部的同步队列,也就是FIFO的双向队列,如果当前线程竞争锁失败, 那么加入队列,同时阻塞该线程。当获取锁的线程释放锁之后,会从队列唤醒一个阻塞的节点(线程)。

AQS队列内部维护的是一个FIFO的双向链表,每个Node由线程封装,当线程强锁失败,会封装成Node加入AQS队列,包括一个head节点和一个tail节点。

static final class Node {
        static final Node SHARED = new Node();
        static final Node EXCLUSIVE = null;
        static final int CANCELLED =  1;
        static final int SIGNAL    = -1;
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;
        volatile int waitStatus;
        // 前驱节点
        volatile Node prev;
        // 后继节点
        volatile Node next;
        volatile Thread thread;
        Node nextWaiter;
        final boolean isShared() {
            return nextWaiter == SHARED;
        }
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }
           

3.AQS中队列的节点变换

线程安全(十)AQS(AbstractQueuedSynchronizer)系列文章目录0.前言1.大致整体流程2.Node3.AQS中队列的节点变换4.源码解析

分别说明一下,新增第一个节点nodeA时,包含head与tail节点初始化的过程,和nodeA替换tail的的过程。

增加第二个节点nodeB时,包含nodeB替代tail,以及nodeB的pre节点指向nodeA(原tail),nodeA的next节点指向nodeB。

释放时,head节点的next需要指向nodeB

4.源码解析

4.1.lock

final void lock() {
            acquire(1);
        }
        
public final void acquire(int arg) {
		// tryAcquire(arg)返回true,则获取所有权成功,不用执行另一个条件,加锁失败则需要执行acquireQueued(),因为这个方法里面是自循环,只有符合条件获取锁才能返回结果。addWaiter方法的作用时把node,加入队列,并且返回处理后的node。
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
// 尝试获取所有权
protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
            // state状态为0,因为是公平锁,必须要判断下有没有队列,没有队列才可以获取所有权,设置当前线程为拥有权线程,并把状态改为1,返回true
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 当前线程就拥有拥有权,state+1,返回true
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
 // 循环判断当前node是否是pre指向head以及尝试获取所有权
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

// 创建节点
private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // 有尾节点,直接和enq方法的else分支一样,没有,就调用enq方法
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

// 创建节点
private Node enq(final Node node) {
		// 如果尾节点为nul,必须初始化,把头结点赋值为new Node(),然后让node的前驱节点变为头结点。其他情况,让node的前驱节点变为位结点。变化过程为,传入的node节点的pre指向之前的tail节点,传入的node节点的值赋值为tail,之前的tail节点的next指向前tail。(因为tail节点没有next) 
		附:AQS有两个节点属性,有个是head节点,一个是tali节点,head节点的next节点,tail节点的
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

           

4.2.unlock

public void unlock() {
        sync.release(1);
    }
public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
// 设置拥有线程为null,设置状态为0
protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
 private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
		// 头节点的next节点置为null,然后指向下下个节点
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
           

继续阅读