天天看點

并發程式設計(八)無鎖并發: CAS(Compare And Swap)

Concurrency Programming 八

  • 無鎖并發: CAS(Compare And Swap)
    • 原子整數類
    • 原子引用類
    • 原子數組類
    • 原子字段更新器類
    • 原子累加器類
    • Unsafe

無鎖并發: CAS(Compare And Swap)

  • 可在多核 CPU環境下無阻塞的方式來保證原子性. 它不是通過加鎖的方式來保護共享變量的線程安全
  • *特點: CAS适用于多核 CPU, 同時線程數不能多于核數的環境
原理比較:
  • 悲觀鎖(synchronized/ReentrantLock): 目前線程搶到鎖, 則其它線程會被阻塞, 是以線程上下文切換頻次會相對頻繁
  • 樂觀鎖(CAS): 沒有阻塞的狀态, 每當線程分到 CPU時間片, 都會不斷地嘗試比較, 當比較成功, 則會交換(更改)
  • CAS例子:
interface Account {
    // 擷取餘額
    Integer getBalance();
    // 取款
    void withdraw(Integer amount);
}

class AccountSafe implements Account {
    // 原子整數類
    private AtomicInteger balance;

    public AccountSafe(Integer balance) {
        this.balance = new AtomicInteger(balance);
    }

    @Override
    public Integer getBalance() {
        return balance.get();
    }

    @Override
    public void withdraw(Integer amount) {
        // 不斷嘗試, 直到成功
        while (true) {
            // 舊值
            int prev = balance.get();
            // 将要設定的值
            int next = prev - amount;
            /** boolean compareAndSet(int prev, int next)方法, 在設定(Set) next值時, 會先比較 prev值與目前的餘額
             * 1. 不一緻, 則失敗. 傳回 false; 進入下次循環重試
             * 2. 一緻, 則成功. 傳回 true; 結束循環嘗試 * */
            if (balance.compareAndSet(prev, next)) {
                // 成功設定, 結束循環嘗試
                break;
            }
        }
        // 以上方式, 可以簡化為下面的方式
        // balance.addAndGet(-1 * amount);
    }
}

public class App {
    public static void main(String[] args) {
        // 起始餘額 10000
        Account account = new AccountSafe(10000);
        // 開始時間
        long start = System.nanoTime();
        List<Thread> ts = new ArrayList<>();
        // 建立 1000個線程
        for (int i = 0; i < 1000; i++) {
            ts.add(new Thread(() -> {
                // 每個線程做 -10
                account.withdraw(10);
            }));
        }
        // 啟動所有(1000個)線程
        ts.forEach(Thread::start);
        ts.forEach(t -> {
            try {
                // 同步等待所有線程運作結束
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        // 最後正确的餘額為 0
        System.out.println("目前餘額: " + account.getBalance() +
                ", 耗費時間: " + (System.nanoTime()-start) / 1000_000 + " ms");
        // 目前餘額: 0, 耗費時間: 556 ms
    }
}

           

原子整數類

  • AtomicBoolean, AtomicInteger, AtomicLong
  • AtomicInteger為例:
AtomicInteger i = new AtomicInteger(0);
// 擷取并自增(i = 0, 結果 i = 1, 傳回 0),類似于 i++
System.out.println(i.getAndIncrement());
// 自增并擷取(i = 1, 結果 i = 2, 傳回 2),類似于 ++i
System.out.println(i.incrementAndGet());
// 自減并擷取(i = 2, 結果 i = 1, 傳回 1),類似于 --i
System.out.println(i.decrementAndGet());
// 擷取并自減(i = 1, 結果 i = 0, 傳回 1),類似于 i--
System.out.println(i.getAndDecrement());
// 擷取并加值(i = 0, 結果 i = 5, 傳回 0)
System.out.println(i.getAndAdd(5));
// 加值并擷取(i = 5, 結果 i = 0, 傳回 0)
System.out.println(i.addAndGet(-5));
// 擷取并更新(i = 0, p 為 i 的目前值, 結果 i = -2, 傳回 0)
// 其中函數中的操作能保證原子,但函數需要無副作用
System.out.println(i.getAndUpdate(p -> p - 2));
// 更新并擷取(i = -2, p 為 i 的目前值, 結果 i = 0, 傳回 0)
// 其中函數中的操作能保證原子,但函數需要無副作用
System.out.println(i.updateAndGet(p -> p + 2));
// 擷取并計算(i = 0, p 為 i 的目前值, x 為參數1, 結果 i = 10, 傳回 0)
// 其中函數中的操作能保證原子,但函數需要無副作用
// getAndUpdate 如果在 lambda 中引用了外部的局部變量,要保證該局部變量是 final 的
// getAndAccumulate 可以通過 參數1 來引用外部的局部變量,但因為其不在 lambda 中是以不必是 final
System.out.println(i.getAndAccumulate(10, (p, x) -> p + x));
// 計算并擷取(i = 10, p 為 i 的目前值, x 為參數1, 結果 i = 0, 傳回 0)
// 其中函數中的操作能保證原子,但函數需要無副作用
System.out.println(i.accumulateAndGet(-10, (p, x) -> p + x));

           

原子引用類

  • AtomicReference, AtomicStampedReference, AtomicMarkableReference
  • AtomicReference為例:
interface Account {
    // 擷取餘額
    BigDecimal getBalance();
    // 取款
    void withdraw(BigDecimal amount);
}

class AccountSafe implements Account {
    AtomicReference<BigDecimal> ref;

    public AccountSafe(BigDecimal balance) {
        ref = new AtomicReference<>(balance);
    }

    @Override
    public BigDecimal getBalance() {
        return ref.get();
    }

    @Override
    public void withdraw(BigDecimal amount) {
        while (true) {
            BigDecimal prev = ref.get();
            BigDecimal next = prev.subtract(amount);
            if (ref.compareAndSet(prev, next)) {
                break;
            }
        }
    }
}

public class App {
    public static void main(String[] args) {
        // 起始餘額 10000
        Account account = new AccountSafe(new BigDecimal("10000"));
        // 開始時間
        long start = System.nanoTime();
        List<Thread> ts = new ArrayList<>();
        // 建立 1000個線程
        for (int i = 0; i < 1000; i++) {
            ts.add(new Thread(() -> {
                // 每個線程做 -10
                account.withdraw(BigDecimal.TEN);
            }));
        }
        // 啟動所有(1000個)線程
        ts.forEach(Thread::start);
        ts.forEach(t -> {
            try {
                // 同步等待所有線程運作結束
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        // 最後正确的餘額為 0
        System.out.println("目前餘額: " + account.getBalance() +
                ", 耗費時間: " + (System.nanoTime()-start) / 1000_000 + " ms");
        // 目前餘額: 0, 耗費時間: 395 ms
    }
}

           
  • ABA問題: 比如 AtomicReference共享變量最初值 A, 在 CAS比較時, 無法感覺到從 A改為 B, 再改回 A的情況.

    *為了追蹤此種變化過程就有了附帶版本号的原子引用 AtomicStampedReference, 一旦出現以上情況, 則會更改失敗以及可以查詢目前版本号

public class App {
    // 原子引用(附加版本)
    static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 0);
    public static void main(String[] args) throws InterruptedException {
        // 擷取版本号
        int stamp = ref.getStamp();
        System.out.println("起始版本 " + stamp);
        // 通過子線程修改
        other();
        TimeUnit.MILLISECONDS.sleep(1000);
        // 在主線程, 延遲1秒後嘗試修改, 請求版本号為0, 改後1
        System.out.print("A嘗試改為 C " +
                ref.compareAndSet(ref.getReference(), "C", stamp, stamp + 1));
        System.out.println(", 目前版本為 " + ref.getStamp());
    }

    private static void other() throws InterruptedException {
        // 線程1
        new Thread(() -> {
            System.out.print("A嘗試改為 B " +
                    ref.compareAndSet(ref.getReference(), "B", ref.getStamp(), ref.getStamp() + 1));
            System.out.println(", 版本更新為 " + ref.getStamp());
        }, "t1").start();
        // 睡眠, 延遲0.5秒
        TimeUnit.MILLISECONDS.sleep(500);
        // 線程2
        new Thread(() -> {
            System.out.print("B嘗試改為 A " +
                    ref.compareAndSet(ref.getReference(), "A", ref.getStamp(), ref.getStamp() + 1));
            System.out.println(", 版本更新為 " + ref.getStamp());
        }, "t2").start();
    }
}
起始版本 0
A嘗試改為 B true, 版本更新為 1 # t1
B嘗試改為 A true, 版本更新為 2 # t2
A嘗試改為 C false, 目前版本為 2 # 主線程

           

原子數組類

  • AtomicIntegerArray, AtomicLongArray, AtomicReferenceArray
  • 線程[安全& 不安全]的數組示範例子: 保護數組内元素的線程安全
public class App {
    private static <T> void demo(
            Supplier<T> arraySupplier, // 提供數組(線程不安全數組/線程安全數組)
            Function<T, Integer> lengthFun, // 擷取數組長度的方法
            BiConsumer<T, Integer> putConsumer, // 自增方法, 回傳 array, index
            Consumer<T> printConsumer ) { // 列印數組的方法
        List<Thread> ts = new ArrayList<>();
        T array = arraySupplier.get();
        int length = lengthFun.apply(array);
        for (int i = 0; i < length; i++) {
            // 建立線程, 同時對提供的數組, 做10000次操作
            ts.add(new Thread(() -> {
                for (int j = 0; j < 10000; j++) {
                    putConsumer.accept(array, j % length);
                }
            }));
        }
        // 啟動所有線程
        ts.forEach(t -> t.start());
        ts.forEach(t -> {
            try {
                // 同步等待所有線程運作結束
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        printConsumer.accept(array);
    }
}

           
  • 線程不安全的數組示範例子:
public static void main(String[] args) {
        demo(
            () -> new int[10],
            (array) -> array.length,
            (array, index) -> array[index]++,
            array-> System.out.println(Arrays.toString(array))
        );
    }

> [9902, 9905, 9907, 9906, 9918, 9918, 9916, 9913, 9902, 9903]

           
  • 線程安全的數組示範例子:
public static void main(String[] args) {
        demo(
            () -> new AtomicIntegerArray(10),
            (array) -> array.length(),
            (array, index) -> array.getAndIncrement(index),
            array -> System.out.println(array)
        );
    }

> [10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000]

           

原子字段更新器類

  • AtomicReferenceFieldUpdater, AtomicIntegerFieldUpdater, AtomicLongFieldUpdater
  • 可以對對象内的域(Field)進行原子操作, 注: 屬性必須 volatile修飾, 否則會抛異常
public class App {
    private volatile int field;
    public static void main(String[] args) {
        AtomicIntegerFieldUpdater fieldUpdater = AtomicIntegerFieldUpdater.newUpdater(App.class, "field");
        App app = new App();
        fieldUpdater.compareAndSet(app, 0, 10);
        // 修改成功 field = 10
        System.out.println(app.field);
        // 修改成功 field = 20
        fieldUpdater.compareAndSet(app, 10, 20);
        System.out.println(app.field);
        // 修改失敗 field = 20
        fieldUpdater.compareAndSet(app, 10, 30);
        System.out.println(app.field);
        // 修改成功 field = 40
        fieldUpdater.compareAndSet(app, 20, 40);
        System.out.println(app.field);
    }
}
10
20
20
40

           

原子累加器類

  • LongAdder, DoubleAdder
  • LongAdder使用例子:
public class App33 {
    private static <T> void demo(Supplier<T> adderSupplier, Consumer<T> action) {
        T adder = adderSupplier.get();
        long start = System.nanoTime();
        List<Thread> ts = new ArrayList<>();
        // 建立 10個線程, 每個線程累加100萬次
        for (int i = 0; i < 10; i++) {
            ts.add(new Thread(() -> {
                for (int j = 0; j < 1000000; j++) {
                    action.accept(adder);
                }
            }));
        }
        // 啟動所有(10個)線程
        ts.forEach(t -> t.start());
        ts.forEach(t -> {
            try {
                // 同步等待所有線程運作結束
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        System.out.println("總計: " + adder +
                ", 耗費時間: " + (System.nanoTime()-start) / 1000_000 + " ms");
    }

    public static void main(String[] args) {
        System.out.println("LongAdder:");
        for (int i = 0; i < 5; i++) {
            demo(() -> new LongAdder(), adder -> adder.increment());
        }
    }
}
LongAdder:
總計: 10000000, 耗費時間: 229 ms
總計: 10000000, 耗費時間: 94 ms
總計: 10000000, 耗費時間: 134 ms
總計: 10000000, 耗費時間: 65 ms
總計: 10000000, 耗費時間: 92 ms

           
  • 與累加器 AtomicLong性能比較:
public static void main(String[] args) {
        System.out.println("AtomicLong:");
        for (int i = 0; i < 5; i++) {
            demo(() -> new AtomicLong(), adder -> adder.getAndIncrement());
        }
    }
AtomicLong:
總計: 10000000, 耗費時間: 192 ms
總計: 10000000, 耗費時間: 180 ms
總計: 10000000, 耗費時間: 209 ms
總計: 10000000, 耗費時間: 209 ms
總計: 10000000, 耗費時間: 247 ms

           

Unsafe

  • Unsafe對象無法直接調用, 而隻能通過反射獲得
  • 使用例子:
class UnsafeAccessor {
    static Unsafe unsafe;
    static {
        try {
            Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
            // 無法讀取 public以外的, 此項預設 false. 設定為 true來取消封裝
            theUnsafe.setAccessible(true);
            unsafe = (Unsafe) theUnsafe.get(null);
        } catch (NoSuchFieldException | IllegalAccessException e) {
            throw new Error(e);
        }
    }
    static Unsafe getUnsafe() {
        return unsafe;
    }
}

@Data
class Student {
    volatile int id;
    volatile String name;
}

public class App34 {
    public static void main(String[] args) throws NoSuchFieldException {
        Unsafe unsafe = UnsafeAccessor.getUnsafe();
        Field id = Student.class.getDeclaredField("id");
        Field name = Student.class.getDeclaredField("name");
        // 獲得成員變量的偏移量
        long idOffset = unsafe.objectFieldOffset(id);
        long nameOffset = unsafe.objectFieldOffset(name);
        Student student = new Student();
        // 使用 CAS方法替換成員變量的值
        System.out.println("設定 id: " + unsafe.compareAndSwapInt(student, idOffset, 0, 20));
        System.out.println("設定 name: " + unsafe.compareAndSwapObject(student, nameOffset, null, "張三"));
        System.out.println(student);
    }
}
設定 id: true
設定 name: true
Student(id=20, name=張三)

           
  • 自定義原子整數例子:
class AtomicData {
    private volatile int data;
    static final Unsafe unsafe;
    static final long DATA_OFFSET;
    static {
        unsafe = UnsafeAccessor.getUnsafe();
        try {
            // data屬性在對象中的偏移量, 用于 Unsafe直接通路該屬性
            DATA_OFFSET = unsafe.objectFieldOffset(AtomicData.class.getDeclaredField("data"));
        } catch (NoSuchFieldException e) {
            throw new Error(e);
        }
    }
    
    public AtomicData(int data) {
        this.data = data;
    }

    public void withdraw(int amount) {
        int oldValue;
        while(true) {
            oldValue = data;
            // CAS嘗試修改 data為(舊值 - amount)
            if (unsafe.compareAndSwapInt(this, DATA_OFFSET, oldValue, oldValue - amount)) {
                return;
            }
        }
    }

    public int getData() {
        return data;
    }
}

interface Account {
    // 擷取餘額
    Integer getBalance();
    // 取款
    void withdraw(Integer amount);
}

class AccountSafe implements Account {
    private AtomicData balance;

    public AccountSafe(Integer balance) {
        this.balance = new AtomicData(balance);
    }

    @Override
    public Integer getBalance() {
        return balance.getData();
    }

    @Override
    public void withdraw(Integer amount) {
        balance.withdraw(amount);
    }
}

public class App {
    public static void main(String[] args) {
        // 起始餘額 10000
        Account account = new AccountSafe(10000);
        // 開始時間
        long start = System.nanoTime();
        List<Thread> ts = new ArrayList<>();
        // 建立 1000個線程
        for (int i = 0; i < 1000; i++) {
            ts.add(new Thread(() -> {
                // 每個線程做 -10
                account.withdraw(10);
            }));
        }
        // 啟動所有(1000個)線程
        ts.forEach(Thread::start);
        ts.forEach(t -> {
            try {
                // 同步等待所有線程運作結束
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        // 最後正确的餘額為 0
        System.out.println("目前餘額: " + account.getBalance() +
                ", 耗費時間: " + (System.nanoTime()-start) / 1000_000 + " ms");
        // 目前餘額: 0, 耗費時間: 421 ms
    }
}

           
如果您覺得有幫助,歡迎點贊哦 ~ 謝謝!!