天天看點

Java并發程式設計一CountDownLatch、CyclicBarrier、Semaphore初使用

推薦:​​Java并發程式設計彙總​​

Java并發程式設計一CountDownLatch、CyclicBarrier、Semaphore初使用

​CountDownLatch​

​​、​

​CyclicBarrier​

​​、​

​Semaphore​

​​這些線程協作工具類是基于​

​AQS​

​的,看完這篇部落格後可以去看下面這篇部落格,了解它們是如何實作的。

​​Java并發之AQS詳解​​

CountDownLatch

​CountDownLatch​

​可以實作一個線程等待多個線程、多個線程等待一個線程、多個線程等待多個線程(這裡不涉及)。

我們首先來看看怎麼實作一個線程等待多個線程吧。

Java并發程式設計一CountDownLatch、CyclicBarrier、Semaphore初使用

工廠中,對産品需要進行質檢,​

​5​

​​個勞工進行檢查,所有人都認為通過,這個産品才算通過。

代碼:

package flowcontrol.countdownlatch;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountDownLatchDemo1 {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(5);
        ExecutorService service = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            final int no = i + 1;
            Runnable runnable = new Runnable() {

                @Override
                public void run() {
                    try {
                        Thread.sleep((long) (Math.random() * 10000));
                        System.out.println("No." + no + "完成了檢查。");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        latch.countDown();
                    }
                }
            };
            service.submit(runnable);
        }
        System.out.println("等待5個人檢查完.....");
        latch.await();
        System.out.println("所有人都完成了工作,進入下一個環節。");
    }
}      

輸出:

等待5個人檢查完.....
No.4完成了檢查。
No.3完成了檢查。
No.2完成了檢查。
No.1完成了檢查。
No.5完成了檢查。
所有人都完成了工作,進入下一個環節。      

輸出也符合預期。

我們來看一看程式中使用的​

​CountDownLatch​

​構造器源碼。

public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }      

進入​

​Sync(count)​

​​方法,​

​Sync​

​​是​

​CountDownLatch​

​​的内部類,它繼承了​

​AbstractQueuedSynchronizer​

​​,也就是傳說中​

​AQS​

​​,我們調用的​

​CountDownLatch​

​​構造器傳入的​

​count​

​​其實對應于​

​AQS​

​​的​

​state​

​​,是以現在隻需要知道怎麼用​

​CountDownLatch​

​​,知道用法後再去看底層就會更加容易了解,這裡就不多說​

​AQS​

​了。

private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }      

再來實作一下多個線程等待一個線程。

模拟100米跑步,5名選手都準備好了,隻等裁判員一聲令下,所有人同時開始跑步。

代碼:

package flowcontrol.countdownlatch;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountDownLatchDemo2 {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch begin = new CountDownLatch(1);
        ExecutorService service = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            final int no = i + 1;
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    System.out.println("No." + no + "準備完畢,等待發令槍");
                    try {
                        begin.await();
                        System.out.println("No." + no + "開始跑步了");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            service.submit(runnable);
        }
        //裁判員檢查發令槍...
        Thread.sleep(5000);
        System.out.println("發令槍響,比賽開始!");
        begin.countDown();
    }
}      

輸出:

No.1準備完畢,等待發令槍
No.4準備完畢,等待發令槍
No.3準備完畢,等待發令槍
No.2準備完畢,等待發令槍
No.5準備完畢,等待發令槍
發令槍響,比賽開始!
No.1開始跑步了
No.4開始跑步了
No.3開始跑步了
No.2開始跑步了
No.5開始跑步了      

輸出也符合預期。

結合前面兩種實作,我們來實作一個複合版的。

模拟100米跑步,5名選手都準備好了,隻等裁判員一聲令下,所有人同時開始跑步。當所有人都到終點後,比賽結束。

代碼:

package flowcontrol.countdownlatch;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountDownLatchDemo1And2 {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch begin = new CountDownLatch(1);

        CountDownLatch end = new CountDownLatch(5);
        ExecutorService service = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            final int no = i + 1;
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    System.out.println("No." + no + "準備完畢,等待發令槍");
                    try {
                        begin.await();
                        System.out.println("No." + no + "開始跑步了");
                        Thread.sleep((long) (Math.random() * 10000));
                        System.out.println("No." + no + "跑到終點了");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        end.countDown();
                    }
                }
            };
            service.submit(runnable);
        }
        //裁判員檢查發令槍...
        Thread.sleep(5000);
        System.out.println("發令槍響,比賽開始!");
        begin.countDown();

        end.await();
        System.out.println("所有人到達終點,比賽結束");
    }
}      

輸出:

No.1準備完畢,等待發令槍
No.4準備完畢,等待發令槍
No.3準備完畢,等待發令槍
No.2準備完畢,等待發令槍
No.5準備完畢,等待發令槍
發令槍響,比賽開始!
No.1開始跑步了
No.4開始跑步了
No.3開始跑步了
No.2開始跑步了
No.5開始跑步了
No.1跑到終點了
No.4跑到終點了
No.5跑到終點了
No.3跑到終點了
No.2跑到終點了
所有人到達終點,比賽結束      

輸出也符合預期。

CyclicBarrier

​CyclicBarrier​

​可以讓多個線程一起互相等待(阻塞),當互相等待的線程數量達到我們指定的數量時,它們就可以不用再阻塞了,可以接着執行了,不過在它們接着執行之前會觸發一個我們指定的線程,具體看代碼吧。

代碼:

package flowcontrol.cyclicbarrier;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
            @Override
            public void run() {
                System.out.println("所有人都到場了, 大家統一出發!");
            }
        });
        for (int i = 0; i < 10; i++) {
            new Thread(new Task(i, cyclicBarrier)).start();
        }
    }

    static class Task implements Runnable{
        private int id;
        private CyclicBarrier cyclicBarrier;

        public Task(int id, CyclicBarrier cyclicBarrier) {
            this.id = id;
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            System.out.println("線程" + id + "現在前往集合地點");
            try {
                Thread.sleep((long) (Math.random()*10000));
                System.out.println("線程"+id+"到了集合地點,開始等待其他人到達");
                cyclicBarrier.await();
                System.out.println("線程"+id+"出發了");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}      

輸出:

線程0現在前往集合地點
線程2現在前往集合地點
線程5現在前往集合地點
線程3現在前往集合地點
線程4現在前往集合地點
線程9現在前往集合地點
線程1現在前往集合地點
線程8現在前往集合地點
線程7現在前往集合地點
線程6現在前往集合地點
線程6到了集合地點,開始等待其他人到達
線程2到了集合地點,開始等待其他人到達
線程4到了集合地點,開始等待其他人到達
線程0到了集合地點,開始等待其他人到達
線程7到了集合地點,開始等待其他人到達
所有人都到場了, 大家統一出發!
線程7出發了
線程6出發了
線程2出發了
線程4出發了
線程0出發了
線程5到了集合地點,開始等待其他人到達
線程3到了集合地點,開始等待其他人到達
線程8到了集合地點,開始等待其他人到達
線程9到了集合地點,開始等待其他人到達
線程1到了集合地點,開始等待其他人到達
所有人都到場了, 大家統一出發!
線程1出發了
線程5出發了
線程3出發了
線程8出發了
線程9出發了      

輸出符合預期。

Semaphore

​Semaphore​

​​可以控制一起執行線程的數量,也就是并發數,當某個操作很耗資源時,我們不允許太大的并發量時,我們可以使用​

​Semaphore​

​來進行限制并發量,如下圖所示。

Java并發程式設計一CountDownLatch、CyclicBarrier、Semaphore初使用

這是通過信号量來實作的,也對應于​

​AQS​

​​的​

​state​

​,我們給某個服務設定一定數量的信号量,當一個線程來擷取資源時,信号量就減一些(數量由我們自己指定,根據業務需求),沒有信号量的線程就得阻塞。

Java并發程式設計一CountDownLatch、CyclicBarrier、Semaphore初使用

當線程成功擷取資源後,也就是線程可以結束了,該線程會返還使用的信号量,以供其他線程使用。

Java并發程式設計一CountDownLatch、CyclicBarrier、Semaphore初使用
Java并發程式設計一CountDownLatch、CyclicBarrier、Semaphore初使用

代碼:

package flowcontrol.semaphore;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class SemaphoreDemo {

    static Semaphore semaphore = new Semaphore(4, true);

    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(50);
        for (int i = 0; i < 100; i++) {
            service.submit(new Task());
        }
        service.shutdown();
    }

    static class Task implements Runnable {

        @Override
        public void run() {
            try {
                semaphore.acquire(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "拿到了許可證");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "釋放了許可證");
            semaphore.release(2);
        }
    }
}      
pool-1-thread-1拿到了許可證
pool-1-thread-2拿到了許可證
pool-1-thread-1釋放了許可證
pool-1-thread-2釋放了許可證
pool-1-thread-5拿到了許可證
pool-1-thread-6拿到了許可證
pool-1-thread-5釋放了許可證
pool-1-thread-6釋放了許可證
pool-1-thread-3拿到了許可證
pool-1-thread-4拿到了許可證
pool-1-thread-3釋放了許可證
pool-1-thread-4釋放了許可證
pool-1-thread-7拿到了許可證
pool-1-thread-8拿到了許可證
pool-1-thread-8釋放了許可證
pool-1-thread-7釋放了許可證