CountDownLatch原理
CountDownLatch是通過一個計數器來實作的,計數器的初始化值為線程的數量。每當一個線程完成了自己的任務後,計數器的值就相應得減1。當計數器到達0時,表示所有的線程都已完成任務,然後在閉鎖上等待的線程就可以恢複執行任務。CountDownLatch可以起到阻塞線程,并保證線程在滿足某種特定的情況後繼續執行。
CountDownLatch 的兩種典型用法
- 某一線程在開始運作前等待n個線程執行完畢。将 CountDownLatch 的計數器初始化為n :new CountDownLatch(n),每當一個任務線程執行完畢,就将計數器減1 countdownlatch.countDown(),當計數器的值變為0時,在CountDownLatch上 await()的線程就會被喚醒。一個典型應用場景就是啟動一個服務時,主線程需要等待多個元件加載完畢,之後再繼續執行。即可以通過初始化,定義線程個數。
- 實作多個線程開始執行任務的最大并行性。注意是并行性,不是并發,強調的是多個線程在某一時刻同時開始執行。類似于賽跑,将多個線程放到起點,等待發令槍響,然後同時開跑。做法是初始化一個共享的 CountDownLatch 對象,将其計數器初始化為 1 :new CountDownLatch(1),多個線程在開始執行任務前首先 coundownlatch.await(),當主線程調用 countDown() 時,計數器變為0,多個線程同時被喚醒。
源碼分析
/**
* 構造器中的計數值(count)實際上就是閉鎖需要等待的線程數量。這個值隻能被設定一次,
*而且CountDownLatch沒有提供任何機制去重新設定這個計數值。
*
* @param count 線上程可以通過 {@link #await} 之前必須調用 {@link #countDown} 的次數
* @throws IllegalArgumentException 如果給定參數小于0則抛出異常
*/
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
/**
* 使目前線程等待,直到計數器為零,除非目前線程被中斷
* 1.目前計數為零,則此方法立即傳回
* 2.如果目前計數大于零,則目前線程将因線程排程目的而被禁用并處于休眠狀态,
* 直到發生如下可能:
* 由于調用了 {@link #countDown} 方法,計數達到零
* 線程等待被中斷。
* @throws 如果目前線程在等待時被中斷
*/
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
* 可以設定等待時間,超過這個時間就會執行,不會等到計數器變為0,但是
* 之前給定的線程還是會執行完
*
* @param timeout 等待時間長度
* @param unit 等待時間機關
* @return
* @throws 如果目前線程在等待時被中斷
*/
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
與CountDownLatch的第一次互動是主線程等待其他線程。主線程必須在啟動其他線程後立即調用CountDownLatch.await()方法。這樣主線程的操作就會在這個方法上阻塞,直到其他線程完成各自的任務。
其他N 個線程必須引用閉鎖對象,因為他們需要通知CountDownLatch對象,他們已經完成了各自的任務。這種通知機制是通過 CountDownLatch.countDown()方法來完成的;每調用一次這個方法,在構造函數中初始化的count值就減1。是以當N個線程都調用了這個方法,count的值等于0,然後主線程就能通過await()方法,恢複執行自己的任務。
注意:
- CountDownLatch的構造函數 CountDownLatch countDownLatch = new CountDownLatch(7); //7表示需要等待執行完畢的線程數量。
- 在每一個線程執行完畢之後,都需要執行 countDownLatch.countDown() 方法,不然計數器就不會準确;
- 隻有所有的線程執行完畢之後,才會執行 countDownLatch.await() 之後的 代碼;
- CountDownLatch 阻塞的是主線程;
CountDownLatch 的使用示例
@Slf4j
public class CountDownLatchExample1 {
/**
* 線程數量
*/
private final static int threadCount = 200;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
test(threadNum);
} catch (Exception e) {
log.error("exception", e);
} finally {
// 表示一個請求已經完成
countDownLatch.countDown();
}
});
}
//使目前線程等待,直到計數器為零,除非目前線程被中斷
countDownLatch.await();
//當這200個請求被處理完成之後,才會執行
log.info("finish");
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
// 模拟請求的耗時操作
Thread.sleep(100);
log.info("{}", threadNum);
Thread.sleep(100);
}
}
上面的代碼中,我們定義了請求的數量為200,當這200個請求被處理完成之後,才會執行System.out.println("finish");。
//可以設定等待時間,超過這個時間就會執行,不會等到計數器變為0,但是之前給定的線程還是會執行完
countDownLatch.await(20, TimeUnit.MILLISECONDS);
上面代碼中其他跟第一個代碼一緻,使用了wait設定等待一定時間後繼續執行方法。
/**
* CountDownLatch 模拟并發調用多個任務
*
* @author zjq
*/
@Slf4j
public class CountDownLatchExample3 {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(2) {
@Override
public void await() throws InterruptedException {
super.await();
log.info("其他線程執行完畢後主線程執行的内容");
log.info("threadName:{},", Thread.currentThread().getName() + " count down is ok");
}
};
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
log.info(Thread.currentThread().getName() + "任務已完成");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//計數器減1
countDownLatch.countDown();
}
}
}, "thread111");
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2000);
log.info(Thread.currentThread().getName() + "任務已完成");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//計數器減1
countDownLatch.countDown();
}
}
}, "thread222");
thread1.start();
thread2.start();
countDownLatch.await();
log.info("====everything is end====");
}
}
上述代碼執行輸出結果:
[thread111] INFO com.zjq.concurrency.aqs.CountDownLatchExample3 - thread111任務已完成
[thread222] INFO com.zjq.concurrency.aqs.CountDownLatchExample3 - thread222任務已完成
[main] INFO com.zjq.concurrency.aqs.CountDownLatchExample3 - 其他線程執行完畢後主線程執行的内容
[main] INFO com.zjq.concurrency.aqs.CountDownLatchExample3 - threadName:main count down is ok,
[main] INFO com.zjq.concurrency.aqs.CountDownLatchExample3 - ====everything is end====
/**
* CountDownLatch 模拟多個任務并發執行完畢後等待主線程發令同時執行後續操作
*
* @author zjq
*/
@Slf4j
public class CountDownLatchExample4 {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
//所有線程阻塞在這,等待主線程号令
log.info(Thread.currentThread().getName() + "已準備完畢!!");
countDownLatch.await();
log.info("【" + Thread.currentThread().getName() + "】" + "開始執行……");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
// 主線程準備發令
Thread.sleep(2000);
log.info(Thread.currentThread().getName() + "發号施令,給我沖!!");
// 主線程:執行發令
countDownLatch.countDown();
}
}
實作最大并行性輸出結果:
[Thread-3] INFO com.zjq.aqs.CountDownLatchExample4 - Thread-3已準備完畢!!
[Thread-0] INFO com.zjq.aqs.CountDownLatchExample4 - Thread-0已準備完畢!!
[Thread-2] INFO com.zjq.aqs.CountDownLatchExample4 - Thread-2已準備完畢!!
[Thread-1] INFO com.zjq.aqs.CountDownLatchExample4 - Thread-1已準備完畢!!
[Thread-4] INFO com.zjq.aqs.CountDownLatchExample4 - Thread-4已準備完畢!!
[main] INFO com.zjq.aqs.CountDownLatchExample4 - main發号施令,給我沖!!
[Thread-3] INFO com.zjq.aqs.CountDownLatchExample4 - 【Thread-3】開始執行……
[Thread-1] INFO com.zjq.aqs.CountDownLatchExample4 - 【Thread-1】開始執行……
[Thread-0] INFO com.zjq.aqs.CountDownLatchExample4 - 【Thread-0】開始執行……
[Thread-2] INFO com.zjq.aqs.CountDownLatchExample4 - 【Thread-2】開始執行……
[Thread-4] INFO com.zjq.aqs.CountDownLatchExample4 - 【Thread-4】開始執行……
CountDownLatch 的不足
CountDownLatch是一次性的,計數器的值隻能在構造方法中初始化一次,之後沒有任何機制再次對其設定值,當CountDownLatch使用完畢後,它不能再次被使用。
CountDownLatch應用場景
(1)實作最大的并行性:有時我們想同時啟動多個線程,實作最大程度的并行性。例 如,我們想測試一個單例類。如果我們建立一個初始計數為1的CountDownLatch,并 讓所有線程都在這個鎖上等待,那麼我們可以很輕松地完成測試。我們隻需調用 一次 countDown()方法就可以讓所有的等待線程同時恢複執行。
(2)開始執行前等待n個線程完成各自任務:例如應用程式啟動類要確定在處理使用者 請求前,所有N個外部系統已經啟動和運作了。
(3)死鎖檢測:一個非常友善的使用場景是,你可以使用n個線程通路共享資源,在每次測試階段的線程數目是不同的,并嘗試産生死鎖。