天天看點

Java多線程 -- JUC包源碼分析5 -- Condition/ArrayBlockingQueue/LinkedBlockingQueue/Deque/PriorityBlockingQueueLinkedBlockingQueue

本人新書出版,對技術感興趣的朋友請關注:

Java多線程 -- JUC包源碼分析5 -- Condition/ArrayBlockingQueue/LinkedBlockingQueue/Deque/PriorityBlockingQueueLinkedBlockingQueue

https://mp.weixin.qq.com/s/uq2cw2Lgf-s4nPHJ4WH4aw

#await – signal – signalAll

以下代碼,分别展示了wait/notify, 和Condition的await/signal的用法

Object o = new Object();
synchronized(o)    //線程1
{
   ...
   o.wait();   //内部,會先釋放鎖。被其他線程notify之後,會再次拿鎖。
   ...
}

synchronized(o)   //線程2
{
   ...
   o.notify();
   ...
}
           
ReentrantLock l = new ReentrantLock();
Condition c1 = l.newCondition();
Condition c2 = l.newCondition();


l.lock;               //線程1
try
{
  ...
  c1.await();  //内部,會先釋放鎖。被其他線程signal之後,會再次拿鎖。
  ...
  c2.signal();
  ...
}finally
{
  l.unlock();
}


l.lock;               //線程2
try
{
  ...
  c2.await();  //内部,會先釋放鎖。被其他線程notify之後,會再次拿鎖。
  ...
  c1.signal();
  ...
}finally
{
  l.unlock();
}
           

通過以上代碼,明确3點:

(1)Condition必須與鎖協同使用:對應synchronized來說,wait()的object必須是synchronized對應的同步對象;對應ReentrantLock來說,Condition是通過ReentrantLock.newCondition()得到的。

(2)wait()/await()的時候,會先釋放鎖,然後進入阻塞,然後被notify/signal喚醒之後,會再去拿鎖!也就是其内部有3個環節:

//釋放鎖

//進入阻塞

//被喚醒,拿鎖,執行後續代碼

(3)await/signal在使用上,比wait/notify更加靈活:

wait/notify隻能附屬在一個條件上,所有的阻塞線程都在這1個條件上;

而Lock可以建立多個condition,每個condition都有wait/notify,每個condition都有一個自己的阻塞線程隊列。

後面所講的BlockingQueue,将很好的展示condition的這個優點。

#Condition源碼分析

從上面的第(2)條可以看出,await()的時候,線程要進入阻塞。是以每個Condition内部,都維護了一個連結清單,或者說隊列,存儲所有阻塞在這個條件上的線程。

以下代碼,展示了Condition的内部結構:

//ConditionObject是AQS的一個内部類,實作了Condition接口

    public class ConditionObject implements Condition, java.io.Serializable {
        。。。
        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;     //阻塞隊列

       。。。
           

下面看一下Condition.await()的源碼:

//ReentrantLock
    public Condition newCondition() {
        return sync.newCondition();
    }

   //ReentrantLock的Sync内部類
   final ConditionObject newCondition() {
            return new ConditionObject();
   }
 
   
   //AQS的ConditionObject内部類
  public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();  //有人置了中段标志位,先響應中斷
            Node node = addConditionWaiter();  //把線程加入該condition的阻塞隊列
            int savedState = fullyRelease(node); //釋放鎖
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);   //開始阻塞
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)  //被中斷喚醒,跳出阻塞
                    break;
            }
            if (acquireQueued(node, savedState) &&  interruptMode != THROW_IE)  //被喚醒之後,重新拿鎖
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
           

由上述代碼可以看出,await()是會響應中斷的。下面看一下屏蔽中斷的await(),即awaitUninterruptibly()

public final void awaitUninterruptibly() {
            Node node = addConditionWaiter();  //加入阻塞隊列
            int savedState = fullyRelease(node); //釋放鎖
            boolean interrupted = false;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);   //進入阻塞
                if (Thread.interrupted())  //被中斷喚醒,沒有break,繼續循環, 再次進入阻塞
                    interrupted = true;
            }
            if (acquireQueued(node, savedState) || interrupted)  //阻塞出來,拿鎖
                selfInterrupt();  //此時,再響應中斷
        }
           

下面看一下signal()的源碼:

public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);  //喚醒隊列裡面第1個
        }
        
        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
       }

    final boolean transferForSignal(Node node) {

        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);  //喚醒
        return true;
    }
           

關鍵點:無論await(), 還是signal(),都是在拿到鎖之後執行的,是以其内部的入隊/出隊,都不需要加鎖!

#ArrayBlockingQueue

通常的Queue,一邊是生産者,一邊是消費者。一邊進,一邊出,有一個判空函數,一個判滿函數。

而所謂的BlockingQueue,就是指當為空的時候,阻塞消費者線程;當為滿的時候,阻塞生産者線程。

以下是ArrayBlockingQueue的核心結構:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    。。。
    /** The queued items */
    final Object[] items;

    /** items index for next take, poll, peek or remove */
    int takeIndex;

    /** items index for next put, offer, or add */
    int putIndex;

    /** Number of elements in the queue */
    int count;

    //其核心就是1把鎖 + 2個條件
    final ReentrantLock lock;
    private final Condition notEmpty;
    private final Condition notFull;
 
    。。。
}
           

以下為其主要的構造函數:

public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
           

以下為其put()/take()源代碼

public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();  //put的時候,隊列滿了,阻塞
            insert(e);
        } finally {
            lock.unlock();
        }
    }

    private void insert(E x) {
        items[putIndex] = x;
        putIndex = inc(putIndex);
        ++count;
        notEmpty.signal(); //put進去之後,通知非空條件
    }

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await(); //take的時候,隊列為空,阻塞
            return extract();
        } finally {
            lock.unlock();
        }
    }

    private E extract() {
        final Object[] items = this.items;
        E x = this.<E>cast(items[takeIndex]);
        items[takeIndex] = null;
        takeIndex = inc(takeIndex);
        --count;
        notFull.signal();  //take完了,通知非滿條件
        return x;
    }
           

順便說一句:上述2個函數,都是響應中斷,并且阻塞的。

另外還有不響應中斷的,不阻塞的成員函數,在此就不再詳述了。

LinkedBlockingQueue

LinkedBlockingQueue是一種基于單向連結清單的阻塞隊列。因為頭和尾是2個指針分開操作的,是以用了2把鎖 + 2個條件,同時一個AtomicInteger的原子變量記錄count數。

private final AtomicInteger count = new AtomicInteger(0);  
    
    /** Head of linked list */  
    private transient Node<E> head;  
    /** Tail of linked list */  
    private transient Node<E> last;  
    
    /** Lock held by take, poll, etc */  
    private final ReentrantLock takeLock = new ReentrantLock();  
    /** Wait queue for waiting takes */  
    private final Condition notEmpty = takeLock.newCondition();  
    
    /** Lock held by put, offer, etc */  
    private final ReentrantLock putLock = new ReentrantLock();  
    /** Wait queue for waiting puts */  
    private final Condition notFull = putLock.newCondition();  
           

#LinkedBlockingDeque

其原理和ArrayBlockingQueue是一樣的,也是1把鎖 + 2個條件。隻是其資料結構不是數組,而是一個雙向連結清單。

有一個小細節:連結清單不是無限長嗎,怎麼會滿呢?這裡是人為設定了一個最大長度:

public LinkedBlockingDeque() {
        this(Integer.MAX_VALUE);   //最大長度是整數的最大值
    }
           

下面是其主要結構:

public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>,  java.io.Serializable {

    ...
    
    //雙向連結清單的Node
    static final class Node<E> {
        E item;
        Node<E> prev;
        Node<E> next;

        Node(E x) {
            item = x;
        }
    }
    
    //隊列的頭,尾
    transient Node<E> first;
    transient Node<E> last;


    private transient int count;
    private final int capacity;

    
    //1把鎖 + 2個條件 
    final ReentrantLock lock = new ReentrantLock();
    private final Condition notEmpty = lock.newCondition();
    private final Condition notFull = lock.newCondition();

   。。。
}
           

下面是其put/take函數,其原理和ArrayBlockQueue的put/take類似:

public void put(E e) throws InterruptedException {
        putLast(e);
    }
    public void putLast(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            while (!linkLast(node))
                notFull.await();
        } finally {
            lock.unlock();
        }
    }
    private boolean linkLast(Node<E> node) {
        // assert lock.isHeldByCurrentThread();
        if (count >= capacity)
            return false;
        Node<E> l = last;
        node.prev = l;
        last = node;
        if (first == null)
            first = node;
        else
            l.next = node;
        ++count;
        notEmpty.signal();
        return true;
    }


    public E take() throws InterruptedException {
        return takeFirst();
    }
    public E takeFirst() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E x;
            while ( (x = unlinkFirst()) == null)
                notEmpty.await();
            return x;
        } finally {
            lock.unlock();
        }
    }
    private E unlinkFirst() {
        // assert lock.isHeldByCurrentThread();
        Node<E> f = first;
        if (f == null)
            return null;
        Node<E> n = f.next;
        E item = f.item;
        f.item = null;
        f.next = f; // help GC
        first = n;
        if (n == null)
            last = null;
        else
            n.prev = null;
        --count;
        notFull.signal();
        return item;
    }

           

#PriorityBlockingQueue

和上面的BlockingQueue有2個差別:

(1)是無界的,是以隻有notEmpty一個條件。put不會阻塞,隻有take會阻塞

(2)通過2叉堆,實作Priority

public class PriorityBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
    ...
    
    private transient Object[] queue;  //2叉堆實作

    //1把鎖 + 1個條件
    private final ReentrantLock lock;
    private final Condition notEmpty;
   
    ...
 }
           

#SynchronousQueue

SynchronousQueue是一種特殊隊列,内部不是用Lock + Condition實作的。後續會單獨用一篇專門闡述。

繼續閱讀