天天看点

什么是线程间通信?Java 中有哪些方式实现线程间通信?

作者:JAVA一问一答

线程间通信指的是多个线程之间共享数据或资源,并协调各自的执行顺序,以完成某个任务的过程。

Java 中有以下几种方式实现线程间通信

wait() 和 notify()/notifyAll() 方法:

通过共享对象上的锁来实现线程之间的等待和通知。当一个线程执行 wait() 方法时,它会释放掉持有的锁并进入等待状态,直到其他线程调用 notify()/notifyAll() 方法唤醒它。这种方式需要注意避免死锁,以及在等待和通知时对共享对象的状态进行正确性检查,避免虚假唤醒。

Condition 接口

与 wait() 和 notify()/notifyAll() 方法相似,但是更加灵活,可以实现更加复杂的等待和通知机制。通过 Lock 对象的 newCondition() 方法创建 Condition 对象,并通过 await() 和 signal()/signalAll() 方法实现线程的等待和通知。

在使用Java中的Condition接口前,需要先使用Lock接口获取一个锁对象。然后,可以使用Condition接口的await()方法将当前线程阻塞,等待其他线程调用signal()或signalAll()方法唤醒。具体使用方法如下:

1、创建锁对象

Lock lock = new ReentrantLock();

2、获取条件变量对象

Condition condition = lock.newCondition();

3、在需要等待的地方调用await()方法

lock.lock();

try {

while (condition不成立) {

condition.await();

}

// 执行需要等待的操作

} finally {

lock.unlock();

}

4、在条件成立的地方调用signal()或signalAll()方法

lock.lock();

try {

// 执行满足条件的操作

condition.signal(); // 或者condition.signalAll();

} finally {

lock.unlock();

}

需要注意的是,await()方法会释放当前线程持有的锁,因此在调用await()方法前需要先获取锁;而signal()或signalAll()方法只会唤醒等待同一个条件变量的线程,因此在调用这两个方法前需要先获取锁。同时,要注意使用带try-finally语句块的锁机制,确保锁的及时释放。

管道(PipedInputStream 和 PipedOutputStream)

在Java中,可以使用PipedInputStream和PipedOutputStream来实现管道通信。PipedInputStream和PipedOutputStream是Java IO库中提供的两个类,分别用于从管道中读取数据和向管道中写入数据。

具体实现时,首先需要创建一个PipedInputStream和一个PipedOutputStream,并将它们连接起来,然后可以在一个线程中使用PipedOutputStream向管道中写入数据,在另一个线程中使用PipedInputStream从管道中读取数据。

下面是一个简单的例子:

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
public class PipeExample {
public static void main(String[] args) throws IOException {
PipedInputStream in = new PipedInputStream();
PipedOutputStream out = new PipedOutputStream(in);
Thread t1 = new Thread(() -> {
try {
out.write("hello world".getBytes());
out.close();
} catch (IOException e) {
e.printStackTrace();
}
});
Thread t2 = new Thread(() -> {
try {
int data;
while ((data = in.read()) != -1) {
System.out.print((char) data);
}
in.close();
} catch (IOException e) {
e.printStackTrace();
}
});
t1.start();
t2.start();
}
}           

在这个例子中,我们创建了一个PipedInputStream和一个PipedOutputStream,并将它们连接起来。然后我们启动了两个线程,一个线程使用PipedOutputStream向管道中写入数据,另一个线程使用PipedInputStream从管道中读取数据。在写入数据的线程中,我们使用write方法将字符串“hello world”写入管道,然后关闭输出流。在读取数据的线程中,我们使用read方法从管道中读取数据,并将其打印到控制台上,直到读取到流的结尾,然后关闭输入流。

需要注意的是,使用PipedInputStream和PipedOutputStream进行管道通信时,如果没有正确地关闭流,可能会导致线程阻塞或内存泄漏等问题。因此,建议在使用完PipedInputStream和PipedOutputStream后,调用close方法来关闭流。

CountDownLatch 类

CountDownLatch是Java并发包提供的一个同步工具类,通过它可以实现类似于计数器的功能,使得一个或多个线程等待其他线程执行完毕后再执行。使用CountDownLatch类,需要使用以下步骤:

1、创建一个CountDownLatch对象,指定计数器的数量,即有多少个线程需要等待。

2、在需要等待的线程中,调用CountDownLatch对象的await()方法,使得该线程等待。

3、在其他线程执行完毕后,调用CountDownLatch对象的countDown()方法,将计数器的数量减1。

4、当计数器的数量减为0时,所有等待的线程都会被唤醒,继续执行。

下面是一个简单的示例代码,演示如何使用CountDownLatch类:

import java.util.concurrent.CountDownLatch;
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
int threadCount = 5;
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
Thread t = new Thread(new Worker(latch));
t.start();
}
// 等待所有线程执行完毕
latch.await();
System.out.println("All workers have finished their work.");
}
static class Worker implements Runnable {
private CountDownLatch latch;
public Worker(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
// 模拟耗时操作
try {
Thread.sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Worker " + Thread.currentThread().getName() + " has finished its work.");
latch.countDown(); // 计数器减1
}
}
}           

在这个示例代码中,我们首先创建了一个CountDownLatch对象,指定计数器的数量为5。然后创建5个线程,每个线程会模拟一个耗时操作,并在操作完成后调用CountDownLatch对象的countDown()方法,将计数器减1。最后,主线程调用CountDownLatch对象的await()方法,等待所有线程执行完毕。当所有线程都执行完毕后,主线程会被唤醒,输出一条消息。

CyclicBarrier 类

CyclicBarrier类是Java并发包提供的一个线程同步工具类,它可以协调一组线程在某个点上同步,当所有线程都到达这个点时,才能继续执行后面的操作。下面是使用CyclicBarrier类的基本步骤:

1、创建CyclicBarrier对象,并指定参与同步的线程数和到达同步点后要执行的操作(可选)。

CyclicBarrier barrier = new CyclicBarrier(parties, action);

2、在各个线程中调用await()方法,等待其他线程到达同步点。

barrier.await();

3、当所有线程都到达同步点后,执行指定的操作(如果有),然后继续执行后面的代码。

在使用CyclicBarrier时需要注意,所有线程都必须调用await()方法,否则会导致死锁。另外,CyclicBarrier只能重置一次,如果需要重复利用,需要重新创建新的对象。

下面是一个简单的示例代码,演示了如何使用CyclicBarrier实现线程同步:

import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
public static void main(String[] args) {
int parties = 3;
CyclicBarrier barrier = new CyclicBarrier(parties, () -> {
System.out.println("All parties have arrived, let's continue!");
});
new Thread(() -> {
try {
System.out.println("Thread 1 is on the way...");
Thread.sleep(3000);
System.out.println("Thread 1 has arrived.");
barrier.await();
System.out.println("Thread 1 continues to run.");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
System.out.println("Thread 2 is on the way...");
Thread.sleep(2000);
System.out.println("Thread 2 has arrived.");
barrier.await();
System.out.println("Thread 2 continues to run.");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
System.out.println("Thread 3 is on the way...");
Thread.sleep(1000);
System.out.println("Thread 3 has arrived.");
barrier.await();
System.out.println("Thread 3 continues to run.");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}           

Semaphore 类

Semaphore是Java并发包中的一个类,用于控制对共享资源的访问。Semaphore维护了一个许可证集合,线程可以通过调用Semaphore的acquire()方法获取许可证,如果没有许可证可用,则该线程将会被阻塞,直到有一个许可证可用。线程可以通过调用Semaphore的release()方法释放许可证,从而允许其他等待线程获取许可证。

在Java中使用Semaphore类,可以通过以下步骤实现:

1、创建Semaphore对象,指定许可证数量。

2、在需要访问共享资源的代码块前调用Semaphore的acquire()方法获取许可证。

3、执行共享资源的代码。

4、在代码块结束时,调用Semaphore的release()方法释放许可证。

下面是一个简单的例子,演示了如何使用Semaphore类控制对共享资源的访问:

import java.util.concurrent.Semaphore;
public class SemaphoreExample {
private static final int THREAD_COUNT = 10;
private static Semaphore semaphore = new Semaphore(5);
public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
new Thread(new Worker()).start();
}
}
private static class Worker implements Runnable {
@Override
public void run() {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " acquired a permit.");
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " released a permit.");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}           

在上面的例子中,我们创建了一个Semaphore对象,初始许可证数量为5。然后创建了10个线程,每个线程在执行之前都会调用Semaphore的acquire()方法获取一个许可证,如果没有许可证可用,则线程会被阻塞。当线程获取到许可证后,会执行一些共享资源的代码,然后调用Semaphore的release()方法释放许可证,从而允许其他等待线程获取许可证。

什么是线程间通信?Java 中有哪些方式实现线程间通信?

继续阅读