天天看點

深入了解(7)Java無鎖CAS與Unsafe類及其并發包Atomic無鎖的概念無鎖的執行者-CAS鮮為人知的指針: Unsafe類并發包中的原子操作類(Atomic系列)再談自旋鎖

原文:https://blog.csdn.net/javazejian/article/details/72772461

深入了解(1)Java注解類型(@Annotation)

深入了解(2)Java枚舉類型(enum)

深入了解(3)Java類加載器(ClassLoader)

深入了解(4)Java類型資訊(Class對象)與反射機制

深入了解(5)Java記憶體模型(JMM)及volatile關鍵字

深入了解(6)Java并發AQS的共享鎖的實作(基于信号量Semaphore)

深入了解(7)Java無鎖CAS與Unsafe類及其并發包Atomic

深入了解(8)Java并發之synchronized實作原理

深入了解(9)Java基于并發AQS的(獨占鎖)重入鎖(ReetrantLock)及其Condition實作原理

深入了解(10)java并發之阻塞隊列LinkedBlockingQueue與ArrayBlockingQueue

文章目錄

  • 無鎖的概念
  • 無鎖的執行者-CAS
    • CAS
    • CPU指令對CAS的支援
  • 鮮為人知的指針: Unsafe類
  • 并發包中的原子操作類(Atomic系列)
    • 原子更新基本類型
    • 原子更新引用
    • 原子更新數組
    • 原子更新屬性
    • CAS的ABA問題及其解決方案
  • 再談自旋鎖

無鎖的概念

在談論無鎖概念時,總會關聯起樂觀派與悲觀派,對于樂觀派而言,他們認為事情總會往好的方向發展,總是認為壞的情況發生的機率特别小,可以無所顧忌地做事,但對于悲觀派而已,他們總會認為發展事态如果不及時控制,以後就無法挽回了,即使無法挽回的局面幾乎不可能發生。這兩種派系映射到并發程式設計中就如同加鎖與無鎖的政策,即加鎖是一種悲觀政策,無鎖是一種樂觀政策,因為對于加鎖的并發程式來說,它們總是認為每次通路共享資源時總會發生沖突,是以必須對每一次資料操作實施加鎖政策。而無鎖則總是假設對共享資源的通路沒有沖突,線程可以不停執行,無需加鎖,無需等待,一旦發現沖突,無鎖政策則采用一種稱為CAS的技術來保證線程執行的安全性,這項CAS技術就是無鎖政策實作的關鍵,下面我們進一步了解CAS技術的奇妙之處。

無鎖的執行者-CAS

CAS

CAS的全稱是Compare And Swap 即比較交換,其算法核心思想如下

執行函數:CAS(V,E,N)

其包含3個參數

  • V表示要更新的變量
  • E表示預期值
  • N表示新值

如果V值等于E值,則将V的值設為N。若V值和E值不同,則說明已經有其他線程做了更新,則目前線程什麼都不做。通俗的了解就是CAS操作需要我們提供一個期望值,當期望值與目前線程的變量值相同時,說明還沒線程修改該值,目前線程可以進行修改,也就是執行CAS操作,但如果期望值與目前線程不符,則說明該值已被其他線程修改,此時不執行更新操作,但可以選擇重新讀取該變量再嘗試再次修改該變量,也可以放棄操作,原理圖如下

深入了解(7)Java無鎖CAS與Unsafe類及其并發包Atomic無鎖的概念無鎖的執行者-CAS鮮為人知的指針: Unsafe類并發包中的原子操作類(Atomic系列)再談自旋鎖

由于CAS操作屬于樂觀派,它總認為自己可以成功完成操作,當多個線程同時使用CAS操作一個變量時,隻有一個會勝出,并成功更新,其餘均會失敗,但失敗的線程并不會被挂起,僅是被告知失敗,并且允許再次嘗試,當然也允許失敗的線程放棄操作,這點從圖中也可以看出來。基于這樣的原理,CAS操作即使沒有鎖,同樣知道其他線程對共享資源操作影響,并執行相應的處理措施。同時從這點也可以看出,由于無鎖操作中沒有鎖的存在,是以不可能出現死鎖的情況,也就是說無鎖操作天生免疫死鎖。

CPU指令對CAS的支援

或許我們可能會有這樣的疑問,假設存在多個線程執行CAS操作并且CAS的步驟很多,有沒有可能在判斷V和E相同後,正要指派時,切換了線程,更改了值。造成了資料不一緻呢?答案是否定的,因為CAS是一種系統原語,原語屬于作業系統用語範疇,是由若幹條指令組成的,用于完成某個功能的一個過程,并且原語的執行必須是連續的,在執行過程中不允許被中斷,也就是說CAS是一條CPU的原子指令,不會造成所謂的資料不一緻問題。

鮮為人知的指針: Unsafe類

Unsafe類存在于

sun.misc

包中,其内部方法操作可以像C的指針一樣直接操作記憶體,單從名稱看來就可以知道該類是非安全的,畢竟Unsafe擁有着類似于C的指針操作,是以總是不應該首先使用Unsafe類,Java官方也不建議直接使用的Unsafe類,據說Oracle正在計劃從Java 9中去掉Unsafe類,但我們還是很有必要了解該類,因為Java中CAS操作的執行依賴于Unsafe類的方法,注意Unsafe類中的所有方法都是native修飾的,也就是說Unsafe類中的方法都直接調用作業系統底層資源執行相應任務,關于Unsafe類的主要功能點如下:

  • 記憶體管理,Unsafe類中存在直接操作記憶體的方法
    //配置設定記憶體指定大小的記憶體
    public native long allocateMemory(long bytes);
    //根據給定的記憶體位址address設定重新配置設定指定大小的記憶體
    public native long reallocateMemory(long address, long bytes);
    //用于釋放allocateMemory和reallocateMemory申請的記憶體
    public native void freeMemory(long address);
    //将指定對象的給定offset偏移量記憶體塊中的所有位元組設定為固定值
    public native void setMemory(Object o, long offset, long bytes, byte value);
    //設定給定記憶體位址的值
    public native void putAddress(long address, long x);
    //擷取指定記憶體位址的值
    public native long getAddress(long address);
    
    //設定給定記憶體位址的long值
    public native void putLong(long address, long x);
    //擷取指定記憶體位址的long值
    public native long getLong(long address);
    //設定或擷取指定記憶體的byte值
    public native byte  getByte(long address);
    public native void  putByte(long address, byte x);
    //其他基本資料類型(long,char,float,double,short等)的操作與putByte及getByte相同
    
    //作業系統的記憶體頁大小
    public native int pageSize();123456789101112131415161718192021222324
               
  • 提供執行個體對象新途徑。
    //傳入一個對象的class并建立該執行個體對象,但不會調用構造方法
    public native Object allocateInstance(Class cls) throws InstantiationException;12
               
  • 類和執行個體對象以及變量的操作,主要方法如下
    //擷取字段f在執行個體對象中的偏移量
    public native long objectFieldOffset(Field f);
    //靜态屬性的偏移量,用于在對應的Class對象中讀寫靜态屬性
    public native long staticFieldOffset(Field f);
    //傳回值就是f.getDeclaringClass()
    public native Object staticFieldBase(Field f);
    
    
    //獲得給定對象偏移量上的int值,所謂的偏移量可以簡單了解為指針指向該變量的記憶體位址,
    //通過偏移量便可得到該對象的變量,進行各種操作
    public native int getInt(Object o, long offset);
    //設定給定對象上偏移量的int值
    public native void putInt(Object o, long offset, int x);
    
    //獲得給定對象偏移量上的引用類型的值
    public native Object getObject(Object o, long offset);
    //設定給定對象偏移量上的引用類型的值
    public native void putObject(Object o, long offset, Object x);
    //其他基本資料類型(long,char,byte,float,double)的操作與getInthe及putInt相同
    
    //設定給定對象的int值,使用volatile語義,即設定後立馬更新到記憶體對其他線程可見
    public native void  putIntVolatile(Object o, long offset, int x);
    //獲得給定對象的指定偏移量offset的int值,使用volatile語義,總能擷取到最新的int值。
    public native int getIntVolatile(Object o, long offset);
    
    //其他基本資料類型(long,char,byte,float,double)的操作與putIntVolatile及getIntVolatile相同,引用類型putObjectVolatile也一樣。
    
    //與putIntVolatile一樣,但要求被操作字段必須有volatile修飾
    public native void putOrderedInt(Object o,long offset,int x);1234567891011121314151617181920212223242526272829
               
    下面通過一個簡單的Demo來示範上述的一些方法以便加深對Unsafe類的了解
    public class UnSafeDemo {
    
        public  static  void main(String[] args) throws NoSuchFieldException, IllegalAccessException, InstantiationException {
            // 通過反射得到theUnsafe對應的Field對象
            Field field = Unsafe.class.getDeclaredField("theUnsafe");
            // 設定該Field為可通路
            field.setAccessible(true);
            // 通過Field得到該Field對應的具體對象,傳入null是因為該Field為static的
            Unsafe unsafe = (Unsafe) field.get(null);
            System.out.println(unsafe);
    
            //通過allocateInstance直接建立對象
            User user = (User) unsafe.allocateInstance(User.class);
    
            Class userClass = user.getClass();
            Field name = userClass.getDeclaredField("name");
            Field age = userClass.getDeclaredField("age");
            Field id = userClass.getDeclaredField("id");
    
            //擷取執行個體變量name和age在對象記憶體中的偏移量并設定值
            unsafe.putInt(user,unsafe.objectFieldOffset(age),18);
            unsafe.putObject(user,unsafe.objectFieldOffset(name),"android TV");
    
            // 這裡傳回 User.class,
            Object staticBase = unsafe.staticFieldBase(id);
            System.out.println("staticBase:"+staticBase);
    
            //擷取靜态變量id的偏移量staticOffset
            long staticOffset = unsafe.staticFieldOffset(userClass.getDeclaredField("id"));
            //擷取靜态變量的值
            System.out.println("設定前的ID:"+unsafe.getObject(staticBase,staticOffset));
            //設定值
            unsafe.putObject(staticBase,staticOffset,"SSSSSSSS");
            //擷取靜态變量的值
            System.out.println("設定前的ID:"+unsafe.getObject(staticBase,staticOffset));
            //輸出USER
            System.out.println("輸出USER:"+user.toString());
    
            long data = 1000;
            byte size = 1;//機關位元組
    
            //調用allocateMemory配置設定記憶體,并擷取記憶體位址memoryAddress
            long memoryAddress = unsafe.allocateMemory(size);
            //直接往記憶體寫入資料
            unsafe.putAddress(memoryAddress, data);
            //擷取指定記憶體位址的資料
            long addrData=unsafe.getAddress(memoryAddress);
            System.out.println("addrData:"+addrData);
    
            /**
             * 輸出結果:
             [email protected]
             staticBase:class geym.conc.ch4.atomic.User
             設定前的ID:USER_ID
             設定前的ID:SSSSSSSS
             輸出USER:User{name='android TV', age=18', id=SSSSSSSS'}
             addrData:1000
             */
    
        }
    }
    
    class User{
        public User(){
            System.out.println("user 構造方法被調用");
        }
        private String name;
        private int age;
        private static String id="USER_ID";
    
        @Override
        public String toString() {
            return "User{" +
                    "name='" + name + '\'' +
                    ", age=" + age +'\'' +
                    ", id=" + id +'\'' +
                    '}';
        }
    }12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
               
    雖然在Unsafe類中存在getUnsafe()方法,但該方法隻提供給進階的Bootstrap類加載器使用,普通使用者調用将抛出異常,是以我們在Demo中使用了反射技術擷取了Unsafe執行個體對象并進行相關操作。
    public static Unsafe getUnsafe() {
          Class cc = sun.reflect.Reflection.getCallerClass(2);
          if (cc.getClassLoader() != null)
              throw new SecurityException("Unsafe");
          return theUnsafe;
      }123456
               
  • 數組操作
    //擷取數組第一個元素的偏移位址
    public native int arrayBaseOffset(Class arrayClass);
    //數組中一個元素占據的記憶體空間,arrayBaseOffset與arrayIndexScale配合使用,可定位數組中每個元素在記憶體中的位置
    public native int arrayIndexScale(Class arrayClass);
    12345
               
  • CAS 操作相關

    CAS是一些CPU直接支援的指令,也就是我們前面分析的無鎖操作,在Java中無鎖操作CAS基于以下3個方法實作,在稍後講解Atomic系列内部方法是基于下述方法的實作的。

    //第一個參數o為給定對象,offset為對象記憶體的偏移量,通過這個偏移量迅速定位字段并設定或擷取該字段的值,
    //expected表示期望值,x表示要設定的值,下面3個方法都通過CAS原子指令執行操作。
    public final native boolean compareAndSwapObject(Object o, long offset,Object expected, Object x);                                                                                                  
    
    public final native boolean compareAndSwapInt(Object o, long offset,int expected,int x);
    
    public final native boolean compareAndSwapLong(Object o, long offset,long expected,long x);1234567
               
    這裡還需介紹Unsafe類中JDK 1.8新增的幾個方法,它們的實作是基于上述的CAS方法,如下
    //1.8新增,給定對象o,根據擷取記憶體偏移量指向的字段,将其增加delta,
     //這是一個CAS操作過程,直到設定成功方能退出循環,傳回舊值
     public final int getAndAddInt(Object o, long offset, int delta) {
         int v;
         do {
             //擷取記憶體中最新值
             v = getIntVolatile(o, offset);
           //通過CAS操作
         } while (!compareAndSwapInt(o, offset, v, v + delta));
         return v;
     }
    
    //1.8新增,方法作用同上,隻不過這裡操作的long類型資料
     public final long getAndAddLong(Object o, long offset, long delta) {
         long v;
         do {
             v = getLongVolatile(o, offset);
         } while (!compareAndSwapLong(o, offset, v, v + delta));
         return v;
     }
    
     //1.8新增,給定對象o,根據擷取記憶體偏移量對于字段,将其 設定為新值newValue,
     //這是一個CAS操作過程,直到設定成功方能退出循環,傳回舊值
     public final int getAndSetInt(Object o, long offset, int newValue) {
         int v;
         do {
             v = getIntVolatile(o, offset);
         } while (!compareAndSwapInt(o, offset, v, newValue));
         return v;
     }
    
    // 1.8新增,同上,操作的是long類型
     public final long getAndSetLong(Object o, long offset, long newValue) {
         long v;
         do {
             v = getLongVolatile(o, offset);
         } while (!compareAndSwapLong(o, offset, v, newValue));
         return v;
     }
    
     //1.8新增,同上,操作的是引用類型資料
     public final Object getAndSetObject(Object o, long offset, Object newValue) {
         Object v;
         do {
             v = getObjectVolatile(o, offset);
         } while (!compareAndSwapObject(o, offset, v, newValue));
         return v;
     }12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
               
    上述的方法我們在稍後的Atomic系列分析中還會見到它們的身影。
  • 挂起與恢複

    将一個線程進行挂起是通過park方法實作的,調用 park後,線程将一直阻塞直到逾時或者中斷等條件出現。unpark可以終止一個挂起的線程,使其恢複正常。Java對線程的挂起操作被封裝在 LockSupport類中,LockSupport類中有各種版本pack方法,其底層實作最終還是使用Unsafe.park()方法和Unsafe.unpark()方法

    //線程調用該方法,線程将一直阻塞直到逾時,或者是中斷條件出現。  
    public native void park(boolean isAbsolute, long time);  
    
    //終止挂起的線程,恢複正常.java.util.concurrent包中挂起操作都是在LockSupport類實作的,其底層正是使用這兩個方法,  
    public native void unpark(Object thread); 12345
               
  • 記憶體屏障

    這裡主要包括了loadFence、storeFence、fullFence等方法,這些方法是在Java 8新引入的,用于定義記憶體屏障,避免代碼重排序,與Java記憶體模型相關,感興趣的可以看部落客的另一篇博文全面了解Java記憶體模型(JMM)及volatile關鍵字,這裡就不展開了

    //在該方法之前的所有讀操作,一定在load屏障之前執行完成
    public native void loadFence();
    //在該方法之前的所有寫操作,一定在store屏障之前執行完成
    public native void storeFence();
    //在該方法之前的所有讀寫操作,一定在full屏障之前執行完成,這個記憶體屏障相當于上面兩個的合體功能
    public native void fullFence();123456
               
  • 其他操作
    //擷取持有鎖,已不建議使用
    @Deprecated
    public native void monitorEnter(Object var1);
    //釋放鎖,已不建議使用
    @Deprecated
    public native void monitorExit(Object var1);
    //嘗試擷取鎖,已不建議使用
    @Deprecated
    public native boolean tryMonitorEnter(Object var1);
    
    //擷取本機記憶體的頁數,這個值永遠都是2的幂次方  
    public native int pageSize();  
    
    //告訴虛拟機定義了一個沒有安全檢查的類,預設情況下這個類加載器和保護域來着調用者類  
    public native Class defineClass(String name, byte[] b, int off, int len, ClassLoader loader, ProtectionDomain protectionDomain);  
    
    //加載一個匿名類
    public native Class defineAnonymousClass(Class hostClass, byte[] data, Object[] cpPatches);
    //判斷是否需要加載一個類
    public native boolean shouldBeInitialized(Class<?> c);
    //確定類一定被加載 
    public native  void ensureClassInitialized(Class<?> c)12345678910111213141516171819202122
               

并發包中的原子操作類(Atomic系列)

通過前面的分析我們已基本了解了無鎖CAS的原理并對Java中的指針類Unsafe類有了比較全面的認識,下面進一步分析CAS在Java中的應用,即并發包中的原子操作類(Atomic系列),從JDK 1.5開始提供了

java.util.concurrent.atomic

包,在該包中提供了許多基于CAS實作的原子操作類,用法友善,性能高效,主要分以下4種類型。

原子更新基本類型

原子更新基本類型主要包括3個類:

  • AtomicBoolean:原子更新布爾類型
  • AtomicInteger:原子更新整型
  • AtomicLong:原子更新長整型

這3個類的實作原理和使用方式幾乎是一樣的,這裡我們以AtomicInteger為例進行分析,AtomicInteger主要是針對int類型的資料執行原子操作,它提供了原子自增方法、原子自減方法以及原子指派方法等,鑒于AtomicInteger的源碼不多,我們直接看源碼

public class AtomicInteger extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 6214790243416807050L;

    // 擷取指針類Unsafe
    private static final Unsafe unsafe = Unsafe.getUnsafe();

    //下述變量value在AtomicInteger執行個體對象内的記憶體偏移量
    private static final long valueOffset;

    static {
        try {
           //通過unsafe類的objectFieldOffset()方法,擷取value變量在對象記憶體中的偏移
           //通過該偏移量valueOffset,unsafe類的内部方法可以擷取到變量value對其進行取值或指派操作
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }
   //目前AtomicInteger封裝的int變量value
    private volatile int value;

    public AtomicInteger(int initialValue) {
        value = initialValue;
    }
    public AtomicInteger() {
    }
   //擷取目前最新值,
    public final int get() {
        return value;
    }
    //設定目前值,具備volatile效果,方法用final修飾是為了更進一步的保證線程安全。
    public final void set(int newValue) {
        value = newValue;
    }
    //最終會設定成newValue,使用該方法後可能導緻其他線程在之後的一小段時間内可以擷取到舊值,有點類似于延遲加載
    public final void lazySet(int newValue) {
        unsafe.putOrderedInt(this, valueOffset, newValue);
    }
   //設定新值并擷取舊值,底層調用的是CAS操作即unsafe.compareAndSwapInt()方法
    public final int getAndSet(int newValue) {
        return unsafe.getAndSetInt(this, valueOffset, newValue);
    }
   //如果目前值為expect,則設定為update(目前值指的是value變量)
    public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }
    //目前值加1傳回舊值,底層CAS操作
    public final int getAndIncrement() {
        return unsafe.getAndAddInt(this, valueOffset, 1);
    }
    //目前值減1,傳回舊值,底層CAS操作
    public final int getAndDecrement() {
        return unsafe.getAndAddInt(this, valueOffset, -1);
    }
   //目前值增加delta,傳回舊值,底層CAS操作
    public final int getAndAdd(int delta) {
        return unsafe.getAndAddInt(this, valueOffset, delta);
    }
    //目前值加1,傳回新值,底層CAS操作
    public final int incrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
    }
    //目前值減1,傳回新值,底層CAS操作
    public final int decrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, -1) - 1;
    }
   //目前值增加delta,傳回新值,底層CAS操作
    public final int addAndGet(int delta) {
        return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
    }
   //省略一些不常用的方法....
}1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
           

通過上述的分析,可以發現AtomicInteger原子類的内部幾乎是基于前面分析過Unsafe類中的CAS相關操作的方法實作的,這也同時證明AtomicInteger是基于無鎖實作的,這裡重點分析自增操作實作過程,其他方法自增實作原理一樣。

//目前值加1,傳回新值,底層CAS操作
public final int incrementAndGet() {
     return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
 }1234
           

我們發現AtomicInteger類中所有自增或自減的方法都間接調用Unsafe類中的getAndAddInt()方法實作了CAS操作,進而保證了線程安全,關于getAndAddInt其實前面已分析過,它是Unsafe類中1.8新增的方法,源碼如下

//Unsafe類中的getAndAddInt方法
public final int getAndAddInt(Object o, long offset, int delta) {
        int v;
        do {
            v = getIntVolatile(o, offset);
        } while (!compareAndSwapInt(o, offset, v, v + delta));
        return v;
    }12345678
           

可看出getAndAddInt通過一個while循環不斷的重試更新要設定的值,直到成功為止,調用的是Unsafe類中的compareAndSwapInt方法,是一個CAS操作方法。這裡需要注意的是,上述源碼分析是基于JDK1.8的,如果是1.8之前的方法,AtomicInteger源碼實作有所不同,是基于for死循環的,如下

//JDK 1.7的源碼,由for的死循環實作,并且直接在AtomicInteger實作該方法,
//JDK1.8後,該方法實作已移動到Unsafe類中,直接調用getAndAddInt方法即可
public final int incrementAndGet() {
    for (;;) {
        int current = get();
        int next = current + 1;
        if (compareAndSet(current, next))
            return next;
    }
}12345678910
           

ok~,下面簡單看個Demo,感受一下AtomicInteger使用方式

public class AtomicIntegerDemo {
    //建立AtomicInteger,用于自增操作
    static AtomicInteger i=new AtomicInteger();

    public static class AddThread implements Runnable{
        public void run(){
           for(int k=0;k<10000;k++)
               i.incrementAndGet();
        }

    }
    public static void main(String[] args) throws InterruptedException {
        Thread[] ts=new Thread[10];
        //開啟10條線程同時執行i的自增操作
        for(int k=0;k<10;k++){
            ts[k]=new Thread(new AddThread());
        }
        //啟動線程
        for(int k=0;k<10;k++){ts[k].start();}

        for(int k=0;k<10;k++){ts[k].join();}

        System.out.println(i);//輸出結果:100000
    }
}12345678910111213141516171819202122232425
           

在Demo中,使用原子類型AtomicInteger替換普通int類型執行自增的原子操作,保證了線程安全。至于AtomicBoolean和AtomicLong的使用方式以及實作原理是一樣,大家可以自行查閱源碼。

原子更新引用

原子更新引用類型可以同時更新引用類型,這裡主要分析一下AtomicReference原子類,即原子更新引用類型。先看看其使用方式,如下

public class AtomicReferenceDemo2 {

    public static AtomicReference<User> atomicUserRef = new AtomicReference<User>();

    public static void main(String[] args) {
        User user = new User("zejian", 18);
        atomicUserRef.set(user);
        User updateUser = new User("Shine", 25);
        atomicUserRef.compareAndSet(user, updateUser);
        //執行結果:User{name='Shine', age=25}
              System.out.println(atomicUserRef.get().toString());  
    }

    static class User {
        public String name;
        private int age;

        public User(String name, int age) {
            this.name = name;
            this.age = age;
        }

        public String getName() {
            return name;
        }

        @Override
        public String toString() {
            return "User{" +
                    "name='" + name + '\'' +
                    ", age=" + age +
                    '}';
        }
    }
}1234567891011121314151617181920212223242526272829303132333435
           

那麼AtomicReference原子類内部是如何實作CAS操作的呢?

public class AtomicReference<V> implements java.io.Serializable {
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicReference.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }
    //内部變量value,Unsafe類通過valueOffset記憶體偏移量即可擷取該變量
    private volatile V value;

//CAS方法,間接調用unsafe.compareAndSwapObject(),它是一個
//實作了CAS操作的native方法
public final boolean compareAndSet(V expect, V update) {
        return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
}

//設定并擷取舊值
public final V getAndSet(V newValue) {
        return (V)unsafe.getAndSetObject(this, valueOffset, newValue);
    }
    //省略其他代碼......
}

//Unsafe類中的getAndSetObject方法,實際調用還是CAS操作
public final Object getAndSetObject(Object o, long offset, Object newValue) {
      Object v;
      do {
          v = getObjectVolatile(o, offset);
      } while (!compareAndSwapObject(o, offset, v, newValue));
      return v;
  }12345678910111213141516171819202122232425262728293031323334
           

從源碼看來,AtomicReference與AtomicInteger的實作原理基本是一樣的,最終執行的還是Unsafe類,關于AtomicReference的其他方法也是一樣的,如下

深入了解(7)Java無鎖CAS與Unsafe類及其并發包Atomic無鎖的概念無鎖的執行者-CAS鮮為人知的指針: Unsafe類并發包中的原子操作類(Atomic系列)再談自旋鎖

紅框内的方法是Java8新增的,可以基于Lambda表達式對傳遞進來的期望值或要更新的值進行其他操作後再進行CAS操作,說白了就是對期望值或要更新的值進行額外修改後再執行CAS更新,在所有的Atomic原子類中幾乎都存在這幾個方法。

原子更新數組

原子更新數組指的是通過原子的方式更新數組裡的某個元素,主要有以下3個類

  • AtomicIntegerArray:原子更新整數數組裡的元素
  • AtomicLongArray:原子更新長整數數組裡的元素
  • AtomicReferenceArray:原子更新引用類型數組裡的元素

這裡以AtomicIntegerArray為例進行分析,其餘兩個使用方式和實作原理基本一樣,簡單案例如下,

public class AtomicIntegerArrayDemo {
    static AtomicIntegerArray arr = new AtomicIntegerArray(10);

    public static class AddThread implements Runnable{
        public void run(){
           for(int k=0;k<10000;k++)
               //執行數組中元素自增操作,參數為index,即數組下标
               arr.getAndIncrement(k%arr.length());
        }
    }
    public static void main(String[] args) throws InterruptedException {

        Thread[] ts=new Thread[10];
        //建立10條線程
        for(int k=0;k<10;k++){
            ts[k]=new Thread(new AddThread());
        }
        //啟動10條線程
        for(int k=0;k<10;k++){ts[k].start();}
        for(int k=0;k<10;k++){ts[k].join();}
        //執行結果
        //[10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000]
        System.out.println(arr);
    }
}12345678910111213141516171819202122232425
           

啟動10條線程對數組中的元素進行自增操作,執行結果符合預期。使用方式比較簡單,接着看看AtomicIntegerArray内部是如何實作,先看看部分源碼

public class AtomicIntegerArray implements java.io.Serializable {
    //擷取unsafe類的執行個體對象
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    //擷取數組的第一個元素記憶體起始位址
    private static final int base = unsafe.arrayBaseOffset(int[].class);

    private static final int shift;
    //内部數組
    private final int[] array;

    static {
        //擷取數組中一個元素占據的記憶體空間
        int scale = unsafe.arrayIndexScale(int[].class);
        //判斷是否為2的次幂,一般為2的次幂否則抛異常
        if ((scale & (scale - 1)) != 0)
            throw new Error("data type scale not a power of two");
        //
        shift = 31 - Integer.numberOfLeadingZeros(scale);
    }

    private long checkedByteOffset(int i) {
        if (i < 0 || i >= array.length)
            throw new IndexOutOfBoundsException("index " + i);

        return byteOffset(i);
    }
    //計算數組中每個元素的的記憶體位址
    private static long byteOffset(int i) {
        return ((long) i << shift) + base;
    }
    //省略其他代碼......
}1234567891011121314151617181920212223242526272829303132
           

通過前面對Unsafe類的分析,我們知道arrayBaseOffset方法可以擷取數組的第一個元素起始位址,而arrayIndexScale方法可以擷取每個數組元素占用的記憶體空間,由于這裡是Int類型,而Java中一個int類型占用4個位元組,也就是scale的值為4,那麼如何根據數組下标值計算每個元素的記憶體位址呢?顯然應該是

每個數組元素的記憶體位址=起始位址+元素下标 * 每個元素所占用的記憶體空間

與該方法原理相同

//計算數組中每個元素的的記憶體位址
private static long byteOffset(int i) {
     return ((long) i << shift) + base;
 }1234
           

這是為什麼,首先來計算出shift的值

shift = 31 - Integer.numberOfLeadingZeros(scale);1
           

其中Integer.numberOfLeadingZeros(scale)是計算出scale的前導零個數(必須是連續的),scale=4,轉成二進制為

00000000 00000000 00000000 00000100

即前導零數為29,也就是shift=2,然後利用shift來定位數組中的記憶體位置,在數組不越界時,計算出前3個數組元素記憶體位址

//第一個數組元素,index=0 , 其中base為起始位址,4代表int類型占用的位元組數 
address = base + 0 * 4 即address= base + 0 << 2
//第二個數組元素,index=1
address = base + 1 * 4 即address= base + 1 << 2
//第三個數組元素,index=2
address = base + 2 * 4 即address= base + 2 << 2
//........1234567
           

顯然shift=2,替換去就是

address= base + i << shift1
           

這就是 byteOffset(int i) 方法的計算原理。是以byteOffset(int)方法可以根據數組下标計算出每個元素的記憶體位址。至于其他方法就比較簡單了,都是間接調用Unsafe類的CAS原子操作方法,如下簡單看其中幾個常用方法

//執行自增操作,傳回舊值,i是指數組元素下标
public final int getAndIncrement(int i) {
      return getAndAdd(i, 1);
}
//指定下标元素執行自增操作,并傳回新值
public final int incrementAndGet(int i) {
    return getAndAdd(i, 1) + 1;
}

//指定下标元素執行自減操作,并傳回新值
public final int decrementAndGet(int i) {
    return getAndAdd(i, -1) - 1;
}
//間接調用unsafe.getAndAddInt()方法
public final int getAndAdd(int i, int delta) {
    return unsafe.getAndAddInt(array, checkedByteOffset(i), delta);
}

//Unsafe類中的getAndAddInt方法,執行CAS操作
public final int getAndAddInt(Object o, long offset, int delta) {
        int v;
        do {
            v = getIntVolatile(o, offset);
        } while (!compareAndSwapInt(o, offset, v, v + delta));
        return v;
    }1234567891011121314151617181920212223242526
           

至于AtomicLongArray和AtomicReferenceArray原子類,使用方式和實作原理基本一樣。

原子更新屬性

如果我們隻需要某個類裡的某個字段,也就是說讓普通的變量也享受原子操作,可以使用原子更新字段類,如在某些時候由于項目前期考慮不周全,項目需求又發生變化,使得某個類中的變量需要執行多線程操作,由于該變量多處使用,改動起來比較麻煩,而且原來使用的地方無需使用線程安全,隻要求新場景需要使用時,可以借助原子更新器處理這種場景,Atomic并發包提供了以下三個類:

  • AtomicIntegerFieldUpdater:原子更新整型的字段的更新器。
  • AtomicLongFieldUpdater:原子更新長整型字段的更新器。
  • AtomicReferenceFieldUpdater:原子更新引用類型裡的字段。

請注意原子更新器的使用存在比較苛刻的條件如下

  • 操作的字段不能是static類型。
  • 操作的字段不能是final類型的,因為final根本沒法修改。
  • 字段必須是volatile修飾的,也就是資料本身是讀一緻的。
  • 屬性必須對目前的Updater所在的區域是可見的,如果不是目前類内部進行原子更新器操作不能使用private,protected子類操作父類時修飾符必須是protect權限及以上,如果在同一個package下則必須是default權限及以上,也就是說無論何時都應該保證操作類與被操作類間的可見性。

下面看看AtomicIntegerFieldUpdater和AtomicReferenceFieldUpdater的簡單使用方式

public class AtomicIntegerFieldUpdaterDemo {
    public static class Candidate{
        int id;
        volatile int score;
    }

    public static class Game{
        int id;
        volatile String name;

        public Game(int id, String name) {
            this.id = id;
            this.name = name;
        }

        @Override
        public String toString() {
            return "Game{" +
                    "id=" + id +
                    ", name='" + name + '\'' +
                    '}';
        }
    }

    static AtomicIntegerFieldUpdater<Candidate> atIntegerUpdater
        = AtomicIntegerFieldUpdater.newUpdater(Candidate.class, "score");

    static AtomicReferenceFieldUpdater<Game,String> atRefUpdate =
            AtomicReferenceFieldUpdater.newUpdater(Game.class,String.class,"name");


    //用于驗證分數是否正确
    public static AtomicInteger allScore=new AtomicInteger(0);


    public static void main(String[] args) throws InterruptedException {
        final Candidate stu=new Candidate();
        Thread[] t=new Thread[10000];
        //開啟10000個線程
        for(int i = 0 ; i < 10000 ; i++) {
            t[i]=new Thread() {
                public void run() {
                    if(Math.random()>0.4){
                        atIntegerUpdater.incrementAndGet(stu);
                        allScore.incrementAndGet();
                    }
                }
            };
            t[i].start();
        }

        for(int i = 0 ; i < 10000 ; i++) {  t[i].join();}
        System.out.println("最終分數score="+stu.score);
        System.out.println("校驗分數allScore="+allScore);

        //AtomicReferenceFieldUpdater 簡單的使用
        Game game = new Game(2,"zh");
        atRefUpdate.compareAndSet(game,game.name,"JAVA-HHH");
        System.out.println(game.toString());

        /**
         * 輸出結果:
         * 最終分數score=5976
           校驗分數allScore=5976
           Game{id=2, name='JAVA-HHH'}
         */
    }
}
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
           

我們使用AtomicIntegerFieldUpdater更新候選人(Candidate)的分數score,開啟了10000條線程投票,當随機值大于0.4時算一票,分數自增一次,其中allScore用于驗證分數是否正确(其實用于驗證AtomicIntegerFieldUpdater更新的字段是否線程安全),當allScore與score相同時,則說明投票結果無誤,也代表AtomicIntegerFieldUpdater能正确更新字段score的值,是線程安全的。對于AtomicReferenceFieldUpdater,我們在代碼中簡單示範了其使用方式,注意在AtomicReferenceFieldUpdater注明泛型時需要兩個泛型參數,一個是修改的類類型,一個修改字段的類型。至于AtomicLongFieldUpdater則與AtomicIntegerFieldUpdater類似,不再介紹。接着簡單了解一下AtomicIntegerFieldUpdater的實作原理,實際就是反射和Unsafe類結合,AtomicIntegerFieldUpdater是個抽象類,實際實作類為AtomicIntegerFieldUpdaterImpl

public abstract class AtomicIntegerFieldUpdater<T> {

    public static <U> AtomicIntegerFieldUpdater<U> newUpdater(Class<U> tclass,
                                                              String fieldName) {
         //實際實作類AtomicIntegerFieldUpdaterImpl                                          
        return new AtomicIntegerFieldUpdaterImpl<U>
            (tclass, fieldName, Reflection.getCallerClass());
    }
 }123456789
           

看看AtomicIntegerFieldUpdaterImpl

private static class AtomicIntegerFieldUpdaterImpl<T>
            extends AtomicIntegerFieldUpdater<T> {
        private static final Unsafe unsafe = Unsafe.getUnsafe();
        private final long offset;//記憶體偏移量
        private final Class<T> tclass;
        private final Class<?> cclass;

        AtomicIntegerFieldUpdaterImpl(final Class<T> tclass,
                                      final String fieldName,
                                      final Class<?> caller) {
            final Field field;//要修改的字段
            final int modifiers;//字段修飾符
            try {
                field = AccessController.doPrivileged(
                    new PrivilegedExceptionAction<Field>() {
                        public Field run() throws NoSuchFieldException {
                            return tclass.getDeclaredField(fieldName);//反射擷取字段對象
                        }
                    });
                    //擷取字段修飾符
                modifiers = field.getModifiers();
            //對字段的通路權限進行檢查,不在通路範圍内抛異常
                sun.reflect.misc.ReflectUtil.ensureMemberAccess(
                    caller, tclass, null, modifiers);
                ClassLoader cl = tclass.getClassLoader();
                ClassLoader ccl = caller.getClassLoader();
                if ((ccl != null) && (ccl != cl) &&
                    ((cl == null) || !isAncestor(cl, ccl))) {
              sun.reflect.misc.ReflectUtil.checkPackageAccess(tclass);
                }
            } catch (PrivilegedActionException pae) {
                throw new RuntimeException(pae.getException());
            } catch (Exception ex) {
                throw new RuntimeException(ex);
            }

            Class<?> fieldt = field.getType();
            //判斷是否為int類型
            if (fieldt != int.class)
                throw new IllegalArgumentException("Must be integer type");
            //判斷是否被volatile修飾
            if (!Modifier.isVolatile(modifiers))
                throw new IllegalArgumentException("Must be volatile type");

            this.cclass = (Modifier.isProtected(modifiers) &&
                           caller != tclass) ? caller : null;
            this.tclass = tclass;
            //擷取該字段的在對象記憶體的偏移量,通過記憶體偏移量可以擷取或者修改該字段的值
            offset = unsafe.objectFieldOffset(field);
        }
        }123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
           

從AtomicIntegerFieldUpdaterImpl的構造器也可以看出更新器為什麼會有這麼多限制條件了,當然最終其CAS操作肯定是通過unsafe完成的,簡單看一個方法

public int incrementAndGet(T obj) {
        int prev, next;
        do {
            prev = get(obj);
            next = prev + 1;
            //CAS操作
        } while (!compareAndSet(obj, prev, next));
        return next;
}

//最終調用的還是unsafe.compareAndSwapInt()方法
public boolean compareAndSet(T obj, int expect, int update) {
            if (obj == null || obj.getClass() != tclass || cclass != null) fullCheck(obj);
            return unsafe.compareAndSwapInt(obj, offset, expect, update);
        }123456789101112131415
           

CAS的ABA問題及其解決方案

假設這樣一種場景,當第一個線程執行CAS(V,E,U)操作,在擷取到目前變量V,準備修改為新值U前,另外兩個線程已連續修改了兩次變量V的值,使得該值又恢複為舊值,這樣的話,我們就無法正确判斷這個變量是否已被修改過,如下圖

深入了解(7)Java無鎖CAS與Unsafe類及其并發包Atomic無鎖的概念無鎖的執行者-CAS鮮為人知的指針: Unsafe類并發包中的原子操作類(Atomic系列)再談自旋鎖

這就是典型的CAS的ABA問題,一般情況這種情況發現的機率比較小,可能發生了也不會造成什麼問題,比如說我們對某個做加減法,不關心數字的過程,那麼發生ABA問題也沒啥關系。但是在某些情況下還是需要防止的,那麼該如何解決呢?在Java中解決ABA問題,我們可以使用以下兩個原子類

  • AtomicStampedReference

    AtomicStampedReference原子類是一個帶有時間戳的對象引用,在每次修改後,AtomicStampedReference不僅會設定新值而且還會記錄更改的時間。當AtomicStampedReference設定對象值時,對象值以及時間戳都必須滿足期望值才能寫入成功,這也就解決了反複讀寫時,無法預知值是否已被修改的窘境,測試demo如下

    /**
     * Created by zejian on 2017/7/2.
     * Blog : http://blog.csdn.net/javazejian [原文位址,請尊重原創]
     */
    public class ABADemo {
    
        static AtomicInteger atIn = new AtomicInteger(100);
    
        //初始化時需要傳入一個初始值和初始時間
        static AtomicStampedReference<Integer> atomicStampedR =
                new AtomicStampedReference<Integer>(200,0);
    
    
        static Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                //更新為200
                atIn.compareAndSet(100, 200);
                //更新為100
                atIn.compareAndSet(200, 100);
            }
        });
    
    
        static Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                boolean flag=atIn.compareAndSet(100,500);
                System.out.println("flag:"+flag+",newValue:"+atIn);
            }
        });
    
    
        static Thread t3 = new Thread(new Runnable() {
            @Override
            public void run() {
                int time=atomicStampedR.getStamp();
                //更新為200
                atomicStampedR.compareAndSet(100, 200,time,time+1);
                //更新為100
                int time2=atomicStampedR.getStamp();
                atomicStampedR.compareAndSet(200, 100,time2,time2+1);
            }
        });
    
    
        static Thread t4 = new Thread(new Runnable() {
            @Override
            public void run() {
                int time = atomicStampedR.getStamp();
                System.out.println("sleep 前 t4 time:"+time);
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                boolean flag=atomicStampedR.compareAndSet(100,500,time,time+1);
                System.out.println("flag:"+flag+",newValue:"+atomicStampedR.getReference());
            }
        });
    
        public static  void  main(String[] args) throws InterruptedException {
            t1.start();
            t2.start();
            t1.join();
            t2.join();
    
            t3.start();
            t4.start();
            /**
             * 輸出結果:
             flag:true,newValue:500
             sleep 前 t4 time:0
             flag:false,newValue:200
             */
        }
    }12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
               
    對比輸出結果可知,AtomicStampedReference類确實解決了ABA的問題,下面我們簡單看看其内部實作原理
    public class AtomicStampedReference<V> {
        //通過Pair内部類存儲資料和時間戳
        private static class Pair<T> {
            final T reference;
            final int stamp;
            private Pair(T reference, int stamp) {
                this.reference = reference;
                this.stamp = stamp;
            }
            static <T> Pair<T> of(T reference, int stamp) {
                return new Pair<T>(reference, stamp);
            }
        }
        //存儲數值和時間的内部類
        private volatile Pair<V> pair;
    
        //構造器,建立時需傳入初始值和時間初始值
        public AtomicStampedReference(V initialRef, int initialStamp) {
            pair = Pair.of(initialRef, initialStamp);
        }
    }123456789101112131415161718192021
               
    接着看看其compareAndSet方法的實作:
    public boolean compareAndSet(V   expectedReference,
                                     V   newReference,
                                     int expectedStamp,
                                     int newStamp) {
            Pair<V> current = pair;
            return
                expectedReference == current.reference &&
                expectedStamp == current.stamp &&
                ((newReference == current.reference &&
                  newStamp == current.stamp) ||
                 casPair(current, Pair.of(newReference, newStamp)));
        }123456789101112
               
    同時對目前資料和目前時間進行比較,隻有兩者都相等是才會執行casPair()方法,單從該方法的名稱就可知是一個CAS方法,最終調用的還是Unsafe類中的compareAndSwapObject方法
    private boolean casPair(Pair<V> cmp, Pair<V> val) {
            return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
        }123
               
    到這我們就很清晰AtomicStampedReference的内部實作思想了,通過一個鍵值對Pair存儲資料和時間戳,在更新時對資料和時間戳進行比較,隻有兩者都符合預期才會調用Unsafe的compareAndSwapObject方法執行數值和時間戳替換,也就避免了ABA的問題。
  • AtomicMarkableReference類

    AtomicMarkableReference與AtomicStampedReference不同的是,AtomicMarkableReference維護的是一個boolean值的辨別,也就是說至于true和false兩種切換狀态,經過部落客測試,這種方式并不能完全防止ABA問題的發生,隻能減少ABA問題發生的機率。

    public class ABADemo {
        static AtomicMarkableReference<Integer> atMarkRef =
                  new AtomicMarkableReference<Integer>(100,false);
    
     static Thread t5 = new Thread(new Runnable() {
            @Override
            public void run() {
                boolean mark=atMarkRef.isMarked();
                System.out.println("mark:"+mark);
                //更新為200
                System.out.println("t5 result:"+atMarkRef.compareAndSet(atMarkRef.getReference(), 200,mark,!mark));
            }
        });
    
        static Thread t6 = new Thread(new Runnable() {
            @Override
            public void run() {
                boolean mark2=atMarkRef.isMarked();
                System.out.println("mark2:"+mark2);
                System.out.println("t6 result:"+atMarkRef.compareAndSet(atMarkRef.getReference(), 100,mark2,!mark2));
            }
        });
    
        static Thread t7 = new Thread(new Runnable() {
            @Override
            public void run() {
                boolean mark=atMarkRef.isMarked();
                System.out.println("sleep 前 t7 mark:"+mark);
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                boolean flag=atMarkRef.compareAndSet(100,500,mark,!mark);
                System.out.println("flag:"+flag+",newValue:"+atMarkRef.getReference());
            }
        });
    
        public static  void  main(String[] args) throws InterruptedException {        
            t5.start();t5.join();
            t6.start();t6.join();
            t7.start();
    
            /**
             * 輸出結果:
             mark:false
             t5 result:true
             mark2:true
             t6 result:true
             sleep 前 t5 mark:false
             flag:true,newValue:500 ---->成功了.....說明還是發生ABA問題
             */
        }
    }123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
               
    AtomicMarkableReference的實作原理與AtomicStampedReference類似,這裡不再介紹。到此,我們也明白了如果要完全杜絕ABA問題的發生,我們應該使用AtomicStampedReference原子類更新對象,而對于AtomicMarkableReference來說隻能減少ABA問題的發生機率,并不能杜絕。

再談自旋鎖

自旋鎖是一種假設在不久将來,目前的線程可以獲得鎖,是以虛拟機會讓目前想要擷取鎖的線程做幾個空循環(這也是稱為自旋的原因),在經過若幹次循環後,如果得到鎖,就順利進入臨界區。如果還不能獲得鎖,那就會将線程在作業系統層面挂起,這種方式确實也是可以提升效率的。但問題是當線程越來越多競争很激烈時,占用CPU的時間變長會導緻性能急劇下降,是以Java虛拟機内部一般對于自旋鎖有一定的次數限制,可能是50或者100次循環後就放棄,直接挂起線程,讓出CPU資源。如下通過AtomicReference可實作簡單的自旋鎖。

public class SpinLock {
  private AtomicReference<Thread> sign =new AtomicReference<>();

  public void lock(){
    Thread current = Thread.currentThread();
    while(!sign .compareAndSet(null, current)){
    }
  }

  public void unlock (){
    Thread current = Thread.currentThread();
    sign .compareAndSet(current, null);
  }
}1234567891011121314
           

使用CAS原子操作作為底層實作,lock()方法将要更新的值設定為目前線程,并将預期值設定為null。unlock()函數将要更新的值設定為null,并預期值設定為目前線程。然後我們通過lock()和unlock來控制自旋鎖的開啟與關閉,注意這是一種非公平鎖。事實上AtomicInteger(或者AtomicLong)原子類内部的CAS操作也是通過不斷的自循環(while循環)實作,不過這種循環的結束條件是線程成功更新對于的值,但也是自旋鎖的一種。