天天看點

Java并發包之閉鎖/栅欄/信号量

一、Java多線程總結:
  1. 描述線程的類:Runable和Thread都屬于java.lang包。
  2. 内置鎖synchronized屬于jvm關鍵字,内置條件隊列操作接口Object.wait()/notify()/notifyAll()屬于java.lang包。
  3. 提供記憶體可見性和防止指令重排的volatile屬于jvm關鍵字。
  4. 而java.util.concurrent包(J.U.C)中包含的是java并發程式設計中有用的一些工具類,包括幾個部分:
    • locks部分:包含在java.util.concurrent.locks包中,提供顯式鎖(互斥鎖和速寫鎖)相關功能。
    • atomic部分:包含在java.util.concurrent.atomic包中,提供原子變量類相關的功能,是建構非阻塞算法的基礎。
    • executor部分:散落在java.util.concurrent包中,提供線程池相關的功能。
    • collections部分:散落在java.util.concurrent包中,提供并發容器相關功能。
    • tools部分:散落在java.util.concurrent包中,提供同步工具類,如信号量、閉鎖、栅欄等功能。
      Java并發包之閉鎖/栅欄/信号量
二、同步工具類詳解

1、Semaphore信号量:跟鎖機制存在一定的相似性,semaphore也是一種鎖機制,所不同的是,reentrantLock是隻允許一個線程獲得鎖,而信号量持有多個許可(permits),允許多個線程獲得許可并執行。可以用來控制同時通路某個特定資源的操作數量,或者同時執行某個指定操作的數量。

示例代碼:

public class TIJ_semaphore {
      public static void main(String[] args) {
          ExecutorService exec = Executors.newCachedThreadPool();
          final Semaphore semp = new Semaphore(); // 5 permits
  
         for (int index = ; index < ; index++) {
             final int NO = index;
             Runnable run = new Runnable() {
                 public void run() {
                     try {
                           // if 1 permit avaliable, thread will get a permits and go; if no permit avaliable, thread will block until 1 avaliable
                         semp.acquire(); 
                         System.out.println("Accessing: " + NO);
                         Thread.sleep((long) ();
                         semp.release();
                     } catch (InterruptedException e) {
                     }
                 }
             };
             exec.execute(run);
         }
         exec.shutdown();
     }
           

2、CountDownLatch閉鎖:允許一個或多個線程一直等待,直到其他線程的操作執行完後再執行。CountDownLatch是通過一個計數器來實作的,計數器的初始值為線程的數量。每當一個線程完成了自己的任務後,計數器的值就會減1。當計數器值到達0時,它表示所有的線程已經完成了任務,然後在閉鎖上等待的線程就可以恢複執行任務。

主要方法:

1. CountDownLatch.await():将某個線程阻塞住,直到計數器count=0才恢複執行。

2. CountDownLatch.countDown():将計數器count減1。

使用場景:

1. 實作最大的并行性:有時我們想同時啟動多個線程,實作最大程度的并行性。例如,我們想測試一個單例類。如果我們建立一個初始計數為1的CountDownLatch,并讓所有線程都在這個鎖上等待,那麼我們可以很輕松地完成測試。我們隻需調用 一次countDown()方法就可以讓所有的等待線程同時恢複執行。

2. 開始執行前等待n個線程完成各自任務:例如應用程式啟動類要確定在處理使用者請求前,所有N個外部系統已經啟動和運作了。

3. 死鎖檢測:一個非常友善的使用場景是,你可以使用n個線程通路共享資源,在每次測試階段的線程數目是不同的,并嘗試産生死鎖。

4. 計算并發執行某個任務的耗時。

示例代碼:

public class CountDownLatchTest {  

    public void timeTasks(int nThreads, final Runnable task) throws InterruptedException{  
        final CountDownLatch startGate = new CountDownLatch();  
        final CountDownLatch endGate = new CountDownLatch(nThreads);  

        for(int i = ; i < nThreads; i++){  
            Thread t = new Thread(){  
                public void run(){  
                    try{  
                        startGate.await();  
                        try{  
                            task.run();  
                        }finally{  
                            endGate.countDown();  
                        }  
                    }catch(InterruptedException ignored){  

                    }  

                }  
            };  
            t.start();  
        }  

        long start = System.nanoTime();  
        System.out.println("打開閉鎖");  
        startGate.countDown();  
        endGate.await();  
        long end = System.nanoTime();  
        System.out.println("閉鎖退出,共耗時" + (end-start));  
    }  

    public static void main(String[] args) throws InterruptedException{  
        CountDownLatchTest test = new CountDownLatchTest();  
        test.timeTasks(, test.new RunnableTask());  
    }  

    class RunnableTask implements Runnable{  

        @Override  
        public void run() {  
            System.out.println("目前線程為:" + Thread.currentThread().getName());  

        }     
    }  

執行結果為:  
打開閉鎖  
目前線程為:Thread-  
目前線程為:Thread-  
目前線程為:Thread-  
目前線程為:Thread-  
目前線程為:Thread-  
閉鎖退出,共耗時  
           

3、CyclicBarrier栅欄:用于阻塞一組線程直到某個事件發生。所有線程必須同時到達栅欄位置才能繼續執行下一步操作,且能夠被重置以達到重複利用。而閉鎖是一次性對象,一旦進入終止狀态,就不能被重置。

示例代碼:

public class CyclicBarrierTest {
    private final CyclicBarrier barrier;
    private final Worker[] workers;

    public CyclicBarrierTest(){
        int count = Runtime.getRuntime().availableProcessors();
        this.barrier = new CyclicBarrier(count,
                new Runnable(){

                    @Override
                    public void run() {
                        System.out.println("所有線程均到達栅欄位置,開始下一輪計算");
                    }

        });
        this.workers = new Worker[count];
        for(int i = ; i< count;i++){
            workers[i] = new Worker(i);
        }
    }
    private class Worker implements Runnable{
        int i;

        public Worker(int i){
            this.i = i;
        }

        @Override
        public void run() {
            for(int index = ; index < ;index++){
                System.out.println("線程" + i + "第" + index + "次到達栅欄位置,等待其他線程到達");
                try {
                    //注意是await,而不是wait
                    barrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    return;
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                    return;
                }
            }
        }

    }

    public void start(){
        for(int i=;i<workers.length;i++){
            new Thread(workers[i]).start();
        }
    }

    public static void main(String[] args){
        new CyclicBarrierTest().start();
    }
}

執行結果為:  
線程第次到達栅欄位置,等待其他線程到達  
線程第次到達栅欄位置,等待其他線程到達  
線程第次到達栅欄位置,等待其他線程到達  
線程第次到達栅欄位置,等待其他線程到達  
所有線程均到達栅欄位置,開始下一輪計算  
線程第次到達栅欄位置,等待其他線程到達  
線程第次到達栅欄位置,等待其他線程到達  
線程第次到達栅欄位置,等待其他線程到達  
線程第次到達栅欄位置,等待其他線程到達  
所有線程均到達栅欄位置,開始下一輪計算 
           

繼續閱讀