Concurrency Programming 八
- 無鎖并發: CAS(Compare And Swap)
-
- 原子整數類
- 原子引用類
- 原子數組類
- 原子字段更新器類
- 原子累加器類
- Unsafe
無鎖并發: CAS(Compare And Swap)
- 可在多核 CPU環境下無阻塞的方式來保證原子性. 它不是通過加鎖的方式來保護共享變量的線程安全
- *特點: CAS适用于多核 CPU, 同時線程數不能多于核數的環境
原理比較:
- 悲觀鎖(synchronized/ReentrantLock): 目前線程搶到鎖, 則其它線程會被阻塞, 是以線程上下文切換頻次會相對頻繁
- 樂觀鎖(CAS): 沒有阻塞的狀态, 每當線程分到 CPU時間片, 都會不斷地嘗試比較, 當比較成功, 則會交換(更改)
- CAS例子:
interface Account {
// 擷取餘額
Integer getBalance();
// 取款
void withdraw(Integer amount);
}
class AccountSafe implements Account {
// 原子整數類
private AtomicInteger balance;
public AccountSafe(Integer balance) {
this.balance = new AtomicInteger(balance);
}
@Override
public Integer getBalance() {
return balance.get();
}
@Override
public void withdraw(Integer amount) {
// 不斷嘗試, 直到成功
while (true) {
// 舊值
int prev = balance.get();
// 将要設定的值
int next = prev - amount;
/** boolean compareAndSet(int prev, int next)方法, 在設定(Set) next值時, 會先比較 prev值與目前的餘額
* 1. 不一緻, 則失敗. 傳回 false; 進入下次循環重試
* 2. 一緻, 則成功. 傳回 true; 結束循環嘗試 * */
if (balance.compareAndSet(prev, next)) {
// 成功設定, 結束循環嘗試
break;
}
}
// 以上方式, 可以簡化為下面的方式
// balance.addAndGet(-1 * amount);
}
}
public class App {
public static void main(String[] args) {
// 起始餘額 10000
Account account = new AccountSafe(10000);
// 開始時間
long start = System.nanoTime();
List<Thread> ts = new ArrayList<>();
// 建立 1000個線程
for (int i = 0; i < 1000; i++) {
ts.add(new Thread(() -> {
// 每個線程做 -10
account.withdraw(10);
}));
}
// 啟動所有(1000個)線程
ts.forEach(Thread::start);
ts.forEach(t -> {
try {
// 同步等待所有線程運作結束
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 最後正确的餘額為 0
System.out.println("目前餘額: " + account.getBalance() +
", 耗費時間: " + (System.nanoTime()-start) / 1000_000 + " ms");
// 目前餘額: 0, 耗費時間: 556 ms
}
}
原子整數類
- AtomicBoolean, AtomicInteger, AtomicLong
- AtomicInteger為例:
AtomicInteger i = new AtomicInteger(0);
// 擷取并自增(i = 0, 結果 i = 1, 傳回 0),類似于 i++
System.out.println(i.getAndIncrement());
// 自增并擷取(i = 1, 結果 i = 2, 傳回 2),類似于 ++i
System.out.println(i.incrementAndGet());
// 自減并擷取(i = 2, 結果 i = 1, 傳回 1),類似于 --i
System.out.println(i.decrementAndGet());
// 擷取并自減(i = 1, 結果 i = 0, 傳回 1),類似于 i--
System.out.println(i.getAndDecrement());
// 擷取并加值(i = 0, 結果 i = 5, 傳回 0)
System.out.println(i.getAndAdd(5));
// 加值并擷取(i = 5, 結果 i = 0, 傳回 0)
System.out.println(i.addAndGet(-5));
// 擷取并更新(i = 0, p 為 i 的目前值, 結果 i = -2, 傳回 0)
// 其中函數中的操作能保證原子,但函數需要無副作用
System.out.println(i.getAndUpdate(p -> p - 2));
// 更新并擷取(i = -2, p 為 i 的目前值, 結果 i = 0, 傳回 0)
// 其中函數中的操作能保證原子,但函數需要無副作用
System.out.println(i.updateAndGet(p -> p + 2));
// 擷取并計算(i = 0, p 為 i 的目前值, x 為參數1, 結果 i = 10, 傳回 0)
// 其中函數中的操作能保證原子,但函數需要無副作用
// getAndUpdate 如果在 lambda 中引用了外部的局部變量,要保證該局部變量是 final 的
// getAndAccumulate 可以通過 參數1 來引用外部的局部變量,但因為其不在 lambda 中是以不必是 final
System.out.println(i.getAndAccumulate(10, (p, x) -> p + x));
// 計算并擷取(i = 10, p 為 i 的目前值, x 為參數1, 結果 i = 0, 傳回 0)
// 其中函數中的操作能保證原子,但函數需要無副作用
System.out.println(i.accumulateAndGet(-10, (p, x) -> p + x));
原子引用類
- AtomicReference, AtomicStampedReference, AtomicMarkableReference
- AtomicReference為例:
interface Account {
// 擷取餘額
BigDecimal getBalance();
// 取款
void withdraw(BigDecimal amount);
}
class AccountSafe implements Account {
AtomicReference<BigDecimal> ref;
public AccountSafe(BigDecimal balance) {
ref = new AtomicReference<>(balance);
}
@Override
public BigDecimal getBalance() {
return ref.get();
}
@Override
public void withdraw(BigDecimal amount) {
while (true) {
BigDecimal prev = ref.get();
BigDecimal next = prev.subtract(amount);
if (ref.compareAndSet(prev, next)) {
break;
}
}
}
}
public class App {
public static void main(String[] args) {
// 起始餘額 10000
Account account = new AccountSafe(new BigDecimal("10000"));
// 開始時間
long start = System.nanoTime();
List<Thread> ts = new ArrayList<>();
// 建立 1000個線程
for (int i = 0; i < 1000; i++) {
ts.add(new Thread(() -> {
// 每個線程做 -10
account.withdraw(BigDecimal.TEN);
}));
}
// 啟動所有(1000個)線程
ts.forEach(Thread::start);
ts.forEach(t -> {
try {
// 同步等待所有線程運作結束
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 最後正确的餘額為 0
System.out.println("目前餘額: " + account.getBalance() +
", 耗費時間: " + (System.nanoTime()-start) / 1000_000 + " ms");
// 目前餘額: 0, 耗費時間: 395 ms
}
}
- ABA問題: 比如 AtomicReference共享變量最初值 A, 在 CAS比較時, 無法感覺到從 A改為 B, 再改回 A的情況.
*為了追蹤此種變化過程就有了附帶版本号的原子引用 AtomicStampedReference, 一旦出現以上情況, 則會更改失敗以及可以查詢目前版本号
public class App {
// 原子引用(附加版本)
static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 0);
public static void main(String[] args) throws InterruptedException {
// 擷取版本号
int stamp = ref.getStamp();
System.out.println("起始版本 " + stamp);
// 通過子線程修改
other();
TimeUnit.MILLISECONDS.sleep(1000);
// 在主線程, 延遲1秒後嘗試修改, 請求版本号為0, 改後1
System.out.print("A嘗試改為 C " +
ref.compareAndSet(ref.getReference(), "C", stamp, stamp + 1));
System.out.println(", 目前版本為 " + ref.getStamp());
}
private static void other() throws InterruptedException {
// 線程1
new Thread(() -> {
System.out.print("A嘗試改為 B " +
ref.compareAndSet(ref.getReference(), "B", ref.getStamp(), ref.getStamp() + 1));
System.out.println(", 版本更新為 " + ref.getStamp());
}, "t1").start();
// 睡眠, 延遲0.5秒
TimeUnit.MILLISECONDS.sleep(500);
// 線程2
new Thread(() -> {
System.out.print("B嘗試改為 A " +
ref.compareAndSet(ref.getReference(), "A", ref.getStamp(), ref.getStamp() + 1));
System.out.println(", 版本更新為 " + ref.getStamp());
}, "t2").start();
}
}
起始版本 0
A嘗試改為 B true, 版本更新為 1 # t1
B嘗試改為 A true, 版本更新為 2 # t2
A嘗試改為 C false, 目前版本為 2 # 主線程
原子數組類
- AtomicIntegerArray, AtomicLongArray, AtomicReferenceArray
- 線程[安全& 不安全]的數組示範例子: 保護數組内元素的線程安全
public class App {
private static <T> void demo(
Supplier<T> arraySupplier, // 提供數組(線程不安全數組/線程安全數組)
Function<T, Integer> lengthFun, // 擷取數組長度的方法
BiConsumer<T, Integer> putConsumer, // 自增方法, 回傳 array, index
Consumer<T> printConsumer ) { // 列印數組的方法
List<Thread> ts = new ArrayList<>();
T array = arraySupplier.get();
int length = lengthFun.apply(array);
for (int i = 0; i < length; i++) {
// 建立線程, 同時對提供的數組, 做10000次操作
ts.add(new Thread(() -> {
for (int j = 0; j < 10000; j++) {
putConsumer.accept(array, j % length);
}
}));
}
// 啟動所有線程
ts.forEach(t -> t.start());
ts.forEach(t -> {
try {
// 同步等待所有線程運作結束
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
printConsumer.accept(array);
}
}
- 線程不安全的數組示範例子:
public static void main(String[] args) {
demo(
() -> new int[10],
(array) -> array.length,
(array, index) -> array[index]++,
array-> System.out.println(Arrays.toString(array))
);
}
> [9902, 9905, 9907, 9906, 9918, 9918, 9916, 9913, 9902, 9903]
- 線程安全的數組示範例子:
public static void main(String[] args) {
demo(
() -> new AtomicIntegerArray(10),
(array) -> array.length(),
(array, index) -> array.getAndIncrement(index),
array -> System.out.println(array)
);
}
> [10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000]
原子字段更新器類
- AtomicReferenceFieldUpdater, AtomicIntegerFieldUpdater, AtomicLongFieldUpdater
- 可以對對象内的域(Field)進行原子操作, 注: 屬性必須 volatile修飾, 否則會抛異常
public class App {
private volatile int field;
public static void main(String[] args) {
AtomicIntegerFieldUpdater fieldUpdater = AtomicIntegerFieldUpdater.newUpdater(App.class, "field");
App app = new App();
fieldUpdater.compareAndSet(app, 0, 10);
// 修改成功 field = 10
System.out.println(app.field);
// 修改成功 field = 20
fieldUpdater.compareAndSet(app, 10, 20);
System.out.println(app.field);
// 修改失敗 field = 20
fieldUpdater.compareAndSet(app, 10, 30);
System.out.println(app.field);
// 修改成功 field = 40
fieldUpdater.compareAndSet(app, 20, 40);
System.out.println(app.field);
}
}
10
20
20
40
原子累加器類
- LongAdder, DoubleAdder
- LongAdder使用例子:
public class App33 {
private static <T> void demo(Supplier<T> adderSupplier, Consumer<T> action) {
T adder = adderSupplier.get();
long start = System.nanoTime();
List<Thread> ts = new ArrayList<>();
// 建立 10個線程, 每個線程累加100萬次
for (int i = 0; i < 10; i++) {
ts.add(new Thread(() -> {
for (int j = 0; j < 1000000; j++) {
action.accept(adder);
}
}));
}
// 啟動所有(10個)線程
ts.forEach(t -> t.start());
ts.forEach(t -> {
try {
// 同步等待所有線程運作結束
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println("總計: " + adder +
", 耗費時間: " + (System.nanoTime()-start) / 1000_000 + " ms");
}
public static void main(String[] args) {
System.out.println("LongAdder:");
for (int i = 0; i < 5; i++) {
demo(() -> new LongAdder(), adder -> adder.increment());
}
}
}
LongAdder:
總計: 10000000, 耗費時間: 229 ms
總計: 10000000, 耗費時間: 94 ms
總計: 10000000, 耗費時間: 134 ms
總計: 10000000, 耗費時間: 65 ms
總計: 10000000, 耗費時間: 92 ms
- 與累加器 AtomicLong性能比較:
public static void main(String[] args) {
System.out.println("AtomicLong:");
for (int i = 0; i < 5; i++) {
demo(() -> new AtomicLong(), adder -> adder.getAndIncrement());
}
}
AtomicLong:
總計: 10000000, 耗費時間: 192 ms
總計: 10000000, 耗費時間: 180 ms
總計: 10000000, 耗費時間: 209 ms
總計: 10000000, 耗費時間: 209 ms
總計: 10000000, 耗費時間: 247 ms
Unsafe
- Unsafe對象無法直接調用, 而隻能通過反射獲得
- 使用例子:
class UnsafeAccessor {
static Unsafe unsafe;
static {
try {
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
// 無法讀取 public以外的, 此項預設 false. 設定為 true來取消封裝
theUnsafe.setAccessible(true);
unsafe = (Unsafe) theUnsafe.get(null);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new Error(e);
}
}
static Unsafe getUnsafe() {
return unsafe;
}
}
@Data
class Student {
volatile int id;
volatile String name;
}
public class App34 {
public static void main(String[] args) throws NoSuchFieldException {
Unsafe unsafe = UnsafeAccessor.getUnsafe();
Field id = Student.class.getDeclaredField("id");
Field name = Student.class.getDeclaredField("name");
// 獲得成員變量的偏移量
long idOffset = unsafe.objectFieldOffset(id);
long nameOffset = unsafe.objectFieldOffset(name);
Student student = new Student();
// 使用 CAS方法替換成員變量的值
System.out.println("設定 id: " + unsafe.compareAndSwapInt(student, idOffset, 0, 20));
System.out.println("設定 name: " + unsafe.compareAndSwapObject(student, nameOffset, null, "張三"));
System.out.println(student);
}
}
設定 id: true
設定 name: true
Student(id=20, name=張三)
- 自定義原子整數例子:
class AtomicData {
private volatile int data;
static final Unsafe unsafe;
static final long DATA_OFFSET;
static {
unsafe = UnsafeAccessor.getUnsafe();
try {
// data屬性在對象中的偏移量, 用于 Unsafe直接通路該屬性
DATA_OFFSET = unsafe.objectFieldOffset(AtomicData.class.getDeclaredField("data"));
} catch (NoSuchFieldException e) {
throw new Error(e);
}
}
public AtomicData(int data) {
this.data = data;
}
public void withdraw(int amount) {
int oldValue;
while(true) {
oldValue = data;
// CAS嘗試修改 data為(舊值 - amount)
if (unsafe.compareAndSwapInt(this, DATA_OFFSET, oldValue, oldValue - amount)) {
return;
}
}
}
public int getData() {
return data;
}
}
interface Account {
// 擷取餘額
Integer getBalance();
// 取款
void withdraw(Integer amount);
}
class AccountSafe implements Account {
private AtomicData balance;
public AccountSafe(Integer balance) {
this.balance = new AtomicData(balance);
}
@Override
public Integer getBalance() {
return balance.getData();
}
@Override
public void withdraw(Integer amount) {
balance.withdraw(amount);
}
}
public class App {
public static void main(String[] args) {
// 起始餘額 10000
Account account = new AccountSafe(10000);
// 開始時間
long start = System.nanoTime();
List<Thread> ts = new ArrayList<>();
// 建立 1000個線程
for (int i = 0; i < 1000; i++) {
ts.add(new Thread(() -> {
// 每個線程做 -10
account.withdraw(10);
}));
}
// 啟動所有(1000個)線程
ts.forEach(Thread::start);
ts.forEach(t -> {
try {
// 同步等待所有線程運作結束
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 最後正确的餘額為 0
System.out.println("目前餘額: " + account.getBalance() +
", 耗費時間: " + (System.nanoTime()-start) / 1000_000 + " ms");
// 目前餘額: 0, 耗費時間: 421 ms
}
}
如果您覺得有幫助,歡迎點贊哦 ~ 謝謝!!