CAS的全稱為Compare And Swap,直譯就是比較交換。是一條CPU的原子指令,其作用是讓CPU先進行比較兩個值是否相等,然後原子地更新某個位置的值,其實作方式是基于硬體平台的彙編指令,在intel的CPU中,使用的是 cmpxchg
指令,就是說CAS是靠硬體實作的,進而在硬體層面提升效率。
CSA 原理
利用CPU的CAS指令,同時借助JNI來完成Java的非阻塞算法,其它原子操作都是利用類似的特性完成的。
在
java.util.concurrent
下面的源碼中,
Atomic
,
ReentrantLock
都使用了Unsafe類中的方法來保證并發的安全性。
CAS操作是原子性的,是以多線程并發使用CAS更新資料時,可以不使用鎖,JDK中大量使用了CAS來更新資料而防止加鎖來保持原子更新。
CAS 操作包含三個操作數 :記憶體偏移量位置(V)、預期原值(A)和新值(B)。 如果記憶體位置的值與預期原值相比對,那麼處理器會自動将該位置值更新為新值 。否則,處理器不做任何操作。
源碼分析
下面來看一下
java.util.concurrent.atomic.AtomicInteger.java
,
getAndIncrement()
getAndDecrement()
是如何利用CAS實作原子性操作的。
AtomicInteger 源碼解析
// 使用 unsafe 類的原子操作方式
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
//計算變量 value 在類對象中的偏移量
valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
valueOffset
字段表示
"value"
記憶體位置,在
compareAndSwap
方法 ,第二個參數會用到.
關于偏移量
Unsafe
調用C 語言可以通過偏移量對變量進行操作
//volatile變量value
private volatile int value;
/**
* 建立具有給定初始值的新 AtomicInteger
*
* @param initialValue 初始值
*/
public AtomicInteger(int initialValue) {
value = initialValue;
}
//傳回目前的值
public final int get() {
return value;
}
//原子更新為新值并傳回舊值
public final int getAndSet(int newValue) {
return unsafe.getAndSetInt(this, valueOffset, newValue);
}
//最終會設定成新值
public final void lazySet(int newValue) {
unsafe.putOrderedInt(this, valueOffset, newValue);
}
//如果輸入的值等于預期值,則以原子方式更新為新值
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
//方法相當于原子性的 ++i
public final int getAndIncrement() {
//三個參數,1、目前的執行個體 2、value執行個體變量的偏移量 3、遞增的值。
return unsafe.getAndAddInt(this, valueOffset, 1);
}
//方法相當于原子性的 --i
public final int getAndDecrement() {
//三個參數,1、目前的執行個體 2、value執行個體變量的偏移量 3、遞減的值。
return unsafe.getAndAddInt(this, valueOffset, -1);
}
實作邏輯封裝在 Unsafe 中
getAndAddInt
方法,繼續往下看,
Unsafe
源碼解析
Unsafe 源碼解析
在JDK8中追蹤可見
sun.misc.Unsafe
這個類是無法看見源碼的,打開
openjdk8
源碼看
檔案:
openjdk-8-src-b132-03_mar_2014.zip
目錄:
openjdk\jdk\src\share\classes\sun\misc\Unsafe.java
通常我們最好也不要使用
Unsafe
類,除非有明确的目的,并且也要對它有深入的了解才行。要想使用
Unsafe
類需要用一些比較
tricky
的辦法。Unsafe類使用了單例模式,需要通過一個靜态方法
getUnsafe()
來擷取。但Unsafe類做了限制,如果是普通的調用的話,它會抛出一個
SecurityException
異常;隻有由主類加載器加載的類才能調用這個方法。
下面是
sun.misc.Unsafe.java
類源碼
//擷取Unsafe執行個體靜态方法
@CallerSensitive
public static Unsafe getUnsafe() {
Class<?> caller = Reflection.getCallerClass();
if (!VM.isSystemDomainLoader(caller.getClassLoader()))
throw new SecurityException("Unsafe");
return theUnsafe;
}
網上也有一些辦法來用主類加載器加載使用者代碼,最簡單方法是利用Java反射,方法如下:
private static Unsafe unsafe;
static {
try {
//通過反射擷取rt.jar下的Unsafe類
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
unsafe = (Unsafe) field.get(null);
} catch (Exception e) {
System.out.println("Get Unsafe instance occur error" + e);
}
}
擷取到Unsafe執行個體之後,我們就可以為所欲為了。Unsafe類提供了以下這些功能:
https://www.cnblogs.com/pkufork/p/java_unsafe.html//native硬體級别的原子操作
//類似的有compareAndSwapInt,compareAndSwapLong,compareAndSwapBoolean,compareAndSwapChar等等。
public final native boolean compareAndSwapInt(Object o, long offset,int expected,int x);
//内部使用自旋的方式進行CAS更新(while循環進行CAS更新,如果更新失敗,則循環再次重試)
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
//擷取對象記憶體位址偏移量上的數值v
v = getIntVolatile(o, offset);
//如果現在還是v,設定為 v + delta,否則傳回false,繼續循環再次重試.
} while (!compareAndSwapInt(o, offset, v, v + delta));
return v;
}
利用 Unsafe 類的 JNI compareAndSwapInt 方法實作,使用CAS實作一個原子操作更新,
compareAndSwapInt 四個參數:
1、目前的執行個體
2、執行個體變量的記憶體位址偏移量
3、預期的舊值
4、要更新的值
unsafe.cpp 深層次解析
// unsafe.cpp
/*
* 這個看起來好像不像一個函數,不過不用擔心,不是重點。UNSAFE_ENTRY 和 UNSAFE_END 都是宏,
* 在預編譯期間會被替換成真正的代碼。下面的 jboolean、jlong 和 jint 等是一些類型定義(typedef):
*
* 省略部分内容
*/
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj);
// 根據偏移量,計算 value 的位址。這裡的 offset 就是 AtomaicInteger 中的 valueOffset
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
// 調用 Atomic 中的函數 cmpxchg,該函數聲明于 Atomic.hpp 中
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END
// atomic.cpp
unsigned Atomic::cmpxchg(unsigned int exchange_value, volatile unsigned int* dest, unsigned int compare_value) {
assert(sizeof(unsigned int) == sizeof(jint), "more work to do");
/*
* 根據作業系統類型調用不同平台下的重載函數,這個在預編譯期間編譯器會決定調用哪個平台下的重載
* 函數。相關的預編譯邏輯如下:
*
* atomic.inline.hpp:
* #include "runtime/atomic.hpp"
*
* // Linux
* #ifdef TARGET_OS_ARCH_linux_x86
* # include "atomic_linux_x86.inline.hpp"
* #endif
*
* // 省略部分代碼
*
* // Windows
* #ifdef TARGET_OS_ARCH_windows_x86
* # include "atomic_windows_x86.inline.hpp"
* #endif
*
* // BSD
* #ifdef TARGET_OS_ARCH_bsd_x86
* # include "atomic_bsd_x86.inline.hpp"
* #endif
*
* 接下來分析 atomic_windows_x86.inline.hpp 中的 cmpxchg 函數實作
*/
return (unsigned int)Atomic::cmpxchg((jint)exchange_value, (volatile jint*)dest,
(jint)compare_value);
}
上面的分析看起來比較多,不過主流程并不複雜。如果不糾結于代碼細節,還是比較容易看懂的。接下來,我會分析 Windows 平台下的 Atomic::cmpxchg 函數。繼續往下看吧。
// atomic_windows_x86.inline.hpp
#define LOCK_IF_MP(mp) __asm cmp mp, 0 \
__asm je L0 \
__asm _emit 0xF0 \
__asm L0:
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
// alternative for InterlockedCompareExchange
int mp = os::is_MP();
__asm {
mov edx, dest
mov ecx, exchange_value
mov eax, compare_value
LOCK_IF_MP(mp)
cmpxchg dword ptr [edx], ecx
}
}
上面的代碼由 LOCK_IF_MP 預編譯辨別符和 cmpxchg 函數組成。為了看到更清楚一些,我們将 cmpxchg 函數中的 LOCK_IF_MP 替換為實際内容。如下:
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
// 判斷是否是多核 CPU
int mp = os::is_MP();
__asm {
// 将參數值放入寄存器中
mov edx, dest // 注意: dest 是指針類型,這裡是把記憶體位址存入 edx 寄存器中
mov ecx, exchange_value
mov eax, compare_value
// LOCK_IF_MP
cmp mp, 0
/*
* 如果 mp = 0,表明是線程運作在單核 CPU 環境下。此時 je 會跳轉到 L0 标記處,
* 也就是越過 _emit 0xF0 指令,直接執行 cmpxchg 指令。也就是不在下面的 cmpxchg 指令
* 前加 lock 字首。
*/
je L0
/*
* 0xF0 是 lock 字首的機器碼,這裡沒有使用 lock,而是直接使用了機器碼的形式。至于這樣做的
* 原因可以參考知乎的一個回答:
* https://www.zhihu.com/question/50878124/answer/123099923
*/
_emit 0xF0
L0:
/*
* 比較并交換。簡單解釋一下下面這條指令,熟悉彙編的朋友可以略過下面的解釋:
* cmpxchg: 即“比較并交換”指令
* dword: 全稱是 double word,在 x86/x64 體系中,一個
* word = 2 byte,dword = 4 byte = 32 bit
* ptr: 全稱是 pointer,與前面的 dword 連起來使用,表明通路的記憶體單元是一個雙字單元
* [edx]: [...] 表示一個記憶體單元,edx 是寄存器,dest 指針值存放在 edx 中。
* 那麼 [edx] 表示記憶體位址為 dest 的記憶體單元
*
* 這一條指令的意思就是,将 eax 寄存器中的值(compare_value)與 [edx] 雙字記憶體單元中的值
* 進行對比,如果相同,則将 ecx 寄存器中的值(exchange_value)存入 [edx] 記憶體單元中。
*/
cmpxchg dword ptr [edx], ecx
}
}
到這裡 CAS 的實作過程就講了,CAS 的實作離不開處理器的支援。以上這麼多代碼,其實核心代碼就是一條帶
lock
字首的
cmpxchg
指令,即
lock cmpxchg dword ptr [edx], ecx
。
通過上述的分析,可以發現
AtomicInteger
原子類的内部幾乎是基于前面分析過
Unsafe
類中的
CAS
相關操作的方法實作的,這也同時證明
AtomicInteger
getAndIncrement
自增操作實作過程,是基于無鎖實作的。
CAS的ABA問題及其解決方案
假設這樣一種場景,當第一個線程執行CAS(V,E,U)操作。在擷取到目前變量V,準備修改為新值U前,另外兩個線程已連續修改了兩次變量V的值,使得該值又恢複為舊值,這樣的話,我們就無法正确判斷這個變量是否已被修改過,如下圖:
這就是典型的CAS的ABA問題,一般情況這種情況發現的機率比較小,可能發生了也不會造成什麼問題,比如說我們對某個做加減法,不關心數字的過程,那麼發生ABA問題也沒啥關系。但是在某些情況下還是需要防止的,那麼該如何解決呢?在Java中解決ABA問題,我們可以使用以下原子類
AtomicStampedReference類
AtomicStampedReference原子類是一個帶有時間戳的對象引用,在每次修改後,AtomicStampedReference不僅會設定新值而且還會記錄更改的時間。當AtomicStampedReference設定對象值時,對象值以及時間戳都必須滿足期望值才能寫入成功,這也就解決了反複讀寫時,無法預知值是否已被修改的窘境
底層實作為: 通過Pair私有内部類存儲資料和時間戳, 并構造volatile修飾的私有執行個體
接着看
java.util.concurrent.atomic.AtomicStampedReference
類的compareAndSet()方法的實作:
private static class Pair<T> {
final T reference;
final int stamp;
//最好不要重複的一個資料,決定資料是否能設定成功,時間戳會重複
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}
同時對目前資料和目前時間進行比較,隻有兩者都相等是才會執行casPair()方法,
單從該方法的名稱就可知是一個CAS方法,最終調用的還是
Unsafe
compareAndSwapObject
方法
到這我們就很清晰
AtomicStampedReference
的内部實作思想了,
通過一個鍵值對
Pair
存儲資料和時間戳,在更新時對資料和時間戳進行比較,
隻有兩者都符合預期才會調用
Unsafe
的
compareAndSwapObject
方法執行數值和時間戳替換,也就避免了ABA的問題。
/**
* 原子更新帶有版本号的引用類型。
* 該類将整數值與引用關聯起來,可用于原子的更資料和資料的版本号。
* 可以解決使用CAS進行原子更新時,可能出現的ABA問題。
*/
public class AtomicStampedReference<V> {
//靜态内部類Pair将對應的引用類型和版本号stamp作為它的成員
private static class Pair<T> {
//最好不要重複的一個資料,決定資料是否能設定成功,建議時間戳
final T reference;
final int stamp;
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
//根據reference和stamp來生成一個Pair的執行個體
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}
//作為一個整體的pair變量被volatile修飾
private volatile Pair<V> pair;
//構造方法,參數分别是初始引用變量的值和初始版本号
public AtomicStampedReference(V initialRef, int initialStamp) {
pair = Pair.of(initialRef, initialStamp);
}
....
private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
private static final long pairOffset = objectFieldOffset(UNSAFE, "pair", AtomicStampedReference.class);
//擷取pair成員的偏移位址
static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
String field, Class<?> klazz) {
try {
return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
} catch (NoSuchFieldException e) {
NoSuchFieldError error = new NoSuchFieldError(field);
error.initCause(e);
throw error;
}
}
}
/**
* @param 期望(老的)引用
* @param (新的)引用資料
* @param 期望(老的)标志stamp(時間戳)值
* @param (新的)标志stamp(時間戳)值
* @return 是否成功
*/
public boolean compareAndSet(V expectedReference,V newReference,int expectedStamp,int newStamp) {
Pair<V> current = pair;
return
// 期望(老的)引用 == 目前引用
expectedReference == current.reference &&
// 期望(老的)标志stamp(時間戳)值 == 目前标志stamp(時間戳)值
expectedStamp == current.stamp &&
// (新的)引用資料 == 目前引用資料 并且 (新的)标志stamp(時間戳)值 ==目前标志stamp(時間戳)值
((newReference == current.reference && newStamp == current.stamp) ||
#原子更新值
casPair(current, Pair.of(newReference, newStamp)));
}
//當引用類型的值與期望的一緻的時候,原子的更改版本号為新的值。該方法隻修改版本号,不修改引用變量的值,成功傳回true
public boolean attemptStamp(V expectedReference, int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
(newStamp == current.stamp ||
casPair(current, Pair.of(expectedReference, newStamp)));
}
/**
* CAS真正實作方法
*/
private boolean casPair(Pair<V> cmp, Pair<V> val) {
return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
}
期望 Pair cmp(A) == 目前記憶體存偏移量位置 Pair(V),就更新值 Pair val(B)成功傳回true 否則 false
public static void main(String[] args) {
AtomicStampedReference<Integer> num = new AtomicStampedReference<Integer>(1, 0);
Integer i = num.getReference();
int stamped = num.getStamp();
if (num.compareAndSet(i, i + 1, stamped, stamped + 1)) {
System.out.println("測試成功");
}
}
通過以上原子更新方法,可見 AtomicStampedReference就是利用了Unsafe的CAS方法+Volatile關鍵字對存儲實際的引用變量和int的版本号的Pair執行個體進行更新。
參考:
https://www.cnblogs.com/nullllun/p/9039049.html https://blog.csdn.net/a67474506/article/details/48310515Contact
- 作者:鵬磊
- 出處: http://www.ymq.io/2018/05/09/dubbo
- 版權歸作者所有,轉載請注明出處
- Wechat:關注公衆号,搜雲庫,專注于開發技術的研究與知識分享