天天看點

Java并發程式設計進階内容介紹

計數器:CountDownLatch

CountDownLatch類似于一個計數器,和Atomic類比較相近,操作是原子的,即多個線程同時隻能有一個可以去操作。CountDownLatch對象設定一個初始的數字作為計數值,任何調用這個對象上的await()方法都會阻塞,直到這個計數器的計數值被其他的線程調用countDown()減為0為止。典型的應用場景就是:有一個任務想要往下執行,但必須要等到其他的任務執行完畢後才可以繼續往下執行。例如在Zookeeper的使用過程中,由于用戶端與伺服器建立連接配接是異步調用的,是以主線程需要await()阻塞直至異步回調countDown()完成。

代碼示例:

public class CountDownLatchTest {


    public static void main(String[] args) {

        final CountDownLatch countDownLatch = new CountDownLatch(2);

        Thread work1 = new Thread(new Runnable() {
            @Override
            public void run() {

                System.out.println(Thread.currentThread() + " doing work...start");

                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                System.out.println(Thread.currentThread() + " doing work...end ");
                countDownLatch.countDown();
            }
        },"work1");


        Thread work2 = new Thread(new Runnable() {
            @Override
            public void run() {

                System.out.println(Thread.currentThread().getName() + " doing work...start");

                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                System.out.println(Thread.currentThread().getName() + " doing work...end ");
                countDownLatch.countDown();
            }
        },"work2");

        work1.start();
        work2.start();

        try {
            countDownLatch.await();
            System.out.println("all workers finish ");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }


    }


}      

齊步走:CyclicBarrier

Barrier的意思是栅欄,就是讓一組線程互相等待,直至所有線程都到齊了,那麼就可以齊步走。Cyclic是循環的意思,就是說Barrier可以循環使用。CyclicBarrier主要的方法就是await(),較CountDownLatch的await()雖然都是阻塞,但是CyclicBarrier.await()有傳回值int,即目前線程是第幾個到達這個Barrier的線程。

構造CyclicBarrier時指定計數值,await() 方法每被調用一次,計數便會減少1,并阻塞住目前線程。當計數減至0時,阻塞解除,所有在此 CyclicBarrier 上面阻塞的線程開始運作。在這之後,如果再次調用 await() 方法,計數就又會變成 N-1,新一輪重新開始。在構造方法上還可以傳遞一個Runnable對象,阻塞解除時這個Runnable會得到運作。

CyclicBarrier有點“不見不散”的味道,想一想,如果某個成員因某種原因來不了Barrier這個地方,那麼我們一直等待嗎?實際中,如果來不了理應通知其他成員,别等了,回家吧!注意到CyclicBarrier.await()獨有的BrokenBarrierException異常

Java并發程式設計進階内容介紹
public class CyclicBarrierTest {

    public static void main(String[] args) {

        final CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable() {
            @Override
            public void run() {
                System.out.println("都準備好啦!");
            }
        });

        Thread runman1 = new Thread(new Runnable() {
            @Override
            public void run() {

                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                try {
                    cyclicBarrier.await();
                    System.out.println(Thread.currentThread().getName() + "i am ok");

                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        },"runman1");


        Thread runman2 = new Thread(new Runnable() {
            @Override
            public void run() {

                try {
                    cyclicBarrier.await();
                    System.out.println(Thread.currentThread().getName() + "i am ok");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        },"runman2");

        runman1.start();
        runman2.start();

    }

}      

Callable And Future

在部落客以前的部落格《Java Future模式實作》中有介紹Future模式,Future模式非常适合在處理耗時很長的業務邏輯,可以有效的減少系統的響應時間,提高系統的吞吐量。JDK其實已經為我們提供了API實作,我們來看一段代碼即可:

public class FutureTest {


    public static void main(String[] args) {

        FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
            @Override
            public String call() throws Exception {

                Thread.sleep(2000);
                return "ok";
            }
        });

        ExecutorService es = Executors.newFixedThreadPool(1);
        es.submit(futureTask);
        System.out.println("開啟線程去異步處理,主線程繼續往下執行!");
        try {

            System.out.println("取得異步處理結果:" + futureTask.get());

        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }


    }

}      

注意到線程池執行任務,可以利用2個方法:

Java并發程式設計進階内容介紹
Java并發程式設計進階内容介紹
Java并發程式設計進階内容介紹

submit和execute有什麼差別呢?從入參和結果類型就知道了。

信号量:Semaphore

Semaphore實作的功能就類似廁所有5個坑,假如有10個人要上廁所,那麼顯然同時隻能有5個人占用廁所,當5個人中的任何一個人讓開後,其中等待的另外5個人中又有一個人可以占用了。另外等待的5個人中可以是随機獲得優先機會,也可以是按照先來後到的順序獲得機會,這取決于構造Semaphore對象時傳入的fair參數選項。

Semaphore可以控制某個資源可被同時通路的個數(構造方法傳入),通過 acquire() 擷取一個許可,如果沒有就等待,而 release() 釋放一個許可。

public static void main(String[] args) {

    final Semaphore semaphore = new Semaphore(5);

    for(int i = 0 ; i < 6 ; i++){

        new Thread(new Runnable() {
            @Override
            public void run() {

                try {
                    semaphore.acquire();

                    System.out.println(Thread.currentThread().getName() + " 運作...");

                    Thread.sleep(1000);

                    semaphore.release();

                    System.out.println(Thread.currentThread().getName() + " 結束...");


                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        },String.valueOf(i)).start();


    }

}      

Condition

JDK由原始的synchronized發展到Lock,以類的方式提供鎖機制,發展出重入鎖、讀寫鎖,以類的形式存在自然功能更加強大靈活,比如可以tryLock進行鎖的嗅探。在synchronized代碼塊中我們可以使用wait/notify/notifyAll來進行線程的協同工作,那麼JDK也發展了這一塊,即Condition。Condition.await類似于wait,Condition.signal/signalAll類似于notify/nofityAll。下面我們簡單實作一個Condition版的生産者/消費者。

處理核心:Handler

public class Handler {

    //容器
    private LinkedList<String> linkedList = new LinkedList<String>();

    //限制
    private int MAX_SIZE = 3;

    //鎖
    private Lock lock = new ReentrantLock();

    //condition  實際上,可以new多個condition,這裡暫且隻是用給一個
    private Condition condition = lock.newCondition();

    public void put(String bread){

        try{
            lock.lock();

            if(linkedList.size() == MAX_SIZE){
                System.out.println("容器已滿");
                condition.await();
            }

            linkedList.add(bread);
            System.out.println("放入面包" + bread);
            condition.signalAll();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }

    }

    public void eat(){

        try{
            lock.lock();

            if(linkedList.size() == 0){
                System.out.println("容器為空");
                condition.await();
            }

            String bread = linkedList.removeFirst();
            System.out.println("吃掉一個面包" + bread);
            condition.signalAll();

        }catch(Exception e){

            e.printStackTrace();
        }finally {
            lock.unlock();
        }


    }

}      

生産者:Produce

public class Produce implements Runnable{

    private Handler handler;

    public Produce(Handler handler) {
        this.handler = handler;
    }

    @Override
    public void run() {

        for(int i = 0 ; i < 10 ; i++){
            try {
                Thread.sleep(new Random().nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            handler.put(String.valueOf(i));
        }

    }
}      

消費者:Consume

public class Consume implements Runnable{

    private Handler handler;

    public Consume(Handler handler) {
        this.handler = handler;
    }

    @Override
    public void run() {
        for (int i = 0 ; i < 10 ; i++){
            try {
                Thread.sleep(new Random().nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            handler.eat();
        }
    }
}      

Main:

public class Main {

    public static void main(String[] args) {

        Handler handler = new Handler();

        Produce produce = new Produce(handler);
        Consume consume = new Consume(handler);

        new Thread(consume).start();
        new Thread(produce).start();
        new Thread(produce).start();
    }
}