天天看点

Unsafe API及 Unsafe对java并发的支持Unsafe 对 LockSupport 的支持Unsafe 源码Unsafe对并发工具包的支持

Unsafe 对 LockSupport 的支持

AQS同步器、阻塞队列等类中,对线程的阻塞和恢复都是调用LockSupport类的park/unpark方法。

而LockSupport类的park/unpark方法,则是调用Unsafe类中的本地方法park/unpark,这两个本地方法为调用方提供了硬件级别的线程阻塞和恢复的支撑。

阻塞在unsafe::park方法上的线程t,如果被应用了t.interrupte()方法,则会被检测到线程t的中断状态被置位,线程t就会被恢复为可运行状态,不再被阻塞。

// 以下方法为JDK的并发工具包中线程的阻塞和恢复操作提供了硬件支持
public native void park(boolean isAbsolute, long time);  // 阻塞当前线程。直到线程被其它线程unpark、或被interrupt中断、或超时、或者其它任何未知原因,则从阻塞中恢复。(如果isAbsolute==false,则time的单位是纳秒;否则time的单位是毫秒),
public native void unpark(Object thread);  // 取消阻塞正在阻塞在park方法上的线程thread,
// 如果thread并未处于阻塞状态,那么之后thread对park方法的调用将不会导致thread阻塞。
           

LockSupport.park方法通常都写在循环语句中个,这是因为从LockSupport.park中恢复的原因有多种,而线程应根据这些不同情况做不同的处理:

  1. 线程应首先检测是否是被中断而从park中恢复的:t.isInterrupted();如果是的话,应该执行一些相应的业务逻辑代码,比如在调用park的方法中返回特殊值null或者false、或者使用一个本地变量记录这个中断情况,标明是由于线程被中断了,而非是其等待的条件有可能成立了、或者其等待的资源有可能被释放了、可以参加资源的抢夺了。
  2. 接着应该检测是否超时:如果调用park方法的方法本身是需要超时退出的(比如,time-await或者tryLock),那么方法应该检测从park中恢复的线程是否已经超时,因为除了第一种情况即由于中断而退出外,park方法有可能是可能是由于条件成立或者资源被释放后被其它线程unpak后退出、也有可能是超时而退出。如果检测到线程已经超时,则应该在调用park的方法中返回false,或者其他特殊值等。
  3. 最后检测条件是否成立、资源锁是否已经被释放等,如果条件并未成立(虽然已经排除中断、超时的原因,但线程也可能由于任何其它未知原因从阻塞中恢复),或者抢夺资源锁失败,则继续调用park方法阻塞,直到再次从阻塞中恢复、则进入下次循环执行步骤1、2、3。

    以AQS中对Condition.await()方法的实现代码为例:

public final boolean await(long time, TimeUnit unit)
                throws InterruptedException {
     		...
            final long deadline = System.nanoTime() + nanosTimeout;		// 用于超时检测
            boolean timedout = false;		//	超时标志位
            int interruptMode = 0;		//	中断状态
            while (!isOnSyncQueue(node)) {		//  条件判断,如果条件未成立,继续循环阻塞等待
                if (nanosTimeout <= 0L) {		//	超时检测
                    timedout = transferAfterCancelledWait(node);	//  此方法将会检测到是否已经被unpark恢复、还是超时恢复的。
                    break;		//  超时,取消循环阻塞等待。
                }
                if (nanosTimeout >= spinForTimeoutThreshold)	//  没有超时,则继续阻塞等待
                    LockSupport.parkNanos(this, nanosTimeout);	//	park,进入阻塞状态
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)	//  park方法返回了,当前线程从阻塞中恢复了,首先需要检测是否是由于interrupte中断而从阻塞中恢复的。
                    break;		//  中断,则取消循环阻塞等待
                nanosTimeout = deadline - System.nanoTime();
            }		//  如果在本次循环中,没有出现超时、中断的情况,不会从循环中break退出。则从park中恢复后,应再次进入循环检查等待的条件是否已经满足,如果条件已经满足,则循环自动退出。如果条件没有满足,则会进入下一次的循环、阻塞等待。
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;        
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
            return !timedout;	//	返回超时标志位
        }
           

又如ReentrantLock类的lock方法,当需要获取资源锁的线程进入等待获取锁的队列后,也需要使用循环语句结合park阻塞等待。因为lock方法并不支持中断,因此对于检测到中断后,只是记录、并没有放弃lock:

/**
	*	阻塞等待获取同步队列所代表的资源锁。
	*	如果检测到等待过程中线程被中断,则返回true。否则返回false。
	*/
   final boolean acquireQueued(final Node node, int arg) {		
        boolean failed = true;		//  异常标志位。如果在整个 循环尝试获取资源 + park  过程中,抛出了异常,则应取消lock
        try {
            boolean interrupted = false;	//	中断标志位
            for (;;) {						//  循环 + park
                final Node p = node.predecessor();		//	找到当前线程所在节点的前一个状态正常的节点。
                if (p == head && tryAcquire(arg)) {		// 	首先查看当前节点是否已经阻塞排队排到了队头位置:如果当前节点的前驱节点是head节点,则线程可以尝试抢夺资源锁tryAcquire(arg)
                    setHead(node);		
                    p.next = null; 		// 	help GC
                    failed = false;	
                    return interrupted;		//	正常获得了资源锁,直接退出。返回中断标志位
                }
                //	获取资源锁失败,进入 park阻塞+恢复后中断检测+进入下次循环(即,park+循环)的逻辑
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);	//  在排队过程中出现了异常,从队列中移除node,取消lock
        }
    }
           

而ReentrantLock的lockInterruptibly()方法,支持中断。当检测到中断后,直接抛出异常。

private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();	//	如果检测到中断,直接抛出异常
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
           

Unsafe 源码

Unsafe类提供了硬件级别的直接操作内存的方法

// 以下方法为JDK的并发工具包提供了硬件支持
public native void park(boolean isAbsolute, long time);  // 阻塞当前线程。当线程被unpark、或者被interrupte、或者已经超过设置的阻塞时间time(如果isAbsolute是false,time的单位是纳秒;否则time的单位是毫秒),
public native void unpark(Object thread);  // 取消阻塞正在阻塞在park方法上的线程thread,
// 如果thread并未处于阻塞状态,那么之后thread对park方法的调用将不会导致thread阻塞。
   
// 以下方法是获取静态字段(or实例域)在内存中相对于静态字段所属类的class对象(or实例域所在对象)的偏移量offset
public native long 	objectFieldOffset(Field f);		// 获取某个对象的实例域相对于此对象在内存中的偏移量
public native long 	staticFieldOffset(Field f);   // 获取某个类的字段相对其所在类的class对象在内存中的偏移量offset
public native Object staticFieldBase(Field f);		// 获取某个类字段所属类的class对象在内存中的起始位置,返回值object的其实是一个指针,也就是class对象的内存起始值。这个方法通常与staticFieldOffset结合使用.
// 以Integer类中的MIN_VALUE为例:
Field field = Integer.class.getDeclaredField("MIN_VALUE");
long offset = unsafe.staticFieldOffset(field);
Object base = unsafe.staticFieldBase(field);
int IntegerMinValue = unsafe.getInt(base, offset);
// 上例就是直接从内存中读取一个值

// 使用以上方法从内存读取静态字段的值时,必须确保其所在类已经被初始化过,以下方法提供了这方面的支持
public native boolean shouldBeInitialized(Class<?> c);  // 检测某个类是否需要初始化。通常与staticFieldBase合用,因为如果类还未被初始化,其静态字段就还没被赋初值(未被初始化时字段的值是其字段类型的默认值)。
public native void ensureClassInitialized(Class<?> c);	//	确保某个类已经被初始化,如果还没有被初始化,就初始化它。 同方法shouldBeInitialized一样,用在staticFieldBase方法出现的地方

// 以下方法是用于获取基本类型数组的元素的相对于数组起始地址的内存偏移量的API
public native long arrayBaseOffset(Class<?> arrayClass);  // 如:arrayBaseOffset(byte[].class)


// 以下方法:getXXXVolatile / putXXXVolatile,是使用volatile load语义从内存加载/更新数据,方法的参数值来源有三种:
// 1. o是一个实例对象的引用,offset是o的一个实例域相对于o的内存偏移量,通过objectFieldOffset方法获取的;
// 2. o是某java类型C的class对象的引用,offset是类型C的一个全局变量(静态字段)在内存中相对于o的偏移量,o通过方法staticFieldBase获取,offset是通过staticFieldOffset方法获取的。
// 3. o是某个数组对象的引用,offset是数组中的某个元素相对于数组起始地址的偏移量,offset需要通过类似arrayBaseOffset(int[].class)+元素下标*arrayIndexScale(int[].class)来获取。
public final native int 	getIntVolatile(Object o, long offset);   // 使用volatile语义直接从内存(o指针+offset)处开始读取一个int值,实际就是读取了对象o的一个int字段的值,只不过是直接从内存取的
public final native Object 	getObjectVolatile(Object o,long offset); // 使用volatile语义直接从内存(o指针+offset)处开始读取一个对象的引用,实际就是读取了对象o的一个复合类型字段的值,只不过是直接从内存取的
// 所有java基本类型都提供了类似方法:getCharVolatile / getByteVolatile / getShortVolatile / getLongVolatile / getFloatVolatile / getDoubleVolatile / getBooleanVolatile
public final native void putIntVolatile(Object o, long offset, int x);
public final native void putObjectVolatile(Object o, long offset, Object x);

// putXXXVolatile的另一版本 ,与putXXXVolatile不同的是,putXXXVolatile方法会确保其操作的内存更新对其它线程立即可见,但是putOrderedXXX提供的这些内存更新操作并不会保证对其它线程立即可见。
// 通常,只有在offset所指向的字段是java volatile的,或者是只提供了通过volatile语义访问的数组元素时,这类方法才有用
public native void    putOrderedObject(Object o, long offset, Object x);
public native void    putOrderedInt(Object o, long offset, int x);
public native void    putOrderedLong(Object o, long offset, long x);

// 以下是硬件级别的CAS,用于确保多线程场景下对一个数据进行(读 + 算 + 写)操作时的数据安全。通常会与getXXXVolatile方法合用。
public final native boolean compareAndSwapObject(Object o, long offset,
                                                     Object expected,
                                                     Object x);
public final native boolean compareAndSwapInt(Object o, long offset,                          int expected, int x);
public final native boolean compareAndSwapLong(Object o, long offset,                          long expected, long x);
//getXXXVolatile方法与CompareAndSwapXXX方法确保并发环境下的数据安全的示例:
public final int getAndAddInt(Object o,long offset,int added){
	int pre;
	int updateTo;
	do{ 
		pre = getIntVolatile(o,offset);
		updateTo = pre + added;
	}while(!compareAndSwapInt(o, offset, pre, updateTo));
	return pre;
}


// 以下方法是对内存做 分配、回收、赋初值(初始化) 操作
public native long 	allocateMemory(long bytes);	// 分配一块大小为bytes(字节数)的内存块,返回值是分配的内存块的起始地址
public native void	setMemory(long address, long bytes, byte value);	// 用值value对以address为起始地址的bytes个字节赋初值
public native void	setMemory(Object o, long offset, long bytes, byte value);	// 用值value对以(object指针+offset)为起始地址的bytes个字节赋初值
public native long reallocateMemory(long address, long bytes);	// 重新分配一块大小为bytes(字节数)的内存块,返回值是分配的内存块的起始地址。会将原来的以address为起始地址的内存块废弃掉,注意系统不会自动把原来内存块中的数据复制到新的内存块中,如果需要复制,可以调用copyMemory
public native void	copyMemory(long srcAddress, long destAddress, long bytes);	// 将以address为起始地址的bytes个字节的数据复制到以destAddress为起始地址的内存块中
public native void copyMemory(Object srcBase, long srcOffset,                                  Object destBase, long destOffset, long bytes);	// 将以(srcBase指针+srcOffset)为起始地址的bytes个字节的数据复制到以(destBase指针+destOffset)为起始地址的内存块中


// 以下是直接根据一个内存块的地址address参数,操作内存
public native int     getInt(long address);    // 从以address为起始地址的内存块中读取一个int值
public native double  getDouble(long address);	// 从以address为起始地址的内存块中读取一个double值
public native long		getAddress(long address);	// 针对引用类型,从以address为起始地址的内存块中读取一个指针,这个指针指向一个复合类型对象,即是对复合类型对象的引用

public native void   putInt(long address, int x);   // 向以address为起始地址的内存块中写入一个int值
public native void   putDouble(long address, double x);   // 向以address为起始地址的内存块中写入一个double值
public native void   putAddress(long address, long x);   // 针对引用类型,向以address为起始地址的内存块中写入一个指针,这个指针指向一个复合类型对象,即是对复合类型对象的引用

// 以下方法是给出对象o和其某个实例域相对于o的内存偏移量offset,操作以(o对象指针+offset)为起始地址的内存块,从中读取或者写入数据,实际就是直接从指定的内存位置读取或者更新对象o的某个实例域的值。实例域的偏移量offset是通过objectFieldOffset方法获取的。
// 这里当o=null时,就是直接对起始地址为offset的内存区域做操作,看起来与getInt(long address)应该是同样结果,但是由于这里的方法有两个参数引用变量,实际机器会为java变量提供双寄存器寻址模式。而getInt(long address)方法则是为非java变量提供单寄存器寻址模式。由于在内存中,java变量与非java变量的布局不同,因此我们不能假设这两种寻址模式是等价的。此外,不能混淆双寄存器寻址模式中偏移量和单寄存器寻址模式中使用的long。
public native int     getInt(Object o, long offset);    // 从以(object指针+offset)为起始地址的内存块中读取一个int值
public native double     getDouble(Object o, long offset);    // 从以(object指针+offset)为起始地址的内存块中读取一个double值
public native Object	getObject(Object o, long offset);    // 从以(object指针+offset)为起始地址的内存块中读取一个复合类型对象的引用值

public native void 	putInt(Object o, long offset, int x);    // 向以(object指针+offset)为起始地址的内存块中写入一个int值   
public native void 	putObject(Object o, long offset, Object x);    // 向以(object指针+offset)为起始地址的内存块中写入一个复合类型对象的引用值   
 

// 以下方法的结果是取决于本机操作系统或JVM厂商实现的
public native int addressSize();	// 存储指针所占用的字节宽度,就是操作系统选择用几个字节来存储指针(即内存地址),或者是4 或者是8.
public static final int ADDRESS_SIZE = theUnsafe.addressSize();  //  搞个常量,来给出当前服务器的内存地址宽度,如果是4字节=32位,即32位操作系统,编址可以从0-2^32;如果是8字节=64位,即64位操作系统。

public native int pageSize();  // 内存页的页面大小

// 以下方法是直接从内存加载类、直接在内存上创建一个类实例
public native Class<?> defineClass(String name, byte[] b, int off, int len,
                                       ClassLoader loader,
                                       ProtectionDomain protectionDomain);  // 直接从内存加载一个类,默认用调用此方法的类的类加载器和保护域

public native Class<?> defineAnonymousClass(Class<?> hostClass, byte[] data, Object[] cpPatches);		// 直接从内存加载一个匿名类,但是这个类不会被任何加载器管理和识别

public native Object allocateInstance(Class<?> cls)    // 创建一个类实例,但是不会运行任何构造方法来初始化这个对象。如果类还没被初始化,则会先初始化这个类。
        throws InstantiationException;

           

Unsafe对并发工具包的支持

对java.util.concurrent.atomic包的支持

java.util.concurrent.atomic工具包下的类提供了多线程环境下对单个变量的无锁、线程安全的编码支持。这些类实质上是将volatile的概念从单个的变量、字段、数组元素,扩展到了提供有条件的原子性更新操作的类中的。

而这些类则无一不是调用Unsafe类中的方法得以使用现代处理器上高效、高可用、硬件级别的(machine-level)原子性指令。

layset操作具有更新一个volatile变量的内存效应。不过,它允许其后的内存操作指令可以重排序,但是在它之前的内存操作指令不可以重排。并且这些内存操作本身不会禁止对普通的非volatile的写入指令的重排序。

weakCompareAndSet操作只能确保对某内存区域的读写的原子性,但是不提供volatile语义,即不会禁止与此内存区域无关的指令重排,不会禁止对其它任何变量的读写指令的重排。

atomic工具包中不止提供了对单个数值型变量进行原子性操作的类,还提供了对任何复合类型中的任意类型的volatile字段提供原子操作的Updater类。包括如下Updater:

AtomicIntegerFieldUpdater / AtomicLongFieldUpdater / AtomicReferenceFieldUpdater

以上三个都是抽象类,但是每个类内部各自定义了一个名为XXXUpdateImpl的实现类,抽象类本是提供了静态方法newUpdater方法来为用户提供创建 实例的入口。

继续阅读