天天看點

Java 并發程式設計解析

作者:會寫代碼的猴子

寫在開頭

在Java領域中, 尤其是在并發程式設計領域,對于多線程并發執行一直有兩大核心問題:同步和互斥。其中:

  • 互斥(Mutual Exclusion):一個公共資源同一時刻隻能被一個程序或線程使用,多個程序或線程不能同時使用公共資源。即就是同一時刻隻允許一個線程通路共享資源的問題。
  • 同步(Synchronization):兩個或兩個以上的程序或線程在運作過程中協同步調,按預定的先後次序運作。即就是線程之間如何通信、協作的問題。

針對對于這兩大核心問題,利用管程是能夠解決和實作的,是以可以說,管程是并發程式設計的萬能鑰匙。

雖然,Java在基于文法層面(synchronized 關鍵字)實作了對管程技術,但是從使用方式和性能上來說,内置鎖(synchronized 關鍵字)的粒度相對過大,不支援逾時和中斷等問題。

為了彌補這些問題,從JDK層面對其“重複造輪子”,在JDK内部對其重新設計和定義,甚至實作了新的特性。

關健術語

Java 并發程式設計解析

本文用到的一些關鍵詞語以及常用術語,主要如下:

  • 信号量(Semaphore): 是在多線程環境下使用的一種設施,是可以用來保證兩個或多個關鍵代碼段不被并發調用,也是作系統用來解決并發中的互斥和同步問題的一種方法。
  • 信号量機制(Semaphores): 用來解決同步/互斥的問題的,它是1965年,荷蘭學者 Dijkstra提出了一種卓有成效的實作程序互斥與同步的方法。
  • 管程(Monitor) : 一般是指管理共享變量以及對共享變量的操作過程,讓它們支援并發的一種機制。

基本概述

在Java領域中,我們可以将鎖大緻分為基于Java文法層面(關鍵詞)實作的鎖和基于JDK層面實作的鎖。

Java 并發程式設計解析

在Java領域中,從JDK源碼分析來看,基于JDK層面實作的鎖大緻主要可以分為以下4種方式:

  • 基于Lock接口實作的鎖
  • 基于ReadWriteLock接口實作的鎖
  • 基于AQS基礎同步器實作的鎖
  • 基于自定義API操作實作的鎖

從閱讀源碼不難發現,在Java SDK 并發包主要通過AbstractQueuedSynchronizer(AQS)實作多線程同步機制的封裝與定義,而通過Lock 和 Condition 兩個接口來實作管程,其中 Lock 用于解決互斥問題,Condition 用于解決同步問題。

一. 基本理論

在并發程式設計領域,有兩大核心問題:一個是互斥,即同一時刻隻允許一個線程通路共享資源;另一個是同步,即線程之間如何通信、協作。

Java 并發程式設計解析

在作業系統中,一般有如果I/O操作時,對于阻塞和非阻塞是從函數調用角度來說的,其中:

  • 阻塞:如果讀寫操作沒有就緒或者完成,則函數一直等待。
  • 非阻塞: 函數立即調用,然後讓應用程式輪詢循環。

而同步和異步則是從“讀寫是主要是由誰完成”的角度來說的,其中:

  • 同步: 讀寫操作主要交給應用程式完成
  • 異步: 讀寫操作主要由作業系統完成,一般完成之後,回調函數和事件通知應用程式。

其中,信号量機制(Semaphores)是用來解決同步/互斥的問題的,但是信号量(Semaphore)的操作分散在各個程序或線程中,不友善進行管理,因每次需調用P/V(來自荷蘭語 proberen和 verhogen)操作,還可能導緻死鎖或破壞互斥請求的問題。

由于PV操作對于解決程序互斥/同步程式設計複雜,因而在此基礎上提出了與信号量等價的——“管程技術”。

其中,管程(Monitor)當中定義了共享資料結構隻能被管程内部定義的函數所修改,是以如果我們想修改管程内部的共享資料結構的話,隻能調用管程内部提供的函數來間接的修改這些資料結構。

一般來說,管程(Monitor)和信号量(Semaphore)是等價的,所謂等價指的是用管程能夠實作信号量,也能用信号量實作管程。

在管程的發展曆程上,先後出現過Hasen模型、Hoare模型和MESA模型等三種不同的管程模型,現在正在廣泛使用的是MESA模型。

在MESA模型中,管程中引入了條件變量(Conditional Variable)的概念,而且每個條件變量都對應有一個等待隊列(Wait Queue)。其中,條件變量和等待隊列的作用是解決線程之間的同步問題。

而對于解決線程之間的互斥問題,将共享變量(Shared Variable)及其對共享變量的操作統一封裝起來,一般主要是實作一個線程安全的阻塞隊列(Blocking Queue),将線程不安全的隊列封裝起來,對外提供線程安全的操作方法,例如入隊操作(Enqueue)和出隊操作(Dequeue)。

在Java領域中,對于Java文法層面實作的鎖(synchronized 關鍵字), 其實就是參考了 MESA 模型,并且對 MESA 模型進行了精簡,一般在MESA 模型中,條件變量可以有多個,Java 語言内置的管程(synchronized)裡隻有一個條件變量。

這就意味着,被synchronized 關鍵字修飾的代碼塊或者直接标記靜态方法以及執行個體方法,在編譯期會自動生成相關加鎖(lock)和解鎖(unlock)的代碼,即就是monitorenter和monitorexit指令。

對于synchronized 關鍵字來說,主要是在Java HotSpot(TM) VM 虛拟機通過Monitor(螢幕)來實作monitorenter和monitorexit指令的。

同時,在Java HotSpot(TM) VM 虛拟機中,每個對象都會有一個螢幕,螢幕和對象一起建立、銷毀。

螢幕相當于一個用來監視這些線程進入的特殊房間,其義務是保證(同一時間)隻有一個線程可以通路被保護的臨界區代碼塊。

本質上,螢幕是一種同步工具,也可以說是JVM對管程的同步機制的封裝實作,主要特點是:

  • 同步:螢幕所保護的臨界區代碼是互斥地執行的。一個螢幕是一個運作許可,任一線程進入臨界區代碼都需要獲得這個許可,離開時把許可歸還。
  • 協作:螢幕提供Signal機制,允許正持有許可的線程暫時放棄許可進入阻塞等待狀态,等待其他線程發送Signal去喚醒;其他擁有許可的線程可以發送Signal,喚醒正在阻塞等待的線程,讓它可以重新獲得許可并啟動執行。

在Hotspot虛拟機中,螢幕是由C++類ObjectMonitor實作的,ObjectMonitor類定義在ObjectMonitor.hpp檔案中,其中:

Java 并發程式設計解析
  • Owner: 指向的線程即為獲得鎖的線程
  • Cxq:競争隊列(Contention Queue),所有請求鎖的線程首先被放在這個競争隊列中
  • EntryList:對象實體清單,表示Cxq中那些有資格成為候選資源的線程被移動到EntryList中。
  • WaitSet:類似于等待隊列,某個擁有ObjectMonitor的線程在調用Object.wait()方法之後将被阻塞,然後該線程将被放置在WaitSet連結清單中。

同時,管程與Java中面向對象原則(Object Oriented Principle)也是非常契合的,主要展現在 java.lang.Object類中wait()、notify()、notifyAll() 這三個方法,其中:

Java 并發程式設計解析
  • wait()方法: 阻塞線程并且進入等待隊列
  • notify()方法:随機地通知等待隊列中的一個線程
  • notifyAll()方法: 通知等待隊列中的所有線程

不難發現,在Java中synchronized 關鍵字及 java.lang.Object類中wait()、notify()、notifyAll() 這三個方法都是管程的組成部分。

由此可見,我們可以得到一個比較通用的并發同步工具基礎模型,大緻包含如下幾個内容,其中:

Java 并發程式設計解析
  • 條件變量(Conditional Variable): 利用線程間共享的變量進行同步的一種工作機制
  • 共享變量((Shared Variable)):一般指對象實體對象的成員變量和屬性
  • 阻塞隊列(Blocking Queue):共享變量(Shared Variable)及其對共享變量的操作統一封裝
  • 等待隊列(Wait Queue):每個條件變量都對應有一個等待隊列(Wait Queue),内部需要實作入隊操作(Enqueue)和出隊操作(Dequeue)方法
  • 變量狀态描述機(Synchronization Status):描述條件變量和共享變量之間狀态變化,又可以稱其為同步狀态
  • 工作模式(Operation Mode): 線程資源具有排他性,是以定義獨占模式和共享模式兩種工作模式

綜上所述,條件變量和等待隊列的作用是解決線程之間的同步問題;共享變量與阻塞隊列的作用是解決線程之間的互斥問題。

二.AQS基礎同步器的設計與實作

在Java領域中,同步器是專門為多線程并發設計的同步機制,主要是多線程并發執行時線程之間通過某種共享狀态來實作同步,隻有當狀态滿足這種條件時線程才往下執行的一種同步機制。

Java 并發程式設計解析

對于多線程實作實作并發處理機制來說,一直以來,多線程都存在2個問題:

  • 線程之間記憶體共享,需要通過加鎖進行控制,但是加鎖會導緻性能下降,同時複雜的加鎖機制也會增加程式設計編碼難度
  • 過多線程造成線程之間的上下文切換,導緻效率低下

是以,在并發程式設計領域中,一直有一個很重要的設計原則: “ 不要通過記憶體共享來實作通信,而應該通過通信來實作記憶體共享。”

簡單來說,就是盡可能通過消息通信,而不是記憶體共享來實作程序或者線程之間的同步。

其中,同步器是專門為多線程并發設計的同步機制,主要是多線程并發執行時線程之間通過某種共享狀态來實作同步,隻有當狀态滿足這種條件時線程才往下執行的一種同步機制。

由于在不同的應用場景中,對于同步器的需求也會有所不同,一般在我們自己去實作和設計一種并發工具的時候,都需會考慮以下幾個問題:

  • 是否支援響應中斷? 如果阻塞狀态的線程能夠響應中斷信号,也就是說當我們給阻塞的線程發送中斷信号的時候,能夠喚醒它,那它就有機會釋放曾經持有的鎖。
  • 是否支援逾時?如果線程在一段時間之内沒有擷取到鎖,不是進入阻塞狀态,而是傳回一個錯誤,那這個線程也有機會釋放曾經持有的鎖。
  • 是否支援非阻塞地擷取鎖資源 ? 如果嘗試擷取鎖失敗,并不進入阻塞狀态,而是直接傳回,那這個線程也有機會釋放曾經持有的鎖。

從閱讀JDK源碼不難發現,主要是采用設計模式中模闆模式的原則,JDK将各種同步器中相同的部分抽象封裝成了一個統一的基礎同步器,然後基于基礎同步器為模闆通過繼承的方式來實作不同的同步器。

也就是說,在實際開發過程中,除了直接使用JDK實作的同步器,還可以基于這個基礎同步器我們也可以自己自定義實作符合我們業務需求的同步器。

在JDK源碼中,同步器位于java.util.concurrent.locks包下,其基本定義是AbstractQueuedSynchronizer類,即就是我們常說的AQS同步器。

1. 設計思想

一個标準的AQS同步器主要有同步狀态機制,等待隊列,條件隊列,獨占模式,共享模式等五大核心要素組成。

Java 并發程式設計解析

JDK的JUC(java.util.concurrent.)包中提供了各種并發工具,但是大部分同步工具的實作基于AbstractQueuedSynchronizer類實作,其内部結構主要如下:

  • 同步狀态機制(Synchronization Status):主要用于實作鎖(Lock)機制,是指同步狀态,其要求對于狀态的更新必須原子性的
  • 等待隊列(Wait Queue):主要用于存放等待線程擷取到的鎖資源,并且把線程維護到一個Node(節點)裡面和維護一個非阻塞的CHL Node FIFO(先進先出)隊列,主要是采用自旋鎖+CAS操作來保證節點插入和移除的原子性操作。
  • 條件隊列(Condition Queue):用于實作鎖的條件機制,一般主要是指替換“等待-通知”工作機制,主要是通過ConditionObject對象實作Condition接口提供的方法實作。
  • 獨占模式(Exclusive Mode):主要用于實作獨占鎖,主要是基于靜态内部類Node的常量标志EXCLUSIVE來辨別該節點是獨占模式
  • 共享模式(Shared Mode):主要用于實作共享鎖,主要是基于靜态内部類Node的常量标志SHARED來辨別該節點是共享模式

其中,對于AbstractQueuedSynchronizer類的實作原理,我們可以從如下幾個方面來看:

public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {

    private static final long serialVersionUID = 7373984972572414691 L;


    protected AbstractQueuedSynchronizer() {}

    /**
     * 等待隊列: head-頭節點
     */
    private transient volatile Node head;

    /**
     * 等待隊列: tail-尾節點
     */
    private transient volatile Node tail;

    /**
     * 同步狀态:32位整數類型,更新同步狀态(state)時必須保證其是原子性的
     */
    private volatile int state;

    /**
     * 自旋鎖消耗逾時時間閥值(threshold): threshold < 1000ns時,表示競争時選擇自旋;threshold > 1000ns時,表示競争時選擇系統阻塞
     */
    static final long spinForTimeoutThreshold = 1000 L;

    /**
     * CAS原子性操作
     */
    private static final Unsafe unsafe = Unsafe.getUnsafe();

    /**
     * stateOffset
     */
    private static final long stateOffset;

    /**
     * headOffset
     */
    private static final long headOffset;

    /**
     * tailOffset
     */
    private static final long tailOffset;

    /**
     * waitStatusOffset
     */
    private static final long waitStatusOffset;

    /**
     * nextOffset
     */
    private static final long nextOffset;


    static {
        try {
            stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));

        } catch (Exception ex) {
            throw new Error(ex);
        }
    }
        
            private final boolean compareAndSetHead(Node update)  {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }
        
            private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }
        
            private static final boolean compareAndSetWaitStatus(Node node,
                                                         int expect,
                                                         int update) {
        return unsafe.compareAndSwapInt(node, waitStatusOffset,
                                        expect, update);
    }
        
            private static final boolean compareAndSetNext(Node node,
                                                   Node expect,
                                                   Node update) {
        return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
    }
        
            protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

}           

[1]. AbstractQueuedSynchronizer類的實作原理是繼承了基于AbstractOwnableSynchronizer類的抽象類,其中主要對AQS同步器的通用特性和方法進行抽象封裝定義,主要包括如下方法:

public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {


    private static final long serialVersionUID = 3737899427754241961 L;


    protected AbstractOwnableSynchronizer() {}

    /**
     *  同步器擁有者
     */
    private transient Thread exclusiveOwnerThread;

    /**
     * 設定同步器擁有者:把線程當作參數傳入,指定某個線程為獨享
     */
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

    /**
     * 擷取同步器擁有者:擷取指定的某個線程
     */
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}           
  • setExclusiveOwnerThread(Thread thread)方法: 把某個線程作為參數傳入,進而設定AQS同步器的所有者,即就是我們設定的某個線程
  • getExclusiveOwnerThread()方法: 擷取目前AQS同步器的所有者,即就是我們指定的某個線程

[2]. 對于同步狀态(state),其類型是32位整數類型,并且是被volatile修飾的,表示在更新同步狀态(state)時必須保證其是原子性的。

[3]. 對于等待隊列的結構,主要是在Node定義了head和tail變量,其中head表示頭部節點,tail表示尾部節點

[4].對于等待隊列的結構提到的Node類來說,主要内容如下:

static final class Node {
      /** Marker to indicate a node is waiting in shared mode */
      static final Node SHARED = new Node();
            
      /** Marker to indicate a node is waiting in exclusive mode */
      static final Node EXCLUSIVE = null;

      /** waitStatus value to indicate thread has cancelled */
      static final int CANCELLED = 1;
            
      /** waitStatus value to indicate successor's thread needs unparking */
      static final int SIGNAL = -1;
            
      /** waitStatus value to indicate thread is waiting on condition */
      static final int CONDITION = -2;
            
      /**
       * waitStatus value to indicate the next acquireShared should
       * unconditionally propagate
       */
      static final int PROPAGATE = -3;

      /**
       * Status field, taking on only the values:
       *   SIGNAL:     The successor of this node is (or will soon be)
       *               blocked (via park), so the current node must
       *               unpark its successor when it releases or
       *               cancels. To avoid races, acquire methods must
       *               first indicate they need a signal,
       *               then retry the atomic acquire, and then,
       *               on failure, block.
       *   CANCELLED:  This node is cancelled due to timeout or interrupt.
       *               Nodes never leave this state. In particular,
       *               a thread with cancelled node never again blocks.
       *   CONDITION:  This node is currently on a condition queue.
       *               It will not be used as a sync queue node
       *               until transferred, at which time the status
       *               will be set to 0. (Use of this value here has
       *               nothing to do with the other uses of the
       *               field, but simplifies mechanics.)
       *   PROPAGATE:  A releaseShared should be propagated to other
       *               nodes. This is set (for head node only) in
       *               doReleaseShared to ensure propagation
       *               continues, even if other operations have
       *               since intervened.
       *   0:          None of the above
       *
       *
       * The field is initialized to 0 for normal sync nodes, and
       * CONDITION for condition nodes.  It is modified using CAS
       * (or when possible, unconditional volatile writes).
       */
      volatile int waitStatus;

      /**
       * Link to predecessor node that current node/thread relies on
       */
      volatile Node prev;

      /**
       * Link to the successor node that the current node/thread
       */
      volatile Node next;

      /**
       * The thread that enqueued this node.  Initialized on
       * construction and nulled out after use.
       */
      volatile Thread thread;

      /**
       * Link to next node waiting on condition, or the special
       */
      Node nextWaiter;

      /**
       * Returns true if node is waiting in shared mode.
       */
      final boolean isShared() {
          return nextWaiter == SHARED;
      }


      final Node predecessor() throws NullPointerException {
          Node p = prev;
          if (p == null)
              throw new NullPointerException();
          else
              return p;
      }

      Node() { // Used to establish initial head or SHARED marker
      }

      Node(Thread thread, Node mode) { // Used by addWaiter
          this.nextWaiter = mode;
          this.thread = thread;
      }

      Node(Thread thread, int waitStatus) { // Used by Condition
          this.waitStatus = waitStatus;
          this.thread = thread;
      }
  }           
  • 标記Node的工作模式常量标記:主要維護了SHARED和EXCLUSIVE等2個靜态字面常量,其中 SHARED 用于标記Node中是共享模式,EXCLUSIVE:用于标記Node中是獨享模式
  • 标記等待狀态的靜态字面常量标記: 主要維護了0(表示無狀态),SIGNAL(-1,表示後續節點中的線程通過park進入等待,目前節點在釋放和取消時,需要通過unpark解除後後續節點的等待),CANCELLED(1,表示目前節點中的線程因為逾時和中斷被取消),CONDITION(-2,表示目前節點在條件隊列中),PROPAGATE(-3,SHARED共享模式的頭節點描述狀态,表示無條件往下傳播)等5個靜态字面常量
  • 維護了一個等待狀态(waitStatus): 主要用于描述等待隊列中節點的狀态,其取值範圍為0(waitStatus=0,表示無狀态),SIGNAL(waitStatus=-1,表示等待信号狀态),CANCELLED(waitStatus=1,表示取消狀态),CONDITION(waitStatus=-2,表示條件狀态),PROPAGATE(waitStatus=-3,表示SHARED共享模式狀态)等5個靜态字面常量,CAS操作時寫入,預設值為0。
  • 維護了Node的2個結構節點變量: 主要是prev和next,其中,prev表示前驅節點,next表示後續節點,表示構成雙向向連結清單,構成了等待隊列的資料結構
  • 維護了一個狀态工作模式标記: 主要是維護了一個nextWaiter,用于表示在等待隊列中目前節點在是共享模式還是獨享模式,而對于條件隊列來說,用于組成單向連結清單結構
  • 維護了一個線程對象變量: 主要用于記錄目前節點中的線程thread

[5].對于自旋鎖消耗逾時時間閥值(spinForTimeoutThreshold),主要表示系統依據這個閥值來選擇自旋方式還是系統阻塞。一般假設這個threshold,當 threshold < 1000ns時,表示競争時選擇自旋;否則,當threshold > 1000ns時,表示競争時選擇系統阻塞

[6].對于帶有Offset 等變量對應各自的句柄,主要用于執行CAS操作。在JDK1.8版本之前,CAS操作主要通過Unsafe類來說實作;在JDK1.8版本之後,已經開始利用VarHandle來替代Unsafe類操作實作。

[7].對于CAS操作來說,主要提供了如下幾個方法:

  • compareAndSetState(int expect, int update)方法:CAS操作原子更新狀态
  • compareAndSetHead(Node update)方法:CAS操作原子更新頭部節點
  • compareAndSetTail(Node expect, Node update)方法:CAS操作原子更新尾部節點
  • compareAndSetWaitStatus(Node node, int expect,int update)方法:CAS操作原子更新等待狀态
  • compareAndSetNext(Node node,Node expect,Node update)方法:CAS操作原子更新後續節點

[8].對于條件隊列(ConditionObject)來說,主要内容如下:

public class ConditionObject implements Condition, java.io.Serializable {

      private static final long serialVersionUID = 1173984872572414699 L;

      /** First node of condition queue. */
      private transient Node firstWaiter;

      /** Last node of condition queue. */
      private transient Node lastWaiter;

      /** Mode meaning to reinterrupt on exit from wait */
      private static final int REINTERRUPT = 1;

      /** Mode meaning to throw InterruptedException on exit from wait */
      private static final int THROW_IE = -1;

      /**
       * Creates a new {@code ConditionObject} instance.
       */
      public ConditionObject() {}
            
  }           
  • 基于Condition的接口實作條件隊列,其核心主要是實作阻塞和喚醒的工作機制
  • 基于Node定義了firstWaiter和lastWaiter變量,其中,firstWaiter表示的是頭節點,lastWaiter是尾節點
  • 還定義了2個字面常量REINTERRUPT和THROW_IE,其中REINTERRUPT=1,描述的是當中斷是退出條件隊列,THROW_IE=-1表示的是發生異常時退出

[8].除此之外,在AQS基礎同步器中,一般可以通過構造方法直接将參數值賦給對應變量,也可以通過變量句柄進行指派操作:

  • isShared()方法: 用于判斷等待隊列是否為共享模式
  • predecessor()方法: 用于擷取目前節點對應的前驅節點,如果為空,則 throw new NullPointerException();

2. 基本實作

一個标準的AQS同步器最核心底層設計實作是一個非阻塞的CHL Node FIFO(先進先出)隊列資料結構,通過采用自旋鎖+CAS操作的方法來保證原子性操作。

Java 并發程式設計解析

總的來說,一個AQS基礎同步器,底層的資料結構采用的是一個非阻塞的CHL Node FIFO(先進先出)隊列資料結構,而實作的核心算法則是采用自旋鎖+CAS操作的方法。

首先,對于非阻塞的CHL Node FIFO(先進先出)隊列資料結構,一般來說,FIFO(First In First Out,先進先出)隊列是一個有序清單,屬于抽象型資料類型(Abstract Data Type,ADT),所有的插入和删除操作都發生在隊首(Front)和隊尾(Rear)兩端,具有先進先出的特性。

/**
     * 等待隊列: head-頭節點
     */
    private transient volatile Node head;

    /**
     * 等待隊列: tail-尾節點
     */
    private transient volatile Node tail;
        
    /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

    /**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }           

在AQS同步器的源碼中,主要是通過靜态内部類Node來實作的這個非阻塞的CHL Node FIFO(先進先出)隊列資料結構, 維護了兩個變量head和tail,其中head對應隊首(Front),tail對應隊尾(Rear)。同時,還定義了addWaiter(Node mode)方法來表示入隊操作,其中有個enq(final Node node)方法,主要用于初始化隊列中head和tail的設定。

其次,AQS同步器以CLH鎖為基礎,其中CLH鎖是一種自旋鎖,對于自旋鎖的實作方式來看,主要可以分為普通自旋鎖和自适應自旋鎖,CLH鎖和MCS鎖等4種,其中:

  • 普通自旋鎖:多個線程不斷自旋,不斷嘗試擷取鎖,其不具備公平性和由于要保證CPU和緩存以及主存之間的資料一緻性,其開銷較大。
  • 自适應自旋鎖:主要是為解決普通自旋鎖的公平性問題,引入了一個排隊機制,一般稱為排他自旋鎖,其具備公平性,但是沒有解決保證CPU和緩存以及主存之間的資料一緻性問題,其開銷較大。
  • CLH鎖:通過一定手段将線程對于某一個共享變量的輪詢競争轉化為一個線程隊列,且隊列中的線程各自輪詢自己本地變量。
  • MCS鎖:主旨在于解決 CLH鎖的問題,也是基于FIFO隊列,與CLH鎖不同是,隻對本地變量自旋,前驅節點負責通知MCS鎖中線程自适結束。

自旋鎖是一種實作同步的方案,屬于一種非阻塞鎖,與正常鎖主要的差別就在于擷取鎖失敗之後的處理方式不同,主要展現在:

  • 一般情況下,正常鎖在擷取鎖失敗之後,會将線程阻塞并适當時重新喚醒
  • 而自旋鎖則是使用自旋來替換阻塞操作,主要是線程會不斷循環檢查該鎖是否被釋放,一旦釋放線程便會擷取鎖資源。

從本質上講,自旋是一鐘忙等待狀态,會一直消耗CPU的執行時間。一般情況下,正常互斥鎖适用于持有鎖長時間的情況,自旋鎖适合持有時間短的情況。

其中,對于CLH鎖來說,其核心是為解決同步帶來的花銷問題,Craig,Landim,Hagersten三人發明了CLH鎖,其中主要是:

  • 建構一個FIFO(先進先出)隊列,建構時主要通過移動尾部節點tail來實作隊列的排隊,每個想獲得鎖的線程都會建立一個新節點(next)并通過CAS操作原子操作将新節點賦予給tail,目前線程輪詢前一個節點的狀态。
  • 執行完線程後,隻需将目前線程對應節點狀态設定為解鎖即可,主要是判斷目前節點是否為尾部節點,如果是直接設定尾部節點設定為空。由于下一個節點一直在輪詢,是以可以獲得鎖。

CLH鎖将衆多線程長時間對資源的競争,通過有序化這些線程将其轉化為隻需要對本地變量檢測。唯一存在競争的地方就是入隊之前對尾部節點tail 的競争,相對來說,目前線程對資源的競争次數減少,這節省了CPU緩存同步的消耗,進而提升了系統性能。

但是同時也有一個問題,CLH鎖雖然解決了大量線程同時操作同一個變量時帶來的開銷問題,如果前驅節點和目前節點在本地主存中不存在,則通路時間過長,也會引起性能問題。

為了讓CLH鎖更容易實作取消和逾時的功能,AQS同步器在設計時進行了改造,主要展現在:節點的結構和節點等待機制。其中:

  • 節點的結構: 主要引入了頭節點和尾節點,分别指向隊列頭部和尾部,對于鎖的相關操作都與其息息相關,并且每個節點都引入了前驅節點和後繼節點。
  • 節點等待機制: 主要在原來的自旋基礎上增加了系統阻塞喚醒,主要展現在 自旋鎖消耗逾時時間閥值(threshold): threshold < 1000ns時,表示競争時選擇自旋;threshold > 1000ns時,表示競争時選擇系統阻塞。

由此可見,主要是通過前驅節點和後繼節點的引用連接配接起來形成一個連結清單隊列,其中對于入隊,檢測節點,出隊,判斷逾時,取消節點等操作主要如下:

  • 入隊(enqueue): 主要采用一個無限循環進行CAS操作,即就是使用自旋方式競争直到成功。
  • 檢測節點(checkedPrev): 一般在入隊完成後,主要是檢測判斷目前節點的前驅節點是否為頭節點, 一般自旋方式是直接進入循環檢測,而系統阻塞方式是目前線程先檢測,其中如果是頭節點并成功擷取鎖,則直接傳回,目前線程不阻塞,否則對目前線程進行阻塞。
  • 出隊(dequeue):主要負責喚醒等待隊列中的後繼節點,并且按照條件往下傳播有序執行
  • 判斷逾時(checkedTimeout): 隊列中等待鎖的線程可能因為中斷或者逾時的情況,當總耗時大于等于自定義耗時就直接傳回,即就是
  • 取消節點(cancel): 主要是對于中斷和逾時而涉及到取消操作,而且這樣的情況不再參與鎖競争,即就是一般通過調用compareAndSetNext(Node node, Node expect,Node update)來進行CAS操作。

特别值得注意的是,AQS基礎同步器中主要有等待隊列和條件隊列兩種對列結構,對比便不難發現:等待隊列采用的底層資料結構是雙向連結清單結構,而對于條件隊列則是單向連結清單結構。

最後,AQS同步器中使用了CAS操作,其中CAS(Compare And Swap,比較并交換)操作時一種樂觀鎖政策,主要涉及三個操作資料:記憶體值,預期值,新值,主要是指當且僅當預期值和記憶體值相等時才去修改記憶體值為新值。

一般來說,CAS操作的具體邏輯,主要可以分為三個步驟:

  • 首先,檢查某個記憶體值是否與該線程之前取到值一樣。
  • 其次,如果不一樣,表示此記憶體值已經被别的線程修改,需要舍棄本次操作。
  • 最後,如果時一樣,表示期間沒有線程更改過,則需要用新值執行更新記憶體值。

除此之外,需要注意的是CAS操作具有原子性,主要是由CPU硬體指令來保證,并且通過Java本地接口(Java Native Interface,JNI)調用本地硬體指令實作。

當然,CAS操作避免了悲觀政策獨占對象的 問題,同時提高了并發性能,但是也有以下三個問題:

  • 樂觀政策隻能保證一個共享變量的原子操作,如果是多個變量,CAS便不如互斥鎖,主要是CAS操作的局限所緻。
  • 長時間循環操作可能導緻開銷過大。
  • 經典的ABA問題: 主要是檢查某個記憶體值是否與該線程之前取到值一樣,這個判斷邏輯不嚴謹。解決ABA問題的核心在于,引入版本号,每次更新變量值更新版本号。

而在AQS同步器中,為了保證并發實作保證原子性,而且是硬體級别的原子性,一般是通過JNI(Java native interface,Java 本地接口)方式讓Java代碼調用C/C++本地代碼。

通過分析源碼可知,一般使用Unsafe類需要隻用關注如下方法即可:

public final class Unsafe {

    private static final Unsafe theUnsafe;

    private static native void registerNatives();

    private Unsafe() {}

    @CallerSensitive
    public static Unsafe getUnsafe() {
        Class var0 = Reflection.getCallerClass();
        if (!VM.isSystemDomainLoader(var0.getClassLoader())) {
            throw new SecurityException("Unsafe");
        } else {
            return theUnsafe;
        }
    }

    public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);

    public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

    public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);

    public native void unpark(Object var1);

    public native void park(boolean var1, long var2);

}           
  • rgisterNatives()方法:是一個靜态 方法,主要用于注冊本地方法
  • 構造函數是private私有化的,一般無法通過構造函數來執行個體化Unsafe對象
  • getUnsafe()方法:是用來擷取Unsafe對象的,雖然是公有化的,但是如果Java語言開發層面的對進行安全檢查

一般地,由于Unsafe類的操作涉及到硬體底層的操作,JDK對其執行個體化做了安全校驗,隻有受系統信任的代碼才對其執行個體化,主要是通過類加載器來解析,其執行個體化方式主要有如下方式:

  • 第一種:直接調用該方法,主要新式是Unsafe.getUnsafe()。但是,對于我們實際開發來說,這種方式無法通過安全校驗行不通,系統會抛出 throw new SecurityException("Unsafe")資訊
  • 第二種:通過反射機制繞過安全檢查,主要是修改Unsafe類中theUnsafe字段的通路權限,讓其能被通路進而達到擷取Unsafe對象的目的

需要注意的是,在Java領域中,對于CAS操作實作,主要有兩點問題:

  • JDK1.8版本之前,CAS操作主要使用Unsafe類來執行底層操作,一般并發和線程操作時,主要用compareAndSwapObject,compareAndSwapInt,compareAndSwapLong等來實作CAS,而對于線程排程主要是park和unpark方法,其主要在sun.misc包下面。
  • JDK1.8版本之後,JDK1.9的CAS操作主要使用VarHandle類,隻是用VarHandle替代了一部分Unsafe類的操作,但是對于新版本中Unsafe,本質上Unsafe類會間接調用jdk.internal.misc包下面Unsafe類來實作。

3. 具體實作

在Java領域中,AQS同步器利用獨享模式和共享模式來實作同步機制,主要為解決多線并發執行中資料競争和競争條件問題。

Java 并發程式設計解析

為解決多線并發執行中資料競争和競争條件問題,引入了同步機制,主要是通過控制共享資料和臨界區的通路,一般比較通用的方式是通過鎖機制來實作。

在Java領域中,JDK對于AQS基礎同步器抽象封裝了鎖的擷取和釋放操作,主要提供了獨享和共享兩種工作模式:

  • 獨享模式(Exclusive Mode) :對應着獨享鎖(Exclusive Lock),表示着對于鎖的擷取和釋放,一次隻能至多一個或者隻有一個線程把持,其他線程無法獲得并獲得持有,必須等待持有線程釋放鎖。
  • 共享模式(Shared Mode) :對應着共享鎖(Shared Lock),表示着對于鎖的擷取和釋放,一次可以至少一個或者允許多個線程把持,其他線程可以獲得并獲得持有,不用等待持有線程釋放鎖。

其中,AQS基礎同步器對于獨享模式和共享模式的工作模式的基本流程,主要如下:

  • 擷取鎖流程: 先嘗試擷取鎖,如果擷取成功則往下繼續進行,否則把線程維護到等待隊列中,線程可能會挂起。
  • 釋放鎖流程:喚醒等待隊列中的一個或者多個線程去嘗試擷取需要釋放的鎖。

一般地,AQS基礎同步器對于獨享模式和共享模式的封裝和實作,其中:

3.1. 獨享模式的技術實作

[1].擷取鎖操作相關的核心邏輯,主要如下:

public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {

    /**
     * 獨占模式:[1].通過acquire擷取鎖操作
     */

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }


    /**
     * 獨占模式:[2].通過tryAcquire嘗試擷取鎖操作
     */
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }



    /**
     * 獨占模式:[3].通過addWaiter入隊操作
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

    /**
     * 獨占模式:[4].通過acquireQueued檢測節點
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    /**
     * 獨占模式:[5].通過cancelAcquire取消鎖擷取
     */
    private void cancelAcquire(Node node) {

        if (node == null)
            return;

        node.thread = null;


        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;


        Node predNext = pred.next;


        node.waitStatus = Node.CANCELLED;


        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {

            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                    (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);
            }

            node.next = node; 
        }
    }


}           

[2]. 釋放鎖操作相關的核心邏輯,主要如下:

public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {


    /**
     * 獨占模式:[1].通過release釋放鎖操作
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    /**
     * 獨占模式:[2].通過tryRelease嘗試釋放鎖操作
     */
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

    /**
     * 獨占模式:[3].通過unparkSuccessor喚醒後繼節點
     */
    private void unparkSuccessor(Node node) {

        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);


        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

}           

由此可見,對于獨占模式的鎖擷取和釋放,主要是依據acquire和release等方法來實作。

3.1. 共享模式的技術實作

[1].擷取鎖操作相關的核心邏輯,主要如下:

public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {

    /**
     * 共享模式:[1].通過acquireShared擷取鎖操作
     */

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

    /**
     * 共享模式:[2].通過tryAcquireShared嘗試擷取鎖操作
     */
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }

    /**
     * 共享模式:[3].通過doAcquireShared入隊操作
     */
    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

}           

[2].釋放鎖操作相關的核心邏輯,主要如下:

public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {

    /**
     * 共享模式:[1].通過releaseShared釋放鎖操作
     */

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

    /**
     * 共享模式:[2].通過tryReleaseShared嘗試釋放鎖操作
     */


    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }

    /**
     * 共享模式:[3].通過doReleaseShared釋放鎖就緒操作
     */
    private void doReleaseShared() {

        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;
                    unparkSuccessor(h);
                } else if (ws == 0 &&
                    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;
            }
            if (h == head)
                break;
        }
    }
        
}           

由此可見,對于共享模式的鎖擷取和釋放,主要是依據acquireShared和releaseShared等方法來實作。

Java 并發程式設計解析

綜上所述,不難看出,AQS同步器設計思想是通過繼承的方式提供一個模闆,其核心原理是管理一個共享狀态,通過對狀态的控制來實作不同的控制。

二. LockSupport的設計與實作

在Java領域中,LockSupport主要從線程資源角度為同步器和鎖提供基本線程阻塞和喚醒原語,是“等待-通知”工作機制的實作。

Java 并發程式設計解析

一般來說,當一個線程(Thread)隻要參與鎖競争時,其經曆的主要流程有:

  • 一旦目前線程進行鎖競争時,線程都會嘗試擷取鎖,根據擷取鎖的情況進行後續處理。
  • 如果擷取鎖失敗,則會建立節點插入到隊列的尾部,會二次嘗試重新擷取鎖,并不會阻塞目前線程。
  • 如果擷取鎖成功,則直接傳回,否則會将節點設定為待運作狀态(SIGNAL)。
  • 最後對目前線程進行阻塞,目前驅節點運作完成後會喚醒後繼節點。

在Java領域中,對于線程的阻塞和喚醒,也許我們最早在學習面向對象原則的時候,一般都使用java.lang.Object類中wait()、notify()、notifyAll() 這三個方法,可以用它們幫助我們實作等待-通知”工作機制。

同時,在講解AQS基礎同步器的實作時,提到說CAS操作的核心是使用Unsafe類來執行底層操作,對于線程排程主要是park和unpark方法,但是一般的Java語言層 main開發對其調用又有安全檢查的限制。

但是,在AQS基礎同步的的阻塞和喚醒操作咋在擷取鎖餓的鎖操作中需要使用,一般地:

  • 如果擷取不到鎖的目前線程在進入排到隊列之後需要阻塞目前線程。
  • 并且,排到隊列中前驅節點運作完成後,需要負責喚醒後繼節點。

于是,在AQS基礎同步器的設計與實作中,封裝一個專門用于實作“等待-通知”工作機制的LockSupport類。

對于LockSupport類,主要是為同步器和鎖提供基本線程阻塞和喚醒原語,AQS同步器和鎖都是使用它來阻塞和喚醒線程。主要源碼如下:

public class LockSupport {

    /**
     *  阻塞操作:利用park阻塞某個線程(指定參數)
     */
    public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, 0 L);
        setBlocker(t, null);
    }

    /**
     *  阻塞操作:利用park阻塞某個線程(無指定參數)
     */
    public static void park() {
        UNSAFE.park(false, 0 L);
    }


    /**
     *  阻塞操作:根據nanos許可,利用parkNanos阻塞某個線程
     */
    public static void parkNanos(long nanos) {
        if (nanos > 0)
            UNSAFE.park(false, nanos);
    }

    /**
     *  阻塞操作:根據nanos許可,利用parkNanos阻塞某個線程,但是指定阻塞對象
     */
    public static void parkNanos(Object blocker, long nanos) {
        if (nanos > 0) {
            Thread t = Thread.currentThread();
            setBlocker(t, blocker);
            UNSAFE.park(false, nanos);
            setBlocker(t, null);
        }
    }

    /**
     *  阻塞操作:根據deadline最大等待時間,利用parkUntil阻塞某個線程
     */
    public static void parkUntil(long deadline) {
        UNSAFE.park(true, deadline);
    }

    /**
     *  阻塞操作:根據deadline最大等待時間,利用parkUntil阻塞某個線程,需要指定阻塞對象
     */
    public static void parkUntil(Object blocker, long deadline) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(true, deadline);
        setBlocker(t, null);
    }


    /**
     *  喚醒操作:利用unpark喚醒某個線程
     */
    public static void unpark(Thread thread) {
        if (thread != null)
            UNSAFE.unpark(thread);
    }

    /**
     *  設定阻塞器: 指定線程和對象
     */
    private static void setBlocker(Thread t, Object arg) {
        // Even though volatile, hotspot doesn't need a write barrier here.
        UNSAFE.putObject(t, parkBlockerOffset, arg);
    }

    /**
     *  擷取阻塞器中線程對象
     */
    public static Object getBlocker(Thread t) {
        if (t == null)
            throw new NullPointerException();
        return UNSAFE.getObjectVolatile(t, parkBlockerOffset);
    }

    /**
     *  擷取阻塞器中線程對象
     */
    static final int nextSecondarySeed() {
        int r;
        Thread t = Thread.currentThread();
        if ((r = UNSAFE.getInt(t, SECONDARY)) != 0) {
            r ^= r << 13; // xorshift
            r ^= r >>> 17;
            r ^= r << 5;
        } else if ((r = java.util.concurrent.ThreadLocalRandom.current().nextInt()) == 0)
            r = 1; // avoid zero
        UNSAFE.putInt(t, SECONDARY, r);
        return r;
    }

    // Hotspot implementation via intrinsics API
    private static final sun.misc.Unsafe UNSAFE;

    private static final long parkBlockerOffset;

    private static final long SEED;

    private static final long PROBE;

    private static final long SECONDARY;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class <? > tk = Thread.class;
            parkBlockerOffset = UNSAFE.objectFieldOffset(tk.getDeclaredField("parkBlocker"));
            SEED = UNSAFE.objectFieldOffset(tk.getDeclaredField("threadLocalRandomSeed"));
            PROBE = UNSAFE.objectFieldOffset(tk.getDeclaredField("threadLocalRandomProbe"));
            SECONDARY = UNSAFE.objectFieldOffset(tk.getDeclaredField("threadLocalRandomSecondarySeed"));
        } catch (Exception ex) {
            throw new Error(ex);
        }
    }

}           
  • 設計思想: 相比用java.lang.Object類中的wait/notify方式,其LockSupport類更關注線程本身,解耦了線程之間的同步。
  • 實作原理: 主要還是使用Unsafe類來執行底層操作,主要是間接調用是park和unpark本地方法
  • 阻塞操作涉及方法: 一般以park開頭的方法來阻塞線程操作,大緻可以分為自定義阻塞對象參數和非自定義阻塞對象參數等阻塞方法
  • 喚醒線程操作: 主要通過unpark方法來對目前線程設定可用,相對于喚醒操作

三. Condition接口的設計與實作

在Java領域中,Condition接口是用來實作管程技術,其中 Condition用于解決同步問題。

Java 并發程式設計解析

相對于LockSupport的設計與實作來說,Condition接口隻是在JDK層面對于阻塞和喚醒提供了一個模闆的定義,是AQS基礎同步器中條件隊列的定義,而ConditionObject是在AQS基礎同步器具體實作。

對于Condition接口而言,是提供了可替代wait/notify機制的條件隊列模式,其中:

public interface Condition {

    /**
     * 條件隊列模式:等待await操作
     */
    void await() throws InterruptedException;

    /**
     * 條件隊列模式:等待awaitUninterruptibly操作,可中斷模式
     */
    void awaitUninterruptibly();

    /**
     * 條件隊列模式:等待awaitNanos操作,可逾時模式
     */
    long awaitNanos(long nanosTimeout) throws InterruptedException;

    /**
     * 條件隊列模式:等待await操作,可逾時模式
     */
    boolean await(long time, TimeUnit unit) throws InterruptedException;

    /**
     *條件隊列模式:等待awaitUntil操作,可逾時模式
     */
    boolean awaitUntil(Date deadline) throws InterruptedException;

    /**
     * 條件隊列模式:通知signal操作
     */
    void signal();

    /**
     * 條件隊列模式:通知signalAll操作
     */
    void signalAll();
}           
  • 定義了關于“等待(wait)”機制的相關實作:而以await開頭的所有方法都是關于等待機制的定義。
  • 定義了關于“通知(signal)”機制的相關實作: signal()方法和signalAll()方法,其中 signal()方法是随機地通知等待隊列中的一個線程,而signalAll()方法是通知等待隊列中的所有線程。

四. Lock接口的設計與實作

在Java領域中,Lock接口是用來實作管程技術,其中 Lock 用于解決互斥問題。

Java 并發程式設計解析

Lock接口位于java.util.concurrent.locks包中,是JUC顯式鎖的一個抽象,Lock接口的主要抽象方法:

public interface Lock {

    /**
     * Lock接口-擷取鎖
     */
    void lock();

    /**
     * Lock接口-擷取鎖(可中斷)
     */
    void lockInterruptibly() throws InterruptedException;

    /**
     * Lock接口-嘗試擷取鎖
     *
     */
    boolean tryLock();

    /**
     *Lock接口-嘗試擷取鎖(支援逾時)
     */
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

    /**
     *Lock接口-釋放鎖
     *
     */
    void unlock();

    /**
     *  Lock接口-設定條件變量
     */
    Condition newCondition();
}           
  • 擷取鎖: lock()
  • 釋放鎖:unlock()
  • 條件變量: Condition

在JDK中,對于Lock接口的具體實作主要是ReentrantLock類,其中:

public class ReentrantLock implements Lock, java.io.Serializable {

    private static final long serialVersionUID = 7373984872572414699 L;

    /** Synchronizer providing all implementation mechanics */
    private final Sync sync;


    /**
     * 構造鎖的非公平模式(預設模式)
     */
    public ReentrantLock() {
            sync = new NonfairSync();
        }
                
    /**
     * 構造鎖的公平和非公平模式(可選公平或者非公平)
     */
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

    /**
     * Lock接口-實作嘗試擷取鎖
     */
    public void lock() {
        sync.lock();
    }

    /**
     * Lock接口-實作嘗試擷取鎖
     */
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    /**
     * Lock接口-實作嘗試擷取鎖
     */
    public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
    }

    /**
     * Lock接口-實作嘗試擷取鎖
     */
    public boolean tryLock(long timeout, TimeUnit unit)
    throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }

    /**
     * Lock接口-釋放鎖
     */
    public void unlock() {
        sync.release(1);
    }

    /**
     * Lock接口-建立條件變量
     */
    public Condition newCondition() {
        return sync.newCondition();
    }


}           
  • 包含了一個同步器Sync,主要是基于AbstractQueuedSynchronizer實作,同時基于Sync類還實作FairSync類和NonfairSync類,其中FairSync類對應着公平模式,NonfairSync類對應非公平模式。
  • 實作Lock接口設計和定義的相關方法,可以設定其鎖是公平和非公的,預設是非公平模式的。

相對于Java内置鎖,Java SDK 并發包裡的 Lock接口主要差別有能夠響應中斷、支援逾時和非阻塞地擷取鎖等三個特性。

五. ReadWriteLock接口的設計與實作

在Java領域中,ReadWriteLock接口主要是基于Lock接口來封裝了ReadLock鎖和WriteLock鎖等2種鎖的實作方法,其中ReadLock鎖是讀鎖的接口定義,WriteLock鎖是寫鎖的接口定義。

Java 并發程式設計解析

ReadWriteLock接口的内部實作,主要是基于Lock接口來擷取ReadLock鎖和WriteLock鎖的,其具體代碼如下:

public interface ReadWriteLock {
    /**
     * ReadWriteLock接口-基于Lock接口實作ReadLock
     */
    Lock readLock();

    /**
     * ReadWriteLock接口-基于Lock接口實作WriteLock
     */
    Lock writeLock();
}           
  • readLock()方法: 擷取讀鎖,主要是基于基于Lock接口來定義,表示着其具體實作類都會基于AQS基礎同步器實作
  • writeLock()方法:擷取寫鎖,主要是基于基于Lock接口來定義,表示着其具體實作類都會基于AQS基礎同步器實作

在JDK中,對于ReadWriteLock接口的具體實作主要是ReentrantReadWriteLock類,其中:

public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {

    private static final long serialVersionUID = -6992448646407690164 L;

    /** Inner class providing readlock */
    private final ReentrantReadWriteLock.ReadLock readerLock;

    /** Inner class providing writelock */
    private final ReentrantReadWriteLock.WriteLock writerLock;

    /** Performs all synchronization mechanics */
    final Sync sync;


    /**
     * 同步器-基于AbstractQueuedSynchronizer實作
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        //.....

        abstract boolean readerShouldBlock();

        abstract boolean writerShouldBlock();
    }

    /**
     * 同步器-非公平模式
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -8159625535654395037 L;

        final boolean writerShouldBlock() {
            return false;
        }

        final boolean readerShouldBlock() {
            return apparentlyFirstQueuedIsExclusive();
        }
    }

    /**
     * 同步器-公平模式
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -2274990926593161451 L;
        final boolean writerShouldBlock() {
            return hasQueuedPredecessors();
        }
        final boolean readerShouldBlock() {
            return hasQueuedPredecessors();
        }
    }


    /**
     * 構造鎖的非公平模式(預設預設)
     */
    public ReentrantReadWriteLock() {
        this(false);
    }

    /**
     * 構造鎖的公平和非公平模式(可選公平或者非公平)
     */
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

    /**
     * ReadWriteLock接口-基于Lock具體實作的writeLock()
     */
    public ReentrantReadWriteLock.WriteLock writeLock() {
        return writerLock;
    }

    /**
     * ReadWriteLock接口-基于Lock具體實作的readLock()
     */
    public ReentrantReadWriteLock.ReadLock readLock() {
        return readerLock;
    }

    /**
     * ReadWriteLock接口-ReadLock内置類
     */
    public static class ReadLock implements Lock, java.io.Serializable {
        private static final long serialVersionUID = -5992448646407690164 L;

        private final Sync sync;

        //.....
    }

    /**
     * ReadWriteLock接口-WriteLock内置類
     */
    public static class WriteLock implements Lock, java.io.Serializable {
        private static final long serialVersionUID = -4992448646407690164 L;

        private final Sync sync;

        //.....
    }

}           
  • 包含了一個同步器Sync,主要是基于AbstractQueuedSynchronizer實作,同時基于Sync類還實作FairSync類和NonfairSync類,其中FairSync類對應着公平模式,NonfairSync類對應非公平模式。
  • 主要是内置定義了ReadLock類和WriteLock類等兩個内部類,其中ReadLock類為讀鎖,而而WriteLock類為寫鎖,可以設定其鎖是公平和非公的,預設是非公平模式的。
  • 實作了ReadWriteLock接口,通過内部類的定義來具體實作對應的鎖,其中ReadLock類對應readLock()方法,而WriteLock類對應writeLock()方法。

綜上所述,從JDK中對于ReadWriteLock接口的具體實作來看,我們不難發現,ReadWriteLock接口是實作讀寫鎖一種模闆定義,我們可以基于這個接口來實作滿足我們實際業務需求的讀寫鎖。

寫在最後

在并發程式設計領域,有兩大核心問題:一個是互斥,即同一時刻隻允許一個線程通路共享資源;另一個是同步,即線程之間如何通信、協作。

主要原因是,對于多線程實作實作并發,一直以來,多線程都存在2個問題:

  • 線程之間記憶體共享,需要通過加鎖進行控制,但是加鎖會導緻性能下降,同時複雜的加鎖機制也會增加程式設計編碼難度
  • 過多線程造成線程之間的上下文切換,導緻效率低下

是以,在并發程式設計領域中,一直有一個很重要的設計原則: “ 不要通過記憶體共享來實作通信,而應該通過通信來實作記憶體共享。”

本文篇幅有點長,能看到最後的感謝你們的支援,我是會寫代碼的猴子,希望大家能夠一鍵三連多多支援

繼續閱讀