天天看點

并發程式設計 -- condition

文章目錄

  • ​​一,condition簡介​​
  • ​​二,使用​​
  • ​​1.簡單使用​​
  • ​​2.基于condition的有界隊列demo​​
  • ​​三,實作分析​​
  • ​​1.await()​​
  • ​​2.signal()​​

一,condition簡介

任何一個java對象都擁有一對螢幕方法(定義在Object上)主要包括:wait()/wait(long timeOut),notify(),notifyAll(),這些方法和synchronized配合實作等待/通知機制

Condition接口也提供了類似的螢幕方法,配合Lock實作等待/通知機制

二,使用

1.簡單使用

  • 調用lock.newCondition()獲得condition對象
  • 配合lock使用
  • 支援響應中斷和對中斷不敏感的進入等待狀态
  • 支援喚醒一個或多個線程
  • 支援目前線程釋放鎖并進入逾時等待狀态
public class ConditionDemo {

    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    /**
     * 線程等待(對中斷敏感)
     * <h> condition.await() </h>
     * <p>
     *    等待:
     *          目前線程進入等待狀态直到被通知或被中斷
     *    喚醒:
     *          1.signal(),signalAll()
     *          2.其他線程調用interrupt()中斷該線程(對中斷敏感)
     * </p>
     */
    public void awaitThread(){
        //執行condition相關方法先擷取到鎖(lock)
        lock.lock();
        try{
            //執行condition相關方法
            condition.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            //釋放鎖
            lock.unlock();
        }
    }

    /**
     * 線程等待(對中斷不敏感)
     * <h> condition.awaitUninterruptibly() </h>
     * <p>
     *    等待:
     *          目前線程進入等待狀态直到被通知
     *    喚醒:
     *          1.signal(),signalAll()
     * </p>
     */
    public void awaitUninterruptiblyThread(){
        //執行condition相關方法先擷取到鎖(lock)
        lock.lock();
        try{
            //執行condition相關方法
            condition.awaitUninterruptibly();
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            //釋放鎖
            lock.unlock();
        }
    }


    /**
     * 喚醒目前等待的線程
     * <h> condition.signal() </h>
     */
    public void signalThread(){
        lock.lock();
        try {
            condition.signal();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }


    /**
     * 喚醒所有等待的線程
     * <h> condition.signalAll() </h>
     */
    public void signalAllThread(){
        lock.lock();
        try {
            condition.signalAll();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

}      

2.基于condition的有界隊列demo

public class BoundedQueue {

    //基于數組的簡單有界隊列
    private Integer[] array ;

    /**
     * <params>
     *     addIndex : 添加索引
     *     removeIndex : 移除索引
     *     count :目前隊列元素數量
     * </params>
     */
    private int addIndex,removeIndex,count;

    private Lock lock = new ReentrantLock();

    //當隊列滿時的等待/通知機制
    private Condition fullCondition = lock.newCondition();

    //當隊列空時的等待/通知機制
    private Condition emptyCondition = lock.newCondition();

    public BoundedQueue(int size){
        array = new Integer[size];
    }

    /**
     * 向隊列添加一個元素,當隊列滿時阻塞線程,不能添加
     * @param i
     * @throws InterruptedException
     */
    public void add(int i) throws InterruptedException {
        lock.lock();
        try{
            //判斷隊列是否已滿,有空位才能跳出循環完成添加
            while(count == array.length){
                fullCondition.await();
            }
            array[addIndex] = i;
            //如果剛好達到隊列滿時,重置
            if(++addIndex == array.length){
                addIndex = 0;
            }
            ++count;
            fullCondition.signal();
        }finally {
            lock.unlock();
        }
    }

    /**
     * 移除隊列元素,當隊列是空時,線程被阻塞
     * @return
     */
    public Integer remove(){
        lock.lock();
        try{
            while(count == 0){
                emptyCondition.awaitUninterruptibly();
            }
            Integer x = array[removeIndex];
            array[removeIndex] = null;
            if(++removeIndex == array.length){
                removeIndex = 0;
            }
            --count;
            emptyCondition.signal();
            return x;
        }finally {
            lock.unlock();
        }
    }
}      

三,實作分析

每個condition對象都包含一個等待隊列,這個等待隊列是實作等待/通知機制的重點

1.await()

源碼:

public final void await() throws InterruptedException {
            //不響應中斷
            if (Thread.interrupted())
                throw new InterruptedException();
            //構造節點,加入到等待隊列中
            Node node = addConditionWaiter();
            //釋放該節點對應線程的同步狀态
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            //判斷節點是否還在同步隊列
            while (!isOnSyncQueue(node)) {
                //在同步隊列則阻塞該線程
                LockSupport.park(this);
                //如果中斷則跳出循環
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            //在被别的線程喚醒後, 将剛剛這個節點放到 AQS 隊列中.接下來就是那個節點的事情了,比如搶鎖.
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }      

2.signal()

public final void signal() {
            //檢查是否擷取了鎖
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            //從等待隊列移除首節點    
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                    first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

        final boolean transferForSignal(Node node) {
            //CAS設定節點狀态
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
            //進入同步隊列    
            Node p = enq(node);
            int ws = p.waitStatus;
            //喚醒該節點對應線程
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread);
            return true;
        }      

繼續閱讀