CountDownLatch
概述
同步輔助類,通過它可以阻塞目前線程。也就是說,能夠實作一個線程或者多個線程一直等待,直到其他線程執行的操作完成。使用一個給定的計數器進行初始化,該計數器的操作是原子操作,即同時隻能有一個線程操作該計數器。
調用該類await()方法的線程會一直阻塞,直到其他線程調用該類的countDown()方法,使目前計數器的值變為0為止。每次調用該類的countDown()方法,目前計數器的值就會減1。當計數器的值減為0的時候,所有因調用await()方法而處于等待狀态的線程就會繼續往下執行。這種操作隻能出現一次,因為該類中的計數器不能被重置。如果需要一個可以重置計數次數的版本,可以考慮使用CyclicBarrier類。
CountDownLatch支援給定時間的等待,超過一定的時間不再等待,使用時隻需要在countDown()方法中傳入需要等待的時間即可。此時,countDown()方法的方法簽名如下:
public boolean await(long timeout, TimeUnit unit)
使用場景
在某些業務場景中,程式執行需要等待某個條件完成後才能繼續執行後續的操作。典型的應用為并行計算:當某個處理的運算量很大時,可以将該運算任務拆分成多個子任務,等待所有的子任務都完成之後,父任務再拿到所有子任務的運算結果進行彙總。
代碼示例
調用ExecutorService類的shutdown()方法,并不會第一時間内把所有線程全部都銷毀掉,而是讓目前已有的線程全部執行完,之後,再把線程池銷毀掉。
示例代碼如下:
package io.binghe.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class CountDownLatchExample {
private static final int threadCount = 200;
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++){
final int threadNum = i;
exec.execute(() -> {
try {
test(threadNum);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();
log.info("finish");
exec.shutdown();
}
private static void test(int threadNum) throws InterruptedException {
Thread.sleep(100);
log.info("{}", threadNum);
Thread.sleep(100);
}
}
支援給定時間等待的示例代碼如下:
package io.binghe.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@Slf4j
public class CountDownLatchExample {
private static final int threadCount = 200;
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++){
final int threadNum = i;
exec.execute(() -> {
try {
test(threadNum);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await(10, TimeUnit.MICROSECONDS);
log.info("finish");
exec.shutdown();
}
private static void test(int threadNum) throws InterruptedException {
Thread.sleep(100);
log.info("{}", threadNum);
}
}
Semaphore
控制同一時間并發線程的數目。能夠完成對于信号量的控制,可以控制某個資源可被同時通路的個數。
提供了兩個核心方法——acquire()方法和release()方法。acquire()方法表示擷取一個許可,如果沒有則等待,release()方法則是在操作完成後釋放對應的許可。Semaphore維護了目前通路的個數,通過提供同步機制來控制同時通路的個數。Semaphore可以實作有限大小的連結清單。
Semaphore常用于僅能提供有限通路的資源,比如:資料庫連接配接數。
每次擷取并釋放一個許可,示例代碼如下:
package io.binghe.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
@Slf4j
public class SemaphoreExample {
private static final int threadCount = 200;
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < threadCount; i++){
final int threadNum = i;
exec.execute(() -> {
try {
semaphore.acquire(); //擷取一個許可
test(threadNum);
semaphore.release(); //釋放一個許可
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
exec.shutdown();
}
private static void test(int threadNum) throws InterruptedException {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}
每次擷取并釋放多個許可,示例代碼如下:
package io.binghe.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
@Slf4j
public class SemaphoreExample {
private static final int threadCount = 200;
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < threadCount; i++){
final int threadNum = i;
exec.execute(() -> {
try {
semaphore.acquire(3); //擷取多個許可
test(threadNum);
semaphore.release(3); //釋放多個許可
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
log.info("finish");
exec.shutdown();
}
private static void test(int threadNum) throws InterruptedException {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}
假設有這樣一個場景,并發太高了,即使使用Semaphore進行控制,處理起來也比較棘手。假設系統目前允許的最高并發數是3,超過3後就需要丢棄,使用Semaphore也能實作這樣的場景,示例代碼如下:
package io.binghe.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
@Slf4j
public class SemaphoreExample {
private static final int threadCount = 200;
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < threadCount; i++){
final int threadNum = i;
exec.execute(() -> {
try {
//嘗試擷取一個許可,也可以嘗試擷取多個許可,
//支援嘗試擷取許可逾時設定,逾時後不再等待後續線程的執行
//具體可以參見Semaphore的源碼
if (semaphore.tryAcquire()) {
test(threadNum);
semaphore.release(); //釋放一個許可
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
log.info("finish");
exec.shutdown();
}
private static void test(int threadNum) throws InterruptedException {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}
CyclicBarrier
是一個同步輔助類,允許一組線程互相等待,直到到達某個公共的屏障點,通過它可以完成多個線程之間互相等待,隻有當每個線程都準備就緒後,才能各自繼續往下執行後面的操作。
與CountDownLatch有相似的地方,都是使用計數器實作,當某個線程調用了CyclicBarrier的await()方法後,該線程就進入了等待狀态,而且計數器執行加1操作,當計數器的值達到了設定的初始值,調用await()方法進入等待狀态的線程會被喚醒,繼續執行各自後續的操作。CyclicBarrier在釋放等待線程後可以重用,是以,CyclicBarrier又被稱為循環屏障。
可以用于多線程計算資料,最後合并計算結果的場景。
CyclicBarrier與CountDownLatch的差別
- CountDownLatch的計數器隻能使用一次,而CyclicBarrier的計數器可以使用reset()方法進行重置,并且可以循環使用
- CountDownLatch主要實作1個或n個線程需要等待其他線程完成某項操作之後,才能繼續往下執行,描述的是1個或n個線程等待其他線程的關系。而CyclicBarrier主要實作了多個線程之間互相等待,直到所有的線程都滿足了條件之後,才能繼續執行後續的操作,描述的是各個線程内部互相等待的關系。
- CyclicBarrier能夠處理更複雜的場景,如果計算發生錯誤,可以重置計數器讓線程重新執行一次。
- CyclicBarrier中提供了很多有用的方法,比如:可以通過getNumberWaiting()方法擷取阻塞的線程數量,通過isBroken()方法判斷阻塞的線程是否被中斷。
示例代碼如下。
package io.binghe.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class CyclicBarrierExample {
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++){
final int threadNum = i;
Thread.sleep(1000);
executorService.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
}
private static void race(int threadNum) throws Exception{
Thread.sleep(1000);
log.info("{} is ready", threadNum);
cyclicBarrier.await();
log.info("{} continue", threadNum);
}
}
設定等待逾時示例代碼如下:
package io.binghe.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
@Slf4j
public class CyclicBarrierExample {
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++){
final int threadNum = i;
Thread.sleep(1000);
executorService.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
}
private static void race(int threadNum) throws Exception{
Thread.sleep(1000);
log.info("{} is ready", threadNum);
try{
cyclicBarrier.await(2000, TimeUnit.MILLISECONDS);
}catch (BrokenBarrierException | TimeoutException e){
log.warn("BarrierException", e);
}
log.info("{} continue", threadNum);
}
}
在聲明CyclicBarrier的時候,還可以指定一個Runnable,當線程達到屏障的時候,可以優先執行Runnable中的方法。
package io.binghe.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class CyclicBarrierExample {
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
log.info("callback is running");
});
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++){
final int threadNum = i;
Thread.sleep(1000);
executorService.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
}
private static void race(int threadNum) throws Exception{
Thread.sleep(1000);
log.info("{} is ready", threadNum);
cyclicBarrier.await();
log.info("{} continue", threadNum);
}
}