天天看點

Java并發程式設計一Condition初使用

推薦:​​Java并發程式設計彙總​​

Java并發程式設計一Condition初使用

Condition是什麼?

​Condition​

​​是在​

​Java1.5​

​​中才出現的,它用來替代傳統​

​Object​

​​中的​

​wait()​

​​、​

​notify()​

​​,實作線程間的協作,相比使用​

​Object​

​​中的​

​wait()​

​​、​

​notify()​

​​,使用​

​Condition​

​​的​

​await()​

​​、​

​signal()​

​​這種方式實作線程間協作更加安全和高效。是以通常來說比較推薦使用​

​Condition​

​​,阻塞隊列實際上是使用了​

​Condition​

​來模拟線程間協作。

​Condition​

​​是個接口,基本的方法就是​

​await()​

​​和​

​signal()​

​​方法。

​​

​Condition​

​​依賴于​

​Lock​

​​接口,生成一個​

​Condition​

​​的基本代碼是​

​lock.newCondition()​

​​(假如​

​lock​

​​為​

​ReentrantLock​

​​的執行個體,​

​ReentrantLock​

​​是​

​Lock​

​​的實作類)。

調用​​

​Condition​

​​的​

​await()​

​​和​

​signal()​

​​方法,都必須在​

​lock​

​​保護之内,就是說必須在​

​lock.lock()​

​​和​

​lock.unlock()​

​​之間才可以使用,這和在​

​synchronized​

​​同步代碼塊或者同步方法中使用​

​Object​

​​中的​

​wait()​

​​、​

​notify()​

​類似。

  • ​Conditon​

    ​​中的​

    ​await()​

    ​​對應​

    ​Object​

    ​中的wait()。
  • ​Condition​

    ​​中的​

    ​signal()​

    ​​對應​

    ​Object​

    ​​中的​

    ​notify()​

    ​。
  • ​Condition​

    ​​中的​

    ​signalAll()​

    ​​對應​

    ​Object​

    ​​中的​

    ​notifyAll()​

    ​。

我們來使用一下它吧。

代碼:

package flowcontrol.condition;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionDemo1 {
    
    private ReentrantLock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    void method1() throws InterruptedException {
        lock.lock();
        try{
            System.out.println("條件不滿足,開始await");
            condition.await();
            System.out.println("條件滿足了,開始執行後續的任務");
        }finally {
            lock.unlock();
        }
    }

    void method2() {
        lock.lock();
        try{
            System.out.println("準備工作完成,喚醒其他的線程");
            condition.signal();
        }finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ConditionDemo1 conditionDemo1 = new ConditionDemo1();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    conditionDemo1.method2();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        conditionDemo1.method1();
    }
}      

輸出:

條件不滿足,開始await
準備工作完成,喚醒其他的線程
條件滿足了,開始執行後續的任務      

是不是很像​

​synchronized​

​​同步代碼塊或者同步方法中調用的​

​wait()​

​​和​

​notify()​

​。

來看看​

​Condition​

​ 的源碼。

package java.util.concurrent.locks;
import java.util.concurrent.TimeUnit;
import java.util.Date;

public interface Condition {

    void await() throws InterruptedException;

    void awaitUninterruptibly();

    long awaitNanos(long nanosTimeout) throws InterruptedException;

    boolean await(long time, TimeUnit unit) throws InterruptedException;

    boolean awaitUntil(Date deadline) throws InterruptedException;

    void signal();

    void signalAll();
}      
  • ​await()​

    ​:造成目前線程在接到信号或被中斷之前一直處于等待狀态。
  • ​awaitUninterruptibly()​

    ​:造成目前線程在接到信号之前一直處于等待狀态(該方法不響應中斷)。
  • ​awaitNanos(long nanosTimeout)​

    ​​:造成目前線程在接到信号、被中斷或到達指定等待時間之前一直處于等待狀态。傳回值表示剩餘時間,如果在​

    ​nanosTimesout​

    ​​之前喚醒,那麼傳回值為​

    ​nanosTimeout​

    ​​ - 消耗時間,如果傳回值​

    ​<= 0​

    ​,則可以認定它已經逾時了。
  • ​await(long time, TimeUnit unit)​

    ​:造成目前線程在接到信号、被中斷或到達指定等待時間之前一直處于等待狀态。
  • ​awaitUntil(Date deadline)​

    ​​:造成目前線程在接到信号、被中斷或到達指定最後期限之前一直處于等待狀态。如果沒有到指定時間就被通知,則傳回​

    ​true​

    ​​,否則表示到了指定時間,傳回​

    ​false​

    ​。
  • ​signal()​

    ​​:喚醒一個等待線程。該線程從等待方法傳回前必須獲得與​

    ​Condition​

    ​相關的鎖。
  • ​signalAll()​

    ​​:喚醒所有等待線程。能夠從等待方法傳回的線程必須獲得與​

    ​Condition​

    ​相關的鎖。

說了​

​Condition​

​​中的方法,就可以将​

​Condition​

​​中的方法和​

​Object​

​中的方法進行對比了,如下圖所示:

Java并發程式設計一Condition初使用

最後我們用​

​Lock​

​​、​

​Condition​

​和隊列來實作一個生産者-消費者模式。

代碼:

package flowcontrol.condition;

import java.util.PriorityQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionDemo2 {

    private int queueSize = 10;
    private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);
    private Lock lock = new ReentrantLock();
    private Condition notFull = lock.newCondition();
    private Condition notEmpty = lock.newCondition();

    public static void main(String[] args) {
        ConditionDemo2 conditionDemo2 = new ConditionDemo2();
        Producer producer = conditionDemo2.new Producer();
        Consumer consumer = conditionDemo2.new Consumer();
        producer.start();
        consumer.start();
    }

    class Consumer extends Thread {

        @Override
        public void run() {
            consume();
        }

        private void consume() {
            while (true) {
                lock.lock();
                try {
                    while (queue.size() == 0) {
                        System.out.println("隊列空,等待資料");
                        try {
                            notEmpty.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    queue.poll();
                    notFull.signalAll();
                    System.out.println("從隊列裡取走了一個資料,隊列剩餘" + queue.size() + "個元素");
                } finally {
                    lock.unlock();
                }
            }
        }
    }

    class Producer extends Thread {

        @Override
        public void run() {
            produce();
        }

        private void produce() {
            while (true) {
                lock.lock();
                try {
                    while (queue.size() == queueSize) {
                        System.out.println("隊列滿,等待有空餘");
                        try {
                            notFull.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    queue.offer(1);
                    notEmpty.signal();
                    System.out.println("向隊列插入了一個元素,隊列剩餘空間" + (queueSize - queue.size()));
                } finally {
                    lock.unlock();
                }
            }
        }
    }
}      

輸出:

向隊列插入了一個元素,隊列剩餘空間9
向隊列插入了一個元素,隊列剩餘空間8
向隊列插入了一個元素,隊列剩餘空間7
向隊列插入了一個元素,隊列剩餘空間6
向隊列插入了一個元素,隊列剩餘空間5
向隊列插入了一個元素,隊列剩餘空間4
向隊列插入了一個元素,隊列剩餘空間3
向隊列插入了一個元素,隊列剩餘空間2
向隊列插入了一個元素,隊列剩餘空間1
向隊列插入了一個元素,隊列剩餘空間0
隊列滿,等待有空餘
從隊列裡取走了一個資料,隊列剩餘9個元素
從隊列裡取走了一個資料,隊列剩餘8個元素
從隊列裡取走了一個資料,隊列剩餘7個元素
從隊列裡取走了一個資料,隊列剩餘6個元素
從隊列裡取走了一個資料,隊列剩餘5個元素
從隊列裡取走了一個資料,隊列剩餘4個元素
從隊列裡取走了一個資料,隊列剩餘3個元素
從隊列裡取走了一個資料,隊列剩餘2個元素
從隊列裡取走了一個資料,隊列剩餘1個元素
從隊列裡取走了一個資料,隊列剩餘0個元素
隊列空,等待資料      

輸出我隻粘貼了一部分,但從輸出結果看,這符合我們的預期。

我們上面用用​

​Lock​

​​、​

​Condition​

​​和隊列來實作的生産者-消費者模式類似于​

​ArrayBlockingQueue​

​​中的生産者-消費者模式的實作,隻不過​

​ArrayBlockingQueue​

​用的是數組。

Java并發程式設計一Condition初使用