1. 概念讲解
为了能够协调线程调度运行,需要线程间进行通信。线程通信使线程间能够互相发送信号(通知),使线程能够等待其他线程的信号或通知其它线程执行。
1.1. 线程通信方式
- synchronized 线程通信: JDK1.5之前synchronized,Object的wait和notify/notify是与对象监视器配合完成线程间的等待/通知机制,是java底层提供的线程同步通信机制. 任意一个Java对象,都拥有一与之关联的唯一的监视器对象,为此Java为每个对象提供了一组监视器方法(定义在java.lang.Object上),主要包括wait()、wait(long timeout)、notify()以及notifyAll()方法,这些方法与synchronized同步关键字配合,可以实现等待/通知模式。 注意: 1. Object类中的wait、notify、notifyAll用于线程等待和唤醒的方法,都必须在同步代码中运行(必须用到关键字synchronized),否则抛出 IllegalMonitorStateException 异常 2. 线程唤醒的方法(notify、notifyAll)需要在等待的方法(wait)之后执行,等待中的线程才可能会被唤醒,否则无法唤醒
- 使用lock+Condition控制线程通信: JDK1.5开始,Lock可以代替synchronized 同步方法或同步代码块,Condition替代同步监视器的功能:lock + condition + await + signal + signalAll,是语言级别的,具有更高的可控制性和扩展性
- 使用阻塞队列(BlockingQueue)控制线程通信: BlockingQueue接口主要作为线程同步的工具。当生产者试图向BlockingQueue中放入元素,如果队列已满,则线程被阻塞;当消费者试图向BlockingQueue中取出元素时,若该队列已空,则线程被阻塞。这里通过共享一个队列的信息,实现生产者和消费者。
- 使用管道流PipedWriter/PipedReader:
- 使用JDK1.5提供的信号量Semaphore、CountDownLatch、CyclicBarrier等工具类
- volatile: volatile能保证所修饰的变量对于多个线程可见性,即只要被修改,其它线程读到的一定是最新的值。以此来实现线程通信,但是volatile不能保证操作原子性,是一种弱的同步机制。
1.2. JAVA的对象监视器Monitor
Monitor可以理解为一个同步工具或一种同步机制,通常被描述为一个对象。每一个Java对象就有一把看不见的锁,称为内部锁或者Monitor锁。
Monitor是线程私有的数据结构,每一个线程都有一个可用monitor record列表,同时还有一个全局的可用列表。监视器Monitor也是一个数据结构,里面维护着一系列等待队列、同步队列等
每一个被锁住的对象都会和一个monitor关联,同时monitor中有一个Owner字段存放拥有该锁的线程的唯一标识,表示该锁被这个线程占用。
1.3. Condition
Condition由ReentrantLock对象创建,并且可以同时创建多个,Condition接口在使用前必须先调用ReentrantLock的lock()方法获得锁,之后调用Condition接口的await()将释放锁,并且在该Condition上等待,直到有其他线程调用Condition的signal()方法唤醒线程,使用方式和wait()、notify()类似。
Condition可以和任意的锁对象结合,监视器方法不会再绑定到某个锁对象上。使用Lock锁之后,相当于Lock 替代了synchronized方法和语句的使用,Condition替代了Object监视器方法的使用。
在Condition中,Condition对象当中封装了监视器方法,并用await()替换wait(),用signal()替换notify(),用signalAll()替换notifyAll(),传统线程的通信方式,Condition都可以实现,这里注意,Condition是被绑定到Lock上的,要创建一个Lock的Condition必须用newCondition()方法。
Condition的强大之处在于它可以为多个线程间建立不同的Condition,使用synchronized/wait()只有一个阻塞队列,notifyAll会唤起所有阻塞队列下的线程,而使用lock/condition,可以实现多个阻塞队列,signalAll只会唤起某个阻塞队列下的阻塞线程。
使用总结:
- 使用condition的步骤:创建condition对象,获取锁,然后调用condition的方法
- 一个ReentrantLock支持多个condition对象
- void await() throws InterruptedException;方法会释放锁,让当前线程等待,支持唤醒,支持线程中断
- void awaitUninterruptibly();方法会释放锁,让当前线程等待,支持唤醒,不支持线程中断
- long awaitNanos(longnanosTimeout) throws InterruptedException;参数为纳秒,此方法会释放锁,让当前线程等待,支持唤醒,支持中断。超时之后返回的,结果为负数;超时之前被唤醒返回的,结果为正数(表示返回时距离超时时间相差的纳秒数)
- boolean await(longtime,TimeUnitunit) throws InterruptedException;方法会释放锁,让当前线程等待,支持唤醒,支持中断。超时之后返回的,结果为false;超时之前被唤醒返回的,结果为true
- boolean awaitUntil(Datedeadline) throws InterruptedException;参数表示超时的截止时间点,方法会释放锁,让当前线程等待,支持唤醒,支持中断。超时之后返回的,结果为false;超时之前被唤醒返回的,结果为true
- void signal();会唤醒一个等待中的线程,然后被唤醒的线程会被加入同步队列,去尝试获取锁
- void signalAll();会唤醒所有等待中的线程,将所有等待中的线程加入同步队列,然后去尝试获取锁
1.4. 对象监视器Monitor与Condition的区别
对比项 | Object 监视器方法 | Condition |
---|---|---|
前置条件 | 获取对象的锁 | 调用Lock.lock获取锁,调用Lock.newCondition()获取Condition对象 |
调用方式 | 直接调用,如:object.wait() | 直接调用,如:condition.await() |
等待队列个数 | 一个 | 多个,使用多个condition实现 |
当前线程释放锁并进入等待状态 | 支持 | 支持 |
当前线程释放锁进入等待状态中不响应中断 | 不支持 | 支持 |
当前线程释放锁并进入超时等待状态 | 支持 | 支持 |
当前线程释放锁并进入等待状态到将来某个时间 | 不支持 | 支持 |
唤醒等待队列中的一个线程 | 支持 | 支持 |
唤醒等待队列中的全部线程 | 支持 | 支持 |
2. 使用示例
针对使用synchronized、信号量、volatile的例子太简单,就不提供示例了。
2.1. 同一个锁支持创建多个Condition
实现一个简单的阻塞队列
import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class BlockingQueueDemo<E> {
int size;//阻塞队列最大容量
ReentrantLock lock = new ReentrantLock();
LinkedList<E> list = new LinkedList<>();//队列底层实现
Condition notFull = lock.newCondition();//队列满时的等待条件
Condition notEmpty = lock.newCondition();//队列空时的等待条件
public BlockingQueueDemo(int size) {
this.size = size;
}
public void enqueue(E e) throws InterruptedException {
lock.lock();
try {
while (list.size() == size)//队列已满,在notFull条件上等待
notFull.await();
list.add(e);//入队:加入链表末尾
notEmpty.signal(); //通知在notEmpty条件上等待的线程
} finally {
lock.unlock();
}
}
public E dequeue() throws InterruptedException {
E e;
lock.lock();
try {
while (list.size() == 0)//队列为空,在notEmpty条件上等待
notEmpty.await();
e = list.removeFirst();//出队:移除链表首元素
notFull.signal();//通知在notFull条件上等待的线程
return e;
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
BlockingQueueDemo<Integer> queue = new BlockingQueueDemo<>(2);
for (int i = 0; i < 10; i++) {
int data = i;
new Thread(new Runnable() {
@Override
public void run() {
try {
queue.enqueue(data);
} catch (InterruptedException e) {
}
}
}).start();
}
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer data = queue.dequeue();
System.out.println(data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
}
2.2. 使用管道流
对于Piped类型的流,必须先要进行绑定,即调用connect方法,如果没有绑定,那么将会抛出异常。管倒流通信类似于聊天室。用的比较少
import java.io.IOException;
import java.io.PipedReader;
import java.io.PipedWriter;
public class PipeTest {
public static void main(String[] args) throws IOException {
PipedWriter pipedWriter = new PipedWriter();
PipedReader pipedReader = new PipedReader();
pipedWriter.connect(pipedReader);
Thread printThread = new Thread(new Print(pipedReader), "PrintThread");
printThread.start();
int receive = 0;
try {
while ((receive = System.in.read()) != -1) {
pipedWriter.write(receive);
}
} finally {
pipedWriter.close();
}
}
static class Print implements Runnable {
private PipedReader in;
public Print(PipedReader in) {
this.in = in;
}
@Override
public void run() {
int receive = 0;
try {
while ((receive = in.read()) != -1) {
System.out.print((char) receive);
}
} catch (IOException e) {
}
}
}
}