天天看点

并发编程01

并发工具类

  • CountDownLatch :

允许一个或多个线程等待,直到在其他线程完成工作等待的线程再执行(增强版Join).

场景:初始化线程执行后其他线程才可执行。

public class UseCountDownLatch {

    //声明CountDownLatch 计数总值 6
	static CountDownLatch latch = new CountDownLatch(6);

	//初始化线程
    private static class InitThread implements Runnable{

        @Override
        public void run() {
        	System.out.println(Thread.currentThread().getName() +" InitThread init ...");
        	latch.countDown();//初始化线程完成工作了,countDown方法只扣减一次;
            System.out.println(Thread.currentThread().getName() +" ...InitThread work"); //继续执行
        }
    }
    
    //业务线程
    private static class BusiThread implements Runnable{

        @Override
        public void run() {
        	try {
				latch.await(); //业务线程等待  countDown 计数值扣减完 才能继续执行
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
            System.out.println(Thread.currentThread().getName() +" BusiThread business");
        }
    }

    public static void main(String[] args) throws InterruptedException {
    	//单独的初始化线程 countDown 2次
        new Thread(new Runnable() {
            @Override
            public void run() {
            	SleepTools.ms(1);
                System.out.println(Thread.currentThread().getName() +"  ready init work step 1st......");
                latch.countDown();//每完成一步初始化工作,扣减一次
                System.out.println("begin step 2nd.......");
                SleepTools.ms(1); //休眠1毫秒
                System.out.println(Thread.currentThread().getName() +" ready init work step 2nd......");
                latch.countDown();//每完成一步初始化工作,扣减一次
            }
        },"匿名内部类线程").start();

        new Thread(new BusiThread()).start(); //业务线程

        for(int i=0;i<4;i++){ //启动4个初始化线程  countDown 4次
            Thread thread = new Thread(new InitThread()); //初始化线程
            thread.start();
        }

        latch.await(); //主线程等待  countDown 计数值扣减完
        System.out.println("Main do ites work........");
    }
}
           
并发编程01
  • Semaphore :

    场景:控制线程信号量

    信号量Semaphore是一个并发工具类,用来控制可同时并发的线程数,每次线程执行操作时先通过acquire方法获得许可,执行完毕再通过release方法释放许可。如果无可用许可,那么acquire方法将一直阻塞,直到其它线程释放许可。

线程池用来控制实际工作的线程数量,通过线程复用的方式来减小内存开销。线程池可同时工作的线程数量是一定的,超过该数量的线程需进入线程队列等待,直到有可用的工作线程来执行任务。

使用Semphore,你创建了多少线程,实际就会有多少线程进行执行,只是可同时执行的线程数量会受到限制。但使用线程池,你创建的线程只是作为任务提交给线程池执行,实际工作的线程由线程池创建,并且实际工作的线程数量由线程池自己管理。

简单来说,线程池实际工作的线程是work线程,不是你自己创建的,是由线程池创建的,并由线程池自动控制实际并发的work线程数量。而Seamphore相当于一个信号灯,作用是对线程做限流,Seamphore可以对你自己创建的的线程做限流(也可以对线程池的work线程做限流),Seamphore的限流必须通过手动acquire和release来实现。

public class Test {
    //线程信号量  许可证2
    static Semaphore semaphore = new Semaphore(2);

    public static void main(String[] args) {

       Thread thread01 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    semaphore.acquire(); //获取许可证,如果获取不到则阻塞(挂起)
                    System.out.println(Thread.currentThread().getName()+"run...");
                    Thread.sleep(2000);
                    semaphore.release(); //释放许可证
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"thread01");

        Thread thread02 =new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    semaphore.acquire(); //获取许可证,如果获取不到则阻塞(挂起)
                    System.out.println(Thread.currentThread().getName()+"run...");
                    Thread.sleep(5000);
                    semaphore.release(); //释放许可证
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"thread02");


        Thread thread03 =new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    semaphore.acquire();  //获取许可证,如果获取不到则阻塞(挂起)
                    System.out.println(Thread.currentThread().getName()+"run...");
                    semaphore.release();//释放许可证
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"thread03");

        //启动三个线程使用同一信号量
        thread01.start(); //休眠2秒
        thread02.start(); //休眠5秒
        thread03.start(); //只能等到前两个线程其中一个释放许可证之后才执行
        //注意:线程池控制的是线程数,信号量不控制线程数只控制并发量
    }
}
           
  • CyclicBarrier(线程计数器)

    场景:初始化线程执行后其他线程才可执行。

parties 线程数
当线程数同时或者状态是await时才能继续往下执行否则等待
public CyclicBarrier(int parties) 
           
parties 线程数
barrierAction 执行线程
当指定的线程数全部await时,barrierAction 线程执行
public CyclicBarrier(int parties, Runnable barrierAction)  有参构造
           
public class UseCyclicBarrier {
    //线程计数器 值5,工作线程 === 当其他线程同时await是CollectThread线程才可执行
	private static CyclicBarrier barrier = new CyclicBarrier(5,new CollectThread());
    //存放子线程工作结果的容器
    private static ConcurrentHashMap<String,Long> resultMap = new ConcurrentHashMap<>();

    public static void main(String[] args) {
        for(int i=0;i<=4;i++){
            Thread thread = new Thread(new SubThread()); //开启5个初始化线程
            thread.start();
        }

    }

    //工作线程
    private static class CollectThread implements Runnable{

        @Override
        public void run() {
            StringBuilder result = new StringBuilder();
            for(Map.Entry<String,Long> workResult : resultMap.entrySet()){
            	result.append("["+workResult.getValue()+"]");
            }
            System.out.println(" the result = "+ result); //输出线程ID
        }
    }

    //初始化线程
    private static class SubThread implements Runnable{

        @Override
        public void run() {
            Long thread_id = Thread.currentThread().getId(); //线程ID
            String thread_name = Thread.currentThread().getName();//线程本身的处理结果
            resultMap.put(thread_name,thread_id); //线程ID name 存入map
            Random r = new Random();//随机决定工作线程的是否睡眠
            try {
                if(r.nextBoolean()) { //随机决定工作线程的是否睡眠
                    System.out.println(thread_name+" ....do sleep 2 ");
                    Thread.sleep(2000); //休眠2秒
                }
                System.out.println(thread_name+"....is await");
                barrier.await();  //线程等待
            	Thread.sleep(1000); //线程休眠1秒
                System.out.println(thread_name+" ....do its business ");
            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    }
}
           
并发编程01

注意:CyclicBarrier 和 CountDownLatch 的区别在于CyclicBarrier的线程计数器和线程数是相等的(一组线程全部或同时不分先后处于await状态时,指定的线程或其他线程才可执行),CountDownLatch 线程计数器和线程数是不相等的(线程大于线程计数值并且线程计数值扣减完之后其他线程才可执行)

  • Exchanger (两个线程间数据交换)
public class Test {
    //Exchanger 两个线程间数据交换
    static Exchanger<HashSet> exchanger = new Exchanger<>();

    //线程A
    public static class A extends Thread {
        @Override
        public void run() {
            HashSet<String> setA = new HashSet<>();
            setA.add("A");
            try {
                HashSet exchange = exchanger.exchange(setA);  //获取B线程数据
                Stream.of(exchange.toArray()).forEach((str)-> System.out.println("线程A 值:"+str));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    //线程B
    public static class B extends Thread {
        @Override
        public void run() {

            HashSet<String> setB = new HashSet<>();
            setB.add("B");
            try {
                HashSet exchange = exchanger.exchange(setB); //获取A线程数据
                Stream.of(exchange.toArray()).forEach((str)-> System.out.println("线程B 值:"+str));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }



    public static void main(String[] args) throws InterruptedException {
        new Thread(new A(),"A").start();
        new Thread(new B(),"B").start();
    }
           
并发编程01

继续阅读