基于AQS的前世今生,來學習并發工具類CountDownLatch。本文将從CountDownLatch的應用場景、源碼原了解析來學習這個并發工具類。
1、 應用場景
CountDownLatch是并發包中用來控制一個或者多個線程等待其他線程完成操作的并發工具類。現以工作中的一個場景來描述下CountDownLatch的應用,代碼如下:
/*
模拟工作中的一個需求場景:
使用者會選擇多個算法來計算費用,最後會将所有算法計算出的費用做一個權重求平均數,這個平均數是最終的費用。
每個算法的複雜度都不一樣,打算每個線程負責一個算法的實作,所有的線程執行完成,最後再求平均數。
1、為每個算法建立一個線程,每個線程負責一個算法的實作
2、通過CountDownLatch來控制所有算法線程的同步
3、全部計算完成後再求平均數
*/
public class CountDownLatchTask {
public static void main(String[] args) {
CountDownLatchTask countDownLatchTask = new CountDownLatchTask();
countDownLatchTask.startThreads(5);
}
//根據線程數和選擇的算法 排程算法對應的實作
private void startThreads(int threadNumber) {
CountDownLatch countDownLatch = new CountDownLatch(threadNumber);
for (int i = 0; i < threadNumber; i++) {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("線程算法實作:" + Thread.currentThread().getName());
countDownLatch.countDown();
}
}).start();
}
try {
countDownLatch.await();
System.out.println("權重求平均數");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
在分析原理實作前,總結下CountDownLatch的作用就是阻塞其他線程直到條件允許後才釋放該阻塞,除了上述這個小案例,實際工作中還有很多可以使用CountDownLatch的場景,比如解析Excel檔案時可以同時解析多個Sheet頁,所有的Sheet解析完成才算完成了Excel檔案的解析。從這個代碼中也可以看到CountDownLatch的主要方法就是await和countDown,下面将以這兩個方法來分析下CountDownLatch的原理實作。
2、 源碼原了解析
2.1 await方法
調用await方法會阻塞目前線程直到計數器的數值為0,方法如下:
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1); //共享式擷取AQS的同步狀态
}
調用的是AQS的acquireSharedInterruptibly方法:
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())//線程中斷 說明閉鎖對線程中斷敏感
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) //閉鎖未使用完成 線程進入同步隊列自旋等待
doAcquireSharedInterruptibly(arg);
}
其中tryAcquireShared依賴的是Sync的實作,和之前的ReentrantLock、ReentrantReadWriteLock及Semaphore相比,CountDownLatch的Sync隻提供了一種方式,代碼如下:
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1; //AQS的同步狀态為0則閉鎖結束 可以進行下一步操作
}
doAcquireSharedInterruptibly方法就不再贅述,和之前Semaphore的實作是一緻的,本質上仍然是AQS同步隊列的入隊自旋等待。
2.2 countDown方法
調用countDown方法會将計數器的數值減1直到計數器為0,方法如下:
public void countDown() {
sync.releaseShared(1);
}
和Semaphore一樣,調用的是AQS的releaseShared方法:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//減少閉鎖的計數器
doReleaseShared();//喚醒後續線程節點
return true;
}
return false;
}
其中tryReleaseShared依賴的是Sync的實作,和之前的ReentrantLock、ReentrantReadWriteLock及Semaphore相比,CountDownLatch的Sync隻提供了一種方式,代碼如下:
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false; //計數器已經是0了
int nextc = c-1; //計數器減1
if (compareAndSetState(c, nextc)) //CAS更新同步狀态
return nextc == 0;
}
}
喚醒後續線程節點的doReleaseShared也不再贅述,和之前Semaphore的實作是一緻的。
總結:CountDownLatch類使用AQS同步狀态來表示計數。在await時,所有的線程進入同步隊列自旋等待,在countDown時,擷取閉鎖成功的線程會減少閉鎖的計數器,同時喚醒後續線程取擷取閉鎖,直到await中的計數器為0,擷取到閉鎖的線程才可以通過,執行下一步操作。
參考資料:
https://github.com/lingjiango/ConcurrentProgramPractice