并发工具类
- CountDownLatch :
允许一个或多个线程等待,直到在其他线程完成工作等待的线程再执行(增强版Join).
场景:初始化线程执行后其他线程才可执行。
public class UseCountDownLatch {
//声明CountDownLatch 计数总值 6
static CountDownLatch latch = new CountDownLatch(6);
//初始化线程
private static class InitThread implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName() +" InitThread init ...");
latch.countDown();//初始化线程完成工作了,countDown方法只扣减一次;
System.out.println(Thread.currentThread().getName() +" ...InitThread work"); //继续执行
}
}
//业务线程
private static class BusiThread implements Runnable{
@Override
public void run() {
try {
latch.await(); //业务线程等待 countDown 计数值扣减完 才能继续执行
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() +" BusiThread business");
}
}
public static void main(String[] args) throws InterruptedException {
//单独的初始化线程 countDown 2次
new Thread(new Runnable() {
@Override
public void run() {
SleepTools.ms(1);
System.out.println(Thread.currentThread().getName() +" ready init work step 1st......");
latch.countDown();//每完成一步初始化工作,扣减一次
System.out.println("begin step 2nd.......");
SleepTools.ms(1); //休眠1毫秒
System.out.println(Thread.currentThread().getName() +" ready init work step 2nd......");
latch.countDown();//每完成一步初始化工作,扣减一次
}
},"匿名内部类线程").start();
new Thread(new BusiThread()).start(); //业务线程
for(int i=0;i<4;i++){ //启动4个初始化线程 countDown 4次
Thread thread = new Thread(new InitThread()); //初始化线程
thread.start();
}
latch.await(); //主线程等待 countDown 计数值扣减完
System.out.println("Main do ites work........");
}
}

-
Semaphore :
场景:控制线程信号量
信号量Semaphore是一个并发工具类,用来控制可同时并发的线程数,每次线程执行操作时先通过acquire方法获得许可,执行完毕再通过release方法释放许可。如果无可用许可,那么acquire方法将一直阻塞,直到其它线程释放许可。
线程池用来控制实际工作的线程数量,通过线程复用的方式来减小内存开销。线程池可同时工作的线程数量是一定的,超过该数量的线程需进入线程队列等待,直到有可用的工作线程来执行任务。
使用Semphore,你创建了多少线程,实际就会有多少线程进行执行,只是可同时执行的线程数量会受到限制。但使用线程池,你创建的线程只是作为任务提交给线程池执行,实际工作的线程由线程池创建,并且实际工作的线程数量由线程池自己管理。
简单来说,线程池实际工作的线程是work线程,不是你自己创建的,是由线程池创建的,并由线程池自动控制实际并发的work线程数量。而Seamphore相当于一个信号灯,作用是对线程做限流,Seamphore可以对你自己创建的的线程做限流(也可以对线程池的work线程做限流),Seamphore的限流必须通过手动acquire和release来实现。
public class Test {
//线程信号量 许可证2
static Semaphore semaphore = new Semaphore(2);
public static void main(String[] args) {
Thread thread01 = new Thread(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire(); //获取许可证,如果获取不到则阻塞(挂起)
System.out.println(Thread.currentThread().getName()+"run...");
Thread.sleep(2000);
semaphore.release(); //释放许可证
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"thread01");
Thread thread02 =new Thread(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire(); //获取许可证,如果获取不到则阻塞(挂起)
System.out.println(Thread.currentThread().getName()+"run...");
Thread.sleep(5000);
semaphore.release(); //释放许可证
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"thread02");
Thread thread03 =new Thread(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire(); //获取许可证,如果获取不到则阻塞(挂起)
System.out.println(Thread.currentThread().getName()+"run...");
semaphore.release();//释放许可证
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"thread03");
//启动三个线程使用同一信号量
thread01.start(); //休眠2秒
thread02.start(); //休眠5秒
thread03.start(); //只能等到前两个线程其中一个释放许可证之后才执行
//注意:线程池控制的是线程数,信号量不控制线程数只控制并发量
}
}
-
CyclicBarrier(线程计数器)
场景:初始化线程执行后其他线程才可执行。
parties 线程数
当线程数同时或者状态是await时才能继续往下执行否则等待
public CyclicBarrier(int parties)
parties 线程数
barrierAction 执行线程
当指定的线程数全部await时,barrierAction 线程执行
public CyclicBarrier(int parties, Runnable barrierAction) 有参构造
public class UseCyclicBarrier {
//线程计数器 值5,工作线程 === 当其他线程同时await是CollectThread线程才可执行
private static CyclicBarrier barrier = new CyclicBarrier(5,new CollectThread());
//存放子线程工作结果的容器
private static ConcurrentHashMap<String,Long> resultMap = new ConcurrentHashMap<>();
public static void main(String[] args) {
for(int i=0;i<=4;i++){
Thread thread = new Thread(new SubThread()); //开启5个初始化线程
thread.start();
}
}
//工作线程
private static class CollectThread implements Runnable{
@Override
public void run() {
StringBuilder result = new StringBuilder();
for(Map.Entry<String,Long> workResult : resultMap.entrySet()){
result.append("["+workResult.getValue()+"]");
}
System.out.println(" the result = "+ result); //输出线程ID
}
}
//初始化线程
private static class SubThread implements Runnable{
@Override
public void run() {
Long thread_id = Thread.currentThread().getId(); //线程ID
String thread_name = Thread.currentThread().getName();//线程本身的处理结果
resultMap.put(thread_name,thread_id); //线程ID name 存入map
Random r = new Random();//随机决定工作线程的是否睡眠
try {
if(r.nextBoolean()) { //随机决定工作线程的是否睡眠
System.out.println(thread_name+" ....do sleep 2 ");
Thread.sleep(2000); //休眠2秒
}
System.out.println(thread_name+"....is await");
barrier.await(); //线程等待
Thread.sleep(1000); //线程休眠1秒
System.out.println(thread_name+" ....do its business ");
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
注意:CyclicBarrier 和 CountDownLatch 的区别在于CyclicBarrier的线程计数器和线程数是相等的(一组线程全部或同时不分先后处于await状态时,指定的线程或其他线程才可执行),CountDownLatch 线程计数器和线程数是不相等的(线程大于线程计数值并且线程计数值扣减完之后其他线程才可执行)
- Exchanger (两个线程间数据交换)
public class Test {
//Exchanger 两个线程间数据交换
static Exchanger<HashSet> exchanger = new Exchanger<>();
//线程A
public static class A extends Thread {
@Override
public void run() {
HashSet<String> setA = new HashSet<>();
setA.add("A");
try {
HashSet exchange = exchanger.exchange(setA); //获取B线程数据
Stream.of(exchange.toArray()).forEach((str)-> System.out.println("线程A 值:"+str));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//线程B
public static class B extends Thread {
@Override
public void run() {
HashSet<String> setB = new HashSet<>();
setB.add("B");
try {
HashSet exchange = exchanger.exchange(setB); //获取A线程数据
Stream.of(exchange.toArray()).forEach((str)-> System.out.println("线程B 值:"+str));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
new Thread(new A(),"A").start();
new Thread(new B(),"B").start();
}