天天看点

Java并发编程中的同步器

1. CyclicBarrier【参考文献】

  • 和 CountDownLatch 类似
  • 线程会等待,直到足够多线程达到了事先规定的数据。一旦触发条件,就可以进行下一步的操作
  • 适用于线程之间相互等待处理结果就绪的场景
  • CyclicBarrier

    可以构造一个集结点,当某一个线程执行完毕,它就会到集结点等待,直到所有线程都到了集结点,那么该栅栏就会被撤销,所有线程再统一出发,继续执行剩下的任务

用法一:等待所有人到达指定地点,再统一出发

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
            @Override
            public void run() {
                System.out.println("所有人都到了,大家统一出发");
            }
        });

        for (int i = 0; i < 5; i++) {
            new Thread(new Task(cyclicBarrier)).start();
        }
    }

    static class Task implements Runnable {

        private final CyclicBarrier cyclicBarrier;

        public Task(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " 现在前往集合地点");
                Thread.sleep((long) (Math.random() * 10000));
                System.out.println(Thread.currentThread().getName() + " 已经到了集合地点,等待其他人到达");
                cyclicBarrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}
           

用法二:👉 CyclicBarrier简单使用示例

2. CountDownLatch【参考文献】

  • 和 CyclicBarrier 类似,数量递减到0时,触发动作
  • 但是不可重复使用

主要方法:

  • CountDownLatch(int count) ,仅有一个构造函数,参数 count 为需要倒数的数值
  • await() ,调用 await() 方法的现场会被挂起,它会等待直到 count 值为0才继续执行
  • countDown() ,讲 count 值减1,直到为0时,等待的现场会被唤起

用法一:一个线程等待多个线程都执行完毕,再继续自己的工作(一等多)

import java.util.concurrent.*;

public class Test {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(5);
        ExecutorService service = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            final int no = i + 1;
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep((long) (Math.random() * 10000));
                        System.out.println("Num:" + no);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        latch.countDown();
                    }
                }
            };
            service.submit(runnable);
        }
        System.out.println("等待5个线程执行完....");
        latch.await();
        System.out.println("所有线程都已经执行完,可以进入下一个环节了。");
    }
}
           

用法二:多个线程等待某一个线程的信号,然后同时开始执行(多等一)

import java.util.concurrent.*;

/**
 * 模拟跑步比赛,5名选手等待1名裁判发令,发令后所有人同时开始跑步
 */
public class Test {

    public static void main(String[] args) throws InterruptedException {
        // 1名裁判
        CountDownLatch latch = new CountDownLatch(1);
        // 5名选手
        ExecutorService service = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            final int no = i + 1;
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println("NO." + no + "号选手准备完毕,等待发令");
                        latch.await();
                        System.out.println("NO." + no + "开始跑步");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            service.submit(runnable);
        }
        System.out.println("裁判检查发令枪....");
        Thread.sleep(2000);
        System.out.println("裁判检查完毕,比赛开始....");
        latch.countDown();
    }
}
           

CyclicBarrier 和 CountDownLatch 的区别

作用不同:CyclicBarrier要等固定数量的线程都到达了栅栏位置才能继续执行,而CountDownLatch只需等待数字到0,也就是说,CountDownLatch用于事件,但是CyclicBarrier是用于线程的

可重用性不同:CountDownLatch在倒数到0并触发门闩打开后,就不能再次使用了,除非新建新的实例;而CyclicBarrier可以重复使用

3. Semaphore【参考文献】

  • 信号量,可以通过控制“许可证”的数量,来保证线程之间的配合
  • 线程只有在拿到“许可证”后才能继续运行,相比于其他的同步器,更灵活

主要方法:

  1. new Semaphore(int permits, boolean

    fair),这里可以设置是否要设置公平策略,如果传入true,那么Semaphore会把之前等待的线程放到FIFO的队列里,以便于当有了新的许可证,可以分发给之前等了最长时间的线程;

  2. acquire(),获取许可证,没有的话会陷入阻塞状态;
  3. acquireUninterruptibly(),同上,但是可以响应中断;
  4. tryAcquire(),看看现在有没有空闲的许可证,如果有的话就获取,没有的话,不用陷入阻塞状态,可以去做其它的事,过一会再来查看许可证的空闲情况
  5. tryAcquire(long timeout, TimeUnit unit),和 tryAcquire()

    一样,但是多了一个超时时间,比如“在3秒内获取不到许可证,就去做别的事情”

  6. release(),操作完成后,归还许可证
  7. 可以指定获取和释放的许可证数量,获取和释放的数量必须一致

用法:

import java.util.concurrent.*;

public class Test {

    static Semaphore semaphore = new Semaphore(3, true);

    public static void main(String[] args) throws InterruptedException {
        ExecutorService service = Executors.newFixedThreadPool(50);
        for (int i = 0; i < 100; i++) {
            service.submit(new Task());
        }
        service.shutdown();
    }

    static class Task implements Runnable {
        @Override
        public void run() {
            try {
                semaphore.acquire();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println(Thread.currentThread().getName() + " 拿到了许可证");

            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println(Thread.currentThread().getName() + " 释放了许可证");
            semaphore.release();
        }
    }
}
           

4. Condition【参考文献】

  • 控制线程的“等待”和“唤醒”,Object.wait()的升级版

实现生产者消费者

import java.util.PriorityQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionDemo {

    private final int queueSize = 10;
    private final PriorityQueue<Integer> queue = new PriorityQueue<>(queueSize);
    private final Lock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

    public static void main(String[] args) {
        ConditionDemo demo = new ConditionDemo();
        Produce produce = demo.new Produce();
        Consume consume = demo.new Consume();
        produce.start();
        consume.start();
    }

    /**
     * 消费者
     */
    class Consume extends Thread {
        @Override
        public void run() {
            try {
                consume();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        private void consume() throws InterruptedException {
            while (true) {
                lock.lock();
                try {
                    while (queue.size() == 0) {
                        System.out.println("队列空,等待数据");
                        notEmpty.await();
                    }

                    // 从队列中取数据
                    queue.poll();
                    // 并且唤醒生产者
                    notFull.signal();

                    System.out.println("从队列中取走了一条数据,队列剩余:" + queue.size());

                } finally {
                    lock.unlock();
                }
            }
        }
    }

    /**
     * 生产者
     */
    class Produce extends Thread {
        @Override
        public void run() {
            try {
                produce();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }


        private void produce() throws InterruptedException {
            while (true) {
                lock.lock();
                try {
                    while (queue.size() == queueSize) {
                        System.out.println("队列满,等待消费者进行消费");
                        notFull.await();
                    }

                    // 开始生产数据
                    queue.offer(1);
                    // 并且唤醒消费者
                    notEmpty.signal();

                    System.out.println("向队列添加了一条数据,队列剩余:" + queue.size());

                } finally {
                    lock.unlock();
                }
            }
        }
    }
}
           

5. Phaser【参考文献 】

和 CyclicBarrier 类似

示例:4个线程执行完后做一次同步操作

import java.nio.file.Path;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;

public class PhaserTest {

    Phaser phaser = new Phaser();
    ExecutorService executorService = Executors.newCachedThreadPool();

    class Worker implements Runnable {
        @Override
        public void run() {
            phaser.register();
            while (true) {
                try {
                    Thread.sleep(500);
                    System.out.println("working:" + phaser.getPhase());
                    phaser.arriveAndAwaitAdvance();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public void run() throws InterruptedException {
        phaser.register();
        executorService.execute(new Worker());
        executorService.execute(new Worker());
        executorService.execute(new Worker());
        executorService.execute(new Worker());
        while (true) {
            phaser.arriveAndAwaitAdvance();
            System.out.println("Sync..." + phaser.getPhase());
        }
    }

    public static void main(String[] args) throws InterruptedException {
        var test = new PhaserTest();
        test.run();
    }
}
           

6. Exchanger

让两个线程在合适时交换数据

适用场景:当两个线程工作在同一个类的不同实例上时,用于交换数据

解决了什么问题:线程间高效交换数据

最后来自小编的福利

小编整理了一份大厂真题的面试资料,以及2021最新Java核心技术整理的资料集锦,需要领取的小伙伴可以 私聊关注我 免费领取 ,编程的世界永远向所有热爱编程的人开放,这是一个自由,平等,共享的世界,我始终是这样坚信的。

喜欢小编的分享可以点赞关注哦,小编持续为你分享最新文章 和 福利领取哦