天天看點

并發程式設計理論 - AQS的設計意圖和擴充用法

    AbstractQueuedSynchronized是是以juc包的鎖的實作本質,内部維護了一個虛拟的雙向連結清單(連結清單的節點就是内部類Node),虛拟雙向連結清單通過對 volatile state + CAS處理【之前分析atomic時,知道其實作了原子性、可見性和有序性】,對外表現為共享和排他兩種模型。排他模式是,可以将目前線程設定到AQS父類AbstractOwnableSynchronizer屬性(Thread exclusiveOwnerThread)中,提供set、get方法,當然一重入線程進入時可以判斷是否該獨享線程是自己。

    我們知道Java的synchronized是MESA管程模型的實作,是以AQS中提供了Condition接口【實作類為ConditionObject】,一個單向隊列,本質上就是管程模型的條件隊列,隻是這個允許建立多個條件隊列。AQS本身并不是一個完整的工具而是一個抽象模闆,隻是實作了重要的共享、排他的擷取和釋放方法等,運作子類對其擴充【并且Doug Lea推薦子類使用final的内部類對其進行擴充,比如:ReentrantLock static final class FairSync extends Sync{ }】。管程模型對比如下:

并發程式設計理論 - AQS的設計意圖和擴充用法

    至于AQS到底是個什麼東西,我覺得還是讓并發大師Daug Lea自己怎麼說,我用很爛的英語和翻譯軟體,試着去看完了AQS類的注釋,很多設計意圖和用法在這裡得到了答案:

/**
 * Provides a framework for implementing blocking locks and related
 * synchronizers (semaphores, events, etc) that rely on
 * first-in-first-out (FIFO) wait queues.  This class is designed to
 * be a useful basis for most kinds of synchronizers that rely on a
 * single atomic {@code int} value to represent state. Subclasses
 * must define the protected methods that change this state, and which
 * define what that state means in terms of this object being acquired
 * or released.  Given these, the other methods in this class carry
 * out all queuing and blocking mechanics. Subclasses can maintain
 * other state fields, but only the atomically updated {@code int}
 * value manipulated using methods {@link #getState}, {@link
 * #setState} and {@link #compareAndSetState} is tracked with respect
 * to synchronization.
 *
 * <p>Subclasses should be defined as non-public internal helper
 * classes that are used to implement the synchronization properties
 * of their enclosing class.  Class
 * {@code AbstractQueuedSynchronizer} does not implement any
 * synchronization interface.  Instead it defines methods such as
 * {@link #acquireInterruptibly} that can be invoked as
 * appropriate by concrete locks and related synchronizers to
 * implement their public methods.
 *
 * <p>This class supports either or both a default <em>exclusive</em>
 * mode and a <em>shared</em> mode. When acquired in exclusive mode,
 * attempted acquires by other threads cannot succeed. Shared mode
 * acquires by multiple threads may (but need not) succeed. This class
 * does not &quot;understand&quot; these differences except in the
 * mechanical sense that when a shared mode acquire succeeds, the next
 * waiting thread (if one exists) must also determine whether it can
 * acquire as well. Threads waiting in the different modes share the
 * same FIFO queue. Usually, implementation subclasses support only
 * one of these modes, but both can come into play for example in a
 * {@link ReadWriteLock}. Subclasses that support only exclusive or
 * only shared modes need not define the methods supporting the unused mode.
 *
 * <p>This class defines a nested {@link ConditionObject} class that
 * can be used as a {@link Condition} implementation by subclasses
 * supporting exclusive mode for which method {@link
 * #isHeldExclusively} reports whether synchronization is exclusively
 * held with respect to the current thread, method {@link #release}
 * invoked with the current {@link #getState} value fully releases
 * this object, and {@link #acquire}, given this saved state value,
 * eventually restores this object to its previous acquired state.  No
 * {@code AbstractQueuedSynchronizer} method otherwise creates such a
 * condition, so if this constraint cannot be met, do not use it.  The
 * behavior of {@link ConditionObject} depends of course on the
 * semantics of its synchronizer implementation.
 *
 * <p>This class provides inspection, instrumentation, and monitoring
 * methods for the internal queue, as well as similar methods for
 * condition objects. These can be exported as desired into classes
 * using an {@code AbstractQueuedSynchronizer} for their
 * synchronization mechanics.
 *
 * <p>Serialization of this class stores only the underlying atomic
 * integer maintaining state, so deserialized objects have empty
 * thread queues. Typical subclasses requiring serializability will
 * define a {@code readObject} method that restores this to a known
 * initial state upon deserialization.
 *
 * <h3>Usage</h3>
 *
 * <p>To use this class as the basis of a synchronizer, redefine the
 * following methods, as applicable, by inspecting and/or modifying
 * the synchronization state using {@link #getState}, {@link
 * #setState} and/or {@link #compareAndSetState}:
 *
 * <ul>
 * <li> {@link #tryAcquire}
 * <li> {@link #tryRelease}
 * <li> {@link #tryAcquireShared}
 * <li> {@link #tryReleaseShared}
 * <li> {@link #isHeldExclusively}
 * </ul>
 *
 * Each of these methods by default throws {@link
 * UnsupportedOperationException}.  Implementations of these methods
 * must be internally thread-safe, and should in general be short and
 * not block. Defining these methods is the <em>only</em> supported
 * means of using this class. All other methods are declared
 * {@code final} because they cannot be independently varied.
 *
 * <p>You may also find the inherited methods from {@link
 * AbstractOwnableSynchronizer} useful to keep track of the thread
 * owning an exclusive synchronizer.  You are encouraged to use them
 * -- this enables monitoring and diagnostic tools to assist users in
 * determining which threads hold locks.
 *
 * <p>Even though this class is based on an internal FIFO queue, it
 * does not automatically enforce FIFO acquisition policies.  The core
 * of exclusive synchronization takes the form:
 *
 * <pre>
 * Acquire:
 *     while (!tryAcquire(arg)) {
 *        <em>enqueue thread if it is not already queued</em>;
 *        <em>possibly block current thread</em>;
 *     }
 *
 * Release:
 *     if (tryRelease(arg))
 *        <em>unblock the first queued thread</em>;
 * </pre>
 *
 * (Shared mode is similar but may involve cascading signals.)
 *
 * <p id="barging">Because checks in acquire are invoked before
 * enqueuing, a newly acquiring thread may <em>barge</em> ahead of
 * others that are blocked and queued.  However, you can, if desired,
 * define {@code tryAcquire} and/or {@code tryAcquireShared} to
 * disable barging by internally invoking one or more of the inspection
 * methods, thereby providing a <em>fair</em> FIFO acquisition order.
 * In particular, most fair synchronizers can define {@code tryAcquire}
 * to return {@code false} if {@link #hasQueuedPredecessors} (a method
 * specifically designed to be used by fair synchronizers) returns
 * {@code true}.  Other variations are possible.
 *
 * <p>Throughput and scalability are generally highest for the
 * default barging (also known as <em>greedy</em>,
 * <em>renouncement</em>, and <em>convoy-avoidance</em>) strategy.
 * While this is not guaranteed to be fair or starvation-free, earlier
 * queued threads are allowed to recontend before later queued
 * threads, and each recontention has an unbiased chance to succeed
 * against incoming threads.  Also, while acquires do not
 * &quot;spin&quot; in the usual sense, they may perform multiple
 * invocations of {@code tryAcquire} interspersed with other
 * computations before blocking.  This gives most of the benefits of
 * spins when exclusive synchronization is only briefly held, without
 * most of the liabilities when it isn't. If so desired, you can
 * augment this by preceding calls to acquire methods with
 * "fast-path" checks, possibly prechecking {@link #hasContended}
 * and/or {@link #hasQueuedThreads} to only do so if the synchronizer
 * is likely not to be contended.
 *
 * <p>This class provides an efficient and scalable basis for
 * synchronization in part by specializing its range of use to
 * synchronizers that can rely on {@code int} state, acquire, and
 * release parameters, and an internal FIFO wait queue. When this does
 * not suffice, you can build synchronizers from a lower level using
 * {@link java.util.concurrent.atomic atomic} classes, your own custom
 * {@link java.util.Queue} classes, and {@link LockSupport} blocking
 * support.
 *
 * <h3>Usage Examples</h3>
 *
 * <p>Here is a non-reentrant mutual exclusion lock class that uses
 * the value zero to represent the unlocked state, and one to
 * represent the locked state. While a non-reentrant lock
 * does not strictly require recording of the current owner
 * thread, this class does so anyway to make usage easier to monitor.
 * It also supports conditions and exposes
 * one of the instrumentation methods:
 *
 *  <pre> {@code
 * class Mutex implements Lock, java.io.Serializable {
 *
 *   // Our internal helper class
 *   private static class Sync extends AbstractQueuedSynchronizer {
 *     // Reports whether in locked state
 *     protected boolean isHeldExclusively() {
 *       return getState() == 1;
 *     }
 *
 *     // Acquires the lock if state is zero
 *     public boolean tryAcquire(int acquires) {
 *       assert acquires == 1; // Otherwise unused
 *       if (compareAndSetState(0, 1)) {
 *         setExclusiveOwnerThread(Thread.currentThread());
 *         return true;
 *       }
 *       return false;
 *     }
 *
 *     // Releases the lock by setting state to zero
 *     protected boolean tryRelease(int releases) {
 *       assert releases == 1; // Otherwise unused
 *       if (getState() == 0) throw new IllegalMonitorStateException();
 *       setExclusiveOwnerThread(null);
 *       setState(0);
 *       return true;
 *     }
 *
 *     // Provides a Condition
 *     Condition newCondition() { return new ConditionObject(); }
 *
 *     // Deserializes properly
 *     private void readObject(ObjectInputStream s)
 *         throws IOException, ClassNotFoundException {
 *       s.defaultReadObject();
 *       setState(0); // reset to unlocked state
 *     }
 *   }
 *
 *   // The sync object does all the hard work. We just forward to it.
 *   private final Sync sync = new Sync();
 *
 *   public void lock()                { sync.acquire(1); }
 *   public boolean tryLock()          { return sync.tryAcquire(1); }
 *   public void unlock()              { sync.release(1); }
 *   public Condition newCondition()   { return sync.newCondition(); }
 *   public boolean isLocked()         { return sync.isHeldExclusively(); }
 *   public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
 *   public void lockInterruptibly() throws InterruptedException {
 *     sync.acquireInterruptibly(1);
 *   }
 *   public boolean tryLock(long timeout, TimeUnit unit)
 *       throws InterruptedException {
 *     return sync.tryAcquireNanos(1, unit.toNanos(timeout));
 *   }
 * }}</pre>
 *
 * <p>Here is a latch class that is like a
 * {@link java.util.concurrent.CountDownLatch CountDownLatch}
 * except that it only requires a single {@code signal} to
 * fire. Because a latch is non-exclusive, it uses the {@code shared}
 * acquire and release methods.
 *
 *  <pre> {@code
 * class BooleanLatch {
 *
 *   private static class Sync extends AbstractQueuedSynchronizer {
 *     boolean isSignalled() { return getState() != 0; }
 *
 *     protected int tryAcquireShared(int ignore) {
 *       return isSignalled() ? 1 : -1;
 *     }
 *
 *     protected boolean tryReleaseShared(int ignore) {
 *       setState(1);
 *       return true;
 *     }
 *   }
 *
 *   private final Sync sync = new Sync();
 *   public boolean isSignalled() { return sync.isSignalled(); }
 *   public void signal()         { sync.releaseShared(1); }
 *   public void await() throws InterruptedException {
 *     sync.acquireSharedInterruptibly(1);
 *   }
 * }}</pre>
 *
 * @since 1.5
 * @author Doug Lea
 */
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
}
           

 了解翻譯如下:

<p>提供了一個實作阻塞和相關同步(如:信号量【juc包中提供了Semaphore】、事件等)的架構,這些依賴于一個先進先出(FIFO)的等待隊列。AQS類的設計的大部分場景是基于一個單個原子{@code int}值,這裡就是指state屬性。模闆方法的子類必須使用protected的方法在内部改變state屬性的值,雖然AQS中定義了一套自己的state表示的含義,但是子類可以自己根據自身業務定義一套【個人了解也學習到了,這個一個非常好的開發方式,自己定義自己的一套狀态含義值,也可以了解為生命周期,Daug Lea在AQS中定義了state,FutureTask中定義的state也規定了生命周期走向,ThreadPoolExecutor中的ctl,高3位表示狀态,低27位表示最大線程數】。給定這些值後,這個類中的其他方法執行都需要排隊和進入阻塞機制,即state 控制這雙向隊列。自己的子類也可以維護其他類似state的狀态字段,但是隻有原子更新使用AQS中定義的state的對應方法{@link #getState}、{@link #setState}、{@link #compareAndSetState}操作才會跟蹤到同步(即才對雙向隊列産生影響)。

<p>子類必須定義非public的子輔助類,來修改其封裝的同步屬性;AQS類本身沒有實作任何的同步接口,相反它定義了{@link #acquireInterruptibly}這樣的方法,具體的鎖和同步器可以調用這些方法來實作其公共的對外方法。

        是以,了解了為什麼ReentrantLock【的Sync子類FairSync、NonfairSync】、CountDownLatch、Semaphore、ThreadPoolExecutor都使用final類型内部類實作AbstractQueuedSynchronizer

并發程式設計理論 - AQS的設計意圖和擴充用法
并發程式設計理論 - AQS的設計意圖和擴充用法

<p> AQS支援獨占和共享兩種模式(可以值實作一種【并且實作獨占模式,則可以不實作共享模式的對應方法】,也可以同時實作兩種模式【比如ReentrantLock等】)。當一個線程擷取到獨占模式的鎖後,其他試圖擷取該鎖的線程都不會成功,共享模式則運作過個線程都擷取成功。AQS類不知道共享模式時,子類擷取到擷取成功之後意味着什麼,則需要自己去控制(比如:限流器,可以控制同時多少個線程能擷取到)。

<p> AQS内部定義了{@link Condition} (對應的實作類為{@link ConditionObject})接口,可以以獨占的方式調用{@link #isHeldExclusively}來判斷是否自己持有鎖,方法{@link #release}和{@link #acquire}來擷取和釋放鎖,配合{#link #getState}來處理任務,并将雙向隊列恢複到處理前的狀态。當建立了Condition時,需要滿足條件才能進入進入,是以設定條件隊列是需要謹慎,否則就不要使用。{@link ConditionObject}底層同樣是依賴于同步器的實作語義,即依賴雙向隊列實作。

<p> AQS序列化隻存儲底層原子整數維護狀态(即隻序列化state屬性),是以反序列化的雙向隊列為空。如果需要處理,則可以重寫{@code readobject}方法,該方法在反序列化時将其恢複到已知的初始狀态【将反序列化的資料存儲回雙向隊列】。

<h3>使用<h3> 

<p> 如果要使用AQS類作為一個基準的同步器,在适當的情況下,可以使用{@link getState}、{@link #setState}、{@link #compareAndSetState}方法【即,自己要開發其他元件的時候,可以使用它】,配合下面的方法:

<ul>
    <li> {@link #tryAcquire}
    <li> {@link #tryRelease}
    <li> {@link #tryAcquireShared}
    <li> {@link #tryReleaseShared}
    <li> {@link #isHeldExclusively}
</ul>
           

這些方法預設都可能會抛出UnsupportedOperationException異常,這些方法的實作需要自己去保證内部是線程安全的,并且通過執行時間會非常短并且是非阻塞的。定義這些方法的調用,僅僅是在該類的方法中調用,否則可能不可控【比如:ReentrantLock的實作是UnfairSync,那麼隻能在ReentrantLock的方法才能調用UnfairSync中的調用該部分方法,否則其他類調用時完全不可控】。并且這些方法都應該定義成final類型的,因為他們是不能獨立變化的。比如,我們看看ReentrantLock中的tryRelease方法:

并發程式設計理論 - AQS的設計意圖和擴充用法

<p> 還可以使用{@link AbstractOmableSynchronizer}繼承的方法對于跟蹤擁有獨占同步器的線程很有用。鼓勵使用它們——這使得監視和診斷工具能夠幫助使用者确定哪些線程持有鎖。

<p> 雖然AQS基于内部的一個FIFO隊列,但是他不會自動的執行FIFO擷取政策,即需要我們實作類編碼。代碼的推薦示例如下(基本是标準寫法,是以實作的子類會看到很多下面代碼的模式):

<pre>
    擷取:
        while (!tryAcquire(arg)) {
            // <em>enqueue thread if it is not already queued</em>;
            // <em>possibly block current thread</em>;
        }

    釋放:
        if (tryRelease(arg)) {
            // <em>unblock the first queued thread</em>;
        }
    </pre>
           

<p id="barging "> 因為上面的模型,acquire【擷取】是在enqueuing【入隊】操作之前被調用,一個新的擷取線程可能在其他被阻塞隊列之前。但是如果需要,可以通過内部調用一個或者多個檢查使用{@code tryAcquire} and/or {@code tryAcquireShared}之後得到一個完全的FIFO隊列。特别是使用為專門同步器設計的方法{@link #hasQueuedPredecessors}傳回true,而大多數公平鎖時{@link tryAcquire}會傳回false【公平鎖性能較低】。

<p> 相比較于renouncement和convoy-avoidance政策,使用預設的barging模式的吞吐性和并發量是最高的。雖然并不能保證公平和無饑餓,但是允許較早排隊的線程在較晚排隊的線程之前重新競争(實作相對的FIFO機會)。當獨占同步的任務執行時間非常短時,自旋能帶來效果。如果需要,您可以通過在調用之前使用“快速路徑”檢查(可能是預先檢查{@link #hasContended)來增強這一點和或{@link #hasQueuedThreads}隻在同步器可能不競争時這樣做。

<p> AQS作為模闆方法的根類,為子類的擴充提供了基礎。還可以通過AQS的state【狀态】屬性來擷取和釋放,而得到一個FIFO的等待隊列。如果這些還不夠,我們也可以使用juc atomic包下的類 + 自己的Queue子類 + LockSupport的阻塞喚醒機制,來實作自己的同步器。

<h3>使用示例【自己的擴充,對于自己如果做架構擴充應該是受益匪淺】</h3>

擴充示例一:下面是一個不可重入鎖(juc下面我們熟知的都是Reentrant* 相關的可重入鎖),使用0表示解鎖狀态,1表示鎖定狀态。而非重入鎖是否嚴格要求記錄目前線程的持有者

class Mutex implements Lock, java.io.Serializable {
	// Our internal helper class
	private static class Sync extends AbstractQueuedSynchronizer {

		// Reports whether in locked state
		protected boolean isHeldExclusively() {
			return getState() == 1;
		}
	
		// Acquires the lock if state is zero
		public boolean tryAcquire(int acquires) {
			assert acquires == 1; // Otherwise unused
			if (compareAndSetState(0, 1)) {
				setExclusiveOwnerThread(Thread.currentThread());
				return true;
			}
			return false;
		}
	
		// Releases the lock by setting state to zero
		protected boolean tryRelease(int releases) {
			assert releases == 1; // Otherwise unused
			if (getState() == 0) {
				throw new IllegalMonitorStateException();
			}
			setExclusiveOwnerThread(null);
			setState(0);
			return true;
		}
		
		// Provides a Condition
		Condition newCondition() { 
			return new ConditionObject(); 
		}
		// Deserializes properly
		private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException {
			s.defaultReadObject();
			setState(0); // reset to unlocked state
		}
	}
	
	// The sync object does all the hard work. We just forward to it.
	private final Sync sync = new Sync();
	public void lock()                { sync.acquire(1); }
	public boolean tryLock()          { return sync.tryAcquire(1); }
	public void unlock()              { sync.release(1); }
	public Condition newCondition()   { return sync.newCondition(); }
	public boolean isLocked()         { return sync.isHeldExclusively(); }
	public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
	public void lockInterruptibly() throws InterruptedException {
    	sync.acquireInterruptibly(1);
  	}
	public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
		return sync.tryAcquireNanos(1, unit.toNanos(timeout));
	}
}
           

擴充示例二:這是一個類似{@link CountDownLatch}的門闩模型,隻是它隻需要一個{@code signal}就開始。因為闩鎖是非獨占的,是以它使用{@code shared}擷取和釋放方法。

class BooleanLatch {

    private static class Sync extends AbstractQueuedSynchronizer {

        boolean isSignalled() {
            return getState() != 0;
        }

        protected int tryAcquireShared(int ignore) {
            return isSignalled() ? 1 : -1;
        }

        protected boolean tryReleaseShared(int ignore) {
            setState(1);
            return true;
        }
    }

    private final Sync sync = new Sync();
    
    public boolean isSignalled() { 
        return sync.isSignalled(); 
    }
    public void signal() { 
        sync.releaseShared(1); 
    }

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
}
           

繼續閱讀