天天看點

Java 并發包中的進階同步工具 + 面試題Java 并發包中的進階同步工具

Java 并發包中的進階同步工具

Java 中的并發包指的是 java.util.concurrent(簡稱 JUC)包和其子包下的類和接口,它為 Java 的并發提供了各種功能支援,比如:

  • 提供了線程池的建立類 ThreadPoolExecutor、Executors 等;
  • 提供了各種鎖,如 Lock、ReentrantLock 等;
  • 提供了各種線程安全的資料結構,如 ConcurrentHashMap、LinkedBlockingQueue、DelayQueue 等;
  • 提供了更加進階的線程同步結構,如 CountDownLatch、CyclicBarrier、Semaphore 等。

在前面的章節中我們已經詳細地介紹了線程池的使用、線程安全的資料結構等,本文我們就重點學習一下 Java 并發包中更進階的線程同步類:CountDownLatch、CyclicBarrier、Semaphore 和 Phaser 等。

CountDownLatch 介紹和使用

CountDownLatch(閉鎖)可以看作一個隻能做減法的計數器,可以讓一個或多個線程等待執行。

CountDownLatch 有兩個重要的方法:

  • countDown():使計數器減 1;
  • await():當計數器不為 0 時,則調用該方法的線程阻塞,當計數器為 0 時,可以喚醒等待的一個或者全部線程。

CountDownLatch 使用場景:

以生活中的情景為例,比如去醫院體檢,通常人們會提前去醫院排隊,但隻有等到醫生開始上班,才能正式開始體檢,醫生也要給所有人體檢完才能下班,這種情況就要使用 CountDownLatch,流程為:患者排隊 → 醫生上班 → 體檢完成 → 醫生下班。

CountDownLatch 示例代碼如下:

// 醫院閉鎖
CountDownLatch hospitalLatch = new CountDownLatch(1);
// 患者閉鎖
CountDownLatch patientLatch = new CountDownLatch(5);
System.out.println("患者排隊");
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
    final int j = i;
    executorService.execute(() -> {
        try {
            hospitalLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("體檢:" + j);
        patientLatch.countDown();
    });
}
System.out.println("醫生上班");
hospitalLatch.countDown();
patientLatch.await();
System.out.println("醫生下班");
executorService.shutdown();

           

以上程式執行結果如下:

患者排隊

醫生上班

體檢:4

體檢:0

體檢:1

體檢:3

體檢:2

醫生下班

執行流程如下圖:

Java 并發包中的進階同步工具 + 面試題Java 并發包中的進階同步工具

CyclicBarrier 介紹和使用

CyclicBarrier(循環屏障)通過它可以實作讓一組線程等待滿足某個條件後同時執行。

CyclicBarrier 經典使用場景是公交發車,為了簡化了解我們這裡定義,每輛公共汽車隻要上滿 4 個人就發車,後面來的人都會排隊依次遵循相應的标準。

它的構造方法為

CyclicBarrier(int parties,Runnable barrierAction)

其中,parties 表示有幾個線程來參與等待,barrierAction 表示滿足條件之後觸發的方法。CyclicBarrier 使用 await() 方法來辨別目前線程已到達屏障點,然後被阻塞。

CyclicBarrier 示例代碼如下:

import java.util.concurrent.*;
public class CyclicBarrierTest {
    public static void main(String[] args) throws InterruptedException {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(4, new Runnable() {
            @Override
            public void run() {
                System.out.println("發車了");
            }
        });
        for (int i = 0; i < 4; i++) {
            new Thread(new CyclicWorker(cyclicBarrier)).start();
        }
    }
    static class CyclicWorker implements Runnable {
        private CyclicBarrier cyclicBarrier;
        CyclicWorker(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
        @Override
        public void run() {
            for (int i = 0; i < 2; i++) {
                System.out.println("乘客:" + i);
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

           

以上程式執行結果如下:

乘客:0

乘客:0

乘客:0

乘客:0

發車了

乘客:1

乘客:1

乘客:1

乘客:1

發車了

執行流程如下圖:

Java 并發包中的進階同步工具 + 面試題Java 并發包中的進階同步工具

Semaphore 介紹和使用

Semaphore(信号量)用于管理多線程中控制資源的通路與使用。Semaphore 就好比停車場的門衛,可以控制車位的使用資源。比如來了 5 輛車,隻有 2 個車位,門衛可以先放兩輛車進去,等有車出來之後,再讓後面的車進入。

Semaphore 示例代碼如下:

Semaphore semaphore = new Semaphore(2);
ThreadPoolExecutor semaphoreThread = new ThreadPoolExecutor(10, 50, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
for (int i = 0; i < 5; i++) {
    semaphoreThread.execute(() -> {
        try {
            // 堵塞擷取許可
            semaphore.acquire();
            System.out.println("Thread:" + Thread.currentThread().getName() + " 時間:" + LocalDateTime.now());
            TimeUnit.SECONDS.sleep(2);
            // 釋放許可
            semaphore.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
}

           

以上程式執行結果如下:

Thread:pool-1-thread-1 時間:2019-07-10 21:18:42

Thread:pool-1-thread-2 時間:2019-07-10 21:18:42

Thread:pool-1-thread-3 時間:2019-07-10 21:18:44

Thread:pool-1-thread-4 時間:2019-07-10 21:18:44

Thread:pool-1-thread-5 時間:2019-07-10 21:18:46

執行流程如下圖:

Java 并發包中的進階同步工具 + 面試題Java 并發包中的進階同步工具

Phaser(移相器)是 JDK 7 提供的,它的功能是等待所有線程到達之後,才繼續或者開始進行新的一組任務。

比如有一個旅行團,我們規定所有成員必須都到達指定地點之後,才能發車去往景點一,到達景點之後可以各自遊玩,之後必須全部到達指定地點之後,才能繼續發車去往下一個景點,類似這種場景就非常适合使用 Phaser。

Phaser 示例代碼如下:

public class Lesson5\_6 {
    public static void main(String[] args) throws InterruptedException {
        Phaser phaser = new MyPhaser();
        PhaserWorker[] phaserWorkers = new PhaserWorker[5];
        for (int i = 0; i < phaserWorkers.length; i++) {
            phaserWorkers[i] = new PhaserWorker(phaser);
            // 注冊 Phaser 等待的線程數,執行一次等待線程數 +1
            phaser.register();
        }
        for (int i = 0; i < phaserWorkers.length; i++) {
            // 執行任務
            new Thread(new PhaserWorker(phaser)).start();
        }
    }
    static class PhaserWorker implements Runnable {
        private final Phaser phaser;
        public PhaserWorker(Phaser phaser) {
            this.phaser = phaser;
        }
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + " | 到達" );
            phaser.arriveAndAwaitAdvance(); // 集合完畢發車
            try {
                Thread.sleep(new Random().nextInt(5) * 1000);
                System.out.println(Thread.currentThread().getName() + " | 到達" );
                phaser.arriveAndAwaitAdvance(); // 景點 1 集合完畢發車
                Thread.sleep(new Random().nextInt(5) * 1000);
                System.out.println(Thread.currentThread().getName() + " | 到達" );
                phaser.arriveAndAwaitAdvance(); // 景點 2 集合完畢發車
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    // Phaser 每個階段完成之後的事件通知
    static class MyPhaser extends Phaser{
        @Override
        protected boolean onAdvance(int phase, int registeredParties) { // 每個階段執行完之後的回調
            switch (phase) {
                case 0:
                    System.out.println("==== 集合完畢發車 ====");
                    return false;
                case 1:
                    System.out.println("==== 景點1集合完畢,發車去下一個景點 ====");
                    return false;
                case 2:
                    System.out.println("==== 景點2集合完畢,發車回家 ====");
                    return false;
                default:
                    return true;
            }
        }
    }
}

           

以上程式執行結果如下:

Thread-0 | 到達

Thread-4 | 到達

Thread-3 | 到達

Thread-1 | 到達

Thread-2 | 到達

==== 集合完畢發車 ====

Thread-0 | 到達

Thread-4 | 到達

Thread-1 | 到達

Thread-3 | 到達

Thread-2 | 到達

==== 景點1集合完畢,發車去下一個景點 ====

Thread-4 | 到達

Thread-3 | 到達

Thread-2 | 到達

Thread-1 | 到達

Thread-0 | 到達

==== 景點2集合完畢,發車回家 ====

相關面試題

1.以下哪個類用于控制某組資源的通路權限?

A:Phaser

B:Semaphore

C:CountDownLatch

D:CyclicBarrier

答:B

2.以下哪個類不能被重用?

A:Phaser

B:Semaphore

C:CountDownLatch

D:CyclicBarrier

答:C

3.以下哪個方法不屬于 CountDownLatch 類?

A:await()

B:countDown()

C:getCount()

D:release()

答:D

題目解析:release() 是 Semaphore 的釋放許可的方法,CountDownLatch 類并不包含此方法。

4.CyclicBarrier 與 CountDownLatch 有什麼差別?

答:CyclicBarrier 與 CountDownLatch 本質上都是依賴 volatile 和 CAS 實作的,它們差別如下:

  • CountDownLatch 隻能使用一次,而 CyclicBarrier 可以使用多次。
  • CountDownLatch 是手動指定等待一個或多個線程執行完成再執行,而 CyclicBarrier 是 n 個線程互相等待,任何一個線程完成之前,所有的線程都必須等待。

5.以下哪個類不包含 await() 方法?

A:Semaphore

B:CountDownLatch

C:CyclicBarrier

答:A

6.以下程式執行花費了多長時間?

Semaphore semaphore = new Semaphore(2);
ThreadPoolExecutor semaphoreThread = new ThreadPoolExecutor(10, 50, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
for (int i = 0; i < 3; i++) {
    semaphoreThread.execute(() -> {
        try {
            semaphore.release();
            System.out.println("Hello");
            TimeUnit.SECONDS.sleep(2);
            semaphore.acquire();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
}

           

A:1s 以内

B:2s 以上

答:A

題目解析:循環先執行了 release() 也就是釋放許可的方法,是以程式可以一次性執行 3 個線程,同時會在 1s 以内執行完。

7.Semaphore 有哪些常用的方法?

答:常用方法如下:

  • acquire():擷取一個許可。
  • release():釋放一個許可。
  • availablePermits():目前可用的許可數。
  • acquire(int n):擷取并使用 n 個許可。
  • release(int n):釋放 n 個許可。

8.Phaser 常用方法有哪些?

答:常用方法如下:

  • register():注冊新的參與者到 Phaser
  • arriveAndAwaitAdvance():等待其他線程執行
  • arriveAndDeregister():登出此線程
  • forceTermination():強制 Phaser 進入終止态
  • isTerminated():判斷 Phaser 是否終止

9.以下程式是否可以正常執行?“發車了”列印了多少次?

import java.util.concurrent.*;
public class TestMain {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(4, new Runnable() {
            @Override
            public void run() {
                System.out.println("發車了");
            }
        });
        for (int i = 0; i < 4; i++) {
            new Thread(new CyclicWorker(cyclicBarrier)).start();
        }
    }
    static class CyclicWorker implements Runnable {
        private CyclicBarrier cyclicBarrier;

        CyclicWorker(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
        @Override
        public void run() {
            for (int i = 0; i < 2; i++) {
                System.out.println("乘客:" + i);
                try {
                    cyclicBarrier.await();
                    System.out.println("乘客 II:" + i);
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

           

答:可以正常執行,因為執行了兩次 await(),是以“發車了”列印了 4 次。

總結

本文我們介紹了四種比 synchronized 更進階的線程同步類,其中 CountDownLatch、CyclicBarrier、Phaser 功能比較類似都是實作線程間的等待,隻是它們的側重點有所不同,其中 CountDownLatch 一般用于等待一個或多個線程執行完,才執行目前線程,并且 CountDownLatch 不能重複使用;CyclicBarrier 用于等待一組線程資源都進入屏障點再共同執行;Phaser 是 JDK 7 提供的功能更加強大和更加靈活的線程輔助工具,等待所有線程達到之後,繼續或開始新的一組任務,Phaser 提供了動态增加和消除線程同步個數功能。而 Semaphore 提供的功能更像鎖,用于控制一組資源的通路權限。

歡迎關注我的公衆号,回複關鍵字“Java” ,将會有大禮相送!!! 祝各位面試成功!!!

Java 并發包中的進階同步工具 + 面試題Java 并發包中的進階同步工具

繼續閱讀