天天看点

Java并发编程一Condition初使用

推荐:​​Java并发编程汇总​​

Java并发编程一Condition初使用

Condition是什么?

​Condition​

​​是在​

​Java1.5​

​​中才出现的,它用来替代传统​

​Object​

​​中的​

​wait()​

​​、​

​notify()​

​​,实现线程间的协作,相比使用​

​Object​

​​中的​

​wait()​

​​、​

​notify()​

​​,使用​

​Condition​

​​的​

​await()​

​​、​

​signal()​

​​这种方式实现线程间协作更加安全和高效。因此通常来说比较推荐使用​

​Condition​

​​,阻塞队列实际上是使用了​

​Condition​

​来模拟线程间协作。

​Condition​

​​是个接口,基本的方法就是​

​await()​

​​和​

​signal()​

​​方法。

​​

​Condition​

​​依赖于​

​Lock​

​​接口,生成一个​

​Condition​

​​的基本代码是​

​lock.newCondition()​

​​(假如​

​lock​

​​为​

​ReentrantLock​

​​的实例,​

​ReentrantLock​

​​是​

​Lock​

​​的实现类)。

调用​​

​Condition​

​​的​

​await()​

​​和​

​signal()​

​​方法,都必须在​

​lock​

​​保护之内,就是说必须在​

​lock.lock()​

​​和​

​lock.unlock()​

​​之间才可以使用,这和在​

​synchronized​

​​同步代码块或者同步方法中使用​

​Object​

​​中的​

​wait()​

​​、​

​notify()​

​类似。

  • ​Conditon​

    ​​中的​

    ​await()​

    ​​对应​

    ​Object​

    ​中的wait()。
  • ​Condition​

    ​​中的​

    ​signal()​

    ​​对应​

    ​Object​

    ​​中的​

    ​notify()​

    ​。
  • ​Condition​

    ​​中的​

    ​signalAll()​

    ​​对应​

    ​Object​

    ​​中的​

    ​notifyAll()​

    ​。

我们来使用一下它吧。

代码:

package flowcontrol.condition;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionDemo1 {
    
    private ReentrantLock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    void method1() throws InterruptedException {
        lock.lock();
        try{
            System.out.println("条件不满足,开始await");
            condition.await();
            System.out.println("条件满足了,开始执行后续的任务");
        }finally {
            lock.unlock();
        }
    }

    void method2() {
        lock.lock();
        try{
            System.out.println("准备工作完成,唤醒其他的线程");
            condition.signal();
        }finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ConditionDemo1 conditionDemo1 = new ConditionDemo1();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    conditionDemo1.method2();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        conditionDemo1.method1();
    }
}      

输出:

条件不满足,开始await
准备工作完成,唤醒其他的线程
条件满足了,开始执行后续的任务      

是不是很像​

​synchronized​

​​同步代码块或者同步方法中调用的​

​wait()​

​​和​

​notify()​

​。

来看看​

​Condition​

​ 的源码。

package java.util.concurrent.locks;
import java.util.concurrent.TimeUnit;
import java.util.Date;

public interface Condition {

    void await() throws InterruptedException;

    void awaitUninterruptibly();

    long awaitNanos(long nanosTimeout) throws InterruptedException;

    boolean await(long time, TimeUnit unit) throws InterruptedException;

    boolean awaitUntil(Date deadline) throws InterruptedException;

    void signal();

    void signalAll();
}      
  • ​await()​

    ​:造成当前线程在接到信号或被中断之前一直处于等待状态。
  • ​awaitUninterruptibly()​

    ​:造成当前线程在接到信号之前一直处于等待状态(该方法不响应中断)。
  • ​awaitNanos(long nanosTimeout)​

    ​​:造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。返回值表示剩余时间,如果在​

    ​nanosTimesout​

    ​​之前唤醒,那么返回值为​

    ​nanosTimeout​

    ​​ - 消耗时间,如果返回值​

    ​<= 0​

    ​,则可以认定它已经超时了。
  • ​await(long time, TimeUnit unit)​

    ​:造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
  • ​awaitUntil(Date deadline)​

    ​​:造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。如果没有到指定时间就被通知,则返回​

    ​true​

    ​​,否则表示到了指定时间,返回​

    ​false​

    ​。
  • ​signal()​

    ​​:唤醒一个等待线程。该线程从等待方法返回前必须获得与​

    ​Condition​

    ​相关的锁。
  • ​signalAll()​

    ​​:唤醒所有等待线程。能够从等待方法返回的线程必须获得与​

    ​Condition​

    ​相关的锁。

说了​

​Condition​

​​中的方法,就可以将​

​Condition​

​​中的方法和​

​Object​

​中的方法进行对比了,如下图所示:

Java并发编程一Condition初使用

最后我们用​

​Lock​

​​、​

​Condition​

​和队列来实现一个生产者-消费者模式。

代码:

package flowcontrol.condition;

import java.util.PriorityQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionDemo2 {

    private int queueSize = 10;
    private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);
    private Lock lock = new ReentrantLock();
    private Condition notFull = lock.newCondition();
    private Condition notEmpty = lock.newCondition();

    public static void main(String[] args) {
        ConditionDemo2 conditionDemo2 = new ConditionDemo2();
        Producer producer = conditionDemo2.new Producer();
        Consumer consumer = conditionDemo2.new Consumer();
        producer.start();
        consumer.start();
    }

    class Consumer extends Thread {

        @Override
        public void run() {
            consume();
        }

        private void consume() {
            while (true) {
                lock.lock();
                try {
                    while (queue.size() == 0) {
                        System.out.println("队列空,等待数据");
                        try {
                            notEmpty.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    queue.poll();
                    notFull.signalAll();
                    System.out.println("从队列里取走了一个数据,队列剩余" + queue.size() + "个元素");
                } finally {
                    lock.unlock();
                }
            }
        }
    }

    class Producer extends Thread {

        @Override
        public void run() {
            produce();
        }

        private void produce() {
            while (true) {
                lock.lock();
                try {
                    while (queue.size() == queueSize) {
                        System.out.println("队列满,等待有空余");
                        try {
                            notFull.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    queue.offer(1);
                    notEmpty.signal();
                    System.out.println("向队列插入了一个元素,队列剩余空间" + (queueSize - queue.size()));
                } finally {
                    lock.unlock();
                }
            }
        }
    }
}      

输出:

向队列插入了一个元素,队列剩余空间9
向队列插入了一个元素,队列剩余空间8
向队列插入了一个元素,队列剩余空间7
向队列插入了一个元素,队列剩余空间6
向队列插入了一个元素,队列剩余空间5
向队列插入了一个元素,队列剩余空间4
向队列插入了一个元素,队列剩余空间3
向队列插入了一个元素,队列剩余空间2
向队列插入了一个元素,队列剩余空间1
向队列插入了一个元素,队列剩余空间0
队列满,等待有空余
从队列里取走了一个数据,队列剩余9个元素
从队列里取走了一个数据,队列剩余8个元素
从队列里取走了一个数据,队列剩余7个元素
从队列里取走了一个数据,队列剩余6个元素
从队列里取走了一个数据,队列剩余5个元素
从队列里取走了一个数据,队列剩余4个元素
从队列里取走了一个数据,队列剩余3个元素
从队列里取走了一个数据,队列剩余2个元素
从队列里取走了一个数据,队列剩余1个元素
从队列里取走了一个数据,队列剩余0个元素
队列空,等待数据      

输出我只粘贴了一部分,但从输出结果看,这符合我们的预期。

我们上面用用​

​Lock​

​​、​

​Condition​

​​和队列来实现的生产者-消费者模式类似于​

​ArrayBlockingQueue​

​​中的生产者-消费者模式的实现,只不过​

​ArrayBlockingQueue​

​用的是数组。

Java并发编程一Condition初使用