天天看点

ReentrantLock—Condition源码解读

          本文是在学习中的总结,欢迎转载但请注明出处:http://blog.csdn.net/pistolove/article/details/105031259

1、前言

Condition实现关键:等待队列。

  • 等待队列是一个FIFO的队列,队列中每个节点包含一个线程引用,该线程就是在Conditon对象上等待的线程。一个Condition对象包含一个等待队列,Condition拥有首节点(firstWaiter)和尾结点(lastWaiter)。当线程调用Conditon.await()方法,将会以当前线程构造节点,并将节点从尾部加入到等待队列。
ReentrantLock—Condition源码解读

Object和Condition对比:

  • Object是java底层级别的;Condition是语言级别的,具有更高的可控制性和扩展性。
  • Object的wait和notify/notify是与对象监视器配合完成线程间的等待/通知机制;Condition与Lock配合完成等待通知机制。
  • Condition能够支持不响应中断;Object方式不支持;
  • Condition能够支持多个等待队列(new 多个Condition对象);Object方式只能支持一个;
  • Condition能够支持超时时间的设置;Object方式不支持。

2、Condition代码示例

  • 基于Condition生产者消费者代码实现如下所示,先了解一下Condition是如何使用的。
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 基于Condition生产者消费者代码实现
 *
 * @param <T>
 */
public class ConditionDemo<T> {

    private ReentrantLock reentrantLock = new ReentrantLock();

    private Condition consumerCondition = reentrantLock.newCondition();

    private Condition producerCondition = reentrantLock.newCondition();

    private int count, addIndex, removeIndex;

    private Object[] goods;

    public ConditionDemo(int size) {
        goods = new Object[size];
    }

    /**
     * 生产商品
     *
     * @param t
     */
    public void produce(T t) {
        reentrantLock.lock();
        try {
            while (count == goods.length)
                producerCondition.await();

            goods[addIndex] = t;
            if (++addIndex == goods.length) {
                addIndex = 0;
            }
            ++count;
            System.out.println("produce: " + t);
            consumerCondition.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            reentrantLock.unlock();
        }
    }

    /**
     * 消费产品
     *
     * @return
     */
    public T consume() {
        reentrantLock.lock();
        try {
            while (count == 0)
                consumerCondition.await();
            Object x = goods[removeIndex];
            if (++removeIndex == goods.length) {
                removeIndex = 0;
            }
            --count;
            producerCondition.signal();

            System.out.println("consume: " + x);
            return (T) x;
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            reentrantLock.unlock();
        }
        return null;
    }

    public static void main(String[] args) {
        ConditionDemo<String> conditionDemo = new ConditionDemo(10);

        new Thread() {
            @Override
            public void run() {
                int i = 1;
                while (i != 100) {
                    conditionDemo.produce("" + i++);
                }
            }
        }.start();

        new Thread() {
            @Override
            public void run() {
                while (true) {
                    conditionDemo.consume();
                }
            }
        }.start();
    }
}
           

3、类关系图

  • new Condition()其实是创建了一个ConditionObject,ConditionObject实现了Condition接口和Serializable接口。

4、源码解读

  • await()方法源码解读
    • 使当前线程进入等待队列,并释放锁,同时线程状态变为等待状态;相当于同步队列的首节点移到了Condition的等待队列中。
public final void await() throws InterruptedException {
        // 线程中断则抛出异常
        if (Thread.interrupted())
            throw new InterruptedException();
        
        // 当前线程加入Conditon等待队列
        Node node = addConditionWaiter();
        
        // 释放同步状态,即释放同步锁;唤醒当前线程的节点(头节点)的后继节点
        int savedState = fullyRelease(node);
        
        int interruptMode = 0;
        
        // 如果节点不在同步队列中
        while (!isOnSyncQueue(node)) {
            // 阻塞当前线程
            LockSupport.park(this);
            
            // THROW_IE if interrupted before signalled
            // REINTERRUPT if after signalled
            // 如果中断被唤醒,循环停止 
            // 判断此次线程的唤醒是否因为线程被中断, 
            // 若是被中断, 则会在checkInterruptWhileWaiting的transferAfterCancelledWait进行节点的转移; 
            // 返回值interruptMode != 0
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                // 通过线程中断的方式进行唤醒, 并且已经进行node的转移, 转移到 Sync Queue里面
                break;
        }
        
        // 调用 acquireQueued在Sync Queue里面进行独占锁的获取, 返回值表明在获取的过程中有没有被中断过
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        
        // 判断线程的唤醒是中断还是signal
        // 如通过中断唤醒的话, 此刻代表线程的Node在Condition Queue与Sync Queue里面都会存在
        if (node.nextWaiter != null) // clean up if cancelled
            // cancelled节点的清除
            unlinkCancelledWaiters();
        
        // 通过中断的方式唤醒线程
        if (interruptMode != 0)
            // 根据 interruptMode 的类型决定是抛出异常, 还是自己中断一下
            reportInterruptAfterWait(interruptMode);
    }

           
// 加入到等待队列
    private Node addConditionWaiter() {
        // 等待队列最后一个节点
        Node t = lastWaiter;
        // If lastWaiter is cancelled, clean out.
        // 如果最后一个节点不是空节点,但是状态不是条件等待状态
        if (t != null && t.waitStatus != Node.CONDITION) {
            // 移除状态为已取消的节点
            unlinkCancelledWaiters();
            t = lastWaiter;
        }
        
        // 将当前线程放入新创建的节点中
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        
        // 如果等待队列为空,等待队列队头置为当前新建节点
        if (t == null)
            firstWaiter = node;
        else
            // 否则将新创建节点放在队列尾部
            t.nextWaiter = node;
        lastWaiter = node;
        return node;
    }

    // 移除状态为已取消的节点                                  
    private void unlinkCancelledWaiters() {
        Node t = firstWaiter;
        Node trail = null;
        while (t != null) {
            Node next = t.nextWaiter;
            // 如果状态不是条件等待
            if (t.waitStatus != Node.CONDITION) {
                //删除next引用
                t.nextWaiter = null;
                if (trail == null)
                    firstWaiter = next;
                else
                    trail.nextWaiter = next;
                if (next == null)
                    lastWaiter = trail;
            }
            else
                trail = t;
            t = next;
        }
    }
    

           
// 释放锁
    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            // 获取当前同步状态
            int savedState = getState();
            
            //释放锁
            if (release(savedState)) {
                // 释放锁成功,返回同步状态
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            // 释放锁失败,状态置为CANCELLED
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }
    
    // 释放完成锁之后,需要唤醒后继节点
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

           
// 获取锁
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

           
// 节点是否在阻塞队列中
    final boolean isOnSyncQueue(Node node) {
        // 如果状态为等待状态,或者前驱节点为空,不在阻塞队列
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        
        // 后继节点不为空,则在阻塞队列中
        if (node.next != null) // If has successor, it must be on queue
            return true;
        /*
         * node.prev can be non-null, but not yet on queue because
         * the CAS to place it on queue can fail. So we have to
         * traverse from tail to make sure it actually made it.  It
         * will always be near the tail in calls to this method, and
         * unless the CAS failed (which is unlikely), it will be
         * there, so we hardly ever traverse much.
         */
        return findNodeFromTail(node);
    }

    // 从阻塞队列尾部往前找,判断当前节点是否在阻塞队列中
    private boolean findNodeFromTail(Node node) {
        Node t = tail;
        for (;;) {
            // 当前节点为尾节点,在阻塞队列
            if (t == node)
                return true;
            // 尾节点为空,不在阻塞队列
            if (t == null)
                return false;
            // 尾部节点前移
            t = t.prev;
        }
    }
           
  • signal()方法源码解读
    • 将等待队列中等待时间最长的节点移动到同步队列中,使得该节点能够有机会获得lock。等待队列是先进先出(FIFO),等待队列头节点必然是等待时间最长的节点,每次调用condition的signal方法是将头节点移动到同步队列中。
public final void signal() {
        // 当前线程必须获取锁才行
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
            
        // 唤醒等待队列中的第一个节点
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);
    }

    // 节点从等待队列移到同步队列
    private void doSignal(Node first) {
        do {
            // 如果等待队列值有一个节点
            if ( (firstWaiter = first.nextWaiter) == null)
                // lastWaiter置为空
                lastWaiter = null;
            
            // first节点从等待队列中出来
            // 判断Node从Condition queue转移到Sync Queue里是通过signal还是timeout/interrupt
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&
                 (first = firstWaiter) != null);
           // 调用transferForSignal将first转移到Sync Queue里
           // 不成功, 则将firstWaiter重新赋值给first
    }

    // 加入到阻塞队列
    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
         // 如果无法更新同步状态,节点状态为cancelled,返回失败
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        // 节点入阻塞队列,返回入队之前的队尾节点
        Node p = enq(node);
        
        // 入队前队尾节点的状态
        int ws = p.waitStatus;
        // 如果前驱节点状态为超时或者中断,或者CAS设置唤醒状态失败
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            // 唤醒当前线程
            LockSupport.unpark(node.thread);
        return true;
    }
           

继续阅读