天天看點

面試官問:“在項目中用過多線程嗎?”你就把這個案例講給他聽!多線程開發執行個體對線面試官

文章目錄

在面試當中,有時候會問到你在項目中用過多線程麼?

對于普通的應屆生或者工作時間不長的初級開發 ???—— crud仔流下了沒有技術的眼淚。

面試官問:“在項目中用過多線程嗎?”你就把這個案例講給他聽!多線程開發執行個體對線面試官

部落客這裡整理了項目中用到了多線程的一個簡單的執行個體,希望能對你有所啟發。

應用的背景非常簡單,部落客做的項目是一個稽核類的項目,稽核的資料需要推送給第三方監管系統,這隻是一個很簡單的對接,但是存在一個問題。

我們需要推送的資料大概三十萬條,但是第三方監管提供的接口隻支援單條推送(别問為什麼不支援批量,問就是沒讨撕論比好過)。可以估算一下,三十萬條資料,一條資料按3秒算,大概需要250(為什麼恰好會是這個數)個小時。

是以就考慮到引入多線程來進行并發操作,降低資料推送的時間,提高資料推送的實時性。

面試官問:“在項目中用過多線程嗎?”你就把這個案例講給他聽!多線程開發執行個體對線面試官

我們推送給第三方的資料肯定是不能重複推送的,必須要有一個機制保證各個線程推送資料的隔離。

這裡有兩個思路:

    1. 将所有資料取到集合(記憶體)中,然後進行切割,每個線程推送不同段的資料
    2. 利用 資料庫分頁的方式,每個線程取 [start,limit] 區間的資料推送,我們需要保證start的一緻性

這裡采用了第二種方式,因為考慮到可能資料量後續會繼續增加,把所有資料都加載到記憶體中,可能會有比較大的記憶體占用。

我們還得考慮到線程推送資料失敗的情況。

如果是自己的系統,我們可以把多線程調用的方法抽出來加一個事務,一個線程異常,整體復原。

但是是和第三方的對接,我們都沒法做事務的,是以,我們采用了直接在資料庫記錄失敗狀态的方法,可以在後面用其它方式處理失敗的資料。

在實際使用中,我們肯定是要用到線程池來管理線程,關于線程池,我們常用 ThreadPoolExecutor提供的線程池服務,SpringBoot中同樣也提供了線程池異步的方式,雖然SprignBoot異步可能更友善一點,但是使用ThreadPoolExecutor更加直覺地控制線程池,是以我們直接使用ThreadPoolExecutor構造方法建立線程池。

大概的技術設計示意圖:

上面叭叭了一堆,到了show you code的環節了。我将項目裡的代碼抽取出來,簡化出了一個示例。

核心代碼如下:

/**
 * @Author 三分惡
 * @Date 2021/3/5
 * @Description
 */
@Service
public class PushProcessServiceImpl implements PushProcessService {
    @Autowired
    private PushUtil pushUtil;
    @Autowired
    private PushProcessMapper pushProcessMapper;

    private final static Logger logger = LoggerFactory.getLogger(PushProcessServiceImpl.class);

    //每個線程每次查詢的條數
    private static final Integer LIMIT = 5000;
    //起的線程數
    private static final Integer THREAD_NUM = 5;
    //建立線程池
    ThreadPoolExecutor pool = new ThreadPoolExecutor(THREAD_NUM, THREAD_NUM * 2, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));

    @Override
    public void pushData() throws ExecutionException, InterruptedException {
        //計數器,需要保證線程安全
        int count = 0;
        //未推送資料總數
        Integer total = pushProcessMapper.countPushRecordsByState(0);
        logger.info("未推送資料條數:{}", total);
        //計算需要多少輪
        int num = total / (LIMIT * THREAD_NUM) + 1;
        logger.info("要經過的輪數:{}", num);
        //統計總共推送成功的資料條數
        int totalSuccessCount = 0;
        for (int i = 0; i < num; i++) {
            //接收線程傳回結果
            List<Future<Integer>> futureList = new ArrayList<>(32);
            //起THREAD_NUM個線程并行查詢更新庫,加鎖
            for (int j = 0; j < THREAD_NUM; j++) {
                synchronized (PushProcessServiceImpl.class) {
                    int start = count * LIMIT;
                    count++;
                    //送出線程,用資料起始位置辨別線程
                    Future<Integer> future = pool.submit(new PushDataTask(start, LIMIT, start));
                    //先不取值,防止阻塞,放進集合
                    futureList.add(future);
                }
            }
            //統計本輪推送成功資料
            for (Future f : futureList) {
                totalSuccessCount = totalSuccessCount + (int) f.get();
            }
        }
        //更新推送标志
        pushProcessMapper.updateAllState(1);
        logger.info("推送資料完成,需推送資料:{},推送成功:{}", total, totalSuccessCount);
    }

    /**
     * 推送資料線程類
     */
    class PushDataTask implements Callable<Integer> {
        int start;
        int limit;
        int threadNo;   //線程編号

        PushDataTask(int start, int limit, int threadNo) {
            this.start = start;
            this.limit = limit;
            this.threadNo = threadNo;
        }

        @Override
        public Integer call() throws Exception {
            int count = 0;
            //推送的資料
            List<PushProcess> pushProcessList = pushProcessMapper.findPushRecordsByStateLimit(0, start, limit);
            if (CollectionUtils.isEmpty(pushProcessList)) {
                return count;
            }
            logger.info("線程{}開始推送資料", threadNo);
            for (PushProcess process : pushProcessList) {
                boolean isSuccess = pushUtil.sendRecord(process);
                if (isSuccess) {   //推送成功
                    //更新推送辨別
                    pushProcessMapper.updateFlagById(process.getId(), 1);
                    count++;
                } else {  //推送失敗
                    pushProcessMapper.updateFlagById(process.getId(), 2);
                }
            }
            logger.info("線程{}推送成功{}條", threadNo, count);
            return count;
        }
    }
}      

代碼很長,我們簡單說一下關鍵的地方:

  • 線程建立:線程内部類選擇了實作Callable接口,這樣友善擷取線程任務執行的結果,在示例裡用于統計線程推送成功的數量
class PushDataTask implements Callable<Integer> {      
  • 使用 ThreadPoolExecutor 建立線程池,
//建立線程池
      ThreadPoolExecutor pool = new ThreadPoolExecutor(THREAD_NUM, THREAD_NUM * 2, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));      

主要構造參數如下:

- corePoolSize:線程核心參數選擇了5

- maximumPoolSize:最大線程數選擇了核心線程數2倍數

- keepAliveTime:非核心閑置線程存活時間直接置為0

- unit:非核心線程保持存活的時間選擇了 TimeUnit.SECONDS 秒

- workQueue:線程池等待隊列,使用 容量初始為100的 LinkedBlockingQueue阻塞隊列

這裡還有沒寫出來的線程池拒絕政策,采用了預設AbortPolicy:直接丢棄任務,抛出異常。

  • 使用 synchronized 來保證線程安全,保證計數器的增加是有序的
synchronized (PushProcessServiceImpl.class) {      
  • 使用集合來接收線程的運作結果,防止阻塞
List<Future<Integer>> futureList = new ArrayList<>(32);      

好了,主要的代碼和簡單的解析就到這裡了。

關于這個簡單的demo,這裡隻是簡單地做推送資料處理。考慮一下,這個執行個體是不是可以用在你項目的某些地方。例如監管系統的資料校驗、審計系統的資料統計、電商系統的資料分析等等,隻要是有大量資料處理的地方,都可以把這個例子結合到你的項目裡,這樣你就有了多線程開發的經驗。

完整代碼倉庫位址在文章底部👇👇

  • 面試官:小夥子,不錯,你這個整挺好。
  • 老三:那是自然。
  • 面試官:呦,小夥子,挺自信,那我得好好考考你。
  • 老三:放馬過來,但考無妨。
面試官問:“在項目中用過多線程嗎?”你就把這個案例講給他聽!多線程開發執行個體對線面試官

要說線程,必先說程序。

程序是程式的⼀次執⾏過程,是系統運⾏程式的基本機關,是以程序是動态的。系統運⾏⼀個程式即是⼀個程序從建立,運⾏到消亡的過程。

線程與程序相似,但線程是⼀個⽐程序更⼩的執⾏機關。⼀個程序在其執⾏的過程中可以産⽣多個線程。與程序不同的是同類的多個線程共享程序的堆和⽅法區資源,但每個線程有⾃⼰的程式計數器、虛拟機棧和本地⽅法棧,是以系統在産⽣⼀個線程,或是在各個線程之間作切換⼯作時,負擔要⽐程序⼩得多,也正因為如此,線程也被稱為輕量級程序。

Java裡建立線程主要有三種方式:

  • 繼承 Thread類:Thread 類本質上是實作了 Runnable 接口的一個執行個體,代表一個線程的執行個體。啟動線程的唯一方法就是通過 Thread 類的 start()執行個體方法。start()方法是一個 native 方法,它将啟動一個新線程,并執行 run()方法。
  • 實作 Runnable接口:如果自己的類已經 extends 另一個類,就無法直接 extends Thread,此時,可以實作一個Runnable 接口。
  • 實作 Callable接口:實作Callable接口,重寫call()方法,可以傳回一個 Future類型的傳回值。我在上面的例子裡就是用到了這種方式。

在Java中,線程共有六種狀态:

狀态 說明
NEW 初始狀态:線程被建立,但還沒有調用start()方法
RUNNABLE 運作狀态:Java線程将作業系統中的就緒和運作兩種狀态籠統的稱作“運作”
BLOCKED 阻塞狀态:表示線程阻塞于鎖
WAITING 等待狀态:表示線程進入等待狀态,進入該狀态表示目前線程需要等待其他線程做出一些特定動作(通知或中斷)
TIME_WAITING 逾時等待狀态:該狀态不同于 WAITIND,它是可以在指定的時間自行傳回的
TERMINATED 終止狀态:表示目前線程已經執行完畢

線程在自身的生命周期中, 并不是固定地處于某個狀态,而是随着代碼的執行在不同的狀态之間進行切換,Java線程狀态變化如圖示:

面試官問:“在項目中用過多線程嗎?”你就把這個案例講給他聽!多線程開發執行個體對線面試官

線程死鎖描述的是這樣⼀種情況:多個線程同時被阻塞,它們中的⼀個或者全部都在等待某個資源被釋放。由于線程被⽆限期地阻塞,是以程式不可能正常終⽌。

如下圖所示,線程 A 持有資源 2,線程 B 持有資源 1,他們同時都想申請對⽅的資源,是以這兩個線程就會互相等待⽽進⼊死鎖狀态。

面試官問:“在項目中用過多線程嗎?”你就把這個案例講給他聽!多線程開發執行個體對線面試官

産生死鎖必須滿足四個條件:

  1. 互斥條件:該資源任意⼀個時刻隻由⼀個線程占⽤。
  2. 請求與保持條件:⼀個程序因請求資源⽽阻塞時,對已獲得的資源保持不放。
  3. 不剝奪條件:線程已獲得的資源在末使⽤完之前不能被其他線程強⾏剝奪,隻有⾃⼰使⽤完畢後才釋放資源。
  4. 循環等待條件:若⼲程序之間形成⼀種頭尾相接的循環等待資源關系。

我上⾯說了産⽣死鎖的四個必要條件,為了避免死鎖,我們隻要破壞産⽣死鎖的四個條件中的其中⼀個就可以了。

  1. 破壞互斥條件 :這個條件我們沒有辦法破壞,因為我們⽤鎖本來就是想讓他們互斥的(臨界資源需要互斥通路)。
  2. 破壞請求與保持條件 :⼀次性申請所有的資源。
  3. 破壞不剝奪條件 :占⽤部分資源的線程進⼀步申請其他資源時,如果申請不到,可以主動釋放它占有的資源。
  4. 破壞循環等待條件 :靠按序申請資源來預防。按某⼀順序申請資源,釋放資源則反序釋放。破壞循環等待條件。

synchronized 關鍵字最主要的三種使⽤⽅式:

1.修飾執行個體⽅法: 作⽤于目前對象執行個體加鎖,進⼊同步代碼前要獲得 目前對象執行個體的鎖

synchronized void method() {
 //業務代碼
}      

2.修飾靜态⽅法: 也就是給目前類加鎖,會作⽤于類的所有對象執行個體 ,進⼊同步代碼前要獲得目前 class 的鎖。因為靜态成員不屬于任何⼀個執行個體對象,是類成員( static 表明這是該類的⼀個靜态資源,不管 new 了多少個對象,隻有⼀份)。是以,如果⼀個線程 A 調⽤⼀個執行個體對象的⾮靜态 synchronized ⽅法,⽽線程 B 需要調⽤這個執行個體對象所屬類的靜态 synchronized ⽅法,是允許的,不會發⽣互斥現象,因為通路靜态 synchronized ⽅法占⽤的鎖是目前類的鎖,⽽通路⾮靜态 synchronized ⽅法占⽤的鎖是目前執行個體對象鎖。

synchronized void staic method() {
 //業務代碼
}      

**3.**修飾代碼塊 :指定加鎖對象,對給定對象/類加鎖。 synchronized(this|object) 表示進⼊同步代碼庫前要獲得給定對象的鎖。 synchronized(類.class) 表示進⼊同步代碼前要獲得 目前 class 的鎖

synchronized(this) {
 //業務代碼
}      

在我的例子裡使用synchronized修飾代碼塊,給PushProcessServiceImpl類加鎖,進⼊同步代碼前要獲得 目前 class 的鎖,防止PushProcessServiceImpl類的對象在控制層調用推送資料的方法。

可以使用juc包提供的鎖。Lock接口主要相關的類和接口如下。

面試官問:“在項目中用過多線程嗎?”你就把這個案例講給他聽!多線程開發執行個體對線面試官

Lock中的主要方法:

  • lock:用來擷取鎖,如果鎖被其他線程擷取,進入等待狀态。
  • lockInterruptibly:通過這個方法去擷取鎖時,如果線程正在等待擷取鎖,則這個線程能夠響應中斷,即中斷線程的等待狀态。
  • tryLock:tryLock方法是有傳回值的,它表示用來嘗試擷取鎖,如果擷取成功,則傳回true,如果擷取失敗(即鎖已被其他線程擷取),則傳回false。
  • tryLock(long,TimeUnit):與tryLock類似,隻不過是有等待時間,在等待時間内擷取到鎖傳回true,逾時傳回false。
  • unlock:釋放鎖。

其它接口和類:

  • ReetrantLock(可重入鎖):實作了Lock接口,可重入鎖,内部定義了公平鎖與非公平鎖。可以完成synchronized 所能完成的所有工作。
  • ReadWriteLock(讀寫鎖):
public interface ReadWriteLock {  
    Lock readLock();       //擷取讀鎖  
    Lock writeLock();      //擷取寫鎖  
}        

一個用來擷取讀鎖,一個用來擷取寫鎖。也就是說将檔案的讀寫操作分開,分成2個鎖來配置設定給線程,進而使得多個線程可以同時進行讀操作。

  • ReetrantReadWriteLock(可重入讀寫鎖):ReetrantReadWriteLock同樣支援公平性選擇,支援重進入,鎖降級。

類别 synchronized Lock
存在層次 Java的關鍵字,在jvm層面上 是一個接口,api級别
鎖的釋放 1、以擷取鎖的線程執行完同步代碼,釋放鎖 2、線程執行發生異常,jvm會讓線程釋放鎖 在finally中必須釋放鎖,不然容易造成線程死鎖
鎖的擷取 假設A線程獲得鎖,B線程等待。如果A線程阻塞,B線程會一直等待 分情況而定,Lock有多個鎖擷取的方式,具體下面會說道,大緻就是可以嘗試獲得鎖,線程可以不用一直等待
鎖狀态 無法判斷 可以判斷
鎖類型 可重入 不可中斷 非公平 可重入 可判斷 可公平(兩者皆可)
性能 少量同步 大量同步

synchronized是利用java提供的原⼦性内置鎖(monitor 對象),每個對象中都内置了⼀個 ObjectMonitor 對象。這種内置的并且使⽤者看不到的鎖也被稱為螢幕鎖。

同步語句塊

synchronized 同步語句塊的實作使⽤的是 monitorenter 和 monitorexit 指令,其中monitorenter 指令指向同步代碼塊的開始位置monitorexit 指令則指明同步代碼塊的結束位置。

執⾏monitorenter指令時會嘗試擷取内置鎖,如果對象沒有被鎖定或者已經獲得了鎖,鎖的計數器+1。此時其他競争鎖的線程則會進⼊等待隊列中。

執⾏monitorexit指令時則會把計數器-1,當計數器值為0時,則鎖釋放,處于等待隊列中的線程再繼續競争鎖。

synchronized 修飾⽅法

synchronized 修飾的⽅法并沒有 monitorenter 指令和 monitorexit 指令,取得代之的确實是ACC_SYNCHRONIZED 辨別,該辨別指明了該⽅法是⼀個同步⽅法。JVM 通過該ACC_SYNCHRONIZED 通路标志來辨識⼀個⽅法是否聲明為同步⽅法,從⽽執⾏相應的同步調⽤。

當然,二者細節略有不同,但本質上都是擷取原子性内置鎖。

再深入一點,synchronized實際上有兩個隊列waitSet和entryList。

  1. 當多個線程進⼊同步代碼塊時,⾸先進⼊entryList
  2. 有⼀個線程擷取到monitor鎖後,就指派給目前線程,并且計數器+1
  3. 如果線程調⽤wait⽅法,将釋放鎖,目前線程置為null,計數器-1,同時進⼊waitSet等待被喚醒,調⽤notify或者notifyAll之後⼜會進⼊entryList競争鎖
  4. 如果線程執⾏完畢,同樣釋放鎖,計數器-1,目前線程置為null
面試官問:“在項目中用過多線程嗎?”你就把這個案例講給他聽!多線程開發執行個體對線面試官

從JDK1.6版本之後,synchronized本身也在不斷優化鎖的機制,有些情況下他并不會是⼀個很重量級的鎖。優化機制包括⾃适應鎖、⾃旋鎖、鎖消除、鎖粗化、偏向鎖、輕量級鎖。

鎖的狀态從低到⾼依次為⽆鎖**->偏向鎖->輕量級鎖->**重量級鎖,更新的過程就是從低到⾼。

面試官問:“在項目中用過多線程嗎?”你就把這個案例講給他聽!多線程開發執行個體對線面試官

自旋鎖:由于⼤部分時候,鎖被占⽤的時間很短,共享變量的鎖定時間也很短,所有沒有必要挂起線程,⽤戶态和核心态的來回上下⽂切換嚴重影響性能。⾃旋的概念就是讓線程執⾏⼀個忙循環,可以了解為就是啥也不⼲,防⽌從⽤戶态轉⼊核心态,⾃旋鎖可以通過設定-XX:+UseSpining來開啟,⾃旋的預設次數是10次,可以使⽤-XX:PreBlockSpin設定。

自适應鎖:自适應鎖就是自适應的自旋鎖,自旋鎖的時間不是固定時間,而是由前⼀次在同⼀個鎖上的⾃旋時間和鎖的持有者狀态來決定。

鎖消除:鎖消除指的是JVM檢測到⼀些同步的代碼塊,完全不存在資料競争的場景,也就是不需要加鎖,就會進⾏鎖消除。

鎖粗化:鎖粗化指的是有很多操作都是對同⼀個對象進⾏加鎖,就會把鎖的同步範圍擴充到整個操作序列之外。

偏向鎖:當線程通路同步塊擷取鎖時,會在對象頭和棧幀中的鎖記錄⾥存儲偏向鎖的線程ID,之後這個線程再次進⼊同步塊時都不需要CAS來加鎖和解鎖了,偏向鎖會永遠偏向第⼀個獲得鎖的線程,如果後續沒有其他線程獲得過這個鎖,持有鎖的線程就永遠不需要進⾏同步,反之,當有其他線程競争偏向鎖時,持有偏向鎖的線程就會釋放偏向鎖。可以⽤過設定-XX:+UseBiasedLocking開啟偏向鎖。

輕量級鎖:JVM的對象的對象頭中包含有⼀些鎖的标志位,代碼進⼊同步塊的時候,JVM将會使⽤CAS⽅式來嘗試擷取鎖,如果更新成功則會把對象頭中的狀态位标記為輕量級鎖,如果更新失敗,目前線程就嘗試⾃旋來獲得鎖。

鎖更新的過程非常複雜,簡單點說,偏向鎖就是通過對象頭的偏向線程ID來對⽐,甚⾄都不需要CAS了,⽽輕量級鎖主要就是通過CAS修改對象頭鎖記錄和⾃旋來實作,重量級鎖則是除了擁有鎖的線程其他全部阻塞。

面試官問:“在項目中用過多線程嗎?”你就把這個案例講給他聽!多線程開發執行個體對線面試官

CAS(Compare And Swap/Set)比較并交換,CAS 算法的過程是這樣:它包含 3 個參數CAS(V,E,N)。V 表示要更新的變量(記憶體值),E 表示預期值(舊的),N 表示新值。當且僅當 V 值等于 E 值時,才會将 V 的值設為 N,如果 V 值和 E 值不同,則說明已經有其他線程做了更新,則目前線程什麼都不做。最後,CAS 傳回目前 V 的真實值。

CAS是一種樂觀鎖,它總是認為自己可以成功完成操作。當多個線程同時使用 CAS 操作一個變量時,隻有一個會勝出,并成功更新,其餘均會失敗。失敗的線程不會被挂起,僅是被告知失敗,并且允許再次嘗試,當然也允許失敗的線程放棄操作。基于這樣的原理,CAS 操作即使沒有鎖,也可以發現其他線程對目前線程的幹擾,并進行恰當的處理。

java.util.concurrent.atomic 包下的類大多是使用 CAS 操作來實作的 (AtomicInteger,AtomicBoolean,AtomicLong)。

  1. ABA 問題:

比如說一個線程 one 從記憶體位置 V 中取出 A,這時候另一個線程 two 也從記憶體中取出 A,并且 two 進行了一些操作變成了 B,然後 two 又将 V 位置的資料變成 A,這時候線程 one 進行 CAS 操作發現記憶體中仍然是 A,然後 one 操作成功。盡管線程 one 的 CAS 操作成功,但可能存在潛藏的問題。從 Java1.5 開始 JDK 的 atomic 包裡提供了一個類 AtomicStampedReference 來解決 ABA 問題。

  1. 循環時間長開銷大:

對于資源競争嚴重(線程沖突嚴重)的情況,CAS 自旋的機率會比較大,進而浪費更多的 CPU 資源,效率低于 synchronized。

  1. 隻能保證一個共享變量的原子操作:

當對一個共享變量執行操作時,我們可以使用循環 CAS 的方式來保證原子操作,但是對多個共享變量操作時,循環 CAS 就無法保證操作的原子性,這個時候就可以用鎖。

ReentrantLock 是基于 Lock 實作的可重入鎖,所有的 Lock 都是基于 AQS 實作的,AQS 和 Condition 各自維護不同的對象,在使用 Lock 和 Condition 時,其實就是兩個隊列的互相移動。它所提供的共享鎖、互斥鎖都是基于對 state 的操作。

AbstractQueuedSynchronizer,抽象的隊列式的同步器,AQS 定義了一套多線程通路共享資源的同步器架構,許多同步類實作都依賴于它,如常用的

ReentrantLock/Semaphore/CountDownLatch。

AQS 核⼼思想是,如果被請求的共享資源空閑,則将目前請求資源的線程設定為有效的⼯作線程,并且将共享資源設定為鎖定狀态。如果被請求的共享資源被占⽤,那麼就需要⼀套線程阻塞等待以及被喚醒時鎖配置設定的機制,這個機制 AQS 是⽤ CLH 隊列鎖實作的,即将暫時擷取不到鎖的線程加⼊到隊列中。

看個 AQS原理圖:

面試官問:“在項目中用過多線程嗎?”你就把這個案例講給他聽!多線程開發執行個體對線面試官

AQS 使⽤⼀個 int 成員變量來表示同步狀态,通過内置的 FIFO 隊列來完成擷取資源線程的排隊⼯作。AQS 使⽤ CAS 對該同步狀态進⾏原⼦操作實作對其值的修改。

private volatile int state;//共享變量,使⽤volatile修飾保證線程可⻅性      

狀态資訊通過 protected 類型的 getState,setState,compareAndSetState 進⾏操作

//傳回同步狀态的目前值
protected final int getState() {
 return state; }
// 設定同步狀态的值
protected final void setState(int newState) {
 state = newState; }
//原⼦地(CAS操作)将同步狀态值設定為給定值update如果目前同步狀态的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
 return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}      

嘗試加鎖的時候通過CAS(CompareAndSwap)修改值,如果成功設定為1,并且把目前線程ID指派,則代表加鎖成功,⼀旦擷取到鎖,其他的線程将會被阻塞進⼊阻塞隊列⾃旋,獲得鎖的線程釋放鎖的時候将會喚醒阻塞隊列中的線程,釋放鎖的時候則會把state重新置為0,同時目前線程ID置為空。

面試官問:“在項目中用過多線程嗎?”你就把這個案例講給他聽!多線程開發執行個體對線面試官

  • Semaphore(信号量)-允許多個線程同時通路: synchronized 和 ReentrantLock 都是一次隻允許一個線程通路某個資源,Semaphore(信号量)可以指定多個線程同時通路某個資源。
  • CountDownLatch(倒計時器): CountDownLatch是一個同步工具類,用來協調多個線程之間的同步。這個工具通常用來控制線程等待,它可以讓某一個線程等待直到倒計時結束,再開始執行。
  • CyclicBarrier(循環栅欄): CyclicBarrier 和 CountDownLatch 非常類似,它也可以實作線程間的技術等待,但是它的功能比 CountDownLatch 更加複雜和強大。主要應用場景和 CountDownLatch 類似。CyclicBarrier 的字面意思是可循環使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最後一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續幹活。CyclicBarrier預設的構造方法是 CyclicBarrier(int parties),其參數表示屏障攔截的線程數量,每個線程調用await()方法告訴 CyclicBarrier 我已經到達了屏障,然後目前線程被阻塞。

相⽐synchronized的加鎖⽅式來解決共享變量的記憶體可⻅性問題,volatile就是更輕量的選擇,他沒有上下⽂切換的額外開銷成本。使⽤volatile聲明的變量,可以確定值被更新的時候對其他線程⽴刻可⻅。

volatile使⽤記憶體屏障來保證不會發⽣指令重排,解決了記憶體可⻅性的問題。

我們知道,線程都是從主記憶體中讀取共享變量到⼯作記憶體來操作,完成之後再把結果寫會主記憶體,但是這樣就會帶來可⻅性問題。舉個例⼦,假設現在我們是兩級緩存的雙核CPU架構,包含L1、L2兩級緩存。

面試官問:“在項目中用過多線程嗎?”你就把這個案例講給他聽!多線程開發執行個體對線面試官

那麼,如果X變量⽤volatile修飾的話,當線程A再次讀取變量X的話,CPU就會根據緩存⼀緻性協定強制線程A重新從主記憶體加載最新的值到⾃⼰的⼯作記憶體,⽽不是直接⽤緩存中的值。

再來說記憶體屏障的問題,volatile修飾之後會加⼊不同的記憶體屏障來保證可⻅性的問題能正确執⾏。這⾥寫的屏障基于書中提供的内容,但是實際上由于CPU架構不同,重排序的政策不同,提供的記憶體屏障也不⼀樣,⽐如x86平台上,隻有StoreLoad⼀種記憶體屏障。

  1. StoreStore屏障,保證上⾯的普通寫不和volatile寫發⽣重排序
  2. StoreLoad屏障,保證volatile寫與後⾯可能的volatile讀寫不發⽣重排序
  3. LoadLoad屏障,禁⽌volatile讀與後⾯的普通讀重排序
  4. LoadStore屏障,禁⽌volatile讀和後⾯的普通寫重排序
面試官問:“在項目中用過多線程嗎?”你就把這個案例講給他聽!多線程開發執行個體對線面試官

本身随着CPU和記憶體的發展速度差異的問題,導緻CPU的速度遠快于記憶體,是以現在的CPU加⼊了⾼速緩存,⾼速緩存⼀般可以分為L1、L2、L3三級緩存。基于上⾯的例⼦我們知道了這導緻了緩存⼀緻性的問題,是以加⼊了緩存⼀緻性協定,同時導緻了記憶體可⻅性的問題,⽽編譯器和CPU的重排序導緻了原⼦性和有序性的問題,JMM記憶體模型正是對多線程操作下的⼀系列規範限制,通過JMM我們才屏蔽了不同硬體和作業系統記憶體的通路差異,這樣保證了Java程式在不同的平台下達到⼀緻的記憶體通路效果,同時也是保證在⾼效并發的時候程式能夠正确執⾏。

面試官問:“在項目中用過多線程嗎?”你就把這個案例講給他聽!多線程開發執行個體對線面試官

  1. 提高線程的使用率,降低資源的消耗。
  2. 提高響應速度,線程的建立時間為T1,執行時間T2,銷毀時間T3,用線程池可以免去T1和T3的時間。
  3. 便于統一管理線程對象
  4. 可控制最大并發數

來看一ThreadPoolExecutor的構造方法:

public ThreadPoolExecutor(int corePoolSize,
                         int maximumPoolSize,
                         long keepAliveTime,
                         TimeUnit unit,
                        BlockingQueue<Runnable> workQueue,
                        ThreadFactory threadFactory,
                        RejectedExecutionHandler handler)       
  • 核⼼線程數corePoolSize :此值是用來初始化線程池中核心線程數,當線程池中線程池數<

    corePoolSize

    時,系統預設是添加一個任務才建立一個線程池。可以通過調用

    prestartAllCoreThreads

    方法一次性的啟動

    corePoolSize

    個數的線程。當線程數 = corePoolSize時,新任務會追加到workQueue中。
  • 允許的最大線程數maximumPoolSize:

    maximumPoolSize

    表示允許的最大線程數 = (非核心線程數+核心線程數),當

    BlockingQueue

    也滿了,但線程池中總線程數 <

    maximumPoolSize

    時候就會再次建立新的線程。
  • 活躍時間keepAliveTime:非核心線程 =(maximumPoolSize - corePoolSize ) ,非核心線程閑置下來不幹活最多存活時間。
  • 保持存活時間unit:線程池中非核心線程保持存活的時間
  • 等待隊列workQueue:線程池 等待隊列,維護着等待執行的

    Runnable

    對象。當運作當線程數= corePoolSize時,新的任務會被添加到

    workQueue

    中,如果

    workQueue

    也滿了則嘗試用非核心線程執行任務
  • 線程工廠 threadFactory:建立一個新線程時使用的工廠,可以用來設定線程名、是否為daemon線程等等。
  • 拒絕政策RejectedExecutionHandler:

    corePoolSize

    workQueue

    maximumPoolSize

    都不可用的時候執行的 飽和政策。

  1. 線程池剛建立時,裡面沒有一個線程。任務隊列是作為參數傳進來的。不過,就算隊列裡面有任務,線程池也不會馬上執行它們。
  2. 當調用 execute() 方法添加一個任務時,線程池會做如下判斷:
  • a) 如果正在運作的線程數量小于 corePoolSize,那麼馬上建立線程運作這個任務;
  • b) 如果正在運作的線程數量大于或等于 corePoolSize,那麼将這個任務放入隊列;
  • c) 如果這時候隊列滿了,而且正在運作的線程數量小于 maximumPoolSize,那麼還是要建立非核心線程立刻運作這個任務;
  • d) 如果隊列滿了,而且正在運作的線程數量大于或等于 maximumPoolSize,那麼線程池會根據拒絕政策來對應處理。
  1. 當一個線程完成任務時,它會從隊列中取下一個任務來執行。
  2. 當一個線程無事可做,超過一定的時間(keepAliveTime)時,線程池會判斷,如果目前運作的線程數大于 corePoolSize,那麼這個線程就被停掉。是以線程池的所有任務完成後,它最終會收縮到 corePoolSize 的大小。
面試官問:“在項目中用過多線程嗎?”你就把這個案例講給他聽!多線程開發執行個體對線面試官

主要有4種拒絕政策:

  1. AbortPolicy:直接丢棄任務,抛出異常,這是預設政策
  2. CallerRunsPolicy:隻⽤調⽤者所在的線程來處理任務
  3. DiscardOldestPolicy:丢棄等待隊列中最舊的任務,并執⾏目前任務
  4. DiscardPolicy:直接丢棄任務,也不抛出異常

線程在Java中屬于稀缺資源,線程池不是越大越好也不是越小越好。任務分為計算密集型、IO密集型、混合型。

  1. 計算密集型一般推薦線程池不要過大,一般是CPU數 + 1,+1是因為可能存在頁缺失(就是可能存在有些資料在硬碟中需要多來一個線程将資料讀入記憶體)。如果線程池數太大,可能會頻繁的 進行線程上下文切換跟任務排程。獲得目前CPU核心數代碼如下:
Runtime.getRuntime().availableProcessors();      
  1. IO密集型:線程數适當大一點,機器的Cpu核心數*2。
  2. 混合型:如果密集型站大頭則拆分的必要性不大,如果IO型占據不少有必要,Mark 下。

  1. ArrayBlockingQueue :由數組結構組成的有界阻塞隊列。
  2. LinkedBlockingQueue :由連結清單結構組成的有界阻塞隊列。
  3. PriorityBlockingQueue :支援優先級排序的無界阻塞隊列。
  4. DelayQueue:使用優先級隊列實作的無界阻塞隊列。
  5. SynchronousQueue:不存儲元素的阻塞隊列。
  6. LinkedTransferQueue:由連結清單結構組成的無界阻塞隊列。
  7. LinkedBlockingDeque:由連結清單結構組成的雙向阻塞隊列

在上面我們直接用到了ThreadPoolExecutor的構造方法建立線程池,還有另一種方式,通過Executors 建立線程。

需要注意的是,阿裡巴巴Java開發手冊強制禁止使用Executors建立線程

比較典型常見的四種線程池包括:

newFixedThreadPool

newSingleThreadExecutor

newCachedThreadPool

newScheduledThreadPool

FixedThreadPool

  1. 定長的線程池,有核心線程,核心線程的即為最大的線程數量,沒有非核心線程。
  2. 使用的無界的等待隊列是

    LinkedBlockingQueue

    。使用時候有堵滿等待隊列的風險。
面試官問:“在項目中用過多線程嗎?”你就把這個案例講給他聽!多線程開發執行個體對線面試官

SingleThreadPool

隻有一條線程來執行任務,适用于有順序的任務的應用場景,也是用的無界等待隊列

面試官問:“在項目中用過多線程嗎?”你就把這個案例講給他聽!多線程開發執行個體對線面試官

CachedThreadPool

可緩存的線程池,該線程池中沒有核心線程,非核心線程的數量為Integer.max_value,就是無限大,當有需要時建立線程來執行任務,沒有需要時回收線程,适用于耗時少,任務量大的情況。任務隊列用的是SynchronousQueue如果生産多快消費慢,則會導緻建立很多線程需注意。

面試官問:“在項目中用過多線程嗎?”你就把這個案例講給他聽!多線程開發執行個體對線面試官

ScheduledThreadPoolExecutor

周期性執行任務的線程池,按照某種特定的計劃執行線程中的任務,有核心線程,但也有非核心線程,非核心線程的大小也為無限大。适用于執行周期性的任務。

看構造函數:調用的還是

ThreadPoolExecutor

構造函數,差別不同點在于任務隊列是用的DelayedWorkQueue。

面試官問:“在項目中用過多線程嗎?”你就把這個案例講給他聽!多線程開發執行個體對線面試官
  • 面試官:這些題都能回答出來,很好,小夥子,很有精神!
  • 老三:謝謝。那面試官老師,你看這一輪面試……
  • 面試官:雖然你答的很好,但你的項目資料量隻有十萬級,不符合我們的要求。是以,面試不能讓你過。

老三上去就是一個左刺拳,再接一個右正蹬……

  • 面試官:啊……年輕人不講武德,來偷襲……

代碼位址:

https://gitee.com/fighter3/thread-demo.git
好了,通過本文,相信你對多線程的應用和原理都有了一定的了解。文章開頭提到的crud仔就是部落客本人了,技術水準有限,難免錯漏,歡迎指出,謝謝!