天天看點

Java 并發:分析分析 CAS

大明哥@cmsblogs.com

Java 并發:分析分析 CAS

本文目錄

  • 一、分析
  • 二、對比優缺及 ABA 問題
  • 三、小結
“七十二行,行行出狀元” 
  1. ​​泥瓦匠是誰?​​
  2. ​​誠邀:碼出高效未來交流群​​
工程源碼來自:JDK 1.8

CAS,Compare And Swap,即比較并交換。Doug lea大神在同步元件中大量使用CAS技術鬼斧神工地實作了Java多線程的并發操作。整個AQS同步元件、Atomic原子類操作等等都是以CAS實作的,甚至ConcurrentHashMap在1.8的版本中也調整為了CAS+Synchronized。可以說CAS是整個JUC的基石。

Java 并發:分析分析 CAS

CAS分析

在CAS中有三個參數:記憶體值V、舊的預期值A、要更新的值B,當且僅當記憶體值V的值等于舊的預期值A時才會将記憶體值V的值修改為B,否則什麼都不幹。其僞代碼如下:

if(this.value == A){


    this.value = B


    return true;


}else{


    return false;


}      

JUC下的atomic類都是通過CAS來實作的,下面就以AtomicInteger為例來闡述CAS的實作。如下:

private static final Unsafe unsafe = Unsafe.getUnsafe();

    private static final long valueOffset;



    static {

        try {

            valueOffset = unsafe.objectFieldOffset

                (AtomicInteger.class.getDeclaredField("value"));

        } catch (Exception ex) { throw new Error(ex); }

    }



    private volatile int value;      

Unsafe是CAS的核心類,Java無法直接通路底層作業系統,而是通過本地(native)方法來通路。不過盡管如此,JVM還是開了一個後門:Unsafe,它提供了硬體級别的原子操作。

valueOffset為變量值在記憶體中的偏移位址,unsafe就是通過偏移位址來得到資料的原值的。

value目前值,使用volatile修飾,保證多線程環境下看見的是同一個。

我們就以AtomicInteger的addAndGet()方法來做說明,先看源代碼:

public final int addAndGet(int delta) {

        return unsafe.getAndAddInt(this, valueOffset, delta) + delta;

    }



    public final int getAndAddInt(Object var1, long var2, int var4) {

        int var5;

        do {

            var5 = this.getIntVolatile(var1, var2);

        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));



        return var5;

    }      

内部調用unsafe的getAndAddInt方法,在getAndAddInt方法中主要是看compareAndSwapInt方法:

public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);      

該方法為本地方法,有四個參數,分别代表:對象、對象的位址、預期值、修改值(有位夥伴告訴我他面試的時候就問到這四個變量是啥意思...)。該方法的實作這裡就不做詳細介紹了,有興趣的夥伴可以看看openjdk的源碼。

CAS可以保證一次的讀-改-寫操作是原子操作,在單處理器上該操作容易實作,但是在多處理器上實作就有點兒複雜了。

CPU提供了兩種方法來實作多處理器的原子操作:總線加鎖或者緩存加鎖。

  • 總線加鎖:總線加鎖就是就是使用處理器提供的一個LOCK#信号,當一個處理器在總線上輸出此信号時,其他處理器的請求将被阻塞住,那麼該處理器可以獨占使用共享記憶體。但是這種處理方式顯得有點兒霸道,不厚道,他把CPU和記憶體之間的通信鎖住了,在鎖定期間,其他處理器都不能其他記憶體位址的資料,其開銷有點兒大。是以就有了緩存加鎖。
  • 緩存加鎖:其實針對于上面那種情況我們隻需要保證在同一時刻對某個記憶體位址的操作是原子性的即可。緩存加鎖就是緩存在記憶體區域的資料如果在加鎖期間,當它執行鎖操作寫回記憶體時,處理器不在輸出LOCK#信号,而是修改内部的記憶體位址,利用緩存一緻性協定來保證原子性。緩存一緻性機制可以保證同一個記憶體區域的資料僅能被一個處理器修改,也就是說當CPU1修改緩存行中的i時使用緩存鎖定,那麼CPU2就不能同時緩存了i的緩存行。

CAS缺陷

CAS雖然高效地解決了原子操作,但是還是存在一些缺陷的,主要表現在三個方法:循環時間太長、隻能保證一個共享變量原子操作、ABA問題。

循環時間太長

如果CAS一直不成功呢?這種情況絕對有可能發生,如果自旋CAS長時間地不成功,則會給CPU帶來非常大的開銷。在JUC中有些地方就限制了CAS自旋的次數,例如BlockingQueue的SynchronousQueue。

隻能保證一個共享變量原子操作

看了CAS的實作就知道這隻能針對一個共享變量,如果是多個共享變量就隻能使用鎖了,當然如果你有辦法把多個變量整成一個變量,利用CAS也不錯。例如讀寫鎖中state的高地位

ABA問題

CAS需要檢查操作值有沒有發生改變,如果沒有發生改變則更新。但是存在這樣一種情況:如果一個值原來是A,變成了B,然後又變成了A,那麼在CAS檢查的時候會發現沒有改變,但是實質上它已經發生了改變,這就是所謂的ABA問題。對于ABA問題其解決方案是加上版本号,即在每個變量都加上一個版本号,每次改變時加1,即A —> B —> A,變成1A —> 2B —> 3A。

用一個例子來闡述ABA問題所帶來的影響。

有如下連結清單

Java 并發:分析分析 CAS

假如我們想要把B替換為A,也就是compareAndSet(this,A,B)。線程1執行B替換A操作,線程2主要執行如下動作,A 、B出棧,然後C、A入棧,最終該連結清單如下:

Java 并發:分析分析 CAS

完成後線程1發現仍然是A,那麼compareAndSet(this,A,B)成功,但是這時會存在一個問題就是B.next = null,compareAndSet(this,A,B)後,會導緻C丢失,改棧僅有一個B元素,平白無故把C給丢失了。

CAS的ABA隐患問題,解決方案則是版本号,Java提供了AtomicStampedReference來解決。AtomicStampedReference通過包裝[E,Integer]的元組來對對象标記版本戳stamp,進而避免ABA問題。對于上面的案例應該線程1會失敗。

AtomicStampedReference的compareAndSet()方法定義如下:

public boolean compareAndSet(V   expectedReference,


                                 V   newReference,


                                 int expectedStamp,


                                 int newStamp) {


        Pair<V> current = pair;


        return


            expectedReference == current.reference &&


            expectedStamp == current.stamp &&


            ((newReference == current.reference &&


              newStamp == current.stamp) ||


             casPair(current, Pair.of(newReference, newStamp)));


    }      

compareAndSet有四個參數,分别表示:預期引用、更新後的引用、預期标志、更新後的标志。源碼部門很好了解預期的引用 == 目前引用,預期的辨別 == 目前辨別,如果更新後的引用和标志和目前的引用和标志相等則直接傳回true,否則通過Pair生成一個新的pair對象與目前pair CAS替換。Pair為AtomicStampedReference的内部類,主要用于記錄引用和版本戳資訊(辨別),定義如下:

private static class Pair<T> {

        final T reference;

        final int stamp;

        private Pair(T reference, int stamp) {

            this.reference = reference;

            this.stamp = stamp;

        }

        static <T> Pair<T> of(T reference, int stamp) {

            return new Pair<T>(reference, stamp);

        }

    }



    private volatile Pair<V> pair;      

Pair記錄着對象的引用和版本戳,版本戳為int型,保持自增。同時Pair是一個不可變對象,其所有屬性全部定義為final,對外提供一個of方法,該方法傳回一個建立的Pari對象。pair對象定義為volatile,保證多線程環境下的可見性。在AtomicStampedReference中,大多方法都是通過調用Pair的of方法來産生一個新的Pair對象,然後指派給變量pair。如set方法:

public void set(V newReference, int newStamp) {


        Pair<V> current = pair;


        if (newReference != current.reference || newStamp != current.stamp)


            this.pair = Pair.of(newReference, newStamp);


    }      

下面我們将通過一個例子可以可以看到AtomicStampedReference和AtomicInteger的差別。我們定義兩個線程,線程1負責将100 —> 110 —> 100,線程2執行 100 —>120,看兩者之間的差別。

public class Test {

    private static AtomicInteger atomicInteger = new AtomicInteger(100);

    private static AtomicStampedReference atomicStampedReference = new AtomicStampedReference(100,1);



    public static void main(String[] args) throws InterruptedException {



        //AtomicInteger

        Thread at1 = new Thread(new Runnable() {

            @Override

            public void run() {

                atomicInteger.compareAndSet(100,110);

                atomicInteger.compareAndSet(110,100);

            }

        });



        Thread at2 = new Thread(new Runnable() {

            @Override

            public void run() {

                try {

                    TimeUnit.SECONDS.sleep(2);      // at1,執行完

                } catch (InterruptedException e) {

                    e.printStackTrace();

                }

                System.out.println("AtomicInteger:" + atomicInteger.compareAndSet(100,120));

            }

        });



        at1.start();

        at2.start();



        at1.join();

        at2.join();



        //AtomicStampedReference



        Thread tsf1 = new Thread(new Runnable() {

            @Override

            public void run() {

                try {

                    //讓 tsf2先擷取stamp,導緻預期時間戳不一緻

                    TimeUnit.SECONDS.sleep(2);

                } catch (InterruptedException e) {

                    e.printStackTrace();

                }

                // 預期引用:100,更新後的引用:110,預期辨別getStamp() 更新後的辨別getStamp() + 1

                atomicStampedReference.compareAndSet(100,110,atomicStampedReference.getStamp(),atomicStampedReference.getStamp() + 1);

                atomicStampedReference.compareAndSet(110,100,atomicStampedReference.getStamp(),atomicStampedReference.getStamp() + 1);

            }

        });



        Thread tsf2 = new Thread(new Runnable() {

            @Override

            public void run() {

                int stamp = atomicStampedReference.getStamp();



                try {

                    TimeUnit.SECONDS.sleep(2);      //線程tsf1執行完

                } catch (InterruptedException e) {

                    e.printStackTrace();

                }

                System.out.println("AtomicStampedReference:" +atomicStampedReference.compareAndSet(100,120,stamp,stamp + 1));

            }

        });



        tsf1.start();

        tsf2.start();

    }



}      

運作結果:

Java 并發:分析分析 CAS

運作結果充分展示了AtomicInteger的ABA問題和AtomicStampedReference解決ABA問題。

參考資料

  1. Doug Lea:《Java并發程式設計實戰》
  2. 方騰飛:《Java并發程式設計的藝術》

以下專題教程也許您會有興趣

  • 《Spring Boot 2.x 系列教程》https://www.bysocket.com/springboot
  • 《Java 核心系列教程》https://www.bysocket.com/archives/2100
由于能力有限,若有錯誤或者不當之處,還請大家批評指正,一起學習交流!

-The End-

​​​​