天天看點

【java多線程】3、Unsafe類和CASUnsafe類CAS

Unsafe類

問題引入:常見的多個線程對一個成員變量(不是局部變量)i進行i++操作。

public Person{

    private int i =0;

    public static void main(String[] args){

        final Person person = new Person();

        // 線程1
        new Thread(new Runnable(){
            @Override
            public void run(){
                while(true){
                    person.i++;
                    System.out.println(person.i);
                }
            }
        }).start();

        // 線程2
        new Thread(new Runnable(){
            @Override
            public void run(){
                while(true){
                    person.i++;
                    System.out.println(person.i);
                }

            }
        }).start();

    }
           

Unsafe類的報錯

如果你模仿jdk源碼使用Unsafe類,

public class Person{
    private static final sun.misc.Unsafe UNSAFE;
    static{
        UNSAFE =sun.misc.Unsafe.getUnsafe();//這裡會進行一些檢查
    }
}
           

會報錯。因為他内部會檢查使用unsafe類的類。比如我們是Person類調用的unsafe類,而Person的類加載器不是跟加載器,而是系統加載器,此時就會報錯。因為getUnsafe()要求原來調用Unsafe類的類的加載器是根加載器。

public final class Unsafe {
    //UNSAFE類成員 //靜态成員
    private static final Unsafe theUnsafe;

    public static Unsafe getUnsafe() {
        Class var0 = Reflection.getCallerClass();//得到調用者的類,即Person類
        if (var0.getClassLoader() != null) {//如果加載器不是根加載器
            throw new SecurityException("Unsafe");//報錯
        } else {//如果是根加載器,傳回unsafe對象
            return theUnsafe;
        }
    }
           

那麼怎麼在我們代碼裡使用unsafe呢?如下

擷取unsafe

我們可以如下使用反射讓他強行在某些地方設定為可通路。

import sun.misc.Unsafe;
import java.lang.reflect.Field;

public Person{

    private int i =0;
    private static Unsafe UNSAFE;
    private static long I_OFFSET ;

    static {
        try {
            Field field = Unsafe.class.getDeclaredField("theUnsafe");
            field.setAccessible(true);//設定為可通路
            UNSAFE = field.get(null);//擷取該屬性值

            // 擷取person類i屬性的偏移量 // 列印的時候有可能被其他線程修改
            I_OFFSET = UNSAFE.objectFieldOffset(Person.class.getDeclaredField("i"));
        } catch (NoSuchFieldException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args){

        final Person person = new Person();

        Boolean b = UNSAFE.compareAndSwapInt(person,//對象
                                             I_OFFSET,//偏移量
                                             person.i,//原始值
                                             person.i+1);//改後的值
           

使用unsafe改變屬性

import sun.misc.Unsafe;
import java.lang.reflect.Field;

public Person{

    private int i =0;
    private static Unsafe UNSAFE;
    private static long I_OFFSET ;

    static {
        try {
            Field field = Unsafe.class.getDeclaredField("theUnsafe");
            field.setAccessible(true);//設定為可通路
            UNSAFE = field.get(null);//擷取該屬性值

            // 擷取person類i屬性的偏移量 // 列印的時候有可能被其他線程修改
            I_OFFSET = UNSAFE.objectFieldOffset(Person.class.getDeclaredField("i"));
        } catch (NoSuchFieldException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args){

        final Person person = new Person();

        // 線程1
        new Thread(new Runnable(){
            @Override
            public void run(){
                while(true){
                    // 我們可以類似于如下的操作進行person.i++操作
                    //person.i++;
                    
                    // 如果通過cas修改成功,則傳回true,比如修改前的版本值不是我們希望的舊值,就沒修改成功,傳回的是false
                    Boolean b = UNSAFE.compareAndSwapInt(person,//對象
                                             I_OFFSET,//偏移量
                                             person.i,//原始值
                                             person.i+1);//改後的值
                    if (b){
                        // 下面這樣讀的是線程裡緩存的值
                    //System.out.println(person.i);
                    // 使用下面的方法讀記憶體中的值
                    System.out.println(UNSAFE.getIntVolatile(person,I_OFFSET));
                    }
                    
                }

            }
        }).start();

        // 線程2
        new Thread(new Runnable(){
            @Override
            public void run(){
                while(true){
                    // 我們可以類似于如下的操作進行person.i++操作
                    //person.i++;
                    UNSAFE.compareAndSwapInt(person,//對象
                                             I_OFFSET,//偏移量
                                             person.i,//原始值
                                             person.i+1);//改後的值
                    // 下面這樣讀的是線程裡緩存的值
                    //System.out.println(person.i);
                    // 使用下面的方法讀記憶體中的值
                    System.out.println(UNSAFE.getIntVolatile(person,I_OFFSET));
                }

            }
        }).start();


    }
           

使用unsafe操作數組

private transient volatile Person[] person;

int base = unsafe.arrayBaseOffset(Peron[].class);//擷取base

long scale = unsafe.arrayIndexScale(String[].class);//擷取每個索引的長度

String[] strings = new String[]{"1", "2", "3"};
//擷取元素值
unsafe.getObject(strings, base + scale * 0);//擷取對應索引的值//0代表[0]
//設定元素值
unsafe.putObject(strings, base + scale * 0, "222");//設定對應位置的值//0代表[0]
           
/**
 * 操作數組:
 * 可以擷取數組的在内容中的基本偏移量(arrayBaseOffset),擷取數組内元素的間隔(比例),
 * 根據數組對象和偏移量擷取元素值(getObject),設定數組元素值(putObject),示例如下。
 */
String[] strings = new String[]{"1", "2", "3"};
long i = unsafe.arrayBaseOffset(String[].class);
System.out.println("string[] base offset is :" + i);

//every index scale
long scale = unsafe.arrayIndexScale(String[].class);
System.out.println("string[] index scale is " + scale);

//print first string in strings[]
System.out.println("first element is :" + unsafe.getObject(strings, i));

//set 100 to first string
unsafe.putObject(strings, i + scale * 0, "100");

//print first string in strings[] again
System.out.println("after set ,first element is :" + unsafe.getObject(strings, i + scale * 0));

//設定[1]為222
unsafe.putObject(strings, i + scale * 1, "222");

//print first string in strings[] again
System.out.println("after set ,first element is :" + unsafe.getObject(strings, i + scale * 1));
System.out.println(strings[1]);
           
package org.fantasy.unsafe;

import java.lang.reflect.Field;
import java.util.Arrays;

import sun.misc.Unsafe;

public class ArrayUnsafe<T> implements Array<T> {

    private static final int DEFAULT_INITIAL_CAPACITY = 10;
    private transient volatile Object[] elements;
    private volatile int size = 0;

    private static final Unsafe UNSAFE;
    private static final long ARRAY_OFFSET;
    private static final int ARRAY_SHIFT;
    private static final int INDEX_SCALE;
    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
    private static final long SIZE_OFFSET;

    static {
        try {
            // 擷取unsafe
            Field unsafeField = Unsafe.class.getDeclaredField("theUnsafe");
            unsafeField.setAccessible(true);
            UNSAFE = (Unsafe)unsafeField.get(null);

            // base
            ARRAY_OFFSET = UNSAFE.arrayBaseOffset(Object[].class);
            // 
            INDEX_SCALE = UNSAFE.arrayIndexScale(Object[].class);
            ARRAY_SHIFT = 31 - Integer.numberOfLeadingZeros(INDEX_SCALE);
            // 屬性size的偏移
            SIZE_OFFSET = UNSAFE.objectFieldOffset(ArrayUnsafe.class.getDeclaredField("size"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }

    public ArrayUnsafe() {
        this(DEFAULT_INITIAL_CAPACITY);
    }

    public ArrayUnsafe(int initialCapacity) {
        this.elements = new Object[initialCapacity];
    }

    private long getIndexScale(long index) {
        return (index << ARRAY_SHIFT) + ARRAY_OFFSET;
    }

    public T getObject(int index) {
        return (T)UNSAFE.getObjectVolatile(elements, getIndexScale(index));
    }

    public boolean setObject(int index, T element) {
        rangeCheck(index);
        T oldElement = getObject(index);
        UNSAFE.putOrderedObject(elements, getIndexScale(index), element);
        if(oldElement != null && element == null) {
            int numMoved = size - index - 1;
            if(numMoved > 0) {
                System.arraycopy(elements, index + 1, elements, index, numMoved);
            }
            UNSAFE.compareAndSwapInt(this, SIZE_OFFSET, size, size - 1);
            UNSAFE.putOrderedObject(elements, getIndexScale(size), null);
        }
        return true;
    }



    private void ensureCapacityInternal(int minCapacity) {
        if(minCapacity - elements.length > 0)
            grow(minCapacity);
    }

    private void grow(int minCapacity) {
        int oldCapacity = elements.length;
        int newCapacity = oldCapacity + (oldCapacity >> 1);
        if(newCapacity - minCapacity < 0)
            newCapacity = minCapacity;
        if(newCapacity - MAX_ARRAY_SIZE > 0)
            newCapacity = hugeCapacity(minCapacity);
        elements = Arrays.copyOf(elements, newCapacity);
    }

    private static int hugeCapacity(int minCapacity) {
        if(minCapacity < 0)
            throw new OutOfMemoryError();
        return minCapacity > MAX_ARRAY_SIZE ? Integer.MAX_VALUE : MAX_ARRAY_SIZE;
    }

    public boolean addObject(T element) {
        ensureCapacityInternal(size + 1);
        UNSAFE.putOrderedObject(elements, getIndexScale(size), element);
        UNSAFE.compareAndSwapInt(this, SIZE_OFFSET, size, size + 1);
        return true;
    }

    private void rangeCheck(int index) {
        if(index >= size || index < 0) {
            throw new IndexOutOfBoundsException("Index: " + index + ", Size: " + size);
        }
    }

    public T remove(T element) {
        for(int i = 0; i < elements.length; i++) {
            int index = i;
            T e = getObject(index);
            if(e != null) {
                if(e.equals(element)) {
                    setObject(index, null);
                    return e;
                }
            }
        }
        return null;
    }

    public boolean remove(int index) {
        rangeCheck(index);
        T removedElement = getObject(index);
        if(removedElement != null)
            return setObject(index, null);
        return false;
    }

    public static void main(String[] args) {
        Array<String> strArray = new ArrayUnsafe<String>();
        for(int i = 0; i < 10; i++) {
            strArray.addObject(String.valueOf(i));
        }
        strArray.addObject("hello");
        strArray.addObject("world");
        String s0 = strArray.getObject(0);// get "0"
        String s2 = strArray.getObject(2);// get "2"
        strArray.setObject(1, null);
        String s6 = strArray.remove("6"); // remove "6"
        String sNull = strArray.remove("foo");// null
        boolean result = strArray.remove(9);
    }
}
           

unsafe擷取數組的另外一種思路

回顧一下數組怎麼取值。比如Person類有一個屬性strs

public class Person{
    String[] strs = new String[]{"1", "2", "3"};

    //擷取base
    int base = unsafe.arrayBaseOffset(Person[].class);

    //擷取每個索引的長度
    long scale = unsafe.arrayIndexScale(String[].class);


    //擷取元素值
    unsafe.getObject(strs, base + scale * 0);//擷取對應索引的值//0代表[0]
           
Class sc = Segment[].class;//concurrentHashMap.segments[]并發數組

SBASE = UNSAFE.arrayBaseOffset(sc);

ss = UNSAFE.arrayIndexScale(sc);

SSHIFT = 31 - Integer.numberOfLeadingZeros(ss);

UNSAFE.getObject (segments, (索引j << SSHIFT) + SBASE)
           
//而 SSHIFT=31-Integer.numberOfLeadingZeros(ss);
//numberOfLeadingZeros二進制時ss前面有多少個0,SSHIFT表示ss表示的二進制的1後面有多少個0
//我們不妨再轉換回來,用i表示1後面有多少個
//驗證:j<<31-Integer.numberOfLeadingZeros(ss) == ss×j
//ss為2的次幂,假設為2的i次幂,那麼==左式就為j<<i,而==右式為2^i ×j,結合計算機組成原理知識,顯然相等

就是驗證:
    SSHIFT = 31 - Integer.numberOfLeadingZeros(ss);
    索引j << SSHIFT
        
    是否等于
        
        j*ss
        
        結合移位的知識可得,
        SSHIFT = 31 - Integer.numberOfLeadingZeros(ss);
實際上就是這個長度的二進制位數中0的個數
    
    而j<<長度中0的個數 與 j*ss等價
           

使用unsafe操作屬性

unsafe.compareAndSwapInt(person,I_OFFSET,原值,新值);//設定對象的對應屬性
    
unsafe.compareAndSwapInt(person,I_OFFSET,person.i,person.i+1);//設定對象的對應屬性
    
unsafe.getIntVolatile(person,I_OFFSET);//擷取對應屬性值
           
/**
 * 對象操作
 * 執行個體化Data
 *
 * 可以通過類的class對象建立類對象(allocateInstance),擷取對象屬性的偏移量(objectFieldOffset)
 * ,通過偏移量設定對象的值(putObject)
 *
 * 對象的反序列化
 * 當使用架構反序列化或者建構對象時,會假設從已存在的對象中重建,你期望使用反射來調用類的設定函數,
 * 或者更準确一點是能直接設定内部字段甚至是final字段的函數。問題是你想建立一個對象的執行個體,
 * 但你實際上又不需要構造函數,因為它可能會使問題更加困難而且會有副作用。
 *
 */
//調用allocateInstance函數避免了在我們不需要構造函數的時候卻調用它
Data data = (Data) unsafe.allocateInstance(Data.class);
data.setId(1L);
data.setName("unsafe");
System.out.println(data);
 
//傳回成員屬性在記憶體中的位址相對于對象記憶體位址的偏移量
Field nameField = Data.class.getDeclaredField("name");
long fieldOffset = unsafe.objectFieldOffset(nameField);
//putLong,putInt,putDouble,putChar,putObject等方法,直接修改記憶體資料(可以越過通路權限)
unsafe.putObject(data,fieldOffset,"這是新的值");
System.out.println(data.getName());
 
 
/**
 * 我們可以在運作時建立一個類,比如從已編譯的.class檔案中。将類内容讀取為位元組數組,
 * 并正确地傳遞給defineClass方法;當你必須動态建立類,而現有代碼中有一些代理, 這是很有用的
 */
File file = new File("C:\\workspace\\idea2\\disruptor\\target\\classes\\com\\onyx\\distruptor\\test\\Data.class");
FileInputStream input = new FileInputStream(file);
byte[] content = new byte[(int)file.length()];
input.read(content);
Class c = unsafe.defineClass(null, content, 0, content.length,null,null);
c.getMethod("getId").invoke(c.newInstance(), null);
 
 
 
/**
 * 記憶體操作
 * 可以在java記憶體區域中配置設定記憶體(allocateMemory),設定記憶體(setMemory,用于初始化),
 * 在指定的記憶體位置中設定值(putInt\putBoolean\putDouble等基本類型)
 */
//配置設定一個8byte的記憶體
long address = unsafe.allocateMemory(8L);
//初始化記憶體填充1
unsafe.setMemory(address, 8L, (byte) 1);
//測試輸出
System.out.println("add byte to memory:" + unsafe.getInt(address));
//設定0-3 4個byte為0x7fffffff
unsafe.putInt(address, 0x7fffffff);
//設定4-7 4個byte為0x80000000
unsafe.putInt(address + 4, 0x80000000);
//int占用4byte
System.out.println("add byte to memory:" + unsafe.getInt(address));
System.out.println("add byte to memory:" + unsafe.getInt(address + 4));
           
/**
 * CAS操作
 * Compare And Swap(比較并交換),當需要改變的值為期望的值時,那麼就替換它為新的值,是原子
 * (不可在分割)的操作。很多并發架構底層都用到了CAS操作,CAS操作優勢是無鎖,可以減少線程切換耗費
 * 的時間,但CAS經常失敗運作容易引起性能問題,也存在ABA問題。在Unsafe中包含compareAndSwapObject、
 * compareAndSwapInt、compareAndSwapLong三個方法,compareAndSwapInt的簡單示例如下。
 */
Data data = new Data();
data.setId(1L);
Field id = data.getClass().getDeclaredField("id");
long l = unsafe.objectFieldOffset(id);
id.setAccessible(true);
//比較并交換,比如id的值如果是所期望的值1,那麼就替換為2,否則不做處理
unsafe.compareAndSwapLong(data,1L,1L,2L);
System.out.println(data.getId());
           
/**
 * 常量擷取
 *
 * 可以擷取位址大小(addressSize),頁大小(pageSize),基本類型數組的偏移量
 * (Unsafe.ARRAY_INT_BASE_OFFSET\Unsafe.ARRAY_BOOLEAN_BASE_OFFSET等)、
 * 基本類型數組内元素的間隔(Unsafe.ARRAY_INT_INDEX_SCALE\Unsafe.ARRAY_BOOLEAN_INDEX_SCALE等)
 */
//get os address size
System.out.println("address size is :" + unsafe.addressSize());
//get os page size
System.out.println("page size is :" + unsafe.pageSize());
//int array base offset
System.out.println("unsafe array int base offset:" + Unsafe.ARRAY_INT_BASE_OFFSET);



/**
 * 線程許可
 * 許可線程通過(park),或者讓線程等待許可(unpark),
 */
Thread packThread = new Thread(() -> {
    long startTime = System.currentTimeMillis();
    //納秒,相對時間park
    unsafe.park(false,3000000000L);
    //毫秒,絕對時間park
    //unsafe.park(true,System.currentTimeMillis()+3000);

    System.out.println("main thread end,cost :"+(System.currentTimeMillis()-startTime)+"ms");
});
packThread.start();
TimeUnit.SECONDS.sleep(1);
//注釋掉下一行後,線程3秒數後進行輸出,否則在1秒後輸出
unsafe.unpark(packThread);
           
/* java數組大小的最大值為Integer.MAX_VALUE。使用直接記憶體配置設定,我們建立的數組大小受限于堆大小;
 * 實際上,這是堆外記憶體(off-heap memory)技術,在java.nio包中部分可用;
 *
 * 這種方式的記憶體配置設定不在堆上,且不受GC管理,是以必須小心Unsafe.freeMemory()的使用。
 * 它也不執行任何邊界檢查,是以任何非法通路可能會導緻JVM崩潰
 */
public class SuperArray {

    private static Unsafe unsafe = null;
    private static Field getUnsafe = null;

    static {
        try {
            getUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
            getUnsafe.setAccessible(true);
            unsafe = (Unsafe) getUnsafe.get(null);
        } catch (NoSuchFieldException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        }
    }


    private final static int BYTE = 1;

    private long size;
    private long address;

    public SuperArray(long size) {
        this.size = size;
        address = unsafe.allocateMemory(size * BYTE);
    }

    public void set(long i, byte value) {
        unsafe.putByte(address + i * BYTE, value);
    }

    public int get(long idx) {
        return unsafe.getByte(address + idx * BYTE);
    }

    public long size() {
        return size;
    }


    public static void main(String[] args) {
        long SUPER_SIZE = (long)Integer.MAX_VALUE * 2;
        SuperArray array = new SuperArray(SUPER_SIZE);
        System.out.println("Array size:" + array.size()); // 4294967294
        int sum=0;
        for (int i = 0; i < 100; i++) {
            array.set((long)Integer.MAX_VALUE + i, (byte)3);
            sum += array.get((long)Integer.MAX_VALUE + i);
        }
        System.out.println(sum);
    }


}
           

CAS

Cas保證了任意多個線程隻有1個能先改成功。

作業系統級别CAS實作

摘自CAS操作在ARM和x86下的不同實作。

cmpxchg是X86比較交換指令,這個指令在各大底層系統實作的原子操作和各種同步原語中都有廣泛的使用,比如linux核心,JVM,GCC編譯器等,cmpxchg就是比較交換指令。

intel P6以及最新系列處理器保證了以下操作是原子的:1.讀寫一個位元組。2.讀寫16位對齊的字。3.讀寫32位對齊的雙字。4.讀寫64位對齊的四字。5.讀寫16位,32位,64位在cache line内的未對齊的字。是以普通的load store指令都是原子的。cache一緻性協定保證了不可能有兩個cpu同時寫一個記憶體。對于cmpxchg這種比較交換指令肯定不是原子的,intel是CISC複雜指令集架構,在内部流水線執行的時候,肯定會将cmpxchg指令翻譯成幾條微碼執行(對比ARM精簡指令集)。是以英特爾對于一些指令提供了LOCK字首來保證這個指令的原子性。Intel 64和IA-32處理器提供LOCK#信号,該信号在某些關鍵存儲器操作期間自動置位,以鎖定系統總線或等效鍊路。當該輸出信号被斷言時,來自其他處理器或總線代理的用于控制總線的請求被阻止。

為了更清楚了解cmxchg,需要同時看ARM和x86兩種架構下的實作一個RISC,一個CISC,linux核心提供了兩種架構下的實作。linux核心的原子變量定義如下:

//原子變量
typedef struct {
    volatile int counter; //volatile禁止編譯器把變量緩沖到寄存器
} atomic_t;
           

先看ARM架構下,ARM架構是精簡指令集,沒有提供cmpxchg這種複雜指令,和其它所有RISC架構一樣提供了LL/SC(連結加載,條件存儲)操作,這個操作是很多原子操作的基礎。ARMv8指令是LDXR\STXR,屬于獨占通路,需要有local monitor和global monitor配合使用。這兩條指令一般需要成對出現。ldrex是從記憶體取出資料放到寄存器,然後螢幕将此位址标記為獨占,strex會先測試是否是目前cpu的獨占,如果是則存儲成功傳回0,如果不是則存儲失敗傳回1。例如cpu0将位址m标記為獨占,在strex執行前,線程被調出了,cpu1調用ldrex會清除cpu0的獨占,而将自己标記為獨占,然後執行strxr,然後cpu0的線程重新被排程,此時執行strex會失敗,因為自己的獨占位被清除了。這樣也會導緻後進入ldrex的線程可能比先進入的先執行。标記為獨占的位址調用strex後都會清除獨占标志。

/**
 *  比較ptr->counter和old的值如果相等,則ptr->counter = new,并且傳回old,否則ptr->counter不變
 * 傳回ptr->counter
 */
static inline int atomic_cmpxchg(atomic_t *ptr, int old, int new)
{
    unsigned long oldval, res;
 
    smp_mb(); //記憶體屏障,保證cmpxchg不會在屏障前執行
 
    do {
        __asm__ __volatile__("@ atomic_cmpxchg\n"
        "ldrex  %1, [%2]\n" //獨占通路,螢幕會将此位址标志獨占并且将ptr->counter給oldvalue
        "mov    %0, #0\n"   //res = 0
        "teq    %1, %3\n"   //測試oldvalue是否和old相等也就是ptr->counter和old
 
        //獨占通路成功并且如果相等則把new指派給ptr->counter,否則不執行這條指令
        "strexeq %0, %4, [%2]\n" 
            : "=&r" (res), "=&r" (oldval)
            : "r" (&ptr->counter), "Ir" (old), "r" (new)
            : "cc");
    } while (res);  //while res是因為strexeq指令是獨占訪存指令從,此時可能未标記訪存,而res為1
 
    smp_mb();//記憶體屏障,保證cmpxchg不會在屏障後執行
 
    return oldval;
}
           

X86架構類似:

/*
 *  根據size大小比較交換位元組,字或者雙字,如果傳回old則交換成功,否則交換失敗
 */
static inline unsigned long __cmpxchg(volatile void *ptr, unsigned long old,
                      unsigned long new, int size)
{
    unsigned long prev;
    switch (size) {
    case 1:
        __asm__ __volatile__(LOCK_PREFIX "cmpxchgb %b1,%2"
                     : "=a"(prev)
                     : "q"(new), "m"(*__xg(ptr)), "0"(old)
                     : "memory");
        return prev;
    case 2:
        __asm__ __volatile__(LOCK_PREFIX "cmpxchgw %w1,%2"
                     : "=a"(prev)
                     : "r"(new), "m"(*__xg(ptr)), "0"(old)
                     : "memory");
        return prev;
//eax = old,比較%2 = ptr->counter和eax是否相等,如果相等則ZF置位,并把%1 = new指派給ptr->counter,傳回old值,否則ZF清除,并且将ptr->counter指派給eax
    case 4:
        __asm__ __volatile__(LOCK_PREFIX "cmpxchgl %1,%2"
                     : "=a"(prev)
                     : "r"(new), "m"(*__xg(ptr)), "0"(old)  //0表示eax = old
                     : "memory");
        return prev;
    }
    return old;
}
           

代碼解釋:

以最為常用的4位元組交換為例,主要的操作就是彙編指令

cmpxchgl %1,%2

,注意一下其中的%2,也就是後面的

"m"(*__xg(ptr))

__xg

是在這個檔案中定義的宏:

struct __xchg_dummy { unsigned long a[100]; };

#define __xg(x) ((struct __xchg_dummy *)(x))
           

那麼%2經過預處理,展開就是

"m"(*((struct __xchg_dummy *)(ptr)))

,這種做法,就可以達到在cmpxchg中的%2是一個位址,就是ptr指向的位址。如果%2是

"m"(ptr)

,那麼指針本身的值就出現在cmpxchg指令中。

簡單點就是:

cmpxchg %ecx, %ebx;如果EAX與EBX相等,則ECX送EBX且ZF置1;否則EBX送ECX,且ZF清0
           

在cmpxchg指令前加了lock字首,保證在進行操作的時候,不會讓其它cpu操作同一個記憶體。使得整個操作保持原子性。對比來看雖然X86隻用了一條指令,但是處理器内部肯定将這條指令轉成了類RISC的微碼。

性能

基于

AtomicLong

可能性能有點差,是以出現了

LongAdder

。接下來通過基準測試,來看看兩者的性能差别。

基準測試

記憶體:4G,CPU:Intel® Core™ i5-4250U CPU @ 1.30GHz,硬碟: 128G SSD

Mode設為

Throughput

,測試吞吐量。

Mode設為

AverageTime

,測試平均耗時。

@OutputTimeUnit(TimeUnit.MICROSECONDS)
@BenchmarkMode(Mode.Throughput)
public class Main {
    private static AtomicLong count = new AtomicLong();

    private static LongAdder longAdder = new LongAdder();

    public static void main(String[] args) throws RunnerException {
        Options options = (Options) new OptionsBuilder().include(Main.class.getName());
        new Runner(options).run();
    }

    @Benchmark
    @Threads(10)
    public void run0() {
        count.getAndIncrement();
    }

    @Benchmark
    @Threads(10)
    public void run1() {
        longAdder.increment();
    }
}
           

吞吐量:

線程為1,結果:

Benchmark   Mode  Cnt    Score   Error   Units
Main.run0  thrpt   25  125.268 ± 2.471  ops/us
Main.run1  thrpt   25   74.587 ± 1.484  ops/us
           

線程為10,結果:

Benchmark   Mode  Cnt    Score   Error   Units
Main.run0  thrpt   25   24.897 ± 2.888  ops/us
Main.run1  thrpt   25  192.474 ± 2.824  ops/us
           

線程為30,結果:

Benchmark   Mode  Cnt    Score   Error   Units
Main.run0  thrpt   25   21.096 ± 1.897  ops/us
Main.run1  thrpt   25  189.732 ± 4.089  ops/us
           

平均耗時:

線程為1,結果:

Main.run0  avgt   25  0.008 ±  0.001  us/op
Main.run1  avgt   25  0.013 ±  0.001  us/op
           

線程為10,結果:

Benchmark  Mode  Cnt  Score   Error  Units
Main.run0  avgt   25  0.485 ± 0.048  us/op
Main.run1  avgt   25  0.050 ± 0.001  us/op
           

線程為30,結果:

Benchmark  Mode  Cnt  Score   Error  Units
Main.run0  avgt   25  1.364 ± 0.033  us/op
Main.run1  avgt   25  0.158 ± 0.003  us/op
           

可以看到除了線程為1的情況下,其他情況下,LogAdder明顯比AtomicLong要好的多。

ABA問題

設想如下場景:

  1. 線程1準備用CAS将變量的值由A替換為B。
  2. 在此之前,線程2将變量的值由A替換為C,又由C替換為A,
  3. 然後線程1執行CAS時發現變量的值仍然為A,是以CAS成功。

但實際上這時的現場已經和最初不同了,盡管CAS成功,但可能存在潛藏的問題。

比如:

  • 有一個用單向連結清單實作的棧,棧頂為A。
  • 線程T1擷取A.next為B,然後希望用CAS将棧頂替換為B,head.compareAndSet(A,B)。
  • 在T1執行上面這條指令之前,線程T2介入,将A、B出棧,再pushD、C、A。此時B.next為null。
  • 此時輪到線程T1執行CAS操作,檢測發現棧頂仍為A,是以CAS成功,棧頂變為B。
  • 但實際上B.next為null,其中堆棧中隻有B一個元素,C和D組成的連結清單不再存在于堆棧中,這樣就造成C、D被抛棄的現象。

解決方法

各種樂觀鎖的實作中通常都會用版本戳version來對記錄或對象标記,避免并發操作帶來的問題,在Java中,AtomicStampedReference也實作了這個作用,它通過包裝[E,Integer]的元組來對對象标記版本戳stamp,進而避免ABA問題。