天天看點

Unsafe API及 Unsafe對java并發的支援Unsafe 對 LockSupport 的支援Unsafe 源碼Unsafe對并發工具包的支援

Unsafe 對 LockSupport 的支援

AQS同步器、阻塞隊列等類中,對線程的阻塞和恢複都是調用LockSupport類的park/unpark方法。

而LockSupport類的park/unpark方法,則是調用Unsafe類中的本地方法park/unpark,這兩個本地方法為調用方提供了硬體級别的線程阻塞和恢複的支撐。

阻塞在unsafe::park方法上的線程t,如果被應用了t.interrupte()方法,則會被檢測到線程t的中斷狀态被置位,線程t就會被恢複為可運作狀态,不再被阻塞。

// 以下方法為JDK的并發工具包中線程的阻塞和恢複操作提供了硬體支援
public native void park(boolean isAbsolute, long time);  // 阻塞目前線程。直到線程被其它線程unpark、或被interrupt中斷、或逾時、或者其它任何未知原因,則從阻塞中恢複。(如果isAbsolute==false,則time的機關是納秒;否則time的機關是毫秒),
public native void unpark(Object thread);  // 取消阻塞正在阻塞在park方法上的線程thread,
// 如果thread并未處于阻塞狀态,那麼之後thread對park方法的調用将不會導緻thread阻塞。
           

LockSupport.park方法通常都寫在循環語句中個,這是因為從LockSupport.park中恢複的原因有多種,而線程應根據這些不同情況做不同的處理:

  1. 線程應首先檢測是否是被中斷而從park中恢複的:t.isInterrupted();如果是的話,應該執行一些相應的業務邏輯代碼,比如在調用park的方法中傳回特殊值null或者false、或者使用一個本地變量記錄這個中斷情況,标明是由于線程被中斷了,而非是其等待的條件有可能成立了、或者其等待的資源有可能被釋放了、可以參加資源的搶奪了。
  2. 接着應該檢測是否逾時:如果調用park方法的方法本身是需要逾時退出的(比如,time-await或者tryLock),那麼方法應該檢測從park中恢複的線程是否已經逾時,因為除了第一種情況即由于中斷而退出外,park方法有可能是可能是由于條件成立或者資源被釋放後被其它線程unpak後退出、也有可能是逾時而退出。如果檢測到線程已經逾時,則應該在調用park的方法中傳回false,或者其他特殊值等。
  3. 最後檢測條件是否成立、資源鎖是否已經被釋放等,如果條件并未成立(雖然已經排除中斷、逾時的原因,但線程也可能由于任何其它未知原因從阻塞中恢複),或者搶奪資源鎖失敗,則繼續調用park方法阻塞,直到再次從阻塞中恢複、則進入下次循環執行步驟1、2、3。

    以AQS中對Condition.await()方法的實作代碼為例:

public final boolean await(long time, TimeUnit unit)
                throws InterruptedException {
     		...
            final long deadline = System.nanoTime() + nanosTimeout;		// 用于逾時檢測
            boolean timedout = false;		//	逾時标志位
            int interruptMode = 0;		//	中斷狀态
            while (!isOnSyncQueue(node)) {		//  條件判斷,如果條件未成立,繼續循環阻塞等待
                if (nanosTimeout <= 0L) {		//	逾時檢測
                    timedout = transferAfterCancelledWait(node);	//  此方法将會檢測到是否已經被unpark恢複、還是逾時恢複的。
                    break;		//  逾時,取消循環阻塞等待。
                }
                if (nanosTimeout >= spinForTimeoutThreshold)	//  沒有逾時,則繼續阻塞等待
                    LockSupport.parkNanos(this, nanosTimeout);	//	park,進入阻塞狀态
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)	//  park方法傳回了,目前線程從阻塞中恢複了,首先需要檢測是否是由于interrupte中斷而從阻塞中恢複的。
                    break;		//  中斷,則取消循環阻塞等待
                nanosTimeout = deadline - System.nanoTime();
            }		//  如果在本次循環中,沒有出現逾時、中斷的情況,不會從循環中break退出。則從park中恢複後,應再次進入循環檢查等待的條件是否已經滿足,如果條件已經滿足,則循環自動退出。如果條件沒有滿足,則會進入下一次的循環、阻塞等待。
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;        
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
            return !timedout;	//	傳回逾時标志位
        }
           

又如ReentrantLock類的lock方法,當需要擷取資源鎖的線程進入等待擷取鎖的隊列後,也需要使用循環語句結合park阻塞等待。因為lock方法并不支援中斷,是以對于檢測到中斷後,隻是記錄、并沒有放棄lock:

/**
	*	阻塞等待擷取同步隊列所代表的資源鎖。
	*	如果檢測到等待過程中線程被中斷,則傳回true。否則傳回false。
	*/
   final boolean acquireQueued(final Node node, int arg) {		
        boolean failed = true;		//  異常标志位。如果在整個 循環嘗試擷取資源 + park  過程中,抛出了異常,則應取消lock
        try {
            boolean interrupted = false;	//	中斷标志位
            for (;;) {						//  循環 + park
                final Node p = node.predecessor();		//	找到目前線程所在節點的前一個狀态正常的節點。
                if (p == head && tryAcquire(arg)) {		// 	首先檢視目前節點是否已經阻塞排隊排到了隊頭位置:如果目前節點的前驅節點是head節點,則線程可以嘗試搶奪資源鎖tryAcquire(arg)
                    setHead(node);		
                    p.next = null; 		// 	help GC
                    failed = false;	
                    return interrupted;		//	正常獲得了資源鎖,直接退出。傳回中斷标志位
                }
                //	擷取資源鎖失敗,進入 park阻塞+恢複後中斷檢測+進入下次循環(即,park+循環)的邏輯
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);	//  在排隊過程中出現了異常,從隊列中移除node,取消lock
        }
    }
           

而ReentrantLock的lockInterruptibly()方法,支援中斷。當檢測到中斷後,直接抛出異常。

private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();	//	如果檢測到中斷,直接抛出異常
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
           

Unsafe 源碼

Unsafe類提供了硬體級别的直接操作記憶體的方法

// 以下方法為JDK的并發工具包提供了硬體支援
public native void park(boolean isAbsolute, long time);  // 阻塞目前線程。當線程被unpark、或者被interrupte、或者已經超過設定的阻塞時間time(如果isAbsolute是false,time的機關是納秒;否則time的機關是毫秒),
public native void unpark(Object thread);  // 取消阻塞正在阻塞在park方法上的線程thread,
// 如果thread并未處于阻塞狀态,那麼之後thread對park方法的調用将不會導緻thread阻塞。
   
// 以下方法是擷取靜态字段(or執行個體域)在記憶體中相對于靜态字段所屬類的class對象(or執行個體域所在對象)的偏移量offset
public native long 	objectFieldOffset(Field f);		// 擷取某個對象的執行個體域相對于此對象在記憶體中的偏移量
public native long 	staticFieldOffset(Field f);   // 擷取某個類的字段相對其所在類的class對象在記憶體中的偏移量offset
public native Object staticFieldBase(Field f);		// 擷取某個類字段所屬類的class對象在記憶體中的起始位置,傳回值object的其實是一個指針,也就是class對象的記憶體起始值。這個方法通常與staticFieldOffset結合使用.
// 以Integer類中的MIN_VALUE為例:
Field field = Integer.class.getDeclaredField("MIN_VALUE");
long offset = unsafe.staticFieldOffset(field);
Object base = unsafe.staticFieldBase(field);
int IntegerMinValue = unsafe.getInt(base, offset);
// 上例就是直接從記憶體中讀取一個值

// 使用以上方法從記憶體讀取靜态字段的值時,必須確定其所在類已經被初始化過,以下方法提供了這方面的支援
public native boolean shouldBeInitialized(Class<?> c);  // 檢測某個類是否需要初始化。通常與staticFieldBase合用,因為如果類還未被初始化,其靜态字段就還沒被賦初值(未被初始化時字段的值是其字段類型的預設值)。
public native void ensureClassInitialized(Class<?> c);	//	確定某個類已經被初始化,如果還沒有被初始化,就初始化它。 同方法shouldBeInitialized一樣,用在staticFieldBase方法出現的地方

// 以下方法是用于擷取基本類型數組的元素的相對于數組起始位址的記憶體偏移量的API
public native long arrayBaseOffset(Class<?> arrayClass);  // 如:arrayBaseOffset(byte[].class)


// 以下方法:getXXXVolatile / putXXXVolatile,是使用volatile load語義從記憶體加載/更新資料,方法的參數值來源有三種:
// 1. o是一個執行個體對象的引用,offset是o的一個執行個體域相對于o的記憶體偏移量,通過objectFieldOffset方法擷取的;
// 2. o是某java類型C的class對象的引用,offset是類型C的一個全局變量(靜态字段)在記憶體中相對于o的偏移量,o通過方法staticFieldBase擷取,offset是通過staticFieldOffset方法擷取的。
// 3. o是某個數組對象的引用,offset是數組中的某個元素相對于數組起始位址的偏移量,offset需要通過類似arrayBaseOffset(int[].class)+元素下标*arrayIndexScale(int[].class)來擷取。
public final native int 	getIntVolatile(Object o, long offset);   // 使用volatile語義直接從記憶體(o指針+offset)處開始讀取一個int值,實際就是讀取了對象o的一個int字段的值,隻不過是直接從記憶體取的
public final native Object 	getObjectVolatile(Object o,long offset); // 使用volatile語義直接從記憶體(o指針+offset)處開始讀取一個對象的引用,實際就是讀取了對象o的一個複合類型字段的值,隻不過是直接從記憶體取的
// 所有java基本類型都提供了類似方法:getCharVolatile / getByteVolatile / getShortVolatile / getLongVolatile / getFloatVolatile / getDoubleVolatile / getBooleanVolatile
public final native void putIntVolatile(Object o, long offset, int x);
public final native void putObjectVolatile(Object o, long offset, Object x);

// putXXXVolatile的另一版本 ,與putXXXVolatile不同的是,putXXXVolatile方法會確定其操作的記憶體更新對其它線程立即可見,但是putOrderedXXX提供的這些記憶體更新操作并不會保證對其它線程立即可見。
// 通常,隻有在offset所指向的字段是java volatile的,或者是隻提供了通過volatile語義通路的數組元素時,這類方法才有用
public native void    putOrderedObject(Object o, long offset, Object x);
public native void    putOrderedInt(Object o, long offset, int x);
public native void    putOrderedLong(Object o, long offset, long x);

// 以下是硬體級别的CAS,用于確定多線程場景下對一個資料進行(讀 + 算 + 寫)操作時的資料安全。通常會與getXXXVolatile方法合用。
public final native boolean compareAndSwapObject(Object o, long offset,
                                                     Object expected,
                                                     Object x);
public final native boolean compareAndSwapInt(Object o, long offset,                          int expected, int x);
public final native boolean compareAndSwapLong(Object o, long offset,                          long expected, long x);
//getXXXVolatile方法與CompareAndSwapXXX方法確定并發環境下的資料安全的示例:
public final int getAndAddInt(Object o,long offset,int added){
	int pre;
	int updateTo;
	do{ 
		pre = getIntVolatile(o,offset);
		updateTo = pre + added;
	}while(!compareAndSwapInt(o, offset, pre, updateTo));
	return pre;
}


// 以下方法是對記憶體做 配置設定、回收、賦初值(初始化) 操作
public native long 	allocateMemory(long bytes);	// 配置設定一塊大小為bytes(位元組數)的記憶體塊,傳回值是配置設定的記憶體塊的起始位址
public native void	setMemory(long address, long bytes, byte value);	// 用值value對以address為起始位址的bytes個位元組賦初值
public native void	setMemory(Object o, long offset, long bytes, byte value);	// 用值value對以(object指針+offset)為起始位址的bytes個位元組賦初值
public native long reallocateMemory(long address, long bytes);	// 重新配置設定一塊大小為bytes(位元組數)的記憶體塊,傳回值是配置設定的記憶體塊的起始位址。會将原來的以address為起始位址的記憶體塊廢棄掉,注意系統不會自動把原來記憶體塊中的資料複制到新的記憶體塊中,如果需要複制,可以調用copyMemory
public native void	copyMemory(long srcAddress, long destAddress, long bytes);	// 将以address為起始位址的bytes個位元組的資料複制到以destAddress為起始位址的記憶體塊中
public native void copyMemory(Object srcBase, long srcOffset,                                  Object destBase, long destOffset, long bytes);	// 将以(srcBase指針+srcOffset)為起始位址的bytes個位元組的資料複制到以(destBase指針+destOffset)為起始位址的記憶體塊中


// 以下是直接根據一個記憶體塊的位址address參數,操作記憶體
public native int     getInt(long address);    // 從以address為起始位址的記憶體塊中讀取一個int值
public native double  getDouble(long address);	// 從以address為起始位址的記憶體塊中讀取一個double值
public native long		getAddress(long address);	// 針對引用類型,從以address為起始位址的記憶體塊中讀取一個指針,這個指針指向一個複合類型對象,即是對複合類型對象的引用

public native void   putInt(long address, int x);   // 向以address為起始位址的記憶體塊中寫入一個int值
public native void   putDouble(long address, double x);   // 向以address為起始位址的記憶體塊中寫入一個double值
public native void   putAddress(long address, long x);   // 針對引用類型,向以address為起始位址的記憶體塊中寫入一個指針,這個指針指向一個複合類型對象,即是對複合類型對象的引用

// 以下方法是給出對象o和其某個執行個體域相對于o的記憶體偏移量offset,操作以(o對象指針+offset)為起始位址的記憶體塊,從中讀取或者寫入資料,實際就是直接從指定的記憶體位置讀取或者更新對象o的某個執行個體域的值。執行個體域的偏移量offset是通過objectFieldOffset方法擷取的。
// 這裡當o=null時,就是直接對起始位址為offset的記憶體區域做操作,看起來與getInt(long address)應該是同樣結果,但是由于這裡的方法有兩個參數引用變量,實際機器會為java變量提供雙寄存器尋址模式。而getInt(long address)方法則是為非java變量提供單寄存器尋址模式。由于在記憶體中,java變量與非java變量的布局不同,是以我們不能假設這兩種尋址模式是等價的。此外,不能混淆雙寄存器尋址模式中偏移量和單寄存器尋址模式中使用的long。
public native int     getInt(Object o, long offset);    // 從以(object指針+offset)為起始位址的記憶體塊中讀取一個int值
public native double     getDouble(Object o, long offset);    // 從以(object指針+offset)為起始位址的記憶體塊中讀取一個double值
public native Object	getObject(Object o, long offset);    // 從以(object指針+offset)為起始位址的記憶體塊中讀取一個複合類型對象的引用值

public native void 	putInt(Object o, long offset, int x);    // 向以(object指針+offset)為起始位址的記憶體塊中寫入一個int值   
public native void 	putObject(Object o, long offset, Object x);    // 向以(object指針+offset)為起始位址的記憶體塊中寫入一個複合類型對象的引用值   
 

// 以下方法的結果是取決于本機作業系統或JVM廠商實作的
public native int addressSize();	// 存儲指針所占用的位元組寬度,就是作業系統選擇用幾個位元組來存儲指針(即記憶體位址),或者是4 或者是8.
public static final int ADDRESS_SIZE = theUnsafe.addressSize();  //  搞個常量,來給出目前伺服器的記憶體位址寬度,如果是4位元組=32位,即32位作業系統,編址可以從0-2^32;如果是8位元組=64位,即64位作業系統。

public native int pageSize();  // 記憶體頁的頁面大小

// 以下方法是直接從記憶體加載類、直接在記憶體上建立一個類執行個體
public native Class<?> defineClass(String name, byte[] b, int off, int len,
                                       ClassLoader loader,
                                       ProtectionDomain protectionDomain);  // 直接從記憶體加載一個類,預設用調用此方法的類的類加載器和保護域

public native Class<?> defineAnonymousClass(Class<?> hostClass, byte[] data, Object[] cpPatches);		// 直接從記憶體加載一個匿名類,但是這個類不會被任何加載器管理和識别

public native Object allocateInstance(Class<?> cls)    // 建立一個類執行個體,但是不會運作任何構造方法來初始化這個對象。如果類還沒被初始化,則會先初始化這個類。
        throws InstantiationException;

           

Unsafe對并發工具包的支援

對java.util.concurrent.atomic包的支援

java.util.concurrent.atomic工具包下的類提供了多線程環境下對單個變量的無鎖、線程安全的編碼支援。這些類實質上是将volatile的概念從單個的變量、字段、數組元素,擴充到了提供有條件的原子性更新操作的類中的。

而這些類則無一不是調用Unsafe類中的方法得以使用現代處理器上高效、高可用、硬體級别的(machine-level)原子性指令。

layset操作具有更新一個volatile變量的記憶體效應。不過,它允許其後的記憶體操作指令可以重排序,但是在它之前的記憶體操作指令不可以重排。并且這些記憶體操作本身不會禁止對普通的非volatile的寫入指令的重排序。

weakCompareAndSet操作隻能確定對某記憶體區域的讀寫的原子性,但是不提供volatile語義,即不會禁止與此記憶體區域無關的指令重排,不會禁止對其它任何變量的讀寫指令的重排。

atomic工具包中不止提供了對單個數值型變量進行原子性操作的類,還提供了對任何複合類型中的任意類型的volatile字段提供原子操作的Updater類。包括如下Updater:

AtomicIntegerFieldUpdater / AtomicLongFieldUpdater / AtomicReferenceFieldUpdater

以上三個都是抽象類,但是每個類内部各自定義了一個名為XXXUpdateImpl的實作類,抽象類本是提供了靜态方法newUpdater方法來為使用者提供建立 執行個體的入口。

繼續閱讀