天天看点

从零开始java多线程并发---锁(九):Condition详解一:Condition详解二:Condition的await()三:Condition的await(long time, TimeUnit unit)四:awaitUninterruptibly()五:signal()六:signalAll()七:总结

一:Condition详解

  Conditon提供了线程协作的更多的更丰富的API帮助,其本身是对Object的await()、notitify()方法的扩展。

  首先我们来看一下Condition的方法:

从零开始java多线程并发---锁(九):Condition详解一:Condition详解二:Condition的await()三:Condition的await(long time, TimeUnit unit)四:awaitUninterruptibly()五:signal()六:signalAll()七:总结

COndititon是实现AQS中的ConditonObject的。

从零开始java多线程并发---锁(九):Condition详解一:Condition详解二:Condition的await()三:Condition的await(long time, TimeUnit unit)四:awaitUninterruptibly()五:signal()六:signalAll()七:总结

二:Condition的await()

  该方法设置获取倒锁的线程从运行状态转变为等待状态,其主要的流程如下:

  • 将当前线程包装成Node节点,加入FIFO等待队列
  • 释放当前线程锁

 (1)await()方法

public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            
            //将当前线程包装成节点,插入等待队列    
            Node node = addConditionWaiter();
            //释放锁
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            //判断当前线程节点是否在等待队列
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
           

(2)节点包装

private Node addConditionWaiter() {
            Node t = lastWaiter;
            // 如果最后一个节点不存在,则清空等待队列
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }

            //封装新的节点,并加入当前队列        
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }
           

(3)释放锁

final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            
            //获取到同步状态 
            int savedState = getState();

            //释放锁
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            
            //释放失败,将当前节点状态设置为取消,然后回自动从等待队列中移除
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }
           

三:Condition的await(long time, TimeUnit unit)

  限时等待队列,直到等待结束,其主要流程如下:

  • 包装当前节点,加入等待队列
  • 释放当前线程锁
  • 判断等待时间的比较
public final boolean await(long time, TimeUnit unit)
                throws InterruptedException {

            //等待时间
            long nanosTimeout = unit.toNanos(time);

            //线程响应中断
            if (Thread.interrupted())
                throw new InterruptedException();

            //包装节点,加入等待队列
            Node node = addConditionWaiter();

            //释放锁
            int savedState = fullyRelease(node);

            //绝对等待时间
            final long deadline = System.nanoTime() + nanosTimeout;
            boolean timedout = false;
            int interruptMode = 0;
          
            //如果不在等待队列    
            while (!isOnSyncQueue(node)) {
                
                //如果等待时间 小于 0 ,移除等待队列
                if (nanosTimeout <= 0L) {
                    timedout = transferAfterCancelledWait(node);
                    break;
                }

                //如果等待时间小于 1000L,直接进入自旋等待
                if (nanosTimeout >= spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
                nanosTimeout = deadline - System.nanoTime();
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
            return !timedout;
        }
           

   针对超时控制,程序首先记录唤醒时间deadline ,deadline = System.nanoTime() + nanosTimeout(时间间隔)。如果获取同步状态失败,则需要计算出需要休眠的时间间隔nanosTimeout(= deadline - System.nanoTime()),如果nanosTimeout <= 0 表示已经超时了,返回false,如果大于spinForTimeoutThreshold(1000L)则需要休眠nanosTimeout ,如果nanosTimeout <= spinForTimeoutThreshold ,就不需要休眠了,直接进入快速自旋的过程。原因在于 spinForTimeoutThreshold 已经非常小了,非常短的时间等待无法做到十分精确,如果这时再次进行超时等待,相反会让nanosTimeout 的超时从整体上面表现得不是那么精确,所以在超时非常短的场景中,AQS会进行无条件的快速自旋。

四:awaitUninterruptibly()

 这个方法其实也同上面的差不多,主要是上面的await()方法,如果响应中断会使线程从等待队列移除,而此方法不会。

五:signal()

此方法用于唤醒等待队列中的方法执行,主要的流程有:

  • 判断当前线程是否有加锁(通过对象的头文件判断)
  • 唤醒等待队列的首节点 
public final void signal() {
            //判断加锁
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            
            //唤醒首节点
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }
           

六:signalAll()

用于唤醒所有的等待队列中的方法,

public final void signalAll() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignalAll(first);
        }
           
private void doSignalAll(Node first) {
            lastWaiter = firstWaiter = null;
            do {
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                transferForSignal(first);
                first = next;
            } while (first != null);
        }
           

七:总结

  • Condition依赖于Lock锁,就像Object的wait()方法依赖于Synchronized一样
  • 在Lock的实现类中都是通过new Conditon方法创建一个该Lock的Condition

继续阅读