天天看点

并发编程--Java的线程通信方式概念及使用示例1. 概念讲解2. 使用示例

1. 概念讲解

为了能够协调线程调度运行,需要线程间进行通信。线程通信使线程间能够互相发送信号(通知),使线程能够等待其他线程的信号或通知其它线程执行。

1.1. 线程通信方式

  1. synchronized 线程通信:  JDK1.5之前synchronized,Object的wait和notify/notify是与对象监视器配合完成线程间的等待/通知机制,是java底层提供的线程同步通信机制. 任意一个Java对象,都拥有一与之关联的唯一的监视器对象,为此Java为每个对象提供了一组监视器方法(定义在java.lang.Object上),主要包括wait()、wait(long timeout)、notify()以及notifyAll()方法,这些方法与synchronized同步关键字配合,可以实现等待/通知模式。 注意: 1. Object类中的wait、notify、notifyAll用于线程等待和唤醒的方法,都必须在同步代码中运行(必须用到关键字synchronized),否则抛出 IllegalMonitorStateException 异常  2. 线程唤醒的方法(notify、notifyAll)需要在等待的方法(wait)之后执行,等待中的线程才可能会被唤醒,否则无法唤醒
  2. 使用lock+Condition控制线程通信: JDK1.5开始,Lock可以代替synchronized 同步方法或同步代码块,Condition替代同步监视器的功能:lock + condition + await + signal + signalAll,是语言级别的,具有更高的可控制性和扩展性
  3. 使用阻塞队列(BlockingQueue)控制线程通信: BlockingQueue接口主要作为线程同步的工具。当生产者试图向BlockingQueue中放入元素,如果队列已满,则线程被阻塞;当消费者试图向BlockingQueue中取出元素时,若该队列已空,则线程被阻塞。这里通过共享一个队列的信息,实现生产者和消费者。
  4. 使用管道流PipedWriter/PipedReader:
  5. 使用JDK1.5提供的信号量Semaphore、CountDownLatch、CyclicBarrier等工具类
  6. volatile: volatile能保证所修饰的变量对于多个线程可见性,即只要被修改,其它线程读到的一定是最新的值。以此来实现线程通信,但是volatile不能保证操作原子性,是一种弱的同步机制。

1.2. JAVA的对象监视器Monitor

Monitor可以理解为一个同步工具或一种同步机制,通常被描述为一个对象。每一个Java对象就有一把看不见的锁,称为内部锁或者Monitor锁。

Monitor是线程私有的数据结构,每一个线程都有一个可用monitor record列表,同时还有一个全局的可用列表。监视器Monitor也是一个数据结构,里面维护着一系列等待队列、同步队列等

每一个被锁住的对象都会和一个monitor关联,同时monitor中有一个Owner字段存放拥有该锁的线程的唯一标识,表示该锁被这个线程占用。

1.3. Condition

Condition由ReentrantLock对象创建,并且可以同时创建多个,Condition接口在使用前必须先调用ReentrantLock的lock()方法获得锁,之后调用Condition接口的await()将释放锁,并且在该Condition上等待,直到有其他线程调用Condition的signal()方法唤醒线程,使用方式和wait()、notify()类似。

Condition可以和任意的锁对象结合,监视器方法不会再绑定到某个锁对象上。使用Lock锁之后,相当于Lock 替代了synchronized方法和语句的使用,Condition替代了Object监视器方法的使用。

在Condition中,Condition对象当中封装了监视器方法,并用await()替换wait(),用signal()替换notify(),用signalAll()替换notifyAll(),传统线程的通信方式,Condition都可以实现,这里注意,Condition是被绑定到Lock上的,要创建一个Lock的Condition必须用newCondition()方法。

  Condition的强大之处在于它可以为多个线程间建立不同的Condition,使用synchronized/wait()只有一个阻塞队列,notifyAll会唤起所有阻塞队列下的线程,而使用lock/condition,可以实现多个阻塞队列,signalAll只会唤起某个阻塞队列下的阻塞线程。

使用总结:

  1. 使用condition的步骤:创建condition对象,获取锁,然后调用condition的方法
  2. 一个ReentrantLock支持多个condition对象
  3. void await() throws InterruptedException;方法会释放锁,让当前线程等待,支持唤醒,支持线程中断
  4. void awaitUninterruptibly();方法会释放锁,让当前线程等待,支持唤醒,不支持线程中断
  5. long awaitNanos(longnanosTimeout) throws InterruptedException;参数为纳秒,此方法会释放锁,让当前线程等待,支持唤醒,支持中断。超时之后返回的,结果为负数;超时之前被唤醒返回的,结果为正数(表示返回时距离超时时间相差的纳秒数)
  6. boolean  await(longtime,TimeUnitunit) throws InterruptedException;方法会释放锁,让当前线程等待,支持唤醒,支持中断。超时之后返回的,结果为false;超时之前被唤醒返回的,结果为true
  7. boolean awaitUntil(Datedeadline) throws InterruptedException;参数表示超时的截止时间点,方法会释放锁,让当前线程等待,支持唤醒,支持中断。超时之后返回的,结果为false;超时之前被唤醒返回的,结果为true
  8. void signal();会唤醒一个等待中的线程,然后被唤醒的线程会被加入同步队列,去尝试获取锁
  9. void  signalAll();会唤醒所有等待中的线程,将所有等待中的线程加入同步队列,然后去尝试获取锁

1.4. 对象监视器Monitor与Condition的区别

对比项 Object 监视器方法 Condition
前置条件 获取对象的锁 调用Lock.lock获取锁,调用Lock.newCondition()获取Condition对象
调用方式 直接调用,如:object.wait() 直接调用,如:condition.await()
等待队列个数 一个 多个,使用多个condition实现
当前线程释放锁并进入等待状态 支持 支持
当前线程释放锁进入等待状态中不响应中断 不支持 支持
当前线程释放锁并进入超时等待状态 支持 支持
当前线程释放锁并进入等待状态到将来某个时间 不支持 支持
唤醒等待队列中的一个线程 支持 支持
唤醒等待队列中的全部线程 支持 支持

2. 使用示例

针对使用synchronized、信号量、volatile的例子太简单,就不提供示例了。

2.1. 同一个锁支持创建多个Condition

实现一个简单的阻塞队列

import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class BlockingQueueDemo<E> {
    int size;//阻塞队列最大容量
    ReentrantLock lock = new ReentrantLock();

    LinkedList<E> list = new LinkedList<>();//队列底层实现

    Condition notFull = lock.newCondition();//队列满时的等待条件
    Condition notEmpty = lock.newCondition();//队列空时的等待条件

    public BlockingQueueDemo(int size) {
        this.size = size;
    }

    public void enqueue(E e) throws InterruptedException {
        lock.lock();
        try {
            while (list.size() == size)//队列已满,在notFull条件上等待
                notFull.await();
            list.add(e);//入队:加入链表末尾
            notEmpty.signal(); //通知在notEmpty条件上等待的线程
        } finally {
            lock.unlock();
        }
    }

    public E dequeue() throws InterruptedException {
        E e;
        lock.lock();
        try {
            while (list.size() == 0)//队列为空,在notEmpty条件上等待
                notEmpty.await();
            e = list.removeFirst();//出队:移除链表首元素
            notFull.signal();//通知在notFull条件上等待的线程
            return e;
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        BlockingQueueDemo<Integer> queue = new BlockingQueueDemo<>(2);
        for (int i = 0; i < 10; i++) {
            int data = i;
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        queue.enqueue(data);
                    } catch (InterruptedException e) {
                    }
                }
            }).start();
        }
        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        Integer data = queue.dequeue();
                        System.out.println(data);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }
}
           

2.2. 使用管道流

对于Piped类型的流,必须先要进行绑定,即调用connect方法,如果没有绑定,那么将会抛出异常。管倒流通信类似于聊天室。用的比较少

import java.io.IOException;
import java.io.PipedReader;
import java.io.PipedWriter;

public class PipeTest {
    public static void main(String[] args) throws IOException {
        PipedWriter pipedWriter = new PipedWriter();
        PipedReader pipedReader = new PipedReader();
        pipedWriter.connect(pipedReader);

        Thread printThread = new Thread(new Print(pipedReader), "PrintThread");
        printThread.start();
        int receive = 0;
        try {
            while ((receive = System.in.read()) != -1) {
                pipedWriter.write(receive);
            }
        } finally {
            pipedWriter.close();
        }
    }


    static class Print implements Runnable {
        private PipedReader in;
        public Print(PipedReader in) {
            this.in = in;
        }

        @Override
        public void run() {
            int receive = 0;
            try {
                while ((receive = in.read()) != -1) {
                    System.out.print((char) receive);
                }
            } catch (IOException e) {

            }
        }
    }
}