天天看點

JDK源碼解析之AbstractQueuedSynchronizer(上)1 整體感覺2 用法3 使用案例

AbstractQueuedSynchronizer 抽象同步隊列簡稱 AQS ,它是實作同步器的基礎元件,

并發包中鎖的底層就是使用 AQS 實作的.

大多數開發者可能永遠不會直接使用AQS ,但是知道其原理對于架構設計還是很有幫助的,而且要了解ReentrantLock、CountDownLatch等進階鎖我們必須搞懂 AQS.

1 整體感覺

1.1 架構圖

JDK源碼解析之AbstractQueuedSynchronizer(上)1 整體感覺2 用法3 使用案例

AQS架構大緻分為五層,自上而下由淺入深,從AQS對外暴露的API到底層基礎資料.

當有自定義同步器接入時,隻需重寫第一層所需要的部分方法即可,不需要關注底層具體的實作流程。當自定義同步器進行加鎖或者解鎖操作時,先經過第一層的API進入AQS内部方法,然後經過第二層進行鎖的擷取,接着對于擷取鎖失敗的流程,進入第三層和第四層的等待隊列處理,而這些處理方式均依賴于第五層的基礎資料提供層。

AQS 本身就是一套鎖的架構,它定義了獲得鎖和釋放鎖的代碼結構,是以如果要建立鎖,隻要繼承 AQS,并實作相應方法即可。

1.2 類設計

該類提供了一種架構,用于實作依賴于先進先出(FIFO)等待隊列的阻塞鎖和相關的同步器(信号量,事件等)。此類的設計旨在為大多數依賴單個原子int值表示 state 的同步器提供切實有用的基礎。子類必須定義更改此 state 的 protected 方法,并定義該 state 對于 acquired 或 released 此對象而言意味着什麼。鑒于這些,此類中的其他方法将執行全局的排隊和阻塞機制。子類可以維護其他狀态字段,但是就同步而言,僅跟蹤使用方法 getState,setState 和 compareAndSetState 操作的原子更新的int值。

子類應定義為用于實作其所在類的同步屬性的非公共内部幫助器類。

子類應定義為用于實作其所在類的同步屬性的非 public 内部輔助類。類AbstractQueuedSynchronizer不實作任何同步接口。 相反,它定義了諸如acquireInterruptible之類的方法,可以通過具體的鎖和相關的同步器适當地調用這些方法來實作其 public 方法。

此類支援預設的排他模式和共享模式:

當以獨占方式進行擷取時,其他線程嘗試進行的擷取将無法成功

由多個線程擷取的共享模式可能(但不一定)成功

該類不了解這些差異,隻是從機制的意義上說,當共享模式擷取成功時,下一個等待線程(如果存在)也必須确定它是否也可以擷取。在不同模式下等待的線程們共享相同的FIFO隊列。 通常,實作的子類僅支援這些模式之一,但也可以同時出現,比如在ReadWriteLock.僅支援排他模式或共享模式的子類無需定義支援未使用模式的方法.

此類定義了一個内嵌的 ConditionObject 類,可由支援獨占模式的子類用作Condition 的實作,該子類的 isHeldExclusively 方法報告相對于目前線程是否獨占同步,使用目前 getState 值調用的方法 release 會完全釋放此對象 ,并獲得給定的此儲存狀态值,最終将該對象恢複為其先前的擷取狀态。否則,沒有AbstractQueuedSynchronizer方***建立這樣的條件,是以,如果無法滿足此限制,請不要使用它。ConditionObject的行為當然取決于其同步器實作的語義。

此類提供了内部隊列的檢查,檢測和監視方法,以及條件對象的類似方法。 可以根據需要使用 AQS 将它們導出到類中以實作其同步機制。

此類的序列化僅存儲基礎原子整數維護狀态,是以反序列化的對象具有空線程隊列。 需要序列化性的典型子類将定義一個readObject方法,該方法在反序列化時将其恢複為已知的初始狀态。

2 用法

要将此類用作同步器的基礎,使用getState setState和/或compareAndSetState檢查和/或修改同步狀态,以重新定義以下方法(如适用)

tryAcquire

tryRelease

tryAcquireShared

tryReleaseShared

isHeldExclusively

預設情況下,這些方法中的每一個都會抛 UnsupportedOperationException。

這些方法的實作必須在内部是線程安全的,并且通常應簡短且不阻塞。 定義這些方法是使用此類的唯一受支援的方法。 所有其他方法都被聲明為final,因為它們不能獨立變化。

從 AQS 繼承的方法對跟蹤擁有排他同步器的線程很有用。 鼓勵使用它們-這将啟用監視和診斷工具,以幫助使用者确定哪些線程持有鎖。

雖然此類基于内部的FIFO隊列,它也不會自動執行FIFO擷取政策。 獨占同步的核心采用以下形式:

  • Acquire
while (!tryAcquire(arg)) {
     如果線程尚未入隊,則将其加入隊列;
     可能阻塞目前線程;
}      
  • Release
if (tryRelease(arg))
    取消阻塞第一個入隊的線程;      

共享模式與此相似,但可能涉及級聯的signal。

acquire 中的檢查是在入隊前被調用,是以新擷取的線程可能會在被阻塞和排隊的其他線程之前插入。但若需要,可以定義tryAcquire、tryAcquireShared以通過内部調用一或多種檢查方法來禁用插入,進而提供公平的FIFO擷取順序。

特别是,若 hasQueuedPredecessors()(公平同步器專門設計的一種方法)傳回true,則大多數公平同步器都可以定義tryAcquire傳回false.

公平與否取決于如下一行代碼:

if (c == 0) {
    if (!hasQueuedPredecessors() &&
        compareAndSetState(0, acquires)) {
        setExclusiveOwnerThread(current);
        return true;
    }
}      

hasQueuedPredecessors

public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    // s代表等待隊列的第一個節點
    Node s;
    // (s = h.next) == null 說明此時有另一個線程正在嘗試成為頭節點,詳見AQS的acquireQueued方法
    // s.thread != Thread.currentThread():此線程不是等待的頭節點
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}      

對于預設的插入(也稱為貪婪,放棄和convoey-avoidance)政策,吞吐量和可伸縮性通常最高。 盡管不能保證這是公平的或避免饑餓,但允許較早排隊的線程在較晚排隊的線程之前進行重新競争,并且每個重新争用都有一次機會可以毫無偏向地成功競争過進入的線程。

同樣,盡管擷取通常無需自旋,但在阻塞前,它們可能會執行tryAcquire的多次調用,并插入其他任務。 如果僅短暫地保持排他同步,則這将帶來自旋的大部分好處,而如果不進行排他同步,則不會帶來很多負擔。 如果需要的話,可以通過在調用之前使用“fast-path”檢查來擷取方法來增強此功能,并可能預先檢查hasContended()和/或hasQueuedThreads(),以便僅在同步器可能不存在争用的情況下這樣做。

此類為同步提供了有效且可擴充的基礎,部分是通過将其使用範圍規範化到可以依賴于int狀态,acquire 和 release 參數以及内部的FIFO等待隊列的同步器。 當這還不夠時,可以使用原子類、自定義隊列類和鎖支援阻塞支援從較低級别建構同步器。

3 使用案例

這裡是一個不可重入的排他鎖,它使用值0表示解鎖狀态,使用值1表示鎖定狀态。雖然不可重入鎖并不嚴格要求記錄目前所有者線程,但是這個類這樣做是為了更容易監視使用情況。它還支援條件,并暴露其中一個檢測方法:

class Mutex implements Lock, java.io.Serializable {

  // 我們内部的輔助類
  private static class Sync extends AbstractQueuedSynchronizer {
    // 報告是否處于鎖定狀态
    protected boolean isHeldExclusively() {
      return getState() == 1;
    }

    // 如果 state 是 0,擷取鎖
    public boolean tryAcquire(int acquires) {
      assert acquires == 1; // Otherwise unused
      if (compareAndSetState(0, 1)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
      }
      return false;
    }

    // 通過将 state 置 0 來釋放鎖
    protected boolean tryRelease(int releases) {
      assert releases == 1; // Otherwise unused
      if (getState() == 0) throw new IllegalMonitorStateException();
      setExclusiveOwnerThread(null);
      setState(0);
      return true;
    }

    //  提供一個 Condition
    Condition newCondition() { return new ConditionObject(); }

    // 反序列化屬性
    private void readObject(ObjectInputStream s)
        throws IOException, ClassNotFoundException {
      s.defaultReadObject();
      setState(0); // 重置到解鎖狀态
    }
  }

  // 同步對象完成所有的工作。我們隻是期待它.
  private final Sync sync = new Sync();

  public void lock()                { sync.acquire(1); }
  public boolean tryLock()          { return sync.tryAcquire(1); }
  public void unlock()              { sync.release(1); }
  public Condition newCondition()   { return sync.newCondition(); }
  public boolean isLocked()         { return sync.isHeldExclusively(); }
  public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
  public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
  }
  public boolean tryLock(long timeout, TimeUnit unit)
      throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
  }
}      

這是一個闩鎖類,它類似于CountDownLatch,隻是它隻需要一個單信号就可以觸發。因為鎖存器是非獨占的,是以它使用共享的擷取和釋放方法。

class BooleanLatch {

   private static class Sync extends AbstractQueuedSynchronizer {
     boolean isSignalled() { return getState() != 0; }

     protected int tryAcquireShared(int ignore) {
       return isSignalled() ? 1 : -1;
     }

     protected boolean tryReleaseShared(int ignore) {
       setState(1);
       return true;
     }
   }

   private final Sync sync = new Sync();
   public boolean isSignalled() { return sync.isSignalled(); }
   public void signal()         { sync.releaseShared(1); }
   public void await() throws InterruptedException {
     sync.acquireSharedInterruptibly(1);
   }
 }