天天看點

并發程式設計01

并發工具類

  • CountDownLatch :

允許一個或多個線程等待,直到在其他線程完成工作等待的線程再執行(增強版Join).

場景:初始化線程執行後其他線程才可執行。

public class UseCountDownLatch {

    //聲明CountDownLatch 計數總值 6
	static CountDownLatch latch = new CountDownLatch(6);

	//初始化線程
    private static class InitThread implements Runnable{

        @Override
        public void run() {
        	System.out.println(Thread.currentThread().getName() +" InitThread init ...");
        	latch.countDown();//初始化線程完成工作了,countDown方法隻扣減一次;
            System.out.println(Thread.currentThread().getName() +" ...InitThread work"); //繼續執行
        }
    }
    
    //業務線程
    private static class BusiThread implements Runnable{

        @Override
        public void run() {
        	try {
				latch.await(); //業務線程等待  countDown 計數值扣減完 才能繼續執行
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
            System.out.println(Thread.currentThread().getName() +" BusiThread business");
        }
    }

    public static void main(String[] args) throws InterruptedException {
    	//單獨的初始化線程 countDown 2次
        new Thread(new Runnable() {
            @Override
            public void run() {
            	SleepTools.ms(1);
                System.out.println(Thread.currentThread().getName() +"  ready init work step 1st......");
                latch.countDown();//每完成一步初始化工作,扣減一次
                System.out.println("begin step 2nd.......");
                SleepTools.ms(1); //休眠1毫秒
                System.out.println(Thread.currentThread().getName() +" ready init work step 2nd......");
                latch.countDown();//每完成一步初始化工作,扣減一次
            }
        },"匿名内部類線程").start();

        new Thread(new BusiThread()).start(); //業務線程

        for(int i=0;i<4;i++){ //啟動4個初始化線程  countDown 4次
            Thread thread = new Thread(new InitThread()); //初始化線程
            thread.start();
        }

        latch.await(); //主線程等待  countDown 計數值扣減完
        System.out.println("Main do ites work........");
    }
}
           
并發程式設計01
  • Semaphore :

    場景:控制線程信号量

    信号量Semaphore是一個并發工具類,用來控制可同時并發的線程數,每次線程執行操作時先通過acquire方法獲得許可,執行完畢再通過release方法釋放許可。如果無可用許可,那麼acquire方法将一直阻塞,直到其它線程釋放許可。

線程池用來控制實際工作的線程數量,通過線程複用的方式來減小記憶體開銷。線程池可同時工作的線程數量是一定的,超過該數量的線程需進入線程隊列等待,直到有可用的工作線程來執行任務。

使用Semphore,你建立了多少線程,實際就會有多少線程進行執行,隻是可同時執行的線程數量會受到限制。但使用線程池,你建立的線程隻是作為任務送出給線程池執行,實際工作的線程由線程池建立,并且實際工作的線程數量由線程池自己管理。

簡單來說,線程池實際工作的線程是work線程,不是你自己建立的,是由線程池建立的,并由線程池自動控制實際并發的work線程數量。而Seamphore相當于一個信号燈,作用是對線程做限流,Seamphore可以對你自己建立的的線程做限流(也可以對線程池的work線程做限流),Seamphore的限流必須通過手動acquire和release來實作。

public class Test {
    //線程信号量  許可證2
    static Semaphore semaphore = new Semaphore(2);

    public static void main(String[] args) {

       Thread thread01 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    semaphore.acquire(); //擷取許可證,如果擷取不到則阻塞(挂起)
                    System.out.println(Thread.currentThread().getName()+"run...");
                    Thread.sleep(2000);
                    semaphore.release(); //釋放許可證
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"thread01");

        Thread thread02 =new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    semaphore.acquire(); //擷取許可證,如果擷取不到則阻塞(挂起)
                    System.out.println(Thread.currentThread().getName()+"run...");
                    Thread.sleep(5000);
                    semaphore.release(); //釋放許可證
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"thread02");


        Thread thread03 =new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    semaphore.acquire();  //擷取許可證,如果擷取不到則阻塞(挂起)
                    System.out.println(Thread.currentThread().getName()+"run...");
                    semaphore.release();//釋放許可證
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"thread03");

        //啟動三個線程使用同一信号量
        thread01.start(); //休眠2秒
        thread02.start(); //休眠5秒
        thread03.start(); //隻能等到前兩個線程其中一個釋放許可證之後才執行
        //注意:線程池控制的是線程數,信号量不控制線程數隻控制并發量
    }
}
           
  • CyclicBarrier(線程計數器)

    場景:初始化線程執行後其他線程才可執行。

parties 線程數
當線程數同時或者狀态是await時才能繼續往下執行否則等待
public CyclicBarrier(int parties) 
           
parties 線程數
barrierAction 執行線程
當指定的線程數全部await時,barrierAction 線程執行
public CyclicBarrier(int parties, Runnable barrierAction)  有參構造
           
public class UseCyclicBarrier {
    //線程計數器 值5,工作線程 === 當其他線程同時await是CollectThread線程才可執行
	private static CyclicBarrier barrier = new CyclicBarrier(5,new CollectThread());
    //存放子線程工作結果的容器
    private static ConcurrentHashMap<String,Long> resultMap = new ConcurrentHashMap<>();

    public static void main(String[] args) {
        for(int i=0;i<=4;i++){
            Thread thread = new Thread(new SubThread()); //開啟5個初始化線程
            thread.start();
        }

    }

    //工作線程
    private static class CollectThread implements Runnable{

        @Override
        public void run() {
            StringBuilder result = new StringBuilder();
            for(Map.Entry<String,Long> workResult : resultMap.entrySet()){
            	result.append("["+workResult.getValue()+"]");
            }
            System.out.println(" the result = "+ result); //輸出線程ID
        }
    }

    //初始化線程
    private static class SubThread implements Runnable{

        @Override
        public void run() {
            Long thread_id = Thread.currentThread().getId(); //線程ID
            String thread_name = Thread.currentThread().getName();//線程本身的處理結果
            resultMap.put(thread_name,thread_id); //線程ID name 存入map
            Random r = new Random();//随機決定工作線程的是否睡眠
            try {
                if(r.nextBoolean()) { //随機決定工作線程的是否睡眠
                    System.out.println(thread_name+" ....do sleep 2 ");
                    Thread.sleep(2000); //休眠2秒
                }
                System.out.println(thread_name+"....is await");
                barrier.await();  //線程等待
            	Thread.sleep(1000); //線程休眠1秒
                System.out.println(thread_name+" ....do its business ");
            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    }
}
           
并發程式設計01

注意:CyclicBarrier 和 CountDownLatch 的差別在于CyclicBarrier的線程計數器和線程數是相等的(一組線程全部或同時不分先後處于await狀态時,指定的線程或其他線程才可執行),CountDownLatch 線程計數器和線程數是不相等的(線程大于線程計數值并且線程計數值扣減完之後其他線程才可執行)

  • Exchanger (兩個線程間資料交換)
public class Test {
    //Exchanger 兩個線程間資料交換
    static Exchanger<HashSet> exchanger = new Exchanger<>();

    //線程A
    public static class A extends Thread {
        @Override
        public void run() {
            HashSet<String> setA = new HashSet<>();
            setA.add("A");
            try {
                HashSet exchange = exchanger.exchange(setA);  //擷取B線程資料
                Stream.of(exchange.toArray()).forEach((str)-> System.out.println("線程A 值:"+str));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    //線程B
    public static class B extends Thread {
        @Override
        public void run() {

            HashSet<String> setB = new HashSet<>();
            setB.add("B");
            try {
                HashSet exchange = exchanger.exchange(setB); //擷取A線程資料
                Stream.of(exchange.toArray()).forEach((str)-> System.out.println("線程B 值:"+str));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }



    public static void main(String[] args) throws InterruptedException {
        new Thread(new A(),"A").start();
        new Thread(new B(),"B").start();
    }
           
并發程式設計01

繼續閱讀