天天看點

【Java并發工具類】CountDownLatch和CyclicBarrier

前言

下面介紹協調讓多線程步調一緻的兩個工具類:

CountDownLatch

CyclicBarrier

CountDownLatch和CyclicBarrier的用途介紹

CountDownLatch

// API
 void		await(); // 使目前線程在閉鎖計數器到零之前一直等待,除非線程被中斷。
 boolean	await(long timeout, TimeUnit unit); // 使目前線程在閉鎖計數器至零之前一直等待,除非線程被中斷或超出了指定的等待時間。
 void		countDown(); // 遞減閉鎖計數器,如果計數到達零,則釋放所有等待的線程。
 long		getCount(); // 傳回目前計數。
 String		toString(); // 傳回辨別此閉鎖及其狀态的字元串。
           

CountDownLatch

是一個同步工具類,在完成一組正在其他線程中執行的操作之前,它允許一個或多個線程一直等待。可以指定計數初始化CountDownLatch,當調用

countDown()

方法後,在目前計數到達零之前,

await()

方法會一直受阻塞。計數到達零之後,所有被阻塞的線程都會被釋放,

await()

的所有後續調用都會立即傳回。CountDownLatch的計數隻能被使用一次,如果需要重複計數使用,則要考慮使用

CyclicBarrier

CountDownLatch的用途有很多。将計數為1初始化的CountDownLatch可用作一個簡單的開/關或入口:在通過調用countDown()的線程打開入口前,所有調用await()的線程都一直在入口出等待。而用N初始化CountDownLatch可以使一個線程在N個線程完成某項操作之前一直等待,或者使其在某項操作完成N次之前一直等待。

COuntDownLatch的記憶體一緻性語義:線程中調用

countDown()

之前的操作 Happens-Before緊跟在從另一個線程中對應

await()

成功傳回的操作。

CyclicBarrier

// API
 int		await(); // 線程将一直等待直到所有參與者都在此 barrier 上調用 await 方法
 int		await(long timeout, TimeUnit unit); // 線程将一直等待直到所有參與者都在此 barrier 上調用 await 方法, 或者超出了指定的等待時間。
 int		getNumberWaiting(); // 傳回目前在屏障處等待的參與者數目。
 int		getParties(); // 傳回要求啟動此 barrier 的參與者數目。
 boolean	isBroken(); // 查詢此屏障是否處于損壞狀态。
 void		reset(); // 将屏障重置為其初始狀态。
           

CyclicBarrier

是一個同步輔助類,它允許一組線程互相等待,直到到達某個公共屏障點(barrier也可被翻譯為栅欄) (common barrier point)。 CyclicBarrier 适用于在涉及一組固定大小的線程的程式中,這些線程必須不時地互相等待的情況。即所有線程都必須到達屏障位置後,下面的程式才能繼續執行,适于在疊代算法中使用。因為 barrier 在釋放等待線程後可以計數器會被重置可繼續使用,是以稱它為循環 的 barrier。

CyclicBarrier支援一個可選的

Runnable

指令(也就是可以傳入一個線程執行其他操作),在一組線程中的最後一個線程到達之後(但在釋放所有線程之前),該指令将隻在每個 barrier point 運作一次。這對所有參與線程繼續運作之前更新它們的共享狀态将十分有用。

CyclicBarrier的記憶體一緻性語義:線程中調用

await()

之前的操作 Happens-Before 那些是屏障操作的一部份的操作,後者依次 Happens-Before 緊跟在從另一個線程中對應

await()

Actions in a thread prior to calling await() happen-before actions that are part of the barrier action, which in turn happen-before actions following a successful return from the corresponding await() in other threads.
           

在對賬系統中使用CountDownLatch和CyclicBarrier

對賬系統流程圖如下:

【Java并發工具類】CountDownLatch和CyclicBarrier

目前對賬系統的處理流程是:先查詢訂單,然後查詢派送單,之後對比訂單和派送單,将差異寫入差異庫。對賬系統的代碼抽象後如下:

while(存在未對賬訂單){
    // 查詢未對賬訂單
    pos = getPOrders();
    // 查詢派送單
    dos = getDOrders();
    // 執行對賬操作
    diff = check(pos, dos);
    // 差異寫入差異庫
    save(diff);
}
           

利用并行優化對賬系統

目前的對賬系統,由于訂單量和派送單量巨大,是以查詢未對賬訂單

getPOrder()

和查詢派送單

getDOrder()

都相對比較慢。目前對賬系統是單線程執行的,示意圖如下(圖來自參考[1]):

【Java并發工具類】CountDownLatch和CyclicBarrier

對于串行化的系統,優化性能首先想到的就是能否利用多線程并行處理。

如果我們能将getPOrders()和getDOrders()這兩個操作并行處理,那麼将會提升效率很多。因為這兩個操作并沒有先後順序的依賴,是以,我們可以并行處理這兩個耗時的操作。

并行後的示意圖如下(圖來自參考[1]):

【Java并發工具類】CountDownLatch和CyclicBarrier

對比單線程的執行示意圖,我們發現在同等時間裡,并行執行的吞吐量近乎單線程的2倍,優化效果還是相對明顯的。

優化後的代碼如下:

while(存在未對賬訂單){
    // 查詢未對賬訂單
    Thread T1 = new Thread(()->{
        pos = getPOrders();
    });
    T1.start();

    // 查詢派送單
    Thread T2 = new Thread(()->{
        dos = getDOrders();
    });
    T2.start();

    // 要等待線程T1和T2執行完才能執行check()和save()這兩個操作
    // 通過調用T1.join()和T2.join()來實作等待
    // 當T2和T2線程退出時,調用T1.jion()和T2.join()的主線程就會從阻塞态被喚醒,進而執行check()和save()
    T1.join();
    T2.join();

    // 執行對賬操作
    diff = check(pos, dos);
    // 差異寫入差異庫
    save(diff);
}
           

使用CountDownLatch實作線程等待

上面的解決方案美中不足的地方在于:每一次while循環都會建立新的線程,而線程的建立是一個耗時操作。是以,最好能使建立出來的線程能夠循環使用。一個自然而然的方案便是線程池。

// 建立 2 個線程的線程池
Executor executor =Executors.newFixedThreadPool(2);
while(存在未對賬訂單){
    // 查詢未對賬訂單
    executor.execute(()-> {
        pos = getPOrders();
    });

    // 查詢派送單
    executor.execute(()-> {
        dos = getDOrders();
    });

    /* ??如何實作等待??*/

    // 執行對賬操作
    diff = check(pos, dos);
    // 差異寫入差異庫
    save(diff);
}   
           

于是我們就建立兩個固定大小為2的線程池,之後在while循環裡重複利用。

但是問題也出來了:主線程如何得知getPOrders()和getDOrders()這兩個操作什麼時候執完?

前面主線程通過調用線程T1和T2的

join()

方法來等待T1和T2退出,但是線上程池的方案裡,線程根本就不會退出,是以,join()方法不可取。

這時我們就可以使用CountDownLatch工具類,将其初始計數值設定為2。當執行完

pos = getPOrders();

後,将計數器減一,執行完

dos = getDOrders();

後也将計數器減一。當計數器為0時,被阻塞的主線程就可以繼續執行了。

// 建立 2 個線程的線程池
Executor executor = Executors.newFixedThreadPool(2);

while(存在未對賬訂單){
    // 計數器初始化為 2
    CountDownLatch latch = new CountDownLatch(2);
    // 查詢未對賬訂單
    executor.execute(()-> {
        pos = getPOrders();
        latch.countDown();    // 實作對計數器減1
    });

    // 查詢派送單
    executor.execute(()-> {
        dos = getDOrders();
        latch.countDown();    // 實作對計數器減1
    });

    // 等待兩個查詢操作結束
    latch.await(); // 在await()傳回之前,主線程會一直被阻塞

    // 執行對賬操作
    diff = check(pos, dos);
    // 差異寫入差異庫
    save(diff);
}
           

使用CyclicBarrier進一步優化對賬系統

除了getPOrders()和getDOrders()這兩個操作可以并行,這兩個查詢操作和

check()

save()

這兩個對賬操作之間也可以并行。

【Java并發工具類】CountDownLatch和CyclicBarrier

兩次查詢操作和對賬操作并行,對賬操作還依賴查詢操作的結果,有點像生産者-消費者的意思,兩次查詢操作是生産者,對賬操作是消費者。那麼,我們就需要一個隊列,來儲存生産者生産的資料,而消費者則從這個隊列消費資料。

不過,針對對賬系統,可以設計兩個隊列,并且這兩個隊列之間還有對應關系。訂單查詢操作将訂單查詢結果插入訂單隊列,派送單查詢操作将派送單插入派送單隊列,這兩個隊列的元素之間是有一一對應關系。這樣的好處在于:對賬操作可以每次從訂單隊列出一個元素和從派送單隊列出一個元素,然後對這兩個元素執行對賬操作,這樣資料一定不會亂掉。

【Java并發工具類】CountDownLatch和CyclicBarrier

如何使兩個隊列實作完全的并行?

兩個查詢操作所需時間并不相同,那麼一個簡單的想法便是,一個線程T1執行訂單的查詢工程,一個線程T2執行派送單的查詢工作,僅當線程T1和T2各自都生産完1條資料的時候,通知線程T3執行對賬操作。

【Java并發工具類】CountDownLatch和CyclicBarrier

先查詢完的一方需要在設定的屏障點等待另一方,直到雙方都到達屏障點,才開始繼續下一步任務。

于是我們可以使用CyclicBarrier來實作這個功能。建立一個計數器初始值為2的CyclicBarrier,同時傳入一個回調函數,當計數器減為0的時候,便調用這個函數。

Vector<P> pos; // 訂單隊列
Vector<D> dos; // 派送單隊列
// 執行回調的線程池 
// 固定線程數量為1是因為隻有單線程取擷取兩個隊列中的資料才不會出現資料比對不一緻問題
Executor executor = Executors.newFixedThreadPool(1); 
// 建立CyclicBarrier的計數器為2,傳入一個線程另外執行對賬操作
// 當計數器為0時,會運作傳入線程執行對賬操作
final CyclicBarrier barrier = new CyclicBarrier(2, ()->{
        							executor.execute(()->check());
   							 });
void check(){
    P p = pos.remove(0); // 從訂單隊列中擷取訂單
    D d = dos.remove(0); // 從派送單隊列中擷取派送單
    // 執行對賬操作
    diff = check(p, d);
    // 差異寫入差異庫
    save(diff);
}

void checkAll(){
    // 循環查詢訂單庫
    Thread T1 = new Thread(()->{
        while(存在未對賬訂單){
            pos.add(getPOrders()); // 查詢訂單庫
            barrier.await(); // 将計數器減一并等待直到計數器為0
        }
    });
    T1.start();  
    // 循環查詢運單庫
    Thread T2 = new Thread(()->{
        while(存在未對賬訂單){
            dos.add(getDOrders()); // 查詢運單庫
            barrier.await(); // 将計數器減一并等待直到計數器為0
        }
    });
    T2.start();
}
           

線程T1負責查詢訂單,當查出一條時,調用

barrier.await()

來将計數器減1,同時等待計數器變為0;線程T2負責查詢派送訂單,當查出一條時,也調用

barrier.await()

來将計數器減1,同時等待計數器變為0;當T1和T2都調用barrier.await()時,計數器就會減到0,此時T1和T2就可以執行下一條語句了,同時會調用barrier的回調函數來執行對賬操作。

CyclicBarrier的計數器有自動重置的功能,當減到0時,會自動重置你設定的初始值。于是,我們便可以重複使用CyclicBarrier。

小結

CountDownLatch

CyclicBarrier

是Java并發包提供的兩個非常易用的線程同步工具類。它們的差別在于:CountDownLatch主要用來解決一個線程等待多個線程的場景(計數器一旦減到0,再有線程調用await(),該線程會直接通過,計數器不會被重置);CyclicBarrier是一組線程之間的互相等待(計數器可以重用,減到0會重置為設定的初始值),還可以傳入回調函數,當計數器為0時,執行回調函數。

參考:

[1] 極客時間專欄王寶令《Java并發程式設計實戰》

[2] Brian Goetz.Tim Peierls. et al.Java并發程式設計實戰[M].北京:機械工業出版社,2016

[3] Oracle Java API.https://docs.oracle.com/javase/8/docs/api/index.html?overview-summary.html

每天進步一點點,不要停止前進的腳步~