在前面一篇博文中,我們曾經詳談過有鎖并發的典型代表synchronized關鍵字,通過該關鍵字可以控制并發執行過程中有且隻有一個線程可以通路共享資源,其原理是通過目前線程持有目前對象鎖,進而擁有通路權限,而其他沒有持有目前對象鎖的線程無法擁有通路權限,也就保證了線程安全。但在本篇中,我們将會詳聊另外一種反向而行的并發政策,即無鎖并發,即不加鎖也能保證并發執行的安全性。
本篇的思路是先闡明無鎖執行者CAS的核心算法原理然後分析Java執行CAS的實踐者Unsafe類,該類中的方法都是native修飾的,是以我們會以說明方法作用為主介紹Unsafe類,最後再介紹并發包中的Atomic系統使用CAS原理實作的并發類,以下是主要内容
- 無鎖的概念
- 無鎖的執行者-CAS
- CAS
- CPU指令對CAS的支援
- 鮮為人知的指針 Unsafe類
- 并發包中的原子操作類Atomic系列
- 原子更新基本類型
- 原子更新引用
- 原子更新數組
- 原子更新屬性
- CAS的ABA問題及其解決方案
- 再談自旋鎖
無鎖的概念
在談論無鎖概念時,總會關聯起樂觀派與悲觀派,對于樂觀派而言,他們認為事情總會往好的方向發展,總是認為壞的情況發生的機率特别小,可以無所顧忌地做事,但對于悲觀派而已,他們總會認為發展事态如果不及時控制,以後就無法挽回了,即使無法挽回的局面幾乎不可能發生。這兩種派系映射到并發程式設計中就如同加鎖與無鎖的政策,即加鎖是一種悲觀政策,無鎖是一種樂觀政策,因為對于加鎖的并發程式來說,它們總是認為每次通路共享資源時總會發生沖突,是以必須對每一次資料操作實施加鎖政策。而無鎖則總是假設對共享資源的通路沒有沖突,線程可以不停執行,無需加鎖,無需等待,一旦發現沖突,無鎖政策則采用一種稱為CAS的技術來保證線程執行的安全性,這項CAS技術就是無鎖政策實作的關鍵,下面我們進一步了解CAS技術的奇妙之處。
無鎖的執行者-CAS
CAS
CAS的全稱是Compare And Swap 即比較交換,其算法核心思想如下
執行函數:CAS(V,E,N)
其包含3個參數
- V表示要更新的變量
- E表示預期值
- N表示新值
如果V值等于E值,則将V的值設為N。若V值和E值不同,則說明已經有其他線程做了更新,則目前線程什麼都不做。通俗的了解就是CAS操作需要我們提供一個期望值,當期望值與目前線程的變量值相同時,說明還沒線程修改該值,目前線程可以進行修改,也就是執行CAS操作,但如果期望值與目前線程不符,則說明該值已被其他線程修改,此時不執行更新操作,但可以選擇重新讀取該變量再嘗試再次修改該變量,也可以放棄操作,原理圖如下
由于CAS操作屬于樂觀派,它總認為自己可以成功完成操作,當多個線程同時使用CAS操作一個變量時,隻有一個會勝出,并成功更新,其餘均會失敗,但失敗的線程并不會被挂起,僅是被告知失敗,并且允許再次嘗試,當然也允許失敗的線程放棄操作,這點從圖中也可以看出來。基于這樣的原理,CAS操作即使沒有鎖,同樣知道其他線程對共享資源操作影響,并執行相應的處理措施。同時從這點也可以看出,由于無鎖操作中沒有鎖的存在,是以不可能出現死鎖的情況,也就是說無鎖操作天生免疫死鎖。
CPU指令對CAS的支援
或許我們可能會有這樣的疑問,假設存在多個線程執行CAS操作并且CAS的步驟很多,有沒有可能在判斷V和E相同後,正要指派時,切換了線程,更改了值。造成了資料不一緻呢?答案是否定的,因為CAS是一種系統原語,原語屬于作業系統用語範疇,是由若幹條指令組成的,用于完成某個功能的一個過程,并且原語的執行必須是連續的,在執行過程中不允許被中斷,也就是說CAS是一條CPU的原子指令,不會造成所謂的資料不一緻問題。
鮮為人知的指針: Unsafe類
Unsafe類存在于
sun.misc
包中,其内部方法操作可以像C的指針一樣直接操作記憶體,單從名稱看來就可以知道該類是非安全的,畢竟Unsafe擁有着類似于C的指針操作,是以總是不應該首先使用Unsafe類,Java官方也不建議直接使用的Unsafe類,據說Oracle正在計劃從Java 9中去掉Unsafe類,但我們還是很有必要了解該類,因為Java中CAS操作的執行依賴于Unsafe類的方法,注意Unsafe類中的所有方法都是native修飾的,也就是說Unsafe類中的方法都直接調用作業系統底層資源執行相應任務,關于Unsafe類的主要功能點如下:
- 記憶體管理,Unsafe類中存在直接操作記憶體的方法
//配置設定記憶體指定大小的記憶體 public native long allocateMemory(long bytes); //根據給定的記憶體位址address設定重新配置設定指定大小的記憶體 public native long reallocateMemory(long address, long bytes); //用于釋放allocateMemory和reallocateMemory申請的記憶體 public native void freeMemory(long address); //将指定對象的給定offset偏移量記憶體塊中的所有位元組設定為固定值 public native void setMemory(Object o, long offset, long bytes, byte value); //設定給定記憶體位址的值 public native void putAddress(long address, long x); //擷取指定記憶體位址的值 public native long getAddress(long address); //設定給定記憶體位址的long值 public native void putLong(long address, long x); //擷取指定記憶體位址的long值 public native long getLong(long address); //設定或擷取指定記憶體的byte值 public native byte getByte(long address); public native void putByte(long address, byte x); //其他基本資料類型(long,char,float,double,short等)的操作與putByte及getByte相同 //作業系統的記憶體頁大小 public native int pageSize();
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 提供執行個體對象新途徑。
//傳入一個對象的class并建立該執行個體對象,但不會調用構造方法 public native Object allocateInstance(Class cls) throws InstantiationException;
- 1
- 2
- 類和執行個體對象以及變量的操作,主要方法如下
//擷取字段f在執行個體對象中的偏移量 public native long objectFieldOffset(Field f); //靜态屬性的偏移量,用于在對應的Class對象中讀寫靜态屬性 public native long staticFieldOffset(Field f); //傳回值就是f.getDeclaringClass() public native Object staticFieldBase(Field f); //獲得給定對象偏移量上的int值,所謂的偏移量可以簡單了解為指針指向該變量的記憶體位址, //通過偏移量便可得到該對象的變量,進行各種操作 public native int getInt(Object o, long offset); //設定給定對象上偏移量的int值 public native void putInt(Object o, long offset, int x); //獲得給定對象偏移量上的引用類型的值 public native Object getObject(Object o, long offset); //設定給定對象偏移量上的引用類型的值 public native void putObject(Object o, long offset, Object x); //其他基本資料類型(long,char,byte,float,double)的操作與getInthe及putInt相同 //設定給定對象的int值,使用volatile語義,即設定後立馬更新到記憶體對其他線程可見 public native void putIntVolatile(Object o, long offset, int x); //獲得給定對象的指定偏移量offset的int值,使用volatile語義,總能擷取到最新的int值。 public native int getIntVolatile(Object o, long offset); //其他基本資料類型(long,char,byte,float,double)的操作與putIntVolatile及getIntVolatile相同,引用類型putObjectVolatile也一樣。 //與putIntVolatile一樣,但要求被操作字段必須有volatile修飾 public native void putOrderedInt(Object o,long offset,int x);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
public class UnSafeDemo { public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException, InstantiationException { // 通過反射得到theUnsafe對應的Field對象 Field field = Unsafe.class.getDeclaredField("theUnsafe"); // 設定該Field為可通路 field.setAccessible(true); // 通過Field得到該Field對應的具體對象,傳入null是因為該Field為static的 Unsafe unsafe = (Unsafe) field.get(null); System.out.println(unsafe); //通過allocateInstance直接建立對象 User user = (User) unsafe.allocateInstance(User.class); Class userClass = user.getClass(); Field name = userClass.getDeclaredField("name"); Field age = userClass.getDeclaredField("age"); Field id = userClass.getDeclaredField("id"); //擷取執行個體變量name和age在對象記憶體中的偏移量并設定值 unsafe.putInt(user,unsafe.objectFieldOffset(age),); unsafe.putObject(user,unsafe.objectFieldOffset(name),"android TV"); // 這裡傳回 User.class, Object staticBase = unsafe.staticFieldBase(id); System.out.println("staticBase:"+staticBase); //擷取靜态變量id的偏移量staticOffset long staticOffset = unsafe.staticFieldOffset(userClass.getDeclaredField("id")); //擷取靜态變量的值 System.out.println("設定前的ID:"+unsafe.getObject(staticBase,staticOffset)); //設定值 unsafe.putObject(staticBase,staticOffset,"SSSSSSSS"); //擷取靜态變量的值 System.out.println("設定前的ID:"+unsafe.getObject(staticBase,staticOffset)); //輸出USER System.out.println("輸出USER:"+user.toString()); long data = ; byte size = ;//機關位元組 //調用allocateMemory配置設定記憶體,并擷取記憶體位址memoryAddress long memoryAddress = unsafe.allocateMemory(size); //直接往記憶體寫入資料 unsafe.putAddress(memoryAddress, data); //擷取指定記憶體位址的資料 long addrData=unsafe.getAddress(memoryAddress); System.out.println("addrData:"+addrData); /** * 輸出結果: [email protected] staticBase:class geym.conc.ch4.atomic.User 設定前的ID:USER_ID 設定前的ID:SSSSSSSS 輸出USER:User{name='android TV', age=18', id=SSSSSSSS'} addrData:1000 */ } } class User{ public User(){ System.out.println("user 構造方法被調用"); } private String name; private int age; private static String id="USER_ID"; @Override public String toString() { return "User{" + "name='" + name + '\'' + ", age=" + age +'\'' + ", id=" + id +'\'' + '}'; } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
public static Unsafe getUnsafe() { Class cc = sun.reflect.Reflection.getCallerClass(); if (cc.getClassLoader() != null) throw new SecurityException("Unsafe"); return theUnsafe; }
- 1
- 2
- 3
- 4
- 5
- 6
- 數組操作
//擷取數組第一個元素的偏移位址 public native int arrayBaseOffset(Class arrayClass); //數組中一個元素占據的記憶體空間,arrayBaseOffset與arrayIndexScale配合使用,可定位數組中每個元素在記憶體中的位置 public native int arrayIndexScale(Class arrayClass);
- 1
- 2
- 3
- 4
- 5
-
CAS 操作相關
CAS是一些CPU直接支援的指令,也就是我們前面分析的無鎖操作,在Java中無鎖操作CAS基于以下3個方法實作,在稍後講解Atomic系列内部方法是基于下述方法的實作的。
//第一個參數o為給定對象,offset為對象記憶體的偏移量,通過這個偏移量迅速定位字段并設定或擷取該字段的值, //expected表示期望值,x表示要設定的值,下面3個方法都通過CAS原子指令執行操作。 public final native boolean compareAndSwapObject(Object o, long offset,Object expected, Object x); public final native boolean compareAndSwapInt(Object o, long offset,int expected,int x); public final native boolean compareAndSwapLong(Object o, long offset,long expected,long x);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
//1.8新增,給定對象o,根據擷取記憶體偏移量指向的字段,将其增加delta, //這是一個CAS操作過程,直到設定成功方能退出循環,傳回舊值 public final int getAndAddInt(Object o, long offset, int delta) { int v; do { //擷取記憶體中最新值 v = getIntVolatile(o, offset); //通過CAS操作 } while (!compareAndSwapInt(o, offset, v, v + delta)); return v; } //1.8新增,方法作用同上,隻不過這裡操作的long類型資料 public final long getAndAddLong(Object o, long offset, long delta) { long v; do { v = getLongVolatile(o, offset); } while (!compareAndSwapLong(o, offset, v, v + delta)); return v; } //1.8新增,給定對象o,根據擷取記憶體偏移量對于字段,将其 設定為新值newValue, //這是一個CAS操作過程,直到設定成功方能退出循環,傳回舊值 public final int getAndSetInt(Object o, long offset, int newValue) { int v; do { v = getIntVolatile(o, offset); } while (!compareAndSwapInt(o, offset, v, newValue)); return v; } // 1.8新增,同上,操作的是long類型 public final long getAndSetLong(Object o, long offset, long newValue) { long v; do { v = getLongVolatile(o, offset); } while (!compareAndSwapLong(o, offset, v, newValue)); return v; } //1.8新增,同上,操作的是引用類型資料 public final Object getAndSetObject(Object o, long offset, Object newValue) { Object v; do { v = getObjectVolatile(o, offset); } while (!compareAndSwapObject(o, offset, v, newValue)); return v; }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
-
挂起與恢複
将一個線程進行挂起是通過park方法實作的,調用 park後,線程将一直阻塞直到逾時或者中斷等條件出現。unpark可以終止一個挂起的線程,使其恢複正常。Java對線程的挂起操作被封裝在 LockSupport類中,LockSupport類中有各種版本pack方法,其底層實作最終還是使用Unsafe.park()方法和Unsafe.unpark()方法
//線程調用該方法,線程将一直阻塞直到逾時,或者是中斷條件出現。 public native void park(boolean isAbsolute, long time); //終止挂起的線程,恢複正常.java.util.concurrent包中挂起操作都是在LockSupport類實作的,其底層正是使用這兩個方法, public native void unpark(Object thread);
- 1
- 2
- 3
- 4
- 5
-
記憶體屏障
這裡主要包括了loadFence、storeFence、fullFence等方法,這些方法是在Java 8新引入的,用于定義記憶體屏障,避免代碼重排序,與Java記憶體模型相關,感興趣的可以看部落客的另一篇博文全面了解Java記憶體模型(JMM)及volatile關鍵字,這裡就不展開了
//在該方法之前的所有讀操作,一定在load屏障之前執行完成 public native void loadFence(); //在該方法之前的所有寫操作,一定在store屏障之前執行完成 public native void storeFence(); //在該方法之前的所有讀寫操作,一定在full屏障之前執行完成,這個記憶體屏障相當于上面兩個的合體功能 public native void fullFence();
- 1
- 2
- 3
- 4
- 5
- 6
- 其他操作
//擷取持有鎖,已不建議使用 @Deprecated public native void monitorEnter(Object var1); //釋放鎖,已不建議使用 @Deprecated public native void monitorExit(Object var1); //嘗試擷取鎖,已不建議使用 @Deprecated public native boolean tryMonitorEnter(Object var1); //擷取本機記憶體的頁數,這個值永遠都是2的幂次方 public native int pageSize(); //告訴虛拟機定義了一個沒有安全檢查的類,預設情況下這個類加載器和保護域來着調用者類 public native Class defineClass(String name, byte[] b, int off, int len, ClassLoader loader, ProtectionDomain protectionDomain); //加載一個匿名類 public native Class defineAnonymousClass(Class hostClass, byte[] data, Object[] cpPatches); //判斷是否需要加載一個類 public native boolean shouldBeInitialized(Class<?> c); //確定類一定被加載 public native void ensureClassInitialized(Class<?> c)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
并發包中的原子操作類(Atomic系列)
通過前面的分析我們已基本了解了無鎖CAS的原理并對Java中的指針類Unsafe類有了比較全面的認識,下面進一步分析CAS在Java中的應用,即并發包中的原子操作類(Atomic系列),從JDK 1.5開始提供了
java.util.concurrent.atomic
包,在該包中提供了許多基于CAS實作的原子操作類,用法友善,性能高效,主要分以下4種類型。
原子更新基本類型
原子更新基本類型主要包括3個類:
- AtomicBoolean:原子更新布爾類型
- AtomicInteger:原子更新整型
- AtomicLong:原子更新長整型
這3個類的實作原理和使用方式幾乎是一樣的,這裡我們以AtomicInteger為例進行分析,AtomicInteger主要是針對int類型的資料執行原子操作,它提供了原子自增方法、原子自減方法以及原子指派方法等,鑒于AtomicInteger的源碼不多,我們直接看源碼
public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = L;
// 擷取指針類Unsafe
private static final Unsafe unsafe = Unsafe.getUnsafe();
//下述變量value在AtomicInteger執行個體對象内的記憶體偏移量
private static final long valueOffset;
static {
try {
//通過unsafe類的objectFieldOffset()方法,擷取value變量在對象記憶體中的偏移
//通過該偏移量valueOffset,unsafe類的内部方法可以擷取到變量value對其進行取值或指派操作
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
//目前AtomicInteger封裝的int變量value
private volatile int value;
public AtomicInteger(int initialValue) {
value = initialValue;
}
public AtomicInteger() {
}
//擷取目前最新值,
public final int get() {
return value;
}
//設定目前值,具備volatile效果,方法用final修飾是為了更進一步的保證線程安全。
public final void set(int newValue) {
value = newValue;
}
//最終會設定成newValue,使用該方法後可能導緻其他線程在之後的一小段時間内可以擷取到舊值,有點類似于延遲加載
public final void lazySet(int newValue) {
unsafe.putOrderedInt(this, valueOffset, newValue);
}
//設定新值并擷取舊值,底層調用的是CAS操作即unsafe.compareAndSwapInt()方法
public final int getAndSet(int newValue) {
return unsafe.getAndSetInt(this, valueOffset, newValue);
}
//如果目前值為expect,則設定為update(目前值指的是value變量)
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
//目前值加1傳回舊值,底層CAS操作
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, );
}
//目前值減1,傳回舊值,底層CAS操作
public final int getAndDecrement() {
return unsafe.getAndAddInt(this, valueOffset, -);
}
//目前值增加delta,傳回舊值,底層CAS操作
public final int getAndAdd(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta);
}
//目前值加1,傳回新值,底層CAS操作
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, ) + ;
}
//目前值減1,傳回新值,底層CAS操作
public final int decrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, -) - ;
}
//目前值增加delta,傳回新值,底層CAS操作
public final int addAndGet(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
}
//省略一些不常用的方法....
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
通過上述的分析,可以發現AtomicInteger原子類的内部幾乎是基于前面分析過Unsafe類中的CAS相關操作的方法實作的,這也同時證明AtomicInteger是基于無鎖實作的,這裡重點分析自增操作實作過程,其他方法自增實作原理一樣。
//目前值加1,傳回新值,底層CAS操作
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, ) + ;
}
- 1
- 2
- 3
- 4
我們發現AtomicInteger類中所有自增或自減的方法都間接調用Unsafe類中的getAndAddInt()方法實作了CAS操作,進而保證了線程安全,關于getAndAddInt其實前面已分析過,它是Unsafe類中1.8新增的方法,源碼如下
//Unsafe類中的getAndAddInt方法
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset);
} while (!compareAndSwapInt(o, offset, v, v + delta));
return v;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
可看出getAndAddInt通過一個while循環不斷的重試更新要設定的值,直到成功為止,調用的是Unsafe類中的compareAndSwapInt方法,是一個CAS操作方法。這裡需要注意的是,上述源碼分析是基于JDK1.8的,如果是1.8之前的方法,AtomicInteger源碼實作有所不同,是基于for死循環的,如下
//JDK 1.7的源碼,由for的死循環實作,并且直接在AtomicInteger實作該方法,
//JDK1.8後,該方法實作已移動到Unsafe類中,直接調用getAndAddInt方法即可
public final int incrementAndGet() {
for (;;) {
int current = get();
int next = current + ;
if (compareAndSet(current, next))
return next;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
ok~,下面簡單看個Demo,感受一下AtomicInteger使用方式
public class AtomicIntegerDemo {
//建立AtomicInteger,用于自增操作
static AtomicInteger i=new AtomicInteger();
public static class AddThread implements Runnable{
public void run(){
for(int k=;k<;k++)
i.incrementAndGet();
}
}
public static void main(String[] args) throws InterruptedException {
Thread[] ts=new Thread[];
//開啟10條線程同時執行i的自增操作
for(int k=;k<;k++){
ts[k]=new Thread(new AddThread());
}
//啟動線程
for(int k=;k<;k++){ts[k].start();}
for(int k=;k<;k++){ts[k].join();}
System.out.println(i);//輸出結果:100000
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
在Demo中,使用原子類型AtomicInteger替換普通int類型執行自增的原子操作,保證了線程安全。至于AtomicBoolean和AtomicLong的使用方式以及實作原理是一樣,大家可以自行查閱源碼。