天天看點

線程安全性-原子性、可見性、有序性

線程安全性

  • 當多個線程通路某個類時,不管運作時環境采用何種排程方式或者這些線程将如何交替執行,并且在主調代碼中不需要任何額外的同步或協同,這個類都能表現出正确的行為,那麼這個類就是線程安全的。
  • 原子性:提供了互斥通路,同一時刻隻能有一個線程來對它進行操作
  • 可見性:一個線程對主記憶體的修改可以及時被其他線程觀察到
  • 有序性:一個線程觀察其他線程中的指令執行順序,由于指令重排序的存在,該觀察結果一般雜亂無序

原子性

Atomic包

這裡使用AtomicInteger進行計數,Java底層是使用CAS進行的悲觀鎖的同步。

詳解CAS:

https://blog.csdn.net/v123411739/article/details/79561458

Java中的CAS:

https://blog.csdn.net/mmoren/article/details/79185862
上文提到的CAS都有三個操作數,記憶體位置(V)、預期原值(A)和新值(B)。 如果記憶體位置的值與預期原值相比對,那麼處理器會自動将該位置值更新為新值 。否則,處理器不做任何操作。我再了解這個概念時遇到了一個問題,在多線程的情況下,java如何知道預期原值。這實際上和之前的JVM記憶體模型有關。
一個線程間共享的變量,首先在主存中會保留一份,然後每個線程的工作記憶體也會保留一份副本。這裡說的預期值,就是線程保留的副本。當該線程從主存中擷取該變量的值後,主存中該變量可能已經被其他線程重新整理了,但是該線程工作記憶體中該變量卻還是原來的值,這就是所謂的預期值了。當你要用 CAS 重新整理該值的時候,如果發現線程工作記憶體和主存中不一緻了,就會失敗,如果一緻,就可以更新成功。
package com.ice.concurrency.example.count;
import com.ice.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
@ThreadSafe
public class CountExample2 {

    public static int clientTotal = 5000;

    public static int threadTotal = 200;

    public static AtomicInteger count = new AtomicInteger();

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for(int i = 0;i<clientTotal;i++){
            executorService.execute(()->{
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (InterruptedException e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}",count.get());
    }

    private static void add(){
        count.incrementAndGet();
    }
}
           

實際上Java底層到底是如何實作CAS的,我還不是十厘清楚。但目前而言,已經大緻了解了CAS的原理和優缺點即可。

AtomicReferce類

@Slf4j
@ThreadSafe
public class AtomicExample4 {

    private static AtomicReference<Integer> count = new AtomicReference<>(0);

    public static void main(String[] args) {
        count.compareAndSet(0,2); // 2
        count.compareAndSet(0,1); // no
        count.compareAndSet(1,3); // no
        count.compareAndSet(2,4); // 4
        count.compareAndSet(3,5); // no
        log.info("count:{}",count.get());
    }
}
>>> count:4           

AtomicIntegerFieldUpdater類

@Slf4j
@ThreadSafe
public class AtomicExample5 {

    private static AtomicIntegerFieldUpdater<AtomicExample5> updater =
            AtomicIntegerFieldUpdater.newUpdater(AtomicExample5.class,"count");

    // count字段必須為volatile修飾且非static才能被AtomicIntegerFieldUpdater改變。
    @Getter
    public volatile int count=100;

    private static AtomicExample5 example5 = new AtomicExample5();

    public static void main(String[] args) {
        if(updater.compareAndSet(example5,100,120)){
            log.info("update success, {}",example5.getCount());
        }

        if(updater.compareAndSet(example5,100,120)){
            log.info("update success, {}",example5.getCount());
        }else{
            log.info("update failed, {}",example5.getCount());
        }
    }
}           

AtomicStampReference類(解決ABA問題)

AtomicLongArray類

AtomicBoolean類

@Slf4j
@ThreadSafe
public class AtomicExample6 {

    public static int clientTotal = 5000;

    public static int threadTotal = 200;

    public static AtomicBoolean count = new AtomicBoolean(false);

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for(int i = 0;i<clientTotal;i++){
            executorService.execute(()->{
                try {
                    semaphore.acquire();
                    test();
                    semaphore.release();
                } catch (InterruptedException e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("isHappened:{}",count.get());
    }

    private static void test(){
        if(count.compareAndSet(false,true)){
            log.info("execute");
        }
    }
}           

運作這段代碼,可以看到log隻列印了一次execute。因為不論多少個線程同時通路AtomicBoolean,隻有一個能成功修改它的值。

sychronized關鍵字
package com.ice.concurrency.example.sync;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class SynchronizedExample1 {

    public void test(){
        for(int i = 0;i<10;i++){
            log.info("test - {}",i);
        }
    }

    // synchronized修飾一個代碼塊
    public void test1(int j){
        synchronized (this){
            for(int i = 0;i<10;i++){
                log.info("test1 {} - {}",j,i);
            }
        }
    }

    // 修飾一個方法
    public synchronized void test2(int j){
        for(int i = 0;i<10;i++){
            log.info("test2 {} - {}",j,i);
        }
    }

    // 修飾一個類
    public static void test3(int j){
        synchronized (SynchronizedExample1.class){
            for(int i = 0;i<10;i++){
                log.info("test3 {} - {}",j,i);
            }
        }
    }

    // 修飾一個靜态方法
    public static synchronized void test4(int j){
        for(int i = 0;i<10;i++){
            log.info("test4 {} - {}",j,i);
        }
    }

    public static void main(String[] args) {
        SynchronizedExample1 example1 = new SynchronizedExample1();
        SynchronizedExample1 example2 = new SynchronizedExample1();
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(()->{
            example1.test1(1);
        });
        executorService.execute(()->{
            example2.test1(2);
        });
    }
}           

可見性

@Slf4j
@NotThreadSafe
public class CountExample4 {

    public static int clientTotal = 5000;

    public static int threadTotal = 200;

    public static volatile int count = 0;

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for(int i = 0;i<clientTotal;i++){
            executorService.execute(()->{
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (InterruptedException e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}",count);
    }

    private static void add(){
        // 雖然這裡count變量被volatile修飾,是可見的,但依然沒有原子性,線程不安全
        // 當兩個線程同時操作count時,同時讀一個數,又同時重新整理進記憶體,依然會浪費一次操作
        count++;
    }
}           

volatile的應用

雖然volatile修飾的變量時刻讀取都是他的真實值,是以特别适合用于作為标示量。

有序性

volatile可以保證一定的有序性

synchronized,lock保證了單線程的運作,是以肯定時有序的

java記憶體模型具有一些先天的有序性(不需要通過任何手段就能得到的有序性,即happens-before原則)

happens-before原則

happens-before規則如下:

  • 程式順序規則:一個線程中的每個操作,happens- before 于該線程中的任意後續操作。 
  • 螢幕鎖規則:對一個螢幕鎖的解鎖,happens- before 于随後對這個螢幕鎖的加鎖。 
  • volatile變量規則:對一個volatile域的寫,happens- before 于任意後續對這個volatile域的讀。 
  • 傳遞性:如果A happens- before B,且B happens- before C,那麼A happens- before C。 
  • Thread.start()的調用會happens-before于啟動線程裡面的動作。 
  • Thread中的所有動作都happens-before于其他線程從Thread.join中成功傳回。
如果兩個操作執行的次序無法從happens-before原則中推導出來,那麼就不能保證他們的有序性,jvm就可以随意的對他們進行重排序