Java 并發包中的進階同步工具
Java 中的并發包指的是 java.util.concurrent(簡稱 JUC)包和其子包下的類和接口,它為 Java 的并發提供了各種功能支援,比如:
- 提供了線程池的建立類 ThreadPoolExecutor、Executors 等;
- 提供了各種鎖,如 Lock、ReentrantLock 等;
- 提供了各種線程安全的資料結構,如 ConcurrentHashMap、LinkedBlockingQueue、DelayQueue 等;
- 提供了更加進階的線程同步結構,如 CountDownLatch、CyclicBarrier、Semaphore 等。
在前面的章節中我們已經詳細地介紹了線程池的使用、線程安全的資料結構等,本文我們就重點學習一下 Java 并發包中更進階的線程同步類:CountDownLatch、CyclicBarrier、Semaphore 和 Phaser 等。
CountDownLatch 介紹和使用
CountDownLatch(閉鎖)可以看作一個隻能做減法的計數器,可以讓一個或多個線程等待執行。
CountDownLatch 有兩個重要的方法:
- countDown():使計數器減 1;
- await():當計數器不為 0 時,則調用該方法的線程阻塞,當計數器為 0 時,可以喚醒等待的一個或者全部線程。
CountDownLatch 使用場景:
以生活中的情景為例,比如去醫院體檢,通常人們會提前去醫院排隊,但隻有等到醫生開始上班,才能正式開始體檢,醫生也要給所有人體檢完才能下班,這種情況就要使用 CountDownLatch,流程為:患者排隊 → 醫生上班 → 體檢完成 → 醫生下班。
CountDownLatch 示例代碼如下:
// 醫院閉鎖
CountDownLatch hospitalLatch = new CountDownLatch(1);
// 患者閉鎖
CountDownLatch patientLatch = new CountDownLatch(5);
System.out.println("患者排隊");
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
final int j = i;
executorService.execute(() -> {
try {
hospitalLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("體檢:" + j);
patientLatch.countDown();
});
}
System.out.println("醫生上班");
hospitalLatch.countDown();
patientLatch.await();
System.out.println("醫生下班");
executorService.shutdown();
以上程式執行結果如下:
患者排隊
醫生上班
體檢:4
體檢:0
體檢:1
體檢:3
體檢:2
醫生下班
執行流程如下圖:
CyclicBarrier 介紹和使用
CyclicBarrier(循環屏障)通過它可以實作讓一組線程等待滿足某個條件後同時執行。
CyclicBarrier 經典使用場景是公交發車,為了簡化了解我們這裡定義,每輛公共汽車隻要上滿 4 個人就發車,後面來的人都會排隊依次遵循相應的标準。
它的構造方法為
CyclicBarrier(int parties,Runnable barrierAction)
其中,parties 表示有幾個線程來參與等待,barrierAction 表示滿足條件之後觸發的方法。CyclicBarrier 使用 await() 方法來辨別目前線程已到達屏障點,然後被阻塞。
CyclicBarrier 示例代碼如下:
import java.util.concurrent.*;
public class CyclicBarrierTest {
public static void main(String[] args) throws InterruptedException {
CyclicBarrier cyclicBarrier = new CyclicBarrier(4, new Runnable() {
@Override
public void run() {
System.out.println("發車了");
}
});
for (int i = 0; i < 4; i++) {
new Thread(new CyclicWorker(cyclicBarrier)).start();
}
}
static class CyclicWorker implements Runnable {
private CyclicBarrier cyclicBarrier;
CyclicWorker(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
for (int i = 0; i < 2; i++) {
System.out.println("乘客:" + i);
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
}
以上程式執行結果如下:
乘客:0
乘客:0
乘客:0
乘客:0
發車了
乘客:1
乘客:1
乘客:1
乘客:1
發車了
執行流程如下圖:
Semaphore 介紹和使用
Semaphore(信号量)用于管理多線程中控制資源的通路與使用。Semaphore 就好比停車場的門衛,可以控制車位的使用資源。比如來了 5 輛車,隻有 2 個車位,門衛可以先放兩輛車進去,等有車出來之後,再讓後面的車進入。
Semaphore 示例代碼如下:
Semaphore semaphore = new Semaphore(2);
ThreadPoolExecutor semaphoreThread = new ThreadPoolExecutor(10, 50, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
for (int i = 0; i < 5; i++) {
semaphoreThread.execute(() -> {
try {
// 堵塞擷取許可
semaphore.acquire();
System.out.println("Thread:" + Thread.currentThread().getName() + " 時間:" + LocalDateTime.now());
TimeUnit.SECONDS.sleep(2);
// 釋放許可
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
以上程式執行結果如下:
Thread:pool-1-thread-1 時間:2019-07-10 21:18:42
Thread:pool-1-thread-2 時間:2019-07-10 21:18:42
Thread:pool-1-thread-3 時間:2019-07-10 21:18:44
Thread:pool-1-thread-4 時間:2019-07-10 21:18:44
Thread:pool-1-thread-5 時間:2019-07-10 21:18:46
執行流程如下圖:
Phaser(移相器)是 JDK 7 提供的,它的功能是等待所有線程到達之後,才繼續或者開始進行新的一組任務。
比如有一個旅行團,我們規定所有成員必須都到達指定地點之後,才能發車去往景點一,到達景點之後可以各自遊玩,之後必須全部到達指定地點之後,才能繼續發車去往下一個景點,類似這種場景就非常适合使用 Phaser。
Phaser 示例代碼如下:
public class Lesson5\_6 {
public static void main(String[] args) throws InterruptedException {
Phaser phaser = new MyPhaser();
PhaserWorker[] phaserWorkers = new PhaserWorker[5];
for (int i = 0; i < phaserWorkers.length; i++) {
phaserWorkers[i] = new PhaserWorker(phaser);
// 注冊 Phaser 等待的線程數,執行一次等待線程數 +1
phaser.register();
}
for (int i = 0; i < phaserWorkers.length; i++) {
// 執行任務
new Thread(new PhaserWorker(phaser)).start();
}
}
static class PhaserWorker implements Runnable {
private final Phaser phaser;
public PhaserWorker(Phaser phaser) {
this.phaser = phaser;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " | 到達" );
phaser.arriveAndAwaitAdvance(); // 集合完畢發車
try {
Thread.sleep(new Random().nextInt(5) * 1000);
System.out.println(Thread.currentThread().getName() + " | 到達" );
phaser.arriveAndAwaitAdvance(); // 景點 1 集合完畢發車
Thread.sleep(new Random().nextInt(5) * 1000);
System.out.println(Thread.currentThread().getName() + " | 到達" );
phaser.arriveAndAwaitAdvance(); // 景點 2 集合完畢發車
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// Phaser 每個階段完成之後的事件通知
static class MyPhaser extends Phaser{
@Override
protected boolean onAdvance(int phase, int registeredParties) { // 每個階段執行完之後的回調
switch (phase) {
case 0:
System.out.println("==== 集合完畢發車 ====");
return false;
case 1:
System.out.println("==== 景點1集合完畢,發車去下一個景點 ====");
return false;
case 2:
System.out.println("==== 景點2集合完畢,發車回家 ====");
return false;
default:
return true;
}
}
}
}
以上程式執行結果如下:
Thread-0 | 到達
Thread-4 | 到達
Thread-3 | 到達
Thread-1 | 到達
Thread-2 | 到達
==== 集合完畢發車 ====
Thread-0 | 到達
Thread-4 | 到達
Thread-1 | 到達
Thread-3 | 到達
Thread-2 | 到達
==== 景點1集合完畢,發車去下一個景點 ====
Thread-4 | 到達
Thread-3 | 到達
Thread-2 | 到達
Thread-1 | 到達
Thread-0 | 到達
==== 景點2集合完畢,發車回家 ====
相關面試題
1.以下哪個類用于控制某組資源的通路權限?
A:Phaser
B:Semaphore
C:CountDownLatch
D:CyclicBarrier
答:B
2.以下哪個類不能被重用?
A:Phaser
B:Semaphore
C:CountDownLatch
D:CyclicBarrier
答:C
3.以下哪個方法不屬于 CountDownLatch 類?
A:await()
B:countDown()
C:getCount()
D:release()
答:D
題目解析:release() 是 Semaphore 的釋放許可的方法,CountDownLatch 類并不包含此方法。
4.CyclicBarrier 與 CountDownLatch 有什麼差別?
答:CyclicBarrier 與 CountDownLatch 本質上都是依賴 volatile 和 CAS 實作的,它們差別如下:
- CountDownLatch 隻能使用一次,而 CyclicBarrier 可以使用多次。
- CountDownLatch 是手動指定等待一個或多個線程執行完成再執行,而 CyclicBarrier 是 n 個線程互相等待,任何一個線程完成之前,所有的線程都必須等待。
5.以下哪個類不包含 await() 方法?
A:Semaphore
B:CountDownLatch
C:CyclicBarrier
答:A
6.以下程式執行花費了多長時間?
Semaphore semaphore = new Semaphore(2);
ThreadPoolExecutor semaphoreThread = new ThreadPoolExecutor(10, 50, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
for (int i = 0; i < 3; i++) {
semaphoreThread.execute(() -> {
try {
semaphore.release();
System.out.println("Hello");
TimeUnit.SECONDS.sleep(2);
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
A:1s 以内
B:2s 以上
答:A
題目解析:循環先執行了 release() 也就是釋放許可的方法,是以程式可以一次性執行 3 個線程,同時會在 1s 以内執行完。
7.Semaphore 有哪些常用的方法?
答:常用方法如下:
- acquire():擷取一個許可。
- release():釋放一個許可。
- availablePermits():目前可用的許可數。
- acquire(int n):擷取并使用 n 個許可。
- release(int n):釋放 n 個許可。
8.Phaser 常用方法有哪些?
答:常用方法如下:
- register():注冊新的參與者到 Phaser
- arriveAndAwaitAdvance():等待其他線程執行
- arriveAndDeregister():登出此線程
- forceTermination():強制 Phaser 進入終止态
- isTerminated():判斷 Phaser 是否終止
9.以下程式是否可以正常執行?“發車了”列印了多少次?
import java.util.concurrent.*;
public class TestMain {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(4, new Runnable() {
@Override
public void run() {
System.out.println("發車了");
}
});
for (int i = 0; i < 4; i++) {
new Thread(new CyclicWorker(cyclicBarrier)).start();
}
}
static class CyclicWorker implements Runnable {
private CyclicBarrier cyclicBarrier;
CyclicWorker(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
for (int i = 0; i < 2; i++) {
System.out.println("乘客:" + i);
try {
cyclicBarrier.await();
System.out.println("乘客 II:" + i);
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
}
答:可以正常執行,因為執行了兩次 await(),是以“發車了”列印了 4 次。
總結
本文我們介紹了四種比 synchronized 更進階的線程同步類,其中 CountDownLatch、CyclicBarrier、Phaser 功能比較類似都是實作線程間的等待,隻是它們的側重點有所不同,其中 CountDownLatch 一般用于等待一個或多個線程執行完,才執行目前線程,并且 CountDownLatch 不能重複使用;CyclicBarrier 用于等待一組線程資源都進入屏障點再共同執行;Phaser 是 JDK 7 提供的功能更加強大和更加靈活的線程輔助工具,等待所有線程達到之後,繼續或開始新的一組任務,Phaser 提供了動态增加和消除線程同步個數功能。而 Semaphore 提供的功能更像鎖,用于控制一組資源的通路權限。
歡迎關注我的公衆号,回複關鍵字“Java” ,将會有大禮相送!!! 祝各位面試成功!!!