天天看点

并发编程理论 - AQS的设计意图和扩展用法

    AbstractQueuedSynchronized是所以juc包的锁的实现本质,内部维护了一个虚拟的双向链表(链表的节点就是内部类Node),虚拟双向链表通过对 volatile state + CAS处理【之前分析atomic时,知道其实现了原子性、可见性和有序性】,对外表现为共享和排他两种模型。排他模式是,可以将当前线程设置到AQS父类AbstractOwnableSynchronizer属性(Thread exclusiveOwnerThread)中,提供set、get方法,当然一重入线程进入时可以判断是否该独享线程是自己。

    我们知道Java的synchronized是MESA管程模型的实现,所以AQS中提供了Condition接口【实现类为ConditionObject】,一个单向队列,本质上就是管程模型的条件队列,只是这个允许创建多个条件队列。AQS本身并不是一个完整的工具而是一个抽象模板,只是实现了重要的共享、排他的获取和释放方法等,运行子类对其扩展【并且Doug Lea推荐子类使用final的内部类对其进行扩展,比如:ReentrantLock static final class FairSync extends Sync{ }】。管程模型对比如下:

并发编程理论 - AQS的设计意图和扩展用法

    至于AQS到底是个什么东西,我觉得还是让并发大师Daug Lea自己怎么说,我用很烂的英语和翻译软件,试着去看完了AQS类的注释,很多设计意图和用法在这里得到了答案:

/**
 * Provides a framework for implementing blocking locks and related
 * synchronizers (semaphores, events, etc) that rely on
 * first-in-first-out (FIFO) wait queues.  This class is designed to
 * be a useful basis for most kinds of synchronizers that rely on a
 * single atomic {@code int} value to represent state. Subclasses
 * must define the protected methods that change this state, and which
 * define what that state means in terms of this object being acquired
 * or released.  Given these, the other methods in this class carry
 * out all queuing and blocking mechanics. Subclasses can maintain
 * other state fields, but only the atomically updated {@code int}
 * value manipulated using methods {@link #getState}, {@link
 * #setState} and {@link #compareAndSetState} is tracked with respect
 * to synchronization.
 *
 * <p>Subclasses should be defined as non-public internal helper
 * classes that are used to implement the synchronization properties
 * of their enclosing class.  Class
 * {@code AbstractQueuedSynchronizer} does not implement any
 * synchronization interface.  Instead it defines methods such as
 * {@link #acquireInterruptibly} that can be invoked as
 * appropriate by concrete locks and related synchronizers to
 * implement their public methods.
 *
 * <p>This class supports either or both a default <em>exclusive</em>
 * mode and a <em>shared</em> mode. When acquired in exclusive mode,
 * attempted acquires by other threads cannot succeed. Shared mode
 * acquires by multiple threads may (but need not) succeed. This class
 * does not &quot;understand&quot; these differences except in the
 * mechanical sense that when a shared mode acquire succeeds, the next
 * waiting thread (if one exists) must also determine whether it can
 * acquire as well. Threads waiting in the different modes share the
 * same FIFO queue. Usually, implementation subclasses support only
 * one of these modes, but both can come into play for example in a
 * {@link ReadWriteLock}. Subclasses that support only exclusive or
 * only shared modes need not define the methods supporting the unused mode.
 *
 * <p>This class defines a nested {@link ConditionObject} class that
 * can be used as a {@link Condition} implementation by subclasses
 * supporting exclusive mode for which method {@link
 * #isHeldExclusively} reports whether synchronization is exclusively
 * held with respect to the current thread, method {@link #release}
 * invoked with the current {@link #getState} value fully releases
 * this object, and {@link #acquire}, given this saved state value,
 * eventually restores this object to its previous acquired state.  No
 * {@code AbstractQueuedSynchronizer} method otherwise creates such a
 * condition, so if this constraint cannot be met, do not use it.  The
 * behavior of {@link ConditionObject} depends of course on the
 * semantics of its synchronizer implementation.
 *
 * <p>This class provides inspection, instrumentation, and monitoring
 * methods for the internal queue, as well as similar methods for
 * condition objects. These can be exported as desired into classes
 * using an {@code AbstractQueuedSynchronizer} for their
 * synchronization mechanics.
 *
 * <p>Serialization of this class stores only the underlying atomic
 * integer maintaining state, so deserialized objects have empty
 * thread queues. Typical subclasses requiring serializability will
 * define a {@code readObject} method that restores this to a known
 * initial state upon deserialization.
 *
 * <h3>Usage</h3>
 *
 * <p>To use this class as the basis of a synchronizer, redefine the
 * following methods, as applicable, by inspecting and/or modifying
 * the synchronization state using {@link #getState}, {@link
 * #setState} and/or {@link #compareAndSetState}:
 *
 * <ul>
 * <li> {@link #tryAcquire}
 * <li> {@link #tryRelease}
 * <li> {@link #tryAcquireShared}
 * <li> {@link #tryReleaseShared}
 * <li> {@link #isHeldExclusively}
 * </ul>
 *
 * Each of these methods by default throws {@link
 * UnsupportedOperationException}.  Implementations of these methods
 * must be internally thread-safe, and should in general be short and
 * not block. Defining these methods is the <em>only</em> supported
 * means of using this class. All other methods are declared
 * {@code final} because they cannot be independently varied.
 *
 * <p>You may also find the inherited methods from {@link
 * AbstractOwnableSynchronizer} useful to keep track of the thread
 * owning an exclusive synchronizer.  You are encouraged to use them
 * -- this enables monitoring and diagnostic tools to assist users in
 * determining which threads hold locks.
 *
 * <p>Even though this class is based on an internal FIFO queue, it
 * does not automatically enforce FIFO acquisition policies.  The core
 * of exclusive synchronization takes the form:
 *
 * <pre>
 * Acquire:
 *     while (!tryAcquire(arg)) {
 *        <em>enqueue thread if it is not already queued</em>;
 *        <em>possibly block current thread</em>;
 *     }
 *
 * Release:
 *     if (tryRelease(arg))
 *        <em>unblock the first queued thread</em>;
 * </pre>
 *
 * (Shared mode is similar but may involve cascading signals.)
 *
 * <p id="barging">Because checks in acquire are invoked before
 * enqueuing, a newly acquiring thread may <em>barge</em> ahead of
 * others that are blocked and queued.  However, you can, if desired,
 * define {@code tryAcquire} and/or {@code tryAcquireShared} to
 * disable barging by internally invoking one or more of the inspection
 * methods, thereby providing a <em>fair</em> FIFO acquisition order.
 * In particular, most fair synchronizers can define {@code tryAcquire}
 * to return {@code false} if {@link #hasQueuedPredecessors} (a method
 * specifically designed to be used by fair synchronizers) returns
 * {@code true}.  Other variations are possible.
 *
 * <p>Throughput and scalability are generally highest for the
 * default barging (also known as <em>greedy</em>,
 * <em>renouncement</em>, and <em>convoy-avoidance</em>) strategy.
 * While this is not guaranteed to be fair or starvation-free, earlier
 * queued threads are allowed to recontend before later queued
 * threads, and each recontention has an unbiased chance to succeed
 * against incoming threads.  Also, while acquires do not
 * &quot;spin&quot; in the usual sense, they may perform multiple
 * invocations of {@code tryAcquire} interspersed with other
 * computations before blocking.  This gives most of the benefits of
 * spins when exclusive synchronization is only briefly held, without
 * most of the liabilities when it isn't. If so desired, you can
 * augment this by preceding calls to acquire methods with
 * "fast-path" checks, possibly prechecking {@link #hasContended}
 * and/or {@link #hasQueuedThreads} to only do so if the synchronizer
 * is likely not to be contended.
 *
 * <p>This class provides an efficient and scalable basis for
 * synchronization in part by specializing its range of use to
 * synchronizers that can rely on {@code int} state, acquire, and
 * release parameters, and an internal FIFO wait queue. When this does
 * not suffice, you can build synchronizers from a lower level using
 * {@link java.util.concurrent.atomic atomic} classes, your own custom
 * {@link java.util.Queue} classes, and {@link LockSupport} blocking
 * support.
 *
 * <h3>Usage Examples</h3>
 *
 * <p>Here is a non-reentrant mutual exclusion lock class that uses
 * the value zero to represent the unlocked state, and one to
 * represent the locked state. While a non-reentrant lock
 * does not strictly require recording of the current owner
 * thread, this class does so anyway to make usage easier to monitor.
 * It also supports conditions and exposes
 * one of the instrumentation methods:
 *
 *  <pre> {@code
 * class Mutex implements Lock, java.io.Serializable {
 *
 *   // Our internal helper class
 *   private static class Sync extends AbstractQueuedSynchronizer {
 *     // Reports whether in locked state
 *     protected boolean isHeldExclusively() {
 *       return getState() == 1;
 *     }
 *
 *     // Acquires the lock if state is zero
 *     public boolean tryAcquire(int acquires) {
 *       assert acquires == 1; // Otherwise unused
 *       if (compareAndSetState(0, 1)) {
 *         setExclusiveOwnerThread(Thread.currentThread());
 *         return true;
 *       }
 *       return false;
 *     }
 *
 *     // Releases the lock by setting state to zero
 *     protected boolean tryRelease(int releases) {
 *       assert releases == 1; // Otherwise unused
 *       if (getState() == 0) throw new IllegalMonitorStateException();
 *       setExclusiveOwnerThread(null);
 *       setState(0);
 *       return true;
 *     }
 *
 *     // Provides a Condition
 *     Condition newCondition() { return new ConditionObject(); }
 *
 *     // Deserializes properly
 *     private void readObject(ObjectInputStream s)
 *         throws IOException, ClassNotFoundException {
 *       s.defaultReadObject();
 *       setState(0); // reset to unlocked state
 *     }
 *   }
 *
 *   // The sync object does all the hard work. We just forward to it.
 *   private final Sync sync = new Sync();
 *
 *   public void lock()                { sync.acquire(1); }
 *   public boolean tryLock()          { return sync.tryAcquire(1); }
 *   public void unlock()              { sync.release(1); }
 *   public Condition newCondition()   { return sync.newCondition(); }
 *   public boolean isLocked()         { return sync.isHeldExclusively(); }
 *   public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
 *   public void lockInterruptibly() throws InterruptedException {
 *     sync.acquireInterruptibly(1);
 *   }
 *   public boolean tryLock(long timeout, TimeUnit unit)
 *       throws InterruptedException {
 *     return sync.tryAcquireNanos(1, unit.toNanos(timeout));
 *   }
 * }}</pre>
 *
 * <p>Here is a latch class that is like a
 * {@link java.util.concurrent.CountDownLatch CountDownLatch}
 * except that it only requires a single {@code signal} to
 * fire. Because a latch is non-exclusive, it uses the {@code shared}
 * acquire and release methods.
 *
 *  <pre> {@code
 * class BooleanLatch {
 *
 *   private static class Sync extends AbstractQueuedSynchronizer {
 *     boolean isSignalled() { return getState() != 0; }
 *
 *     protected int tryAcquireShared(int ignore) {
 *       return isSignalled() ? 1 : -1;
 *     }
 *
 *     protected boolean tryReleaseShared(int ignore) {
 *       setState(1);
 *       return true;
 *     }
 *   }
 *
 *   private final Sync sync = new Sync();
 *   public boolean isSignalled() { return sync.isSignalled(); }
 *   public void signal()         { sync.releaseShared(1); }
 *   public void await() throws InterruptedException {
 *     sync.acquireSharedInterruptibly(1);
 *   }
 * }}</pre>
 *
 * @since 1.5
 * @author Doug Lea
 */
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
}
           

 理解翻译如下:

<p>提供了一个实现阻塞和相关同步(如:信号量【juc包中提供了Semaphore】、事件等)的框架,这些依赖于一个先进先出(FIFO)的等待队列。AQS类的设计的大部分场景是基于一个单个原子{@code int}值,这里就是指state属性。模板方法的子类必须使用protected的方法在内部改变state属性的值,虽然AQS中定义了一套自己的state表示的含义,但是子类可以自己根据自身业务定义一套【个人理解也学习到了,这个一个非常好的开发方式,自己定义自己的一套状态含义值,也可以理解为生命周期,Daug Lea在AQS中定义了state,FutureTask中定义的state也规定了生命周期走向,ThreadPoolExecutor中的ctl,高3位表示状态,低27位表示最大线程数】。给定这些值后,这个类中的其他方法执行都需要排队和进入阻塞机制,即state 控制这双向队列。自己的子类也可以维护其他类似state的状态字段,但是只有原子更新使用AQS中定义的state的对应方法{@link #getState}、{@link #setState}、{@link #compareAndSetState}操作才会跟踪到同步(即才对双向队列产生影响)。

<p>子类必须定义非public的子辅助类,来修改其封装的同步属性;AQS类本身没有实现任何的同步接口,相反它定义了{@link #acquireInterruptibly}这样的方法,具体的锁和同步器可以调用这些方法来实现其公共的对外方法。

        所以,理解了为什么ReentrantLock【的Sync子类FairSync、NonfairSync】、CountDownLatch、Semaphore、ThreadPoolExecutor都使用final类型内部类实现AbstractQueuedSynchronizer

并发编程理论 - AQS的设计意图和扩展用法
并发编程理论 - AQS的设计意图和扩展用法

<p> AQS支持独占和共享两种模式(可以值实现一种【并且实现独占模式,则可以不实现共享模式的对应方法】,也可以同时实现两种模式【比如ReentrantLock等】)。当一个线程获取到独占模式的锁后,其他试图获取该锁的线程都不会成功,共享模式则运行过个线程都获取成功。AQS类不知道共享模式时,子类获取到获取成功之后意味着什么,则需要自己去控制(比如:限流器,可以控制同时多少个线程能获取到)。

<p> AQS内部定义了{@link Condition} (对应的实现类为{@link ConditionObject})接口,可以以独占的方式调用{@link #isHeldExclusively}来判断是否自己持有锁,方法{@link #release}和{@link #acquire}来获取和释放锁,配合{#link #getState}来处理任务,并将双向队列恢复到处理前的状态。当创建了Condition时,需要满足条件才能进入进入,所以设置条件队列是需要谨慎,否则就不要使用。{@link ConditionObject}底层同样是依赖于同步器的实现语义,即依赖双向队列实现。

<p> AQS序列化只存储底层原子整数维护状态(即只序列化state属性),因此反序列化的双向队列为空。如果需要处理,则可以重写{@code readobject}方法,该方法在反序列化时将其恢复到已知的初始状态【将反序列化的数据存储回双向队列】。

<h3>使用<h3> 

<p> 如果要使用AQS类作为一个基准的同步器,在适当的情况下,可以使用{@link getState}、{@link #setState}、{@link #compareAndSetState}方法【即,自己要开发其他组件的时候,可以使用它】,配合下面的方法:

<ul>
    <li> {@link #tryAcquire}
    <li> {@link #tryRelease}
    <li> {@link #tryAcquireShared}
    <li> {@link #tryReleaseShared}
    <li> {@link #isHeldExclusively}
</ul>
           

这些方法默认都可能会抛出UnsupportedOperationException异常,这些方法的实现需要自己去保证内部是线程安全的,并且通过执行时间会非常短并且是非阻塞的。定义这些方法的调用,仅仅是在该类的方法中调用,否则可能不可控【比如:ReentrantLock的实现是UnfairSync,那么只能在ReentrantLock的方法才能调用UnfairSync中的调用该部分方法,否则其他类调用时完全不可控】。并且这些方法都应该定义成final类型的,因为他们是不能独立变化的。比如,我们看看ReentrantLock中的tryRelease方法:

并发编程理论 - AQS的设计意图和扩展用法

<p> 还可以使用{@link AbstractOmableSynchronizer}继承的方法对于跟踪拥有独占同步器的线程很有用。鼓励使用它们——这使得监视和诊断工具能够帮助用户确定哪些线程持有锁。

<p> 虽然AQS基于内部的一个FIFO队列,但是他不会自动的执行FIFO获取策略,即需要我们实现类编码。代码的推荐示例如下(基本是标准写法,所以实现的子类会看到很多下面代码的模式):

<pre>
    获取:
        while (!tryAcquire(arg)) {
            // <em>enqueue thread if it is not already queued</em>;
            // <em>possibly block current thread</em>;
        }

    释放:
        if (tryRelease(arg)) {
            // <em>unblock the first queued thread</em>;
        }
    </pre>
           

<p id="barging "> 因为上面的模型,acquire【获取】是在enqueuing【入队】操作之前被调用,一个新的获取线程可能在其他被阻塞队列之前。但是如果需要,可以通过内部调用一个或者多个检查使用{@code tryAcquire} and/or {@code tryAcquireShared}之后得到一个完全的FIFO队列。特别是使用为专门同步器设计的方法{@link #hasQueuedPredecessors}返回true,而大多数公平锁时{@link tryAcquire}会返回false【公平锁性能较低】。

<p> 相比较于renouncement和convoy-avoidance策略,使用默认的barging模式的吞吐性和并发量是最高的。虽然并不能保证公平和无饥饿,但是允许较早排队的线程在较晚排队的线程之前重新竞争(实现相对的FIFO机会)。当独占同步的任务执行时间非常短时,自旋能带来效果。如果需要,您可以通过在调用之前使用“快速路径”检查(可能是预先检查{@link #hasContended)来增强这一点和或{@link #hasQueuedThreads}只在同步器可能不竞争时这样做。

<p> AQS作为模板方法的根类,为子类的扩展提供了基础。还可以通过AQS的state【状态】属性来获取和释放,而得到一个FIFO的等待队列。如果这些还不够,我们也可以使用juc atomic包下的类 + 自己的Queue子类 + LockSupport的阻塞唤醒机制,来实现自己的同步器。

<h3>使用示例【自己的扩展,对于自己如果做架构扩展应该是受益匪浅】</h3>

扩展示例一:下面是一个不可重入锁(juc下面我们熟知的都是Reentrant* 相关的可重入锁),使用0表示解锁状态,1表示锁定状态。而非重入锁是否严格要求记录当前线程的持有者

class Mutex implements Lock, java.io.Serializable {
	// Our internal helper class
	private static class Sync extends AbstractQueuedSynchronizer {

		// Reports whether in locked state
		protected boolean isHeldExclusively() {
			return getState() == 1;
		}
	
		// Acquires the lock if state is zero
		public boolean tryAcquire(int acquires) {
			assert acquires == 1; // Otherwise unused
			if (compareAndSetState(0, 1)) {
				setExclusiveOwnerThread(Thread.currentThread());
				return true;
			}
			return false;
		}
	
		// Releases the lock by setting state to zero
		protected boolean tryRelease(int releases) {
			assert releases == 1; // Otherwise unused
			if (getState() == 0) {
				throw new IllegalMonitorStateException();
			}
			setExclusiveOwnerThread(null);
			setState(0);
			return true;
		}
		
		// Provides a Condition
		Condition newCondition() { 
			return new ConditionObject(); 
		}
		// Deserializes properly
		private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException {
			s.defaultReadObject();
			setState(0); // reset to unlocked state
		}
	}
	
	// The sync object does all the hard work. We just forward to it.
	private final Sync sync = new Sync();
	public void lock()                { sync.acquire(1); }
	public boolean tryLock()          { return sync.tryAcquire(1); }
	public void unlock()              { sync.release(1); }
	public Condition newCondition()   { return sync.newCondition(); }
	public boolean isLocked()         { return sync.isHeldExclusively(); }
	public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
	public void lockInterruptibly() throws InterruptedException {
    	sync.acquireInterruptibly(1);
  	}
	public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
		return sync.tryAcquireNanos(1, unit.toNanos(timeout));
	}
}
           

扩展示例二:这是一个类似{@link CountDownLatch}的门闩模型,只是它只需要一个{@code signal}就开始。因为闩锁是非独占的,所以它使用{@code shared}获取和释放方法。

class BooleanLatch {

    private static class Sync extends AbstractQueuedSynchronizer {

        boolean isSignalled() {
            return getState() != 0;
        }

        protected int tryAcquireShared(int ignore) {
            return isSignalled() ? 1 : -1;
        }

        protected boolean tryReleaseShared(int ignore) {
            setState(1);
            return true;
        }
    }

    private final Sync sync = new Sync();
    
    public boolean isSignalled() { 
        return sync.isSignalled(); 
    }
    public void signal() { 
        sync.releaseShared(1); 
    }

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
}
           

继续阅读