1.等待多線程完成的CountDownLatch
CountDownLatch允許一個或多個線程等待其他線程完成操作。即他可以實作與join()方法相同的功能,而且比join的功能更多。可以在初始化CountDownLatch時傳入一個int參數來設定初始計數值,任何在CountDownLatch對象上調用wait()的方法都将被阻塞,直到這個CountDownLatch對象的計數值為0。CountDownLatch被設計為隻能觸發一次,計數值不能被重置。
當我們調用CountDownLatch的countDown方法時,計數值N就會減1,CountDownLatch的await方法 會阻塞目前線程,直到N變成零。由于countDown方法可以用在任何地方,是以這裡說的N個 點,可以是N個線程,也可以是1個線程裡的N個執行步驟。用在多個線程時,隻需要把這個 CountDownLatch的引用傳遞到線程裡即可。
注意:計數器必須大于等于0,隻是等于0時候,計數器就是零,則此時調用await方法時不會阻塞目前線程。CountDownLatch不可能重新初始化或者修改CountDownLatch對象的内部計數器的值。一個線程調用countDown方法發生之前,另外一個線程調用await方法。一個線程調用countDown方法并不會被阻塞,隻有調用await()方法的線程才會被阻塞。
public class TestCountDownLatch {
static CountDownLatch c=new CountDownLatch(8);
public static void main(String[] args) throws InterruptedException {
for(int i=1;i<=8;i++){
Thread t=new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName()+"完成");
c.countDown();//計數器減1
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
t.start();
}
c.await();//主線程會在此處阻塞,直到CountDownLatch的計數器為0才會恢複
System.out.println("完成所有準備任務");
System.out.println("主程式開始執行");
}
}
2.同步屏障CyclicBarrier
CyclicBarrier的字面意思是可循環使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最後一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續運作。
CyclicBarrier預設的構造方法是CyclicBarrier(int parties),其參數表示屏障攔截的線程數量,每個線程調用await方法告訴CyclicBarrier我已經到達了屏障,然後目前線程被阻塞。
public class TestCyclicBarrier {
static CyclicBarrier c=new CyclicBarrier(2);
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
c.await();//在此處阻塞,等待剩餘的所有任務進入阻塞(到達屏障)即可解除阻塞
} catch (Exception e) {
}
System.out.println(1);
}
}).start();
try {
c.await();//在此處阻塞,等待剩餘的所有任務進入阻塞(到達屏障)即可解除阻塞
} catch (Exception e) {
}
System.out.println(2);
}
}
注意:如果把new CyclicBarrier(2)修改成new CyclicBarrier(3),則主線程和子線程會永遠等待, 因為沒有第三個線程執行await方法,即沒有第三個線程到達屏障,是以之前到達屏障的兩個線程都不會繼續執行。
CyclicBarrier還提供一個更進階的構造函數CyclicBarrier(int parties,Runnable barrierAction),用于線上程到達屏障時,保證會優先執行barrierAction,友善處理更複雜的業務場景。
public class TestCyclicBarrier1 {
static CyclicBarrier c = new CyclicBarrier(2, new A());
public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
new Thread(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
try {
c.await();
System.out.println(2);
} catch (Exception e) {
// TODO: handle exception
}
}
}).start();
c.await();
System.out.println(1);
}
static class A implements Runnable {
@Override
public void run() {
System.out.println(3);
}
}
}
/*
* 輸出結果:
* 3
* 2
* 1
*/
因為CyclicBarrier設定了攔截線程的數量是2,是以必須等代碼中的第一個線程和線程A 都執行完之後,才會繼續執行主線程,是以輸出結果為3 2 1。那麼此時有一個問題,如果阻塞的線程數大于CyclicBarrier的計數器會怎樣?
public class TestCyclicBarrier {
static CyclicBarrier c=new CyclicBarrier(2);
public static void main(String[] args) {
for(int i=1;i<=3;i++){
new Thread(new Runnable() {
@Override
public void run() {
try {
c.await();//在此處阻塞,等待剩餘的所有任務進入阻塞(到達屏障)
} catch (Exception e) {
}
System.out.println(Thread.currentThread().getName());
}
}).start();
}
try {
c.await();//在此處阻塞,等待剩餘的所有任務進入阻塞(到達屏障)
} catch (Exception e) {
}
System.out.println(2);
}
}
根據結果可以知道,CyclicBarrier可以自動重置計數器數量,當攔截線程數量為2時會把從阻塞隊列中任意取出兩個解除阻塞并執行,如果還有剩餘的阻塞隊列則會重置計數器,如果剩餘阻塞隊列數量小于計數器則會阻塞運作,也就是說,如果有阻塞隊列數X與計數器N,X%N==0,那麼所有線程都會執行,如果X%N!=0,那麼會有部分線程處于阻塞狀态無法執行。也可以手動調用 reset()方法來進行重置計數器。
CyclicBarrier還提供其他有用的方法,比如getNumberWaiting方法可以獲得Cyclic-Barrier 阻塞的線程數量。isBroken()方法用來了解阻塞的線程是否被中斷。
public class TestCyclicBarrier {
static CyclicBarrier c=new CyclicBarrier(2);
public static void main(String[] args) {
Thread t=new Thread(new Runnable() {
@Override
public void run() {
try {
c.await();//在此處阻塞,等待剩餘的所有任務進入阻塞(到達屏障)
} catch (Exception e) {
}
System.out.println(Thread.currentThread().getName());
}
});
t.start();
t.interrupt();
try {
c.await();//在此處阻塞,等待剩餘的所有任務進入阻塞(到達屏障)
System.out.println(2);
} catch (BrokenBarrierException|InterruptedException e) {
System.out.println(c.isBroken());
}
}
}
/*
true
*/
3.控制并發線程數的Semaphore
Semaphore(信号量)是用來控制同時通路特定資源的線程數量,它通過協調各個線程,以保證合理的使用公共資源。換句話說,鎖(Lock鎖或synchronized鎖)在任何時刻隻允許一個任務通路被加鎖的資源,而計數信号量允許n個任務同時通路這個資源,還可以将信号量看作是向外分發使用資源的“許可證”,盡管内部沒有這種許可證對象。
“許可證”的數量是有限的,是以當執有“許可證”的線程數量與“許可證”數量相同時,就會阻止其他線程對共享資源的使用,如果某一個或多個線程使用完共享資源後,就會歸還“許可證”,此時Semaphore(信号量)就會将這些歸還的“許可證”再次分發給阻塞中的線程。通過這種方式就實作了控制線程并發數。
3.1 API
- Semaphore(int permits):構造器,接受一個整型的數字,表示可用的許可證數量。
- acquire():線程調用該方法擷取一個許可證來擷取使用共享資源的資格。
- release():線程使用完共享資源之後調用方法歸還許可證。
- tryAcquire():線程調用該方法嘗試擷取許可證。
- int availablePermits():傳回此信号量中目前可用的許可證數。
- int getQueueLength():傳回正在等待擷取許可證的線程數。
- boolean hasQueuedThreads():是否有線程正在等待擷取許可證。
- void reducePermits(int reduction):減少reduction個許可證,是個protected方法。
- Collection getQueuedThreads():傳回所有等待擷取許可證的線程集合,是個protected方 法。
3.2 應用
Semaphore可以用于做流量控制,特别是公用資源有限的應用場景,比如資料庫連接配接。假如有一個需求,要讀取幾萬個檔案的資料,因為都是IO密集型任務,我們可以啟動幾十個線程 并發地讀取,但是如果讀到記憶體後,還需要存儲到資料庫中,而資料庫的連接配接數隻有10個,這 時我們必須控制隻有10個線程同時擷取資料庫連接配接儲存資料,否則會報錯無法擷取資料庫連 接。這個時候,就可以使用Semaphore來做流量控制。
public class TestSemaphore {
static Semaphore s=new Semaphore(10);
static int threadNum=30;
public static void main(String[] args) {
for(int i=1;i<=30;i++){
new Thread(new Runnable() {
@Override
public void run() {
try {
s.acquire();
System.out.println(Thread.currentThread().getName()+"do some work");
s.release();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}).start();
}
}
}
4. 線程間交換資料的Exchanger
Exchanger(交換者)是一個用于線程間協作的工具類。Exchanger用于進行線程間的資料交換。它提供一個同步點,在這個同步點,兩個線程可以交換彼此的資料。這兩個線程通過exchange方法交換資料,如果第一個線程先執行exchange()方法,它會一直等待第二個線程也 執行exchange方法,當兩個線程都到達同步點時,這兩個線程就可以交換資料,将本線程生産出來的資料傳遞給對方。是以由此可見,Exchanger将會與 生産者-消費者模型相關。
其應用場景有:Exchanger可以用于遺傳算法,遺傳算法裡需要選出兩個人作為交配對象,這時候會交換 兩人的資料,并使用交叉規則得出2個交配結果。Exchanger也可以用于校對工作,比如我們需 要将紙制銀行流水通過人工的方式錄入成電子銀行流水,為了避免錯誤,采用AB崗兩人進行 錄入,錄入到Excel之後,系統需要加載這兩個Excel,并對兩個Excel資料進行校對,看看是否錄入一緻。如果兩個線程有一個沒有執行exchange()方法,則會一直等待,如果擔心有特殊情況發 生,避免一直等待,可以使用exchange(V x,longtimeout,TimeUnit unit)設定最大等待時長。
public class TestExchanger {
static Exchanger<String> ex=new Exchanger<String>();
static ExecutorService service=Executors.newFixedThreadPool(2);
public static void main(String[] args) {
service.execute(new Runnable() {
@Override
public void run() {
String a="A";
try {
ex.exchange(a);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
service.execute(new Runnable() {
@Override
public void run() {
String b="B";
try {
String a=ex.exchange(b);
System.out.println("a錄入的是"+a);
System.out.println("b錄入的是"+b);
System.out.println("a與b是否一緻:"+a.equalsIgnoreCase(b));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
service.shutdown();
}
}