AbstractQueuedSynchronized是是以juc包的鎖的實作本質,内部維護了一個虛拟的雙向連結清單(連結清單的節點就是内部類Node),虛拟雙向連結清單通過對 volatile state + CAS處理【之前分析atomic時,知道其實作了原子性、可見性和有序性】,對外表現為共享和排他兩種模型。排他模式是,可以将目前線程設定到AQS父類AbstractOwnableSynchronizer屬性(Thread exclusiveOwnerThread)中,提供set、get方法,當然一重入線程進入時可以判斷是否該獨享線程是自己。
我們知道Java的synchronized是MESA管程模型的實作,是以AQS中提供了Condition接口【實作類為ConditionObject】,一個單向隊列,本質上就是管程模型的條件隊列,隻是這個允許建立多個條件隊列。AQS本身并不是一個完整的工具而是一個抽象模闆,隻是實作了重要的共享、排他的擷取和釋放方法等,運作子類對其擴充【并且Doug Lea推薦子類使用final的内部類對其進行擴充,比如:ReentrantLock static final class FairSync extends Sync{ }】。管程模型對比如下:

至于AQS到底是個什麼東西,我覺得還是讓并發大師Daug Lea自己怎麼說,我用很爛的英語和翻譯軟體,試着去看完了AQS類的注釋,很多設計意圖和用法在這裡得到了答案:
/**
* Provides a framework for implementing blocking locks and related
* synchronizers (semaphores, events, etc) that rely on
* first-in-first-out (FIFO) wait queues. This class is designed to
* be a useful basis for most kinds of synchronizers that rely on a
* single atomic {@code int} value to represent state. Subclasses
* must define the protected methods that change this state, and which
* define what that state means in terms of this object being acquired
* or released. Given these, the other methods in this class carry
* out all queuing and blocking mechanics. Subclasses can maintain
* other state fields, but only the atomically updated {@code int}
* value manipulated using methods {@link #getState}, {@link
* #setState} and {@link #compareAndSetState} is tracked with respect
* to synchronization.
*
* <p>Subclasses should be defined as non-public internal helper
* classes that are used to implement the synchronization properties
* of their enclosing class. Class
* {@code AbstractQueuedSynchronizer} does not implement any
* synchronization interface. Instead it defines methods such as
* {@link #acquireInterruptibly} that can be invoked as
* appropriate by concrete locks and related synchronizers to
* implement their public methods.
*
* <p>This class supports either or both a default <em>exclusive</em>
* mode and a <em>shared</em> mode. When acquired in exclusive mode,
* attempted acquires by other threads cannot succeed. Shared mode
* acquires by multiple threads may (but need not) succeed. This class
* does not "understand" these differences except in the
* mechanical sense that when a shared mode acquire succeeds, the next
* waiting thread (if one exists) must also determine whether it can
* acquire as well. Threads waiting in the different modes share the
* same FIFO queue. Usually, implementation subclasses support only
* one of these modes, but both can come into play for example in a
* {@link ReadWriteLock}. Subclasses that support only exclusive or
* only shared modes need not define the methods supporting the unused mode.
*
* <p>This class defines a nested {@link ConditionObject} class that
* can be used as a {@link Condition} implementation by subclasses
* supporting exclusive mode for which method {@link
* #isHeldExclusively} reports whether synchronization is exclusively
* held with respect to the current thread, method {@link #release}
* invoked with the current {@link #getState} value fully releases
* this object, and {@link #acquire}, given this saved state value,
* eventually restores this object to its previous acquired state. No
* {@code AbstractQueuedSynchronizer} method otherwise creates such a
* condition, so if this constraint cannot be met, do not use it. The
* behavior of {@link ConditionObject} depends of course on the
* semantics of its synchronizer implementation.
*
* <p>This class provides inspection, instrumentation, and monitoring
* methods for the internal queue, as well as similar methods for
* condition objects. These can be exported as desired into classes
* using an {@code AbstractQueuedSynchronizer} for their
* synchronization mechanics.
*
* <p>Serialization of this class stores only the underlying atomic
* integer maintaining state, so deserialized objects have empty
* thread queues. Typical subclasses requiring serializability will
* define a {@code readObject} method that restores this to a known
* initial state upon deserialization.
*
* <h3>Usage</h3>
*
* <p>To use this class as the basis of a synchronizer, redefine the
* following methods, as applicable, by inspecting and/or modifying
* the synchronization state using {@link #getState}, {@link
* #setState} and/or {@link #compareAndSetState}:
*
* <ul>
* <li> {@link #tryAcquire}
* <li> {@link #tryRelease}
* <li> {@link #tryAcquireShared}
* <li> {@link #tryReleaseShared}
* <li> {@link #isHeldExclusively}
* </ul>
*
* Each of these methods by default throws {@link
* UnsupportedOperationException}. Implementations of these methods
* must be internally thread-safe, and should in general be short and
* not block. Defining these methods is the <em>only</em> supported
* means of using this class. All other methods are declared
* {@code final} because they cannot be independently varied.
*
* <p>You may also find the inherited methods from {@link
* AbstractOwnableSynchronizer} useful to keep track of the thread
* owning an exclusive synchronizer. You are encouraged to use them
* -- this enables monitoring and diagnostic tools to assist users in
* determining which threads hold locks.
*
* <p>Even though this class is based on an internal FIFO queue, it
* does not automatically enforce FIFO acquisition policies. The core
* of exclusive synchronization takes the form:
*
* <pre>
* Acquire:
* while (!tryAcquire(arg)) {
* <em>enqueue thread if it is not already queued</em>;
* <em>possibly block current thread</em>;
* }
*
* Release:
* if (tryRelease(arg))
* <em>unblock the first queued thread</em>;
* </pre>
*
* (Shared mode is similar but may involve cascading signals.)
*
* <p id="barging">Because checks in acquire are invoked before
* enqueuing, a newly acquiring thread may <em>barge</em> ahead of
* others that are blocked and queued. However, you can, if desired,
* define {@code tryAcquire} and/or {@code tryAcquireShared} to
* disable barging by internally invoking one or more of the inspection
* methods, thereby providing a <em>fair</em> FIFO acquisition order.
* In particular, most fair synchronizers can define {@code tryAcquire}
* to return {@code false} if {@link #hasQueuedPredecessors} (a method
* specifically designed to be used by fair synchronizers) returns
* {@code true}. Other variations are possible.
*
* <p>Throughput and scalability are generally highest for the
* default barging (also known as <em>greedy</em>,
* <em>renouncement</em>, and <em>convoy-avoidance</em>) strategy.
* While this is not guaranteed to be fair or starvation-free, earlier
* queued threads are allowed to recontend before later queued
* threads, and each recontention has an unbiased chance to succeed
* against incoming threads. Also, while acquires do not
* "spin" in the usual sense, they may perform multiple
* invocations of {@code tryAcquire} interspersed with other
* computations before blocking. This gives most of the benefits of
* spins when exclusive synchronization is only briefly held, without
* most of the liabilities when it isn't. If so desired, you can
* augment this by preceding calls to acquire methods with
* "fast-path" checks, possibly prechecking {@link #hasContended}
* and/or {@link #hasQueuedThreads} to only do so if the synchronizer
* is likely not to be contended.
*
* <p>This class provides an efficient and scalable basis for
* synchronization in part by specializing its range of use to
* synchronizers that can rely on {@code int} state, acquire, and
* release parameters, and an internal FIFO wait queue. When this does
* not suffice, you can build synchronizers from a lower level using
* {@link java.util.concurrent.atomic atomic} classes, your own custom
* {@link java.util.Queue} classes, and {@link LockSupport} blocking
* support.
*
* <h3>Usage Examples</h3>
*
* <p>Here is a non-reentrant mutual exclusion lock class that uses
* the value zero to represent the unlocked state, and one to
* represent the locked state. While a non-reentrant lock
* does not strictly require recording of the current owner
* thread, this class does so anyway to make usage easier to monitor.
* It also supports conditions and exposes
* one of the instrumentation methods:
*
* <pre> {@code
* class Mutex implements Lock, java.io.Serializable {
*
* // Our internal helper class
* private static class Sync extends AbstractQueuedSynchronizer {
* // Reports whether in locked state
* protected boolean isHeldExclusively() {
* return getState() == 1;
* }
*
* // Acquires the lock if state is zero
* public boolean tryAcquire(int acquires) {
* assert acquires == 1; // Otherwise unused
* if (compareAndSetState(0, 1)) {
* setExclusiveOwnerThread(Thread.currentThread());
* return true;
* }
* return false;
* }
*
* // Releases the lock by setting state to zero
* protected boolean tryRelease(int releases) {
* assert releases == 1; // Otherwise unused
* if (getState() == 0) throw new IllegalMonitorStateException();
* setExclusiveOwnerThread(null);
* setState(0);
* return true;
* }
*
* // Provides a Condition
* Condition newCondition() { return new ConditionObject(); }
*
* // Deserializes properly
* private void readObject(ObjectInputStream s)
* throws IOException, ClassNotFoundException {
* s.defaultReadObject();
* setState(0); // reset to unlocked state
* }
* }
*
* // The sync object does all the hard work. We just forward to it.
* private final Sync sync = new Sync();
*
* public void lock() { sync.acquire(1); }
* public boolean tryLock() { return sync.tryAcquire(1); }
* public void unlock() { sync.release(1); }
* public Condition newCondition() { return sync.newCondition(); }
* public boolean isLocked() { return sync.isHeldExclusively(); }
* public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
* public void lockInterruptibly() throws InterruptedException {
* sync.acquireInterruptibly(1);
* }
* public boolean tryLock(long timeout, TimeUnit unit)
* throws InterruptedException {
* return sync.tryAcquireNanos(1, unit.toNanos(timeout));
* }
* }}</pre>
*
* <p>Here is a latch class that is like a
* {@link java.util.concurrent.CountDownLatch CountDownLatch}
* except that it only requires a single {@code signal} to
* fire. Because a latch is non-exclusive, it uses the {@code shared}
* acquire and release methods.
*
* <pre> {@code
* class BooleanLatch {
*
* private static class Sync extends AbstractQueuedSynchronizer {
* boolean isSignalled() { return getState() != 0; }
*
* protected int tryAcquireShared(int ignore) {
* return isSignalled() ? 1 : -1;
* }
*
* protected boolean tryReleaseShared(int ignore) {
* setState(1);
* return true;
* }
* }
*
* private final Sync sync = new Sync();
* public boolean isSignalled() { return sync.isSignalled(); }
* public void signal() { sync.releaseShared(1); }
* public void await() throws InterruptedException {
* sync.acquireSharedInterruptibly(1);
* }
* }}</pre>
*
* @since 1.5
* @author Doug Lea
*/
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {
}
了解翻譯如下:
<p>提供了一個實作阻塞和相關同步(如:信号量【juc包中提供了Semaphore】、事件等)的架構,這些依賴于一個先進先出(FIFO)的等待隊列。AQS類的設計的大部分場景是基于一個單個原子{@code int}值,這裡就是指state屬性。模闆方法的子類必須使用protected的方法在内部改變state屬性的值,雖然AQS中定義了一套自己的state表示的含義,但是子類可以自己根據自身業務定義一套【個人了解也學習到了,這個一個非常好的開發方式,自己定義自己的一套狀态含義值,也可以了解為生命周期,Daug Lea在AQS中定義了state,FutureTask中定義的state也規定了生命周期走向,ThreadPoolExecutor中的ctl,高3位表示狀态,低27位表示最大線程數】。給定這些值後,這個類中的其他方法執行都需要排隊和進入阻塞機制,即state 控制這雙向隊列。自己的子類也可以維護其他類似state的狀态字段,但是隻有原子更新使用AQS中定義的state的對應方法{@link #getState}、{@link #setState}、{@link #compareAndSetState}操作才會跟蹤到同步(即才對雙向隊列産生影響)。
<p>子類必須定義非public的子輔助類,來修改其封裝的同步屬性;AQS類本身沒有實作任何的同步接口,相反它定義了{@link #acquireInterruptibly}這樣的方法,具體的鎖和同步器可以調用這些方法來實作其公共的對外方法。
是以,了解了為什麼ReentrantLock【的Sync子類FairSync、NonfairSync】、CountDownLatch、Semaphore、ThreadPoolExecutor都使用final類型内部類實作AbstractQueuedSynchronizer
<p> AQS支援獨占和共享兩種模式(可以值實作一種【并且實作獨占模式,則可以不實作共享模式的對應方法】,也可以同時實作兩種模式【比如ReentrantLock等】)。當一個線程擷取到獨占模式的鎖後,其他試圖擷取該鎖的線程都不會成功,共享模式則運作過個線程都擷取成功。AQS類不知道共享模式時,子類擷取到擷取成功之後意味着什麼,則需要自己去控制(比如:限流器,可以控制同時多少個線程能擷取到)。
<p> AQS内部定義了{@link Condition} (對應的實作類為{@link ConditionObject})接口,可以以獨占的方式調用{@link #isHeldExclusively}來判斷是否自己持有鎖,方法{@link #release}和{@link #acquire}來擷取和釋放鎖,配合{#link #getState}來處理任務,并将雙向隊列恢複到處理前的狀态。當建立了Condition時,需要滿足條件才能進入進入,是以設定條件隊列是需要謹慎,否則就不要使用。{@link ConditionObject}底層同樣是依賴于同步器的實作語義,即依賴雙向隊列實作。
<p> AQS序列化隻存儲底層原子整數維護狀态(即隻序列化state屬性),是以反序列化的雙向隊列為空。如果需要處理,則可以重寫{@code readobject}方法,該方法在反序列化時将其恢複到已知的初始狀态【将反序列化的資料存儲回雙向隊列】。
<h3>使用<h3>
<p> 如果要使用AQS類作為一個基準的同步器,在适當的情況下,可以使用{@link getState}、{@link #setState}、{@link #compareAndSetState}方法【即,自己要開發其他元件的時候,可以使用它】,配合下面的方法:
<ul>
<li> {@link #tryAcquire}
<li> {@link #tryRelease}
<li> {@link #tryAcquireShared}
<li> {@link #tryReleaseShared}
<li> {@link #isHeldExclusively}
</ul>
這些方法預設都可能會抛出UnsupportedOperationException異常,這些方法的實作需要自己去保證内部是線程安全的,并且通過執行時間會非常短并且是非阻塞的。定義這些方法的調用,僅僅是在該類的方法中調用,否則可能不可控【比如:ReentrantLock的實作是UnfairSync,那麼隻能在ReentrantLock的方法才能調用UnfairSync中的調用該部分方法,否則其他類調用時完全不可控】。并且這些方法都應該定義成final類型的,因為他們是不能獨立變化的。比如,我們看看ReentrantLock中的tryRelease方法:
<p> 還可以使用{@link AbstractOmableSynchronizer}繼承的方法對于跟蹤擁有獨占同步器的線程很有用。鼓勵使用它們——這使得監視和診斷工具能夠幫助使用者确定哪些線程持有鎖。
<p> 雖然AQS基于内部的一個FIFO隊列,但是他不會自動的執行FIFO擷取政策,即需要我們實作類編碼。代碼的推薦示例如下(基本是标準寫法,是以實作的子類會看到很多下面代碼的模式):
<pre>
擷取:
while (!tryAcquire(arg)) {
// <em>enqueue thread if it is not already queued</em>;
// <em>possibly block current thread</em>;
}
釋放:
if (tryRelease(arg)) {
// <em>unblock the first queued thread</em>;
}
</pre>
<p id="barging "> 因為上面的模型,acquire【擷取】是在enqueuing【入隊】操作之前被調用,一個新的擷取線程可能在其他被阻塞隊列之前。但是如果需要,可以通過内部調用一個或者多個檢查使用{@code tryAcquire} and/or {@code tryAcquireShared}之後得到一個完全的FIFO隊列。特别是使用為專門同步器設計的方法{@link #hasQueuedPredecessors}傳回true,而大多數公平鎖時{@link tryAcquire}會傳回false【公平鎖性能較低】。
<p> 相比較于renouncement和convoy-avoidance政策,使用預設的barging模式的吞吐性和并發量是最高的。雖然并不能保證公平和無饑餓,但是允許較早排隊的線程在較晚排隊的線程之前重新競争(實作相對的FIFO機會)。當獨占同步的任務執行時間非常短時,自旋能帶來效果。如果需要,您可以通過在調用之前使用“快速路徑”檢查(可能是預先檢查{@link #hasContended)來增強這一點和或{@link #hasQueuedThreads}隻在同步器可能不競争時這樣做。
<p> AQS作為模闆方法的根類,為子類的擴充提供了基礎。還可以通過AQS的state【狀态】屬性來擷取和釋放,而得到一個FIFO的等待隊列。如果這些還不夠,我們也可以使用juc atomic包下的類 + 自己的Queue子類 + LockSupport的阻塞喚醒機制,來實作自己的同步器。
<h3>使用示例【自己的擴充,對于自己如果做架構擴充應該是受益匪淺】</h3>
擴充示例一:下面是一個不可重入鎖(juc下面我們熟知的都是Reentrant* 相關的可重入鎖),使用0表示解鎖狀态,1表示鎖定狀态。而非重入鎖是否嚴格要求記錄目前線程的持有者
class Mutex implements Lock, java.io.Serializable {
// Our internal helper class
private static class Sync extends AbstractQueuedSynchronizer {
// Reports whether in locked state
protected boolean isHeldExclusively() {
return getState() == 1;
}
// Acquires the lock if state is zero
public boolean tryAcquire(int acquires) {
assert acquires == 1; // Otherwise unused
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// Releases the lock by setting state to zero
protected boolean tryRelease(int releases) {
assert releases == 1; // Otherwise unused
if (getState() == 0) {
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// Provides a Condition
Condition newCondition() {
return new ConditionObject();
}
// Deserializes properly
private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
// The sync object does all the hard work. We just forward to it.
private final Sync sync = new Sync();
public void lock() { sync.acquire(1); }
public boolean tryLock() { return sync.tryAcquire(1); }
public void unlock() { sync.release(1); }
public Condition newCondition() { return sync.newCondition(); }
public boolean isLocked() { return sync.isHeldExclusively(); }
public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
擴充示例二:這是一個類似{@link CountDownLatch}的門闩模型,隻是它隻需要一個{@code signal}就開始。因為闩鎖是非獨占的,是以它使用{@code shared}擷取和釋放方法。
class BooleanLatch {
private static class Sync extends AbstractQueuedSynchronizer {
boolean isSignalled() {
return getState() != 0;
}
protected int tryAcquireShared(int ignore) {
return isSignalled() ? 1 : -1;
}
protected boolean tryReleaseShared(int ignore) {
setState(1);
return true;
}
}
private final Sync sync = new Sync();
public boolean isSignalled() {
return sync.isSignalled();
}
public void signal() {
sync.releaseShared(1);
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
}