并發工具類
- CountDownLatch :
允許一個或多個線程等待,直到在其他線程完成工作等待的線程再執行(增強版Join).
場景:初始化線程執行後其他線程才可執行。
public class UseCountDownLatch {
//聲明CountDownLatch 計數總值 6
static CountDownLatch latch = new CountDownLatch(6);
//初始化線程
private static class InitThread implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName() +" InitThread init ...");
latch.countDown();//初始化線程完成工作了,countDown方法隻扣減一次;
System.out.println(Thread.currentThread().getName() +" ...InitThread work"); //繼續執行
}
}
//業務線程
private static class BusiThread implements Runnable{
@Override
public void run() {
try {
latch.await(); //業務線程等待 countDown 計數值扣減完 才能繼續執行
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() +" BusiThread business");
}
}
public static void main(String[] args) throws InterruptedException {
//單獨的初始化線程 countDown 2次
new Thread(new Runnable() {
@Override
public void run() {
SleepTools.ms(1);
System.out.println(Thread.currentThread().getName() +" ready init work step 1st......");
latch.countDown();//每完成一步初始化工作,扣減一次
System.out.println("begin step 2nd.......");
SleepTools.ms(1); //休眠1毫秒
System.out.println(Thread.currentThread().getName() +" ready init work step 2nd......");
latch.countDown();//每完成一步初始化工作,扣減一次
}
},"匿名内部類線程").start();
new Thread(new BusiThread()).start(); //業務線程
for(int i=0;i<4;i++){ //啟動4個初始化線程 countDown 4次
Thread thread = new Thread(new InitThread()); //初始化線程
thread.start();
}
latch.await(); //主線程等待 countDown 計數值扣減完
System.out.println("Main do ites work........");
}
}

-
Semaphore :
場景:控制線程信号量
信号量Semaphore是一個并發工具類,用來控制可同時并發的線程數,每次線程執行操作時先通過acquire方法獲得許可,執行完畢再通過release方法釋放許可。如果無可用許可,那麼acquire方法将一直阻塞,直到其它線程釋放許可。
線程池用來控制實際工作的線程數量,通過線程複用的方式來減小記憶體開銷。線程池可同時工作的線程數量是一定的,超過該數量的線程需進入線程隊列等待,直到有可用的工作線程來執行任務。
使用Semphore,你建立了多少線程,實際就會有多少線程進行執行,隻是可同時執行的線程數量會受到限制。但使用線程池,你建立的線程隻是作為任務送出給線程池執行,實際工作的線程由線程池建立,并且實際工作的線程數量由線程池自己管理。
簡單來說,線程池實際工作的線程是work線程,不是你自己建立的,是由線程池建立的,并由線程池自動控制實際并發的work線程數量。而Seamphore相當于一個信号燈,作用是對線程做限流,Seamphore可以對你自己建立的的線程做限流(也可以對線程池的work線程做限流),Seamphore的限流必須通過手動acquire和release來實作。
public class Test {
//線程信号量 許可證2
static Semaphore semaphore = new Semaphore(2);
public static void main(String[] args) {
Thread thread01 = new Thread(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire(); //擷取許可證,如果擷取不到則阻塞(挂起)
System.out.println(Thread.currentThread().getName()+"run...");
Thread.sleep(2000);
semaphore.release(); //釋放許可證
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"thread01");
Thread thread02 =new Thread(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire(); //擷取許可證,如果擷取不到則阻塞(挂起)
System.out.println(Thread.currentThread().getName()+"run...");
Thread.sleep(5000);
semaphore.release(); //釋放許可證
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"thread02");
Thread thread03 =new Thread(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire(); //擷取許可證,如果擷取不到則阻塞(挂起)
System.out.println(Thread.currentThread().getName()+"run...");
semaphore.release();//釋放許可證
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"thread03");
//啟動三個線程使用同一信号量
thread01.start(); //休眠2秒
thread02.start(); //休眠5秒
thread03.start(); //隻能等到前兩個線程其中一個釋放許可證之後才執行
//注意:線程池控制的是線程數,信号量不控制線程數隻控制并發量
}
}
-
CyclicBarrier(線程計數器)
場景:初始化線程執行後其他線程才可執行。
parties 線程數
當線程數同時或者狀态是await時才能繼續往下執行否則等待
public CyclicBarrier(int parties)
parties 線程數
barrierAction 執行線程
當指定的線程數全部await時,barrierAction 線程執行
public CyclicBarrier(int parties, Runnable barrierAction) 有參構造
public class UseCyclicBarrier {
//線程計數器 值5,工作線程 === 當其他線程同時await是CollectThread線程才可執行
private static CyclicBarrier barrier = new CyclicBarrier(5,new CollectThread());
//存放子線程工作結果的容器
private static ConcurrentHashMap<String,Long> resultMap = new ConcurrentHashMap<>();
public static void main(String[] args) {
for(int i=0;i<=4;i++){
Thread thread = new Thread(new SubThread()); //開啟5個初始化線程
thread.start();
}
}
//工作線程
private static class CollectThread implements Runnable{
@Override
public void run() {
StringBuilder result = new StringBuilder();
for(Map.Entry<String,Long> workResult : resultMap.entrySet()){
result.append("["+workResult.getValue()+"]");
}
System.out.println(" the result = "+ result); //輸出線程ID
}
}
//初始化線程
private static class SubThread implements Runnable{
@Override
public void run() {
Long thread_id = Thread.currentThread().getId(); //線程ID
String thread_name = Thread.currentThread().getName();//線程本身的處理結果
resultMap.put(thread_name,thread_id); //線程ID name 存入map
Random r = new Random();//随機決定工作線程的是否睡眠
try {
if(r.nextBoolean()) { //随機決定工作線程的是否睡眠
System.out.println(thread_name+" ....do sleep 2 ");
Thread.sleep(2000); //休眠2秒
}
System.out.println(thread_name+"....is await");
barrier.await(); //線程等待
Thread.sleep(1000); //線程休眠1秒
System.out.println(thread_name+" ....do its business ");
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
注意:CyclicBarrier 和 CountDownLatch 的差別在于CyclicBarrier的線程計數器和線程數是相等的(一組線程全部或同時不分先後處于await狀态時,指定的線程或其他線程才可執行),CountDownLatch 線程計數器和線程數是不相等的(線程大于線程計數值并且線程計數值扣減完之後其他線程才可執行)
- Exchanger (兩個線程間資料交換)
public class Test {
//Exchanger 兩個線程間資料交換
static Exchanger<HashSet> exchanger = new Exchanger<>();
//線程A
public static class A extends Thread {
@Override
public void run() {
HashSet<String> setA = new HashSet<>();
setA.add("A");
try {
HashSet exchange = exchanger.exchange(setA); //擷取B線程資料
Stream.of(exchange.toArray()).forEach((str)-> System.out.println("線程A 值:"+str));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//線程B
public static class B extends Thread {
@Override
public void run() {
HashSet<String> setB = new HashSet<>();
setB.add("B");
try {
HashSet exchange = exchanger.exchange(setB); //擷取A線程資料
Stream.of(exchange.toArray()).forEach((str)-> System.out.println("線程B 值:"+str));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
new Thread(new A(),"A").start();
new Thread(new B(),"B").start();
}