文章目錄
- 五、共享模型之無鎖
- 5.1 問題提出
- 5.1.1 為麼不安全
- 5.1.2 解決思路1 - 鎖
- 5.1.3 解決思路2 - 無鎖
- 5.2 CAS 與 volatile
- 5.2.1 CAS和volatile的關系
- 5.2.2 為什麼無鎖效率高
- 5.2.3 CAS 的特點
- 5.3 原子整數
- 5.4 原子引用
- 5.4.1 不安全實作
- 5.4.2 安全實作-使用鎖
- 5.4.3 安全實作-使用CAS
- 5.4.4 ABA 問題及解決
- 5.5 原子數組
- 5.6 字段更新器
- 5.7 原子累加器
- 5.7.1 累加器性能比較
- * 源碼之 LongAdder
- 5.7.2 CAS鎖
- * 原理之僞共享
- 5.8 Unsafe
- 5.8.1 Unsafe CAS 操作
- 6.8.2 unsafe對象模拟實作原子整數
- 本章小結
五、共享模型之無鎖
5.1 問題提出
有如下需求,保證
account.withdraw
取款方法的線程安全
interface Account {
// 擷取餘額
Integer getBalance();
// 取款
void withdraw(Integer amount);
/**
* 方法内會啟動 1000 個線程,每個線程做 -10 元 的操作
* 如果初始餘額為 10000 那麼正确的結果應當是 0
*/
static void demo(Account account) {
List<Thread> ts = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
ts.add(new Thread(() -> {
account.withdraw(10);
}));
}
long start = System.nanoTime();
ts.forEach(Thread::start);
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.nanoTime();
System.out.println(account.getBalance()
+ " cost: " + (end-start)/1000_000 + " ms");
}
}
原有實作并不是線程安全的
class AccountUnsafe implements Account {
private Integer balance;
public AccountUnsafe(Integer balance) {
this.balance = balance;
}
@Override
public Integer getBalance() {
return this.balance;
}
@Override
public void withdraw(Integer amount) {
this.balance -= amount;
}
}
測試代碼
public static void main(String[] args) {
Account account = new AccountUnsafe(10000);
Account.demo(account);
}
執行測試代碼,某次執行結果
280 cost: 82
5.1.1 為麼不安全
withdraw
方法是臨界區,會存線上程安全問題
檢視下位元組碼
ALOAD 0 // <- this
ALOAD 0
GETFIELD cn/itcast/AccountUnsafe.balance : Ljava/lang/Integer; // <- this.balance
INVOKEVIRTUAL java/lang/Integer.intValue ()I // 拆箱
ALOAD 1 // <- amount
INVOKEVIRTUAL java/lang/Integer.intValue ()I // 拆箱
ISUB // 減法
INVOKESTATIC java/lang/Integer.valueOf (I)Ljava/lang/Integer; // 結果裝箱
PUTFIELD cn/itcast/AccountUnsafe.balance : Ljava/lang/Integer; // -> this.balance
多線程在執行過程中可能會出現指令的交錯,進而結果錯誤!
5.1.2 解決思路1 - 鎖
首先想到的是給 Account 對象加鎖
class AccountUnsafe implements Account{
private Integer balance;
public AccountUnsafe(Integer balance) {
this.balance = balance;
}
@Override
public Integer getBalance() {
return this.balance;
}
@Override
public void withdraw(Integer amount) {
synchronized (this){
this.balance -= amount;
}
}
}
運作結果
0 cost: 122
5.1.3 解決思路2 - 無鎖
class AccountCas implements Account{
private AtomicInteger balance;
public AccountCas(int balance) {
this.balance = new AtomicInteger(balance);
}
@Override
public Integer getBalance() {
return balance.get();
}
@Override
public void withdraw(Integer amount) {
while (true){
//擷取金額的最新值
int prev = balance.get();
//要修改的金額
int next = prev - amount;
//真正修改
if(balance.compareAndSet(prev,next)){
break;
}
}
}
}
運作結果
0 cost: 63
5.2 CAS 與 volatile
前面看到的 AtomicInteger 的解決方法,内部并沒有用鎖來保護共享變量的線程安全。那麼它是如何實作的呢?
public void withdraw(Integer amount) {
while(true) {
// 需要不斷嘗試,直到成功為止
while (true) {
// 比如拿到了舊值 1000
int prev = balance.get();
// 在這個基礎上 1000-10 = 990
int next = prev - amount;
/*
compareAndSet 正是做這個檢查,在 set 前,先比較 prev 與目前值
- 不一緻了,next 廢棄,傳回 false 表示失敗
比如,别的線程已經做了減法,目前值已經被減成了 990
那麼本線程的這次 990 就廢棄了,進入 while 下次循環重試
- 一緻,以 next 設定為新值,傳回 true 表示成功
*/
if (balance.compareAndSet(prev, next)) {
break;
}
}
}
}
其中的關鍵是 compareAndSet,它的簡稱就是 CAS (也有 Compare And Swap 的說法),它必須是原子操作。
//判斷期望值和目前主存中的值是否一緻,如果一緻則将其更新成update,否則繼續重試
public final boolean compareAndSet(int expect,int update)
分析:
- 線程1拿到 balance後,進行-10運算 。在這個過程中,balance已經被線程2修改成90了。當線程1進行cas操作時,發現oldBalance和newBalance不一緻,就放棄進行設定next,嘗試進行下一輪
- 當進行到第三輪,cas比較時,發現oldBalance和newBalance一緻,說明并沒有被其他線程修改,就将balance更新成 next
注意
- 其實 CAS 的底層是 lock cmpxchg 指令(X86 架構),在單核 CPU 和多核 CPU 下都能夠保證【比較-交換】的原子性。
- 在多核狀态下,某個核執行到帶 lock 的指令時,CPU 會讓總線鎖住,當這個核把此指令執行完畢,再開啟總線。這個過程中不會被線程的排程機制所打斷,保證了多個線程對記憶體操作的準确性,是原子的。
5.2.1 CAS和volatile的關系
擷取共享變量時,為了保證該變量的可見性,需要使用
volatile
修飾。
它可以用來修飾成員變量和靜态成員變量,可以避免線程從自己的工作緩存中查找變量的值,必須到主存中擷取它的值,線程操作 volatile 變量都是直接操作主存。即一個線程對 volatile 變量的修改,對另一個線程可見。
CAS 必須借助 volatile 才能讀取到共享變量的最新值來實作【比較并交換】的效果
**注意:**volatile 僅僅保證了共享變量的可見性,讓其它線程能夠看到最新值,但不能解決指令交錯問題(不能保證原子性)
5.2.2 為什麼無鎖效率高
- 無鎖情況下,即使重試失敗,線程始終在高速運作,沒有停歇,而
會讓線程在沒有獲得鎖的時候,發生上下文切換,進入阻塞。synchronized
- 打個比喻:線程就好像高速跑道上的賽車,高速運作時,速度超快,一旦發生上下文切換,就好比賽車要減速、熄火,等被喚醒又得重新打火、啟動、加速… 恢複到高速運作,代價比較大
- 但無鎖情況下,因為線程要保持運作,需要額外 CPU 的支援,CPU 在這裡就好比高速跑道,沒有額外的跑道,線程想高速運作也無從談起,雖然不會進入阻塞,但由于沒有分到時間片,仍然會進入可運作狀态,還是會導緻上下文切換。
是以CAS在多核CPU下才能發揮作用,而且線程數最好不要超過CPU數,否則也會發生上下文切換,影響效率。
5.2.3 CAS 的特點
結合 CAS 和 volatile 可以實作無鎖并發,适用于線程數少、多核 CPU 的場景下。
- CAS 是基于樂觀鎖的思想:最樂觀的估計,不怕别的線程來修改共享變量,就算改了也沒關系,我吃虧點再重試呗。(基于樂觀鎖,但本質上沒有上鎖)
- synchronized 是基于悲觀鎖的思想:最悲觀的估計,得防着其它線程來修改共享變量,我上了鎖你們都别想改,我改完了解開鎖,你們才有機會。
- CAS 展現的是無鎖并發(不加鎖)、無阻塞并發(上下文切換少),請仔細體會這兩句話的意思
- 因為沒有使用
,是以線程不會陷入阻塞,這是效率提升的因素之一synchronized
- 但如果競争激烈,可以想到重試必然頻繁發生,反而效率會受影響
5.3 原子整數
J.U.C 并發包提供了:
- AtomicBoolean
- AtomicInteger
- AtomicLong
以 AtomicInteger 為例
public class Test43 {
public static void main(String[] args) {
AtomicInteger i = new AtomicInteger(0);
System.out.println(i.getAndIncrement());// i++ 結果 i = 1,傳回 0
System.out.println(i.incrementAndGet());// ++i 結果 i = 2,傳回 2
System.out.println(i.getAndDecrement());// i-- 結果 i = 1,傳回 2
System.out.println(i.decrementAndGet());// --i 結果 i = 0,傳回 0
System.out.println(i.getAndAdd(5));// 擷取并加值 結果 i = 5, 傳回 0
System.out.println(i.addAndGet(5));// 加值并擷取 結果 i = 10,傳回 10
System.out.println(i.updateAndGet(value -> value * 10));// 更新(+-*/)并擷取 結果 i=100,傳回100
System.out.println(i.getAndUpdate(value -> value / 2));// 擷取并更新 結果 i=50,傳回1002
System.out.println(i.get());//50
}
}
updateAndGet()的原理
- 使用CAS實作updateAndGet()方法
- 檢視一下JDK的updateAndGet的方法源碼,和咱們實作的幾乎一樣!Nice!
5.4 原子引用
為什麼需要原子引用類型?— 用于保證引用類型資料的原子性
- AtomicReference
- AtomicMarkableReference
- AtomicStampedReference
有如下方法
interface DecimalAccount {
// 擷取餘額
BigDecimal getBalance();
// 取款
void withdraw(BigDecimal amount);
/**
* 方法内會啟動 1000 個線程,每個線程做 -10 元 的操作
* 如果初始餘額為 10000 那麼正确的結果應當是 0
*/
static void demo(DecimalAccount account) {
List <Thread> ts = new ArrayList <>();
for (int i = 0; i < 1000; i++) {
ts.add(new Thread(() -> {
account.withdraw(BigDecimal.TEN);
}));
}
ts.forEach(Thread::start);
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println(account.getBalance());
}
}
試着提供不同的 DecimalAccount 實作,實作安全的取款操作
5.4.1 不安全實作
public class Test27 {
public static void main(String[] args) {
DecimalAccount.demo(new DecimalAccountUnsafe(new BigDecimal("10000")));
}
}
class DecimalAccountUnsafe implements DecimalAccount {
BigDecimal balance;
public DecimalAccountUnsafe(BigDecimal balance) {
this.balance = balance;
}
@Override
public BigDecimal getBalance() {
return balance;
}
@Override
public void withdraw(BigDecimal amount) {
BigDecimal balance = this.getBalance();
this.balance = balance.subtract(amount);
}
}
5.4.2 安全實作-使用鎖
class DecimalAccountUnsafe implements DecimalAccount {
BigDecimal balance;
public DecimalAccountUnsafe(BigDecimal balance) {
this.balance = balance;
}
@Override
public BigDecimal getBalance() {
return balance;
}
@Override
public void withdraw(BigDecimal amount) {
synchronized (this){
BigDecimal balance = this.getBalance();
this.balance = balance.subtract(amount);
}
}
}
5.4.3 安全實作-使用CAS
public class Test27 {
public static void main(String[] args) {
DecimalAccount.demo(new DecimalAccountCas(new BigDecimal("10000")));
}
}
class DecimalAccountCas implements DecimalAccount{
private AtomicReference<BigDecimal> balance;
public DecimalAccountCas(BigDecimal balance) {
this.balance = new AtomicReference <>(balance);
}
@Override
public BigDecimal getBalance() {
return balance.get();
}
@Override
public void withdraw(BigDecimal amount) {
while (true){
BigDecimal prev = balance.get();
BigDecimal next = prev.subtract(amount);
if(balance.compareAndSet(prev,next)){
break;
}
}
}
}
5.4.4 ABA 問題及解決
1. ABA 問題
**問題描述:**一個線程t1在修改共享變量w前,另外一個線程t2也對w進行了修改,但是w修改前後值相等。最終會導緻t1也會修改成功! 我們期望的是,隻有其他線程修改w了,t1本輪就修改失敗!
/**
* @author lxy
* @version 1.0
* @Description
* @date 2022/7/22 15:15
*/
@Slf4j(topic = "c.Test28")
public class Test28 {
static AtomicReference<String> ref = new AtomicReference <>("A");
public static void main(String[] args) {
log.debug("main start...");
//擷取值
//這個共享變量被其他線程修改過且修改前後值未變能否可以檢測出來呢? ---目前的寫發不可以
String prev = ref.get();
//如果中間有其他線程幹擾,發生了ABA現象,則依舊可以修改成功
other();
Sleeper.sleep(1);
//嘗試改為C
log.debug("change A->C {}",ref.compareAndSet(prev,"C"));
}
private static void other(){
new Thread(()->{
log.debug("change A->B {}",ref.compareAndSet(ref.get(),"B"));
},"t1").start();
Sleeper.sleep(0.5);
new Thread(()->{
log.debug("change B->A {}",ref.compareAndSet(ref.get(),"A"));
},"t2").start();
}
}
輸出為:
16:03:10.444 c.Test28 [main] - main start...
16:03:10.481 c.Test28 [t1] - change A->B true
16:03:10.995 c.Test28 [t2] - change B->A true
16:03:12.009 c.Test28 [main] - change A->C true
**結論:**主線程僅能判斷出共享變量的值與最初值 A 是否相同,不能感覺到這種從 A 改為 B 又 改回 A 的情況,如果主線程希望:
隻要有其它線程【動過了】共享變量,那麼自己的 cas 就算失敗,這時,僅比較值是不夠的,需要再加一個版本号 — 可以通過我們接下來的
AtomicStampedReference
實作
2. AtomicStampedReference
/**
* @author lxy
* @version 1.0
* @Description
* @date 2022/7/22 15:15
*/
@Slf4j(topic = "c.Test29")
public class Test29 {
static AtomicStampedReference <String> ref = new AtomicStampedReference <>("A",0);
public static void main(String[] args) {
log.debug("main start...");
//擷取值
//這個共享變量被其他線程修改過且修改前後值未變能否可以檢測出來呢? ---目前的寫發可以
String prev = ref.getReference();
//擷取版本号
int stamp = ref.getStamp();
log.debug("版本 {}",stamp);
//如果中間有其他線程幹擾,發生了ABA現象,由于含有stamp,則無法修改成功
other();
Sleeper.sleep(1);
//嘗試改為C
log.debug("change A->C {}",ref.compareAndSet(prev,"C",stamp,stamp+1));
}
//注意:必須先擷取版本号,再擷取值,才能解決ABA。
//如果先擷取值在擷取版本号之前就ABA了再擷取版本号,就是ABA之後的。
private static void other(){
new Thread(()->{
log.debug("change A->B {}",ref.compareAndSet(ref.getReference(),"B",ref.getStamp(),ref.getStamp()+1));
log.debug("版本 {}",ref.getStamp());
},"t1").start();
Sleeper.sleep(0.5);
new Thread(()->{
log.debug("change B->A {}",ref.compareAndSet(ref.getReference(),"A",ref.getStamp(),ref.getStamp()+1));
log.debug("版本 {}",ref.getStamp());
},"t2").start();
}
}
輸出為:
16:08:56.826 c.Test29 [main] - main start...
16:08:56.828 c.Test29 [main] - 版本 0
16:08:56.864 c.Test29 [t1] - change A->B true
16:08:56.864 c.Test29 [t1] - 版本 1
16:08:57.373 c.Test29 [t2] - change B->A true
16:08:57.373 c.Test29 [t2] - 版本 2
16:08:58.386 c.Test29 [main] - change A->C false
結論:
AtomicStampedReference
可以給原子引用加上版本号,追蹤原子引用整個的變化過程,如: A -> B -> A ->C ,通過
AtomicStampedReference
,我們可以知道,引用變量中途被更改了幾次。
**改進:**但是有時候,并不關心引用變量更改了幾次,隻是單純的關心是否更改過,是以就有了
AtomicMarkableReference
3. AtomicMarkableReference
AtomicMarkableReference 可以不關心引用變量的修改次數,隻關心是否被修改過 ~ ~~
如下圖:主人隻關心 垃圾袋是否為空,并不關心垃圾袋被保潔倒了幾次
代碼實作
/**
* @author lxy
* @version 1.0
* @Description
* @date 2022/7/22 15:39
*/
@Slf4j(topic = "c.Test30")
public class Test30 {
public static void main(String[] args) {
GarbageBag bag = new GarbageBag("裝滿了垃圾");
// 參數2 mark可以看做一個标記,表示垃圾裝滿了
AtomicMarkableReference <GarbageBag> ref = new AtomicMarkableReference <>(bag, true);
log.debug("start...");
GarbageBag prev = ref.getReference();
log.debug(prev.toString());
new Thread(()->{
log.debug("start...");
bag.setDesc("空垃圾袋");
ref.compareAndSet(bag,bag,true,false);
log.debug(bag.toString());
},"保潔阿姨").start();
Sleeper.sleep(1);
log.debug("想換一隻新垃圾袋?");
boolean success = ref.compareAndSet(prev, new GarbageBag("空垃圾袋"), true, false);
log.debug("換了麼?"+success);
log.debug(ref.getReference().toString());
}
}
class GarbageBag {
String desc;
public GarbageBag(String desc) {
this.desc = desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
@Override
public String toString() {
return super.toString() + " " + desc;
}
}
輸出為:
16:17:12.629 c.Test30 [main] - start...
16:17:12.631 c.Test30 [main] - com.rg.thread.GarbageBag@33e5ccce 裝滿了垃圾
16:17:12.667 c.Test30 [保潔阿姨] - start...
16:17:12.667 c.Test30 [保潔阿姨] - com.rg.thread.GarbageBag@33e5ccce 空垃圾袋
16:17:13.671 c.Test30 [main] - 想換一隻新垃圾袋?
16:17:13.671 c.Test30 [main] - 換了麼?false
16:17:13.671 c.Test30 [main] - com.rg.thread.GarbageBag@33e5ccce
5.5 原子數組
通過原子的方式,更新數組裡的某個元素(修改元素的内容,而非元素的引用位址)
- AtomicIntegerArray
- AtomicLongArray
- AtomicReferenceArray
注意:
compareAndSet()
的比較是通過引用位址比較的,之前是對String舉例,String的不可變性導緻了我們每次對String的更改也導緻了引用的更改
代碼示範:
public class Test31 {
public static void main(String[] args) {
// 普通數組
demo(
()->new int[10],
array->array.length,
(array,index)->array[index]++,
array->System.out.println(Arrays.toString(array))
);
//原子數組
demo(
()->new AtomicIntegerArray(10),
array->array.length(),
(array,index)->array.getAndIncrement(index),
array-> System.out.println(array)
);
}
/**
參數1,提供數組、可以是線程不安全數組或線程安全數組
參數2,擷取數組長度的方法
參數3,自增方法,回傳 array, index
參數4,列印數組的方法
*/
// supplier 提供者 無中生有 ()->結果
// function 函數 一個參數一個結果 (參數)->結果 , BiFunction (參數1,參數2)->結果
// consumer 消費者 一個參數沒結果 (參數)->void, BiConsumer (參數1,參數2)->
private static <T> void demo(
Supplier <T> arraySupplier,
Function <T, Integer> lengthFun,
BiConsumer <T, Integer> putConsumer,
Consumer <T> printConsumer ) {
List <Thread> ts = new ArrayList <>();
T array = arraySupplier.get();
int length = lengthFun.apply(array);
for (int i = 0; i < length; i++) {// 10個線程 10000 自增操作,最終每個線程的值應該都是10000
// 每個線程對數組作 10000 次操作
ts.add(new Thread(() -> {
for (int j = 0; j < 10000; j++) {
putConsumer.accept(array, j%length);
}
}));
}
ts.forEach(t -> t.start()); // 啟動所有線程
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}); // 等所有線程結束
printConsumer.accept(array);
}
}
輸出為:
[9984, 9983, 9979, 9985, 9985, 9985, 9985, 9982, 9982, 9986] //普通數組
[10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000]//原子數組
5.6 字段更新器
用于保證多個線程通路對象時,保證成員變量(屬性)的安全性
- AtomicReferenceFieldUpdater // 域 字段是引用類型
- AtomicIntegerFieldUpdater //字段時Integer類型
- AtomicLongFieldUpdater //字段時Long類型
利用字段更新器,可以針對對象的某個域(Field)進行原子操作,隻能配合 volatile 修飾的字段使用,否則會出現異常
/**
* @author lxy
* @version 1.0
* @Description
* @date 2022/7/22 17:21
*/
@Slf4j(topic = "c.Test32")
public class Test32 {
public static void main(String[] args) {
Student student = new Student();
AtomicReferenceFieldUpdater <Student, String> updater = AtomicReferenceFieldUpdater.newUpdater(Student.class, String.class, "name");
System.out.println(updater.compareAndSet(student,null,"張三"));
System.out.println(student);
}
}
class Student {
//字段一定要用volatile修飾哦,否則會報異常
volatile String name;
@Override
public String toString() {
return "Student{" +
"name='" + name + '\'' +
'}';
}
}
5.7 原子累加器
5.7.1 累加器性能比較
/**
* @author lxy
* @version 1.0
* @Description
* @date 2022/7/22 17:27
*/
@Slf4j(topic = "c.Test33")
public class Test33 {
public static void main(String[] args) {
demo(
()->new AtomicLong(0),
adder->adder.getAndIncrement()
);
demo(
()->new LongAdder(),
adder->adder.increment()
);
}
/*
() -> 結果 提供累加器對象
(參數) -> 執行累加操作
*/
private static <T> void demo(Supplier <T> adderSupplier, Consumer <T> action) {
T adder = adderSupplier.get();
List <Thread> ts = new ArrayList <>();
// 4 個線程,每人累加 50 萬
for (int i = 0; i < 4; i++) {
ts.add(new Thread(() -> {
for (int j = 0; j < 500000; j++) {
action.accept(adder);
}
}));
}
long start = System.nanoTime();
ts.forEach(t -> t.start());
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.nanoTime();
System.out.println(adder + " cost:" + (end - start) / 1000_000);
}
}
輸出為:
2000000 cost:32
2000000 cost:14
**分析:**性能提升的原因很簡單,就是在有競争時,設定多個累加單元,Therad-0 累加 Cell[0],而 Thread-1 累加Cell[1]… 最後将結果彙總。這樣它們在累加時操作的不同的 Cell 變量,是以減少了 CAS 重試失敗,進而提高性能。
* 源碼之 LongAdder
LongAdder 是并發大師 @author Doug Lea (大哥李)的作品,設計的非常精巧
LongAdder 類有幾個關鍵域
// 累加單元數組, 懶惰初始化
transient volatile Cell[] cells;
// 基礎值, 如果沒有競争, 則用 cas 累加這個域
transient volatile long base;
// 在 cells 建立或擴容時, 置為 1, 表示加鎖
transient volatile int cellsBusy;
注意:volatile保證共享變量的可見性,transient保證序列化下這些屬性資訊不會被序列化
5.7.2 CAS鎖
//使用CAS實作Lock
//不推薦使用:因為會造成CPU的空轉...
@Slf4j(topic = "c.Test42")
public class LockCas {
// 0 沒加鎖
// 1 加鎖
private AtomicInteger state = new AtomicInteger(0);
public void lock() {
while (true) {
if (state.compareAndSet(0, 1)) {
break;
}
}
}
public void unlock() {
log.debug("unlock...");
state.set(0);
}
public static void main(String[] args) {
LockCas lock = new LockCas();
new Thread(() -> {
log.debug("begin...");
lock.lock();
try {
log.debug("lock...");
sleep(1);
} finally {
lock.unlock();
}
}).start();
new Thread(() -> {
log.debug("begin...");
lock.lock();
try {
log.debug("lock...");
} finally {
lock.unlock();
}
}).start();
}
}
輸出為:
16:35:41.029 c.Test42 [Thread-1] - begin...
16:35:41.028 c.Test42 [Thread-0] - begin...
16:35:41.032 c.Test42 [Thread-1] - lock...
16:35:41.032 c.Test42 [Thread-1] - unlock...
16:35:41.032 c.Test42 [Thread-0] - lock...
16:35:42.040 c.Test42 [Thread-0] - unlock...
* 原理之僞共享
其中 Cell 即為累加單元
// 防止緩存行僞共享
@sun.misc.Contended
static final class Cell {
volatile long value;
Cell(long x) { value = x; }
// 最重要的方法, 用來 cas 方式進行累加, prev 表示舊值, next 表示新值
final boolean cas(long prev, long next) {
return UNSAFE.compareAndSwapLong(this, valueOffset, prev, next);
}
// 省略不重要代碼
}
關于緩存行僞共享,得從緩存說起~ ~~
1. 緩存與記憶體的速度比較
從 cpu 到 | 大約需要的時鐘周期 |
寄存器 | 1 cycle (4GHz 的 CPU 約為0.25ns) |
L1 | 3~4 cycle |
L2 | 10~20 cycle |
L3 | 40~45 cycle |
記憶體 | 120~240 cycle |
因為 CPU 與 記憶體的速度差異很大,需要靠預讀資料至緩存來提升效率。
而緩存以緩存行為機關,每個緩存行對應着一塊記憶體,一般是 64 byte(8 個 long)
緩存的加入會造成資料副本的産生,即同一份資料會緩存在不同核心的緩存行中
CPU 要保證資料的一緻性,如果某個 CPU 核心更改了資料,其它 CPU 核心對應的整個緩存行必須失效
- 比如核心Core1和Core2都從記憶體中讀入資料a = 2 到緩存中。之後Core1把資料改成了2000,此時為了保持資料的一緻性,Core2中的資料也需要修改!即把Core2中的緩存行失效,重新去記憶體中取最新的資料
因為 Cell 是數組形式,在記憶體中是連續存儲的,一個 Cell 為 24 位元組(16 位元組的對象頭和 8 位元組的 value),是以緩存行可以存下 2 個的 Cell 對象(1行64個位元組)。這樣問題來了:
- Core-0 要修改 Cell[0]
- Core-1 要修改 Cell[1]
無論誰修改成功,都會導緻對方 Core 的緩存行失效,比如 Core-0 中
Cell[0]=6000, Cell[1]=8000
要修改為Cell[0]=6001, Cell[1]=8000 ,這時會讓 Core-1 的緩存行失效
@sun.misc.Contended
用來解決這個問題,它的原理是在使用此注解的對象或字段的前後各增加 128 位元組大小的padding,進而讓 CPU 将對象預讀至緩存時占用不同的緩存行,這樣,不會造成對方緩存行的失效 。
我們把一個緩存行加入了多個cell對象稱為僞共享,注解的作用就是防止僞共享
累加主要調用下面的方法
2. add()方法
public void add(long x) {
// as 為累加單元數組
// b 為基礎值
// x 為累加值
Cell[] as;
long b, v;
int m;
Cell a;
// 進入 if 的兩個條件
// 1. as 有值, 表示已經發生過競争, 進入 if
// 2. cas 給 base 累加時失敗了, 表示 base 發生了競争, 進入 if
if ((as = cells) != null || !casBase(b = base, b + x)) {
// uncontended 表示 cell 沒有競争
boolean uncontended = true;
if (
// as 還沒有建立
as == null || (m = as.length - 1) < 0 ||
// 目前線程對應的 cell 還沒有
(a = as[getProbe() & m]) == null ||
// cas 給目前線程的 cell 累加失敗 uncontended=false ( a 為目前線程的 cell )
!(uncontended = a.cas(v = a.value, v + x))
) {
// 進入 cell 數組建立、cell 建立的流程
longAccumulate(x, null, uncontended);
}
}
}
add流程圖
3. longAccumulate()
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
// 目前線程還沒有對應的 cell, 需要随機生成一個 h 值用來将目前線程綁定到 cell
if ((h = getProbe()) == 0) {
// 初始化 probe
ThreadLocalRandom.current();
// h 對應新的 probe 值, 用來對應 cell
h = getProbe();
wasUncontended = true;
}
// collide 為 true 表示需要擴容
boolean collide = false;
for (; ; ) {
Cell[] as;
Cell a;
int n;
long v;
// 已經有了 cells
if ((as = cells) != null && (n = as.length) > 0) {
// cells存在,但是裡面還沒有 cell
if ((a = as[(n - 1) & h]) == null) {
//如果cellsBusy沒有加鎖,為 cellsBusy 加鎖, 建立 cell
if (cellsBusy == 0) { // Try to attach new Cell
// 建立cell,且cell 的初始累加值為 x
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {//進行加鎖,并修改cellsBusy=1
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&//判斷槽位是否為空
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {//不為空則進行下一次循環嘗試
rs[j] = r;//為空,則将元素放到槽位上去
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)//如果上面cell建立成功,則退出
break;
continue; // Slot is now non-empty
}
// 成功則 break, 否則繼續 continue 循環
}
// 有競争, 改變線程對應的 cell 來重試 cas
else if (!wasUncontended)
wasUncontended = true;
// cas 嘗試累加, fn 配合 LongAccumulator 不為 null, 配合 LongAdder 為 null
else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
break;
// 如果 cells 長度已經超過了最大長度(CPU的核數), 或者已經擴容, 改變線程對應的 cell 來重試 cas
else if (n >= NCPU || cells != as)
collide = false;
// 確定 collide 為 false 進入此分支, 就不會進入最後的 else if 進行擴容了
else if (!collide)
collide = true;
//修改标志位進行加鎖
else if (cellsBusy == 0 && casCellsBusy()) {
// 加鎖成功, 擴容
try {
if (cells == as) { // 擴容cells
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue;
}
// 改變線程對應的 cell,再重新循環
h = advanceProbe(h);
}
// 還沒有 cells, 嘗試給 cellsBusy 加鎖
//cellsBusy == 0表示還沒有其他線程為其加鎖
//cells == as表示還有沒其他線程建立該cells數組(未建立)
//casCellsBusy() 采用CAS方法将cellsBusy 從0 改為 1。這樣做是為了防止其他線程也對其進行加鎖..
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
// 加鎖成功, 建立 cell數組, 最開始長度為 2。 并建立一個累加單元 cell
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
//注意:雖然我們建立的Cell數組大小是2,但是我們隻向其加入了一個元素,
//不到萬不得已不建立另外一個cell,懶惰化的思想~~~
rs[h & 1] = new Cell(x);
cells = rs;//指派給cells
init = true;
}
} finally {//标志位設定為0,也就是解鎖
cellsBusy = 0;
}
// 成功則 break,退出循環;
if (init)
break;
}
// 上兩種情況失敗(如加鎖失敗), 嘗試給 base 累加,如果失敗則重新循環
else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
break;
}
}
longAccumulate 流程圖
每個線程剛進入 longAccumulate 時,會嘗試對應一個 cell 對象(找到一個坑位)
4. 擷取最終結果通過 sum 方法
//對cells數組的元素進行累加
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
5.8 Unsafe
Unsafe 對象提供了非常底層的,操作記憶體、線程的方法,Unsafe 對象不能直接調用,隻能通過反射獲得
unsafe 與線程安全性無關。因為比較接近底層,用來操作記憶體、線程,不建議我們直接使用
5.8.1 Unsafe CAS 操作
/**
* @author lxy
* @version 1.0
* @Description
* @date 2022/7/22 17:47
*/
@Slf4j(topic = "c.TestUnsafe")
public class TestUnsafe {
public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException {
//使用反射擷取到Unsafe對象
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
Unsafe unsafe = (Unsafe) theUnsafe.get(null);
System.out.println(unsafe);
//1. 擷取域的偏移位址
long idOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("id"));
long nameOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("name"));
Teacher teacher = new Teacher();
//2. 執行cas操作
unsafe.compareAndSwapInt(teacher,idOffset,0,1);
unsafe.compareAndSwapObject(teacher,nameOffset,null,"張三");
//3. 驗證
System.out.println(teacher);
}
}
@Data
class Teacher{
volatile int id;
volatile String name;
}
sun.misc.Unsafe@490d6c15
Teacher(id=1, name=張三)
6.8.2 unsafe對象模拟實作原子整數
/**
* @author lxy
* @version 1.0
* @Description
* @date 2022/7/22 18:13
*/
public class Test35 {
public static void main(String[] args) {
Account.demo(new MyAtomicInteger(10000));
}
}
class MyAtomicInteger implements Account {
private volatile int value;
private static final long valueOffset;
private static final Unsafe UNSAFE;
public MyAtomicInteger(int value) {
this.value = value;
}
static {
UNSAFE = UnsafeAccessor.getUnsafe();
try {
valueOffset = UNSAFE.objectFieldOffset(MyAtomicInteger.class.getDeclaredField("value"));
} catch (NoSuchFieldException e) {
e.printStackTrace();
throw new RuntimeException();
}
}
private int getValue(){
return value;
}
public void decrement(int amount){
while (true){
int prev = this.value;
int next = prev-amount;
if(UNSAFE.compareAndSwapInt(this,valueOffset,prev,next)){
break;
}
}
}
@Override
public Integer getBalance() {
return getValue();
}
@Override
public void withdraw(Integer amount) {
decrement(amount);
}
}
interface Account {
// 擷取餘額
Integer getBalance();
// 取款
void withdraw(Integer amount);
/**
* 方法内會啟動 1000 個線程,每個線程做 -10 元 的操作
* 如果初始餘額為 10000 那麼正确的結果應當是 0
*/
static void demo(Account account) {
List <Thread> ts = new ArrayList <>();
for (int i = 0; i < 1000; i++) {
ts.add(new Thread(() -> {
account.withdraw(10);
}));
}
long start = System.nanoTime();
ts.forEach(Thread::start);
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.nanoTime();
System.out.println(account.getBalance()
+ " cost: " + (end-start)/1000_000 + " ms");
}
}
本章小結
- CAS 與 volatile
- API
- 原子整數
- 原子引用
- 原子數組
- 字段更新器
- 原子累加器
- Unsafe
- 原理方面*
- LongAdder 源碼
- 僞共享