天天看点

【java多线程】3、Unsafe类和CASUnsafe类CAS

Unsafe类

问题引入:常见的多个线程对一个成员变量(不是局部变量)i进行i++操作。

public Person{

    private int i =0;

    public static void main(String[] args){

        final Person person = new Person();

        // 线程1
        new Thread(new Runnable(){
            @Override
            public void run(){
                while(true){
                    person.i++;
                    System.out.println(person.i);
                }
            }
        }).start();

        // 线程2
        new Thread(new Runnable(){
            @Override
            public void run(){
                while(true){
                    person.i++;
                    System.out.println(person.i);
                }

            }
        }).start();

    }
           

Unsafe类的报错

如果你模仿jdk源码使用Unsafe类,

public class Person{
    private static final sun.misc.Unsafe UNSAFE;
    static{
        UNSAFE =sun.misc.Unsafe.getUnsafe();//这里会进行一些检查
    }
}
           

会报错。因为他内部会检查使用unsafe类的类。比如我们是Person类调用的unsafe类,而Person的类加载器不是跟加载器,而是系统加载器,此时就会报错。因为getUnsafe()要求原来调用Unsafe类的类的加载器是根加载器。

public final class Unsafe {
    //UNSAFE类成员 //静态成员
    private static final Unsafe theUnsafe;

    public static Unsafe getUnsafe() {
        Class var0 = Reflection.getCallerClass();//得到调用者的类,即Person类
        if (var0.getClassLoader() != null) {//如果加载器不是根加载器
            throw new SecurityException("Unsafe");//报错
        } else {//如果是根加载器,返回unsafe对象
            return theUnsafe;
        }
    }
           

那么怎么在我们代码里使用unsafe呢?如下

获取unsafe

我们可以如下使用反射让他强行在某些地方设置为可访问。

import sun.misc.Unsafe;
import java.lang.reflect.Field;

public Person{

    private int i =0;
    private static Unsafe UNSAFE;
    private static long I_OFFSET ;

    static {
        try {
            Field field = Unsafe.class.getDeclaredField("theUnsafe");
            field.setAccessible(true);//设置为可访问
            UNSAFE = field.get(null);//获取该属性值

            // 获取person类i属性的偏移量 // 打印的时候有可能被其他线程修改
            I_OFFSET = UNSAFE.objectFieldOffset(Person.class.getDeclaredField("i"));
        } catch (NoSuchFieldException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args){

        final Person person = new Person();

        Boolean b = UNSAFE.compareAndSwapInt(person,//对象
                                             I_OFFSET,//偏移量
                                             person.i,//原始值
                                             person.i+1);//改后的值
           

使用unsafe改变属性

import sun.misc.Unsafe;
import java.lang.reflect.Field;

public Person{

    private int i =0;
    private static Unsafe UNSAFE;
    private static long I_OFFSET ;

    static {
        try {
            Field field = Unsafe.class.getDeclaredField("theUnsafe");
            field.setAccessible(true);//设置为可访问
            UNSAFE = field.get(null);//获取该属性值

            // 获取person类i属性的偏移量 // 打印的时候有可能被其他线程修改
            I_OFFSET = UNSAFE.objectFieldOffset(Person.class.getDeclaredField("i"));
        } catch (NoSuchFieldException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args){

        final Person person = new Person();

        // 线程1
        new Thread(new Runnable(){
            @Override
            public void run(){
                while(true){
                    // 我们可以类似于如下的操作进行person.i++操作
                    //person.i++;
                    
                    // 如果通过cas修改成功,则返回true,比如修改前的版本值不是我们希望的旧值,就没修改成功,返回的是false
                    Boolean b = UNSAFE.compareAndSwapInt(person,//对象
                                             I_OFFSET,//偏移量
                                             person.i,//原始值
                                             person.i+1);//改后的值
                    if (b){
                        // 下面这样读的是线程里缓存的值
                    //System.out.println(person.i);
                    // 使用下面的方法读内存中的值
                    System.out.println(UNSAFE.getIntVolatile(person,I_OFFSET));
                    }
                    
                }

            }
        }).start();

        // 线程2
        new Thread(new Runnable(){
            @Override
            public void run(){
                while(true){
                    // 我们可以类似于如下的操作进行person.i++操作
                    //person.i++;
                    UNSAFE.compareAndSwapInt(person,//对象
                                             I_OFFSET,//偏移量
                                             person.i,//原始值
                                             person.i+1);//改后的值
                    // 下面这样读的是线程里缓存的值
                    //System.out.println(person.i);
                    // 使用下面的方法读内存中的值
                    System.out.println(UNSAFE.getIntVolatile(person,I_OFFSET));
                }

            }
        }).start();


    }
           

使用unsafe操作数组

private transient volatile Person[] person;

int base = unsafe.arrayBaseOffset(Peron[].class);//获取base

long scale = unsafe.arrayIndexScale(String[].class);//获取每个索引的长度

String[] strings = new String[]{"1", "2", "3"};
//获取元素值
unsafe.getObject(strings, base + scale * 0);//获取对应索引的值//0代表[0]
//设置元素值
unsafe.putObject(strings, base + scale * 0, "222");//设置对应位置的值//0代表[0]
           
/**
 * 操作数组:
 * 可以获取数组的在内容中的基本偏移量(arrayBaseOffset),获取数组内元素的间隔(比例),
 * 根据数组对象和偏移量获取元素值(getObject),设置数组元素值(putObject),示例如下。
 */
String[] strings = new String[]{"1", "2", "3"};
long i = unsafe.arrayBaseOffset(String[].class);
System.out.println("string[] base offset is :" + i);

//every index scale
long scale = unsafe.arrayIndexScale(String[].class);
System.out.println("string[] index scale is " + scale);

//print first string in strings[]
System.out.println("first element is :" + unsafe.getObject(strings, i));

//set 100 to first string
unsafe.putObject(strings, i + scale * 0, "100");

//print first string in strings[] again
System.out.println("after set ,first element is :" + unsafe.getObject(strings, i + scale * 0));

//设置[1]为222
unsafe.putObject(strings, i + scale * 1, "222");

//print first string in strings[] again
System.out.println("after set ,first element is :" + unsafe.getObject(strings, i + scale * 1));
System.out.println(strings[1]);
           
package org.fantasy.unsafe;

import java.lang.reflect.Field;
import java.util.Arrays;

import sun.misc.Unsafe;

public class ArrayUnsafe<T> implements Array<T> {

    private static final int DEFAULT_INITIAL_CAPACITY = 10;
    private transient volatile Object[] elements;
    private volatile int size = 0;

    private static final Unsafe UNSAFE;
    private static final long ARRAY_OFFSET;
    private static final int ARRAY_SHIFT;
    private static final int INDEX_SCALE;
    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
    private static final long SIZE_OFFSET;

    static {
        try {
            // 获取unsafe
            Field unsafeField = Unsafe.class.getDeclaredField("theUnsafe");
            unsafeField.setAccessible(true);
            UNSAFE = (Unsafe)unsafeField.get(null);

            // base
            ARRAY_OFFSET = UNSAFE.arrayBaseOffset(Object[].class);
            // 
            INDEX_SCALE = UNSAFE.arrayIndexScale(Object[].class);
            ARRAY_SHIFT = 31 - Integer.numberOfLeadingZeros(INDEX_SCALE);
            // 属性size的偏移
            SIZE_OFFSET = UNSAFE.objectFieldOffset(ArrayUnsafe.class.getDeclaredField("size"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }

    public ArrayUnsafe() {
        this(DEFAULT_INITIAL_CAPACITY);
    }

    public ArrayUnsafe(int initialCapacity) {
        this.elements = new Object[initialCapacity];
    }

    private long getIndexScale(long index) {
        return (index << ARRAY_SHIFT) + ARRAY_OFFSET;
    }

    public T getObject(int index) {
        return (T)UNSAFE.getObjectVolatile(elements, getIndexScale(index));
    }

    public boolean setObject(int index, T element) {
        rangeCheck(index);
        T oldElement = getObject(index);
        UNSAFE.putOrderedObject(elements, getIndexScale(index), element);
        if(oldElement != null && element == null) {
            int numMoved = size - index - 1;
            if(numMoved > 0) {
                System.arraycopy(elements, index + 1, elements, index, numMoved);
            }
            UNSAFE.compareAndSwapInt(this, SIZE_OFFSET, size, size - 1);
            UNSAFE.putOrderedObject(elements, getIndexScale(size), null);
        }
        return true;
    }



    private void ensureCapacityInternal(int minCapacity) {
        if(minCapacity - elements.length > 0)
            grow(minCapacity);
    }

    private void grow(int minCapacity) {
        int oldCapacity = elements.length;
        int newCapacity = oldCapacity + (oldCapacity >> 1);
        if(newCapacity - minCapacity < 0)
            newCapacity = minCapacity;
        if(newCapacity - MAX_ARRAY_SIZE > 0)
            newCapacity = hugeCapacity(minCapacity);
        elements = Arrays.copyOf(elements, newCapacity);
    }

    private static int hugeCapacity(int minCapacity) {
        if(minCapacity < 0)
            throw new OutOfMemoryError();
        return minCapacity > MAX_ARRAY_SIZE ? Integer.MAX_VALUE : MAX_ARRAY_SIZE;
    }

    public boolean addObject(T element) {
        ensureCapacityInternal(size + 1);
        UNSAFE.putOrderedObject(elements, getIndexScale(size), element);
        UNSAFE.compareAndSwapInt(this, SIZE_OFFSET, size, size + 1);
        return true;
    }

    private void rangeCheck(int index) {
        if(index >= size || index < 0) {
            throw new IndexOutOfBoundsException("Index: " + index + ", Size: " + size);
        }
    }

    public T remove(T element) {
        for(int i = 0; i < elements.length; i++) {
            int index = i;
            T e = getObject(index);
            if(e != null) {
                if(e.equals(element)) {
                    setObject(index, null);
                    return e;
                }
            }
        }
        return null;
    }

    public boolean remove(int index) {
        rangeCheck(index);
        T removedElement = getObject(index);
        if(removedElement != null)
            return setObject(index, null);
        return false;
    }

    public static void main(String[] args) {
        Array<String> strArray = new ArrayUnsafe<String>();
        for(int i = 0; i < 10; i++) {
            strArray.addObject(String.valueOf(i));
        }
        strArray.addObject("hello");
        strArray.addObject("world");
        String s0 = strArray.getObject(0);// get "0"
        String s2 = strArray.getObject(2);// get "2"
        strArray.setObject(1, null);
        String s6 = strArray.remove("6"); // remove "6"
        String sNull = strArray.remove("foo");// null
        boolean result = strArray.remove(9);
    }
}
           

unsafe获取数组的另外一种思路

回顾一下数组怎么取值。比如Person类有一个属性strs

public class Person{
    String[] strs = new String[]{"1", "2", "3"};

    //获取base
    int base = unsafe.arrayBaseOffset(Person[].class);

    //获取每个索引的长度
    long scale = unsafe.arrayIndexScale(String[].class);


    //获取元素值
    unsafe.getObject(strs, base + scale * 0);//获取对应索引的值//0代表[0]
           
Class sc = Segment[].class;//concurrentHashMap.segments[]并发数组

SBASE = UNSAFE.arrayBaseOffset(sc);

ss = UNSAFE.arrayIndexScale(sc);

SSHIFT = 31 - Integer.numberOfLeadingZeros(ss);

UNSAFE.getObject (segments, (索引j << SSHIFT) + SBASE)
           
//而 SSHIFT=31-Integer.numberOfLeadingZeros(ss);
//numberOfLeadingZeros二进制时ss前面有多少个0,SSHIFT表示ss表示的二进制的1后面有多少个0
//我们不妨再转换回来,用i表示1后面有多少个
//验证:j<<31-Integer.numberOfLeadingZeros(ss) == ss×j
//ss为2的次幂,假设为2的i次幂,那么==左式就为j<<i,而==右式为2^i ×j,结合计算机组成原理知识,显然相等

就是验证:
    SSHIFT = 31 - Integer.numberOfLeadingZeros(ss);
    索引j << SSHIFT
        
    是否等于
        
        j*ss
        
        结合移位的知识可得,
        SSHIFT = 31 - Integer.numberOfLeadingZeros(ss);
实际上就是这个长度的二进制位数中0的个数
    
    而j<<长度中0的个数 与 j*ss等价
           

使用unsafe操作属性

unsafe.compareAndSwapInt(person,I_OFFSET,原值,新值);//设置对象的对应属性
    
unsafe.compareAndSwapInt(person,I_OFFSET,person.i,person.i+1);//设置对象的对应属性
    
unsafe.getIntVolatile(person,I_OFFSET);//获取对应属性值
           
/**
 * 对象操作
 * 实例化Data
 *
 * 可以通过类的class对象创建类对象(allocateInstance),获取对象属性的偏移量(objectFieldOffset)
 * ,通过偏移量设置对象的值(putObject)
 *
 * 对象的反序列化
 * 当使用框架反序列化或者构建对象时,会假设从已存在的对象中重建,你期望使用反射来调用类的设置函数,
 * 或者更准确一点是能直接设置内部字段甚至是final字段的函数。问题是你想创建一个对象的实例,
 * 但你实际上又不需要构造函数,因为它可能会使问题更加困难而且会有副作用。
 *
 */
//调用allocateInstance函数避免了在我们不需要构造函数的时候却调用它
Data data = (Data) unsafe.allocateInstance(Data.class);
data.setId(1L);
data.setName("unsafe");
System.out.println(data);
 
//返回成员属性在内存中的地址相对于对象内存地址的偏移量
Field nameField = Data.class.getDeclaredField("name");
long fieldOffset = unsafe.objectFieldOffset(nameField);
//putLong,putInt,putDouble,putChar,putObject等方法,直接修改内存数据(可以越过访问权限)
unsafe.putObject(data,fieldOffset,"这是新的值");
System.out.println(data.getName());
 
 
/**
 * 我们可以在运行时创建一个类,比如从已编译的.class文件中。将类内容读取为字节数组,
 * 并正确地传递给defineClass方法;当你必须动态创建类,而现有代码中有一些代理, 这是很有用的
 */
File file = new File("C:\\workspace\\idea2\\disruptor\\target\\classes\\com\\onyx\\distruptor\\test\\Data.class");
FileInputStream input = new FileInputStream(file);
byte[] content = new byte[(int)file.length()];
input.read(content);
Class c = unsafe.defineClass(null, content, 0, content.length,null,null);
c.getMethod("getId").invoke(c.newInstance(), null);
 
 
 
/**
 * 内存操作
 * 可以在java内存区域中分配内存(allocateMemory),设置内存(setMemory,用于初始化),
 * 在指定的内存位置中设置值(putInt\putBoolean\putDouble等基本类型)
 */
//分配一个8byte的内存
long address = unsafe.allocateMemory(8L);
//初始化内存填充1
unsafe.setMemory(address, 8L, (byte) 1);
//测试输出
System.out.println("add byte to memory:" + unsafe.getInt(address));
//设置0-3 4个byte为0x7fffffff
unsafe.putInt(address, 0x7fffffff);
//设置4-7 4个byte为0x80000000
unsafe.putInt(address + 4, 0x80000000);
//int占用4byte
System.out.println("add byte to memory:" + unsafe.getInt(address));
System.out.println("add byte to memory:" + unsafe.getInt(address + 4));
           
/**
 * CAS操作
 * Compare And Swap(比较并交换),当需要改变的值为期望的值时,那么就替换它为新的值,是原子
 * (不可在分割)的操作。很多并发框架底层都用到了CAS操作,CAS操作优势是无锁,可以减少线程切换耗费
 * 的时间,但CAS经常失败运行容易引起性能问题,也存在ABA问题。在Unsafe中包含compareAndSwapObject、
 * compareAndSwapInt、compareAndSwapLong三个方法,compareAndSwapInt的简单示例如下。
 */
Data data = new Data();
data.setId(1L);
Field id = data.getClass().getDeclaredField("id");
long l = unsafe.objectFieldOffset(id);
id.setAccessible(true);
//比较并交换,比如id的值如果是所期望的值1,那么就替换为2,否则不做处理
unsafe.compareAndSwapLong(data,1L,1L,2L);
System.out.println(data.getId());
           
/**
 * 常量获取
 *
 * 可以获取地址大小(addressSize),页大小(pageSize),基本类型数组的偏移量
 * (Unsafe.ARRAY_INT_BASE_OFFSET\Unsafe.ARRAY_BOOLEAN_BASE_OFFSET等)、
 * 基本类型数组内元素的间隔(Unsafe.ARRAY_INT_INDEX_SCALE\Unsafe.ARRAY_BOOLEAN_INDEX_SCALE等)
 */
//get os address size
System.out.println("address size is :" + unsafe.addressSize());
//get os page size
System.out.println("page size is :" + unsafe.pageSize());
//int array base offset
System.out.println("unsafe array int base offset:" + Unsafe.ARRAY_INT_BASE_OFFSET);



/**
 * 线程许可
 * 许可线程通过(park),或者让线程等待许可(unpark),
 */
Thread packThread = new Thread(() -> {
    long startTime = System.currentTimeMillis();
    //纳秒,相对时间park
    unsafe.park(false,3000000000L);
    //毫秒,绝对时间park
    //unsafe.park(true,System.currentTimeMillis()+3000);

    System.out.println("main thread end,cost :"+(System.currentTimeMillis()-startTime)+"ms");
});
packThread.start();
TimeUnit.SECONDS.sleep(1);
//注释掉下一行后,线程3秒数后进行输出,否则在1秒后输出
unsafe.unpark(packThread);
           
/* java数组大小的最大值为Integer.MAX_VALUE。使用直接内存分配,我们创建的数组大小受限于堆大小;
 * 实际上,这是堆外内存(off-heap memory)技术,在java.nio包中部分可用;
 *
 * 这种方式的内存分配不在堆上,且不受GC管理,所以必须小心Unsafe.freeMemory()的使用。
 * 它也不执行任何边界检查,所以任何非法访问可能会导致JVM崩溃
 */
public class SuperArray {

    private static Unsafe unsafe = null;
    private static Field getUnsafe = null;

    static {
        try {
            getUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
            getUnsafe.setAccessible(true);
            unsafe = (Unsafe) getUnsafe.get(null);
        } catch (NoSuchFieldException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        }
    }


    private final static int BYTE = 1;

    private long size;
    private long address;

    public SuperArray(long size) {
        this.size = size;
        address = unsafe.allocateMemory(size * BYTE);
    }

    public void set(long i, byte value) {
        unsafe.putByte(address + i * BYTE, value);
    }

    public int get(long idx) {
        return unsafe.getByte(address + idx * BYTE);
    }

    public long size() {
        return size;
    }


    public static void main(String[] args) {
        long SUPER_SIZE = (long)Integer.MAX_VALUE * 2;
        SuperArray array = new SuperArray(SUPER_SIZE);
        System.out.println("Array size:" + array.size()); // 4294967294
        int sum=0;
        for (int i = 0; i < 100; i++) {
            array.set((long)Integer.MAX_VALUE + i, (byte)3);
            sum += array.get((long)Integer.MAX_VALUE + i);
        }
        System.out.println(sum);
    }


}
           

CAS

Cas保证了任意多个线程只有1个能先改成功。

操作系统级别CAS实现

摘自CAS操作在ARM和x86下的不同实现。

cmpxchg是X86比较交换指令,这个指令在各大底层系统实现的原子操作和各种同步原语中都有广泛的使用,比如linux内核,JVM,GCC编译器等,cmpxchg就是比较交换指令。

intel P6以及最新系列处理器保证了以下操作是原子的:1.读写一个字节。2.读写16位对齐的字。3.读写32位对齐的双字。4.读写64位对齐的四字。5.读写16位,32位,64位在cache line内的未对齐的字。所以普通的load store指令都是原子的。cache一致性协议保证了不可能有两个cpu同时写一个内存。对于cmpxchg这种比较交换指令肯定不是原子的,intel是CISC复杂指令集架构,在内部流水线执行的时候,肯定会将cmpxchg指令翻译成几条微码执行(对比ARM精简指令集)。所以英特尔对于一些指令提供了LOCK前缀来保证这个指令的原子性。Intel 64和IA-32处理器提供LOCK#信号,该信号在某些关键存储器操作期间自动置位,以锁定系统总线或等效链路。当该输出信号被断言时,来自其他处理器或总线代理的用于控制总线的请求被阻止。

为了更清楚理解cmxchg,需要同时看ARM和x86两种架构下的实现一个RISC,一个CISC,linux内核提供了两种架构下的实现。linux内核的原子变量定义如下:

//原子变量
typedef struct {
    volatile int counter; //volatile禁止编译器把变量缓冲到寄存器
} atomic_t;
           

先看ARM架构下,ARM架构是精简指令集,没有提供cmpxchg这种复杂指令,和其它所有RISC架构一样提供了LL/SC(链接加载,条件存储)操作,这个操作是很多原子操作的基础。ARMv8指令是LDXR\STXR,属于独占访问,需要有local monitor和global monitor配合使用。这两条指令一般需要成对出现。ldrex是从内存取出数据放到寄存器,然后监视器将此地址标记为独占,strex会先测试是否是当前cpu的独占,如果是则存储成功返回0,如果不是则存储失败返回1。例如cpu0将地址m标记为独占,在strex执行前,线程被调出了,cpu1调用ldrex会清除cpu0的独占,而将自己标记为独占,然后执行strxr,然后cpu0的线程重新被调度,此时执行strex会失败,因为自己的独占位被清除了。这样也会导致后进入ldrex的线程可能比先进入的先执行。标记为独占的地址调用strex后都会清除独占标志。

/**
 *  比较ptr->counter和old的值如果相等,则ptr->counter = new,并且返回old,否则ptr->counter不变
 * 返回ptr->counter
 */
static inline int atomic_cmpxchg(atomic_t *ptr, int old, int new)
{
    unsigned long oldval, res;
 
    smp_mb(); //内存屏障,保证cmpxchg不会在屏障前执行
 
    do {
        __asm__ __volatile__("@ atomic_cmpxchg\n"
        "ldrex  %1, [%2]\n" //独占访问,监视器会将此地址标志独占并且将ptr->counter给oldvalue
        "mov    %0, #0\n"   //res = 0
        "teq    %1, %3\n"   //测试oldvalue是否和old相等也就是ptr->counter和old
 
        //独占访问成功并且如果相等则把new赋值给ptr->counter,否则不执行这条指令
        "strexeq %0, %4, [%2]\n" 
            : "=&r" (res), "=&r" (oldval)
            : "r" (&ptr->counter), "Ir" (old), "r" (new)
            : "cc");
    } while (res);  //while res是因为strexeq指令是独占访存指令从,此时可能未标记访存,而res为1
 
    smp_mb();//内存屏障,保证cmpxchg不会在屏障后执行
 
    return oldval;
}
           

X86架构类似:

/*
 *  根据size大小比较交换字节,字或者双字,如果返回old则交换成功,否则交换失败
 */
static inline unsigned long __cmpxchg(volatile void *ptr, unsigned long old,
                      unsigned long new, int size)
{
    unsigned long prev;
    switch (size) {
    case 1:
        __asm__ __volatile__(LOCK_PREFIX "cmpxchgb %b1,%2"
                     : "=a"(prev)
                     : "q"(new), "m"(*__xg(ptr)), "0"(old)
                     : "memory");
        return prev;
    case 2:
        __asm__ __volatile__(LOCK_PREFIX "cmpxchgw %w1,%2"
                     : "=a"(prev)
                     : "r"(new), "m"(*__xg(ptr)), "0"(old)
                     : "memory");
        return prev;
//eax = old,比较%2 = ptr->counter和eax是否相等,如果相等则ZF置位,并把%1 = new赋值给ptr->counter,返回old值,否则ZF清除,并且将ptr->counter赋值给eax
    case 4:
        __asm__ __volatile__(LOCK_PREFIX "cmpxchgl %1,%2"
                     : "=a"(prev)
                     : "r"(new), "m"(*__xg(ptr)), "0"(old)  //0表示eax = old
                     : "memory");
        return prev;
    }
    return old;
}
           

代码解释:

以最为常用的4字节交换为例,主要的操作就是汇编指令

cmpxchgl %1,%2

,注意一下其中的%2,也就是后面的

"m"(*__xg(ptr))

__xg

是在这个文件中定义的宏:

struct __xchg_dummy { unsigned long a[100]; };

#define __xg(x) ((struct __xchg_dummy *)(x))
           

那么%2经过预处理,展开就是

"m"(*((struct __xchg_dummy *)(ptr)))

,这种做法,就可以达到在cmpxchg中的%2是一个地址,就是ptr指向的地址。如果%2是

"m"(ptr)

,那么指针本身的值就出现在cmpxchg指令中。

简单点就是:

cmpxchg %ecx, %ebx;如果EAX与EBX相等,则ECX送EBX且ZF置1;否则EBX送ECX,且ZF清0
           

在cmpxchg指令前加了lock前缀,保证在进行操作的时候,不会让其它cpu操作同一个内存。使得整个操作保持原子性。对比来看虽然X86只用了一条指令,但是处理器内部肯定将这条指令转成了类RISC的微码。

性能

基于

AtomicLong

可能性能有点差,所以出现了

LongAdder

。接下来通过基准测试,来看看两者的性能差别。

基准测试

内存:4G,CPU:Intel® Core™ i5-4250U CPU @ 1.30GHz,硬盘: 128G SSD

Mode设为

Throughput

,测试吞吐量。

Mode设为

AverageTime

,测试平均耗时。

@OutputTimeUnit(TimeUnit.MICROSECONDS)
@BenchmarkMode(Mode.Throughput)
public class Main {
    private static AtomicLong count = new AtomicLong();

    private static LongAdder longAdder = new LongAdder();

    public static void main(String[] args) throws RunnerException {
        Options options = (Options) new OptionsBuilder().include(Main.class.getName());
        new Runner(options).run();
    }

    @Benchmark
    @Threads(10)
    public void run0() {
        count.getAndIncrement();
    }

    @Benchmark
    @Threads(10)
    public void run1() {
        longAdder.increment();
    }
}
           

吞吐量:

线程为1,结果:

Benchmark   Mode  Cnt    Score   Error   Units
Main.run0  thrpt   25  125.268 ± 2.471  ops/us
Main.run1  thrpt   25   74.587 ± 1.484  ops/us
           

线程为10,结果:

Benchmark   Mode  Cnt    Score   Error   Units
Main.run0  thrpt   25   24.897 ± 2.888  ops/us
Main.run1  thrpt   25  192.474 ± 2.824  ops/us
           

线程为30,结果:

Benchmark   Mode  Cnt    Score   Error   Units
Main.run0  thrpt   25   21.096 ± 1.897  ops/us
Main.run1  thrpt   25  189.732 ± 4.089  ops/us
           

平均耗时:

线程为1,结果:

Main.run0  avgt   25  0.008 ±  0.001  us/op
Main.run1  avgt   25  0.013 ±  0.001  us/op
           

线程为10,结果:

Benchmark  Mode  Cnt  Score   Error  Units
Main.run0  avgt   25  0.485 ± 0.048  us/op
Main.run1  avgt   25  0.050 ± 0.001  us/op
           

线程为30,结果:

Benchmark  Mode  Cnt  Score   Error  Units
Main.run0  avgt   25  1.364 ± 0.033  us/op
Main.run1  avgt   25  0.158 ± 0.003  us/op
           

可以看到除了线程为1的情况下,其他情况下,LogAdder明显比AtomicLong要好的多。

ABA问题

设想如下场景:

  1. 线程1准备用CAS将变量的值由A替换为B。
  2. 在此之前,线程2将变量的值由A替换为C,又由C替换为A,
  3. 然后线程1执行CAS时发现变量的值仍然为A,所以CAS成功。

但实际上这时的现场已经和最初不同了,尽管CAS成功,但可能存在潜藏的问题。

比如:

  • 有一个用单向链表实现的栈,栈顶为A。
  • 线程T1获取A.next为B,然后希望用CAS将栈顶替换为B,head.compareAndSet(A,B)。
  • 在T1执行上面这条指令之前,线程T2介入,将A、B出栈,再pushD、C、A。此时B.next为null。
  • 此时轮到线程T1执行CAS操作,检测发现栈顶仍为A,所以CAS成功,栈顶变为B。
  • 但实际上B.next为null,其中堆栈中只有B一个元素,C和D组成的链表不再存在于堆栈中,这样就造成C、D被抛弃的现象。

解决方法

各种乐观锁的实现中通常都会用版本戳version来对记录或对象标记,避免并发操作带来的问题,在Java中,AtomicStampedReference也实现了这个作用,它通过包装[E,Integer]的元组来对对象标记版本戳stamp,从而避免ABA问题。