天天看点

4种同步工具类:在Java中提升并发性能

作者:程序员的秃头之路

CountDownLatch

CountDownLatch 是Java中的一个同步工具类,它允许一个或多个线程等待直到在其他线程中执行的一组操作完成。它是存在于 java.util.concurrent 包中的一个类。

CountDownLatch 在内部维护一个计数器,这个计数器由调用方在创建 CountDownLatch 对象时设定。每次调用 countDown 方法都会使这个计数器减一。主要的方法是 await 和 countDown。

CountDownLatch 主要提供了两个方法:

1、void await() :调用这个方法会将当前线程置入阻塞状态,直到计数器的值为0,除非线程被中断。 2、void countDown() :此方法减少计数器的值,直到达到0。

主要应用场景包括:

1、允许一个或多个线程等待直到在其他线程中执行的一组操作完成。例如,你想在一个并行计算算法中,在任何部分开始之前等待所有的部分都准备好,或者在服务开始之前等待至关重要的服务全部启动。 2、用作一个同步辅助,当线程到达某个状态,或者完成某项任务时,它可以通知其他线程继续进行。例如,你可能有一个大型并行计算问题,你想在所有的处理器都完成时将结果合并。

以下是一个使用 CountDownLatch 的例子:

import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(3);

        new Thread(() -> {
            System.out.println("Thread 1 is doing work");
            latch.countDown();
            System.out.println("Thread 1 has finished its work");
        }).start();

        new Thread(() -> {
            System.out.println("Thread 2 is doing work");
            latch.countDown();
            System.out.println("Thread 2 has finished its work");
        }).start();

        new Thread(() -> {
            System.out.println("Thread 3 is doing work");
            latch.countDown();
            System.out.println("Thread 3 has finished its work");
        }).start();

        latch.await();  // Main thread waits for the other threads to finish
        System.out.println("All threads have finished their work, proceeding in the main thread");
    }
}           

这个例子中,我们创建了一个 CountDownLatch 并给它的计数器赋值为3。然后我们启动了三个线程,每个线程在完成其工作后都会调用 countDown 方法将计数器减一。在主线程中,我们调用 await 方法来等待其他线程完成他们的工作。当计数器达到0时,主线程将继续执行,输出 "All threads have finished their work, proceeding in the main thread"。

CyclicBarrier

CyclicBarrier 是 java.util.concurrent 包下的一个类,它是一个同步辅助工具,允许一组线程互相等待直到所有线程都准备就绪。

CyclicBarrier 主要用在多线程计算中,当所有线程都达到一个点(即barrier)时,这个barrier就会打开,然后所有的线程都会被释放,而barrier也可以被重置以便下一次使用。

主要方法包括:

1、CyclicBarrier(int parties) :构造一个新的 CyclicBarrier,当给定数量的线程(即parties)正在等待时,它将会打开。 2、CyclicBarrier(int parties, Runnable barrierAction) :构造一个新的 CyclicBarrier,当给定数量的线程正在等待时,它将会打开,并且在打开barrier之前执行给定的动作。 3、int await() :在所有参与者都已经在这个barrier上调用 await 方法之前,将会一直等待。 4、int await(long timeout, TimeUnit unit) :在所有参与者都已经在这个barrier上调用 await 方法之前,将会一直等待,或者如果超过了给定的等待时间,就会抛出 TimeoutException。

下面是一个使用 CyclicBarrier 的示例:

import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {
    public static void main(String[] args) {
        final int totalThread = 5;
        CyclicBarrier barrier = new CyclicBarrier(totalThread);

        for(int i=0;i<totalThread;i++){
            new Thread(() -> {
                System.out.println("Thread " + Thread.currentThread().getId() + " is ready");
                try {
                    barrier.await();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
                System.out.println("Thread " + Thread.currentThread().getId() + " is proceeding");
            }).start();
        }
    }
}           

在这个例子中,我们创建了一个 CyclicBarrier,并设置了需要5个线程就绪才能打开barrier。然后我们启动了5个线程,每个线程在准备好后就会调用 await 方法。当所有线程都准备好后,barrier就会打开,然后所有线程都会继续执行。

Semaphore

Semaphore 是一个计数信号量,用来控制同时访问特定资源的线程数量。在 java.util.concurrent 包中提供的 Semaphore 类是一个计数信号量的实现。在概念上,信号量维护了一组许可证。如果有必要,每个 acquire() 都会阻塞,直到获得许可证,然后 acquire() 才能获取许可。每个 release() 添加一个许可证,从而可能释放一个正在阻塞的 acquire()。

Semaphore 的主要方法包括:

1、Semaphore(int permits):构造一个 Semaphore 实例,指定可用的许可证数量。 2、Semaphore(int permits, boolean fair):构造一个 Semaphore 实例,指定可用的许可证数量,并设置是否保证线程的调度顺序。 3、void acquire():获取一个许可,若无许可可获,则会阻塞直到有许可。 4、void release():释放一个许可,增加可获取的许可数量。

下面是一个使用 Semaphore 的示例:

import java.util.concurrent.Semaphore;

public class SemaphoreDemo {
    public static void main(String[] args) {
        final int totalThread = 10;
        Semaphore semaphore = new Semaphore(5); // Only 5 threads are allowed to execute concurrently

        for(int i=0;i<totalThread;i++){
            new Thread(() -> {
                try {
                    semaphore.acquire();
                    System.out.println("Thread " + Thread.currentThread().getId() + " is working");
                    Thread.sleep(1000);
                } catch (InterruptedException ex) {
                    ex.printStackTrace();
                } finally {
                    System.out.println("Thread " + Thread.currentThread().getId() + " has finished its work");
                    semaphore.release();
                }
            }).start();
        }
    }
}           

在这个示例中,我们创建了一个 Semaphore,并设置了同时允许5个线程执行。然后我们启动了10个线程,每个线程在开始工作前都会尝试获取许可证。如果获取到许可证,线程就开始工作;如果没有获取到许可证,线程就会等待,直到有其他线程释放许可证。当线程完成工作后,它会释放许可证,以便其他线程可以获取许可证并开始工作。

Exchanger

Exchanger是Java中的一个同步工具类,它提供了一种在两个线程之间交换数据的简单机制。这个工具类位于java.util.concurrent包中。

Exchanger是一种两个线程可以进行一对一的信息交换的工具。当一个线程调用Exchanger对象的exchange方法后,它会阻塞等待另一个线程也调用exchange方法,当两个线程都调用了exchange方法,它们的数据就会进行交换,然后两个线程都可以继续执行。

Exchanger主要提供的方法有:

1、Exchanger():构造一个新的Exchanger对象。 2、V exchange(V x):等待另一个线程到达此交换点(除非当前线程被中断),然后将给定的对象传送给它,并接收它的对象。 3、V exchange(V x, long timeout, TimeUnit unit):等待另一个线程到达此交换点(除非当前线程被中断或超时),然后将给定的对象传送给它,并接收它的对象。

以下是一个简单的示例,展示了如何使用Exchanger在两个线程之间交换数据:

import java.util.concurrent.Exchanger;

public class ExchangerDemo {
    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();

        new Thread(() -> {
            try {
                String message = "Hello from Thread 1";
                message = exchanger.exchange(message);
                System.out.println("Thread 1 received: " + message);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();

        new Thread(() -> {
            try {
                String message = "Hello from Thread 2";
                message = exchanger.exchange(message);
                System.out.println("Thread 2 received: " + message);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();
    }
}           

在这个例子中,我们创建了一个 Exchanger 对象,并启动了两个线程。每个线程都调用 exchange 方法并传递一个字符串消息。线程阻塞,直到另一个线程也调用 exchange 方法。然后,两个线程都收到对方的消息,并打印出接收到的消息。

总结

这四种并发工具类(CountDownLatch、CyclicBarrier、Semaphore、Exchanger)各有特点和适用场景:

1、CountDownLatch:这是一个计数器闭锁。使用它可以让一个线程等待其他线程完成各自的工作后再执行。其核心方法是 countDown() 和 await()。通过 countDown() 方法可以使计数器减1,await() 方法则阻塞调用线程,直到计数器为0。

2、CyclicBarrier:这是一个循环屏障,它能够阻塞一组线程直到所有线程达到一个公共的屏障点。CyclicBarrier 是可重用的,当所有线程都到达屏障后,它就会恢复到初始状态,等待下一次重复使用。

3、Semaphore:这是一个计数信号量,常用于限制可以访问某些资源(物理或逻辑的)的线程数目。它维护了一个许可集。需要访问资源的线程必须首先获得许可,如果已经用完了所有的许可,那么许可获取将会被阻塞。

4、Exchanger:这是一个两方交换数据的同步点。当一个线程到达交换点,它会被阻塞,直到另一个线程也到达交换点,然后两个线程就可以交换数据。

总的来说,CountDownLatch 和 CyclicBarrier 都能实现线程间的等待,但是它们侧重点不同:CountDownLatch 是等待其他线程完成后才继续执行,而 CyclicBarrier 是等待所有线程都达到一个状态后再统一继续执行。Semaphore 主要是控制对共享资源的访问线程数量。而 Exchanger 则提供了一个线程间进行配对和信息交换的同步点。

继续阅读