天天看点

JDK8系列:阻塞队列 之 SynchronousQueue(同步队列)源码分析1、SynchronousQueue 简介2、成员变量 和 构造方法3、方法介绍4、内部类

1、SynchronousQueue 简介

1. 整个 queue 没有容量, 表现为, 你每次进行put值进去时, 必须等待相应的 consumer 拿走数据后才可以再次 put 数据

2. queue 对应 peek, contains, clear, isEmpty ... 等方法其实是无效的

3. 整个 queue 分为 公平(TransferQueue FIFO)与非公平模式(TransferStack LIFO 默认) 

4. 若使用 TransferQueue, 则队列中永远会存在一个 dummy nodo

队列本身并不储存任何元素,非常适合传递性场合。

SynchronziedQueue 的吞吐量高于 LinkedBlockingQueue 和 ArrayBlockingQueue

SynchronousQueue相似于使用CSP和Ada算法(不知道怎么具体指什么算法),他非常适合做交换的工作,生产者的线程必须与消费者的线程同步以传递某些信息、事件或任务

2、成员变量 和 构造方法

public class SynchronousQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
    // 序列号
    private static final long serialVersionUID = -3223113410248163686L;

    static final int NCPUS = Runtime.getRuntime().availableProcessors();

    static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;

    static final int maxUntimedSpins = maxTimedSpins * 16;

    static final long spinForTimeoutThreshold = 1000L;

    private transient volatile Transferer<E> transferer;

    private ReentrantLock qlock;
    private WaitQueue waitingProducers;
    private WaitQueue waitingConsumers;

    static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
                                  String field, Class<?> klazz) {
        try {
            return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
        } catch (NoSuchFieldException e) {
            // Convert Exception to corresponding Error
            NoSuchFieldError error = new NoSuchFieldError(field);
            error.initCause(e);
            throw error;
        }
    }

/******************     下面是构造方法     ***********************/
    public SynchronousQueue() {
        this(false);
    }

    // 通过 fair 值来决定内部用 使用 queue 还是 stack 存储线程节点
    public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }
}
           

关于 TransferQueue 和 TransferStack ,先看内部类那节

3、方法介绍

(1)导致那么多方法无意义的原因:SynchronizedQueue 本身不储存数据

(2)增加、删除数据 全部都是调用的 transferer.transfer(...) 方法

3.1、增加数据

offer(E e)

// 添加数据,实际上是调用 transferer.transfer(e, true, 0) 方法
public boolean offer(E e) {
    if (e == null) 
        throw new NullPointerException();
    return transferer.transfer(e, true, 0) != null;
}
           

offer(E e, long timeout, TimeUnit unit) 

// 实际上是调用 transferer.transfer(e, true, unit.toNanos(timeout)) 方法
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
    if (e == null) 
        throw new NullPointerException();
    if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
        return true;
    if (!Thread.interrupted())
        return false;
    throw new InterruptedException();
}
           

 put(E e)

// 实际上是调用 transferer.transfer(e, false, 0) 方法
public void put(E e) throws InterruptedException {
    if (e == null) 
        throw new NullPointerException();
    if (transferer.transfer(e, false, 0) == null) {
        Thread.interrupted();
        throw new InterruptedException();
    }
}
           

3.2、删除数据

poll()

// 实际上是调用 transferer.transfer(null, true, 0) 方法
public E poll() {
    return transferer.transfer(null, true, 0);
}
           

poll(long timeout, TimeUnit unit) 

// 实际上是调用 transferer.transfer() 方法
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E e = transferer.transfer(null, true, unit.toNanos(timeout));
    if (e != null || !Thread.interrupted())
        return e;
    throw new InterruptedException();
}
           

take() 

// 实际上是调用 transferer.transfer(null, false, 0) 方法
public E take() throws InterruptedException {
    E e = transferer.transfer(null, false, 0);
    if (e != null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}
           

remove(Object o)     无意义

public boolean remove(Object o) {
    return false;
}
           

removeAll(Collection<?> c)      无意义

public boolean removeAll(Collection<?> c) {
    return false;
}
           

retainAll(Collection<?> c)      无意义

public boolean retainAll(Collection<?> c) {
    return false;
}
           

3.3、查询

isEmpty()     无意义

public boolean isEmpty() {
    return true;
}
           

size()      无意义

public int size() {
    return 0;
}
           

remainingCapacity()      无意义

public int remainingCapacity() {
    return 0;
}
           

contains(Object o)       无意义

public boolean contains(Object o) {
    return false;
}
           

containsAll(Collection<?> c)       无意义

public boolean containsAll(Collection<?> c) {
    return c.isEmpty();
}
           

peek()       无意义

public E peek() {
    return null;
}
           

3.4、将queue中数据输出到其他数据结构

toArray()     无意义

public Object[] toArray() {
    return new Object[0];
}
           

toArray(T[] a)      无意义

public <T> T[] toArray(T[] a) {
    if (a.length > 0)
        a[0] = null;
    return a;
}
           

drainTo(Collection<? super E> c)       

public int drainTo(Collection<? super E> c) {
    if (c == null)
        throw new NullPointerException();
    if (c == this)
        throw new IllegalArgumentException();
    int n = 0;
    for (E e; (e = poll()) != null;) {
        c.add(e);
        ++n;
    }
    return n;
}
           

drainTo(Collection<? super E> c, int maxElements) 

public int drainTo(Collection<? super E> c, int maxElements) {
    if (c == null)
        throw new NullPointerException();
    if (c == this)
        throw new IllegalArgumentException();
    int n = 0;
    for (E e; n < maxElements && (e = poll()) != null;) {
        c.add(e);
        ++n;
    }
    return n;
}
           

其他方法

clear()     无意义

// 清空
public void clear() {}
           

iterator()      无意义

public Iterator<E> iterator() {
    return Collections.emptyIterator();
}
           

spliterator()       无意义

public Spliterator<E> spliterator() {
    return Spliterators.emptySpliterator();
}
           

writeObject(java.io.ObjectOutputStream s)  

// 用于序列化
private void writeObject(java.io.ObjectOutputStream s) throws java.io.IOException {
    boolean fair = transferer instanceof TransferQueue;
    if (fair) {
        qlock = new ReentrantLock(true);
        waitingProducers = new FifoWaitQueue();
        waitingConsumers = new FifoWaitQueue();
    }
    else {
        qlock = new ReentrantLock();
        waitingProducers = new LifoWaitQueue();
        waitingConsumers = new LifoWaitQueue();
    }
    s.defaultWriteObject();
}
           

readObject(java.io.ObjectInputStream s)  

// 用于 反序列化
private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException {
    s.defaultReadObject();
    if (waitingProducers instanceof FifoWaitQueue)
        transferer = new TransferQueue<E>();
    else
        transferer = new TransferStack<E>();
}
           

4、内部类

4.1、Transferer 抽象类

abstract static class Transferer<E> {
    // 表现为put 或者 take
    abstract E transfer(E e, boolean timed, long nanos);
}
           

4.2、TransferQueue 类

4.2.1、内部类 QNode

static final class QNode {
    volatile QNode next;          // 队列中的下一个节点
    volatile Object item;         // CAS'ed to or from null
    volatile Thread waiter;       // 控制 park/unpark
    final boolean isData;         // 生产消费类型

    QNode(Object item, boolean isData) {
        this.item = item;
        this.isData = isData;
    }

    // 设置节点的next,item,采用到CAS
    boolean casNext(QNode cmp, QNode val) {
        return next == cmp &&     //next不对就不需要cas了,避免依次cas。
            // 即使这里被打断了,即另外线程修改了next也不要紧,下面的CAS会失败
            //如果执行到这里让出时间片,别的线程入队没有出队,head没改,tail改了,
            //此时h是对的,t是错的,t还是之前的tail,cas的时候说t的next应该是null,
            //此时不是了(现在的t也就是之前的tail已经有后续节点了),所以不进行cas,继续continue
        UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    boolean casItem(Object cmp, Object val) {
        return item == cmp && UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }

    // 节点被取消了,item就是自己,超时和线程中断
    void tryCancel(Object cmp) {
        UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
    }

    boolean isCancelled() {
        return item == this;
    }

    boolean isOffList() {
        return next == this;
    }

    private static final sun.misc.Unsafe UNSAFE;
    private static final long itemOffset;
    private static final long nextOffset;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = QNode.class;
            itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item"));
            nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}
           

4.2.2、成员变量 和 构造方法

大家知道 删除一个节点 ,直接 A.CASNext(B, B.next) 就可以,但是当  节点 B 是整个队列中的末尾元素时,   一个线程删除节点B, 一个线程在节点B之后插入节点 这样操作容易致使插入的节点丢失, 这个cleanMe很像 ConcurrentSkipListMap 中的 删除添加的 marker 节点, 他们都是起着相同的作用。

在清除 队列最尾端节点时, 不直接删除这个节点, 而是间删除节点的前继节点标示为 cleanMe 节点, 为下次删除做准备, 队列里面只能全是生产者或者消费者,不可能生产和消费者共存。然后消费者过来移除节点。(队列即可能是生产者队列也可能是消费者队列)

static final class TransferQueue<E> extends Transferer<E> {

         // 1. 整个队列有 head, tail 两个节点
         // 2. 队列初始化时会有个 dummy 节点
         // 3. 这个队列的头节点是个 dummy 节点/ 或 哨兵节点, 所以操作的总是队列中的第二个节点(AQS的设计中也是这也)

        // 队列的头节点
        transient volatile QNode head;
        // 队列的尾节点
        transient volatile QNode tail;
        
        // 要删除的节点的前一个节点,cleanMe的下一个节点是因中断或超时需要删除的节点
        // 这个节点所在线程被中断了,或者节点超时了,放在上面不要紧,取节点时候x==m,跳过即可。
        transient volatile QNode cleanMe;

        TransferQueue() {
            QNode h = new QNode(null, false); //  dummy节点
            head = h;
            tail = h;
        }
}
           

4.2.3、方法

advanceHead(QNode h, QNode nh)

// 推进 head 节点
void advanceHead(QNode h, QNode nh) {
    if (h == head && 
            UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
        h.next = h; // 丢弃头节点 help gc
}
           

 advanceTail(QNode t, QNode nt)  

// 更新新的 tail 节点
void advanceTail(QNode t, QNode nt) {
    if (tail == t)
        UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
}
           

casCleanMe(QNode cmp, QNode val) 

// CAS 设置 cleamMe 节点
boolean casCleanMe(QNode cmp, QNode val) {
    return cleanMe == cmp &&
        UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
}
           

从代码中我们知道, TransferQueue 是个 dual queue, 初始化时默认会个一个 dummy node;

而最特别的是 cleanMeNode, cleanMeNode是一个标记节点, cleanMeNode.next 节点是因中断或超时需要删除的节点,是在清除 队列最尾端节点时, 不直接删除这个节点, 而是间删除节点的前继节点标示为 cleanMe 节点, 为下次删除做准备, 功能和 ConcurrentSkipListMap 中的 marker 节点差不多, 都是防止在同一地点插入节点的同时因删除节点而造成节点的丢失, 不明白的可以看 ConcurrentSkipListMap。

transfer(E e, boolean timed, long nanos) 

看不懂看不懂看不懂看不懂看不懂看不懂看不懂看不懂

 1. 若队列为空 / 队列中的尾节点和自己的 类型相同, 则添加 node 到队列中, 直到 timeout/interrupt/其他线程和这个线程匹配

   timeout/interrupt awaitFulfill方法返回的是 node 本身匹配成功的话, 要么返回 null (producer返回的), 或正真的传递值 (consumer 返回的)

2. 队列不为空, 且队列的 head.next 节点是当前节点匹配的节点,进行数据的传递匹配, 并且通过 advanceHead 方法帮助 先前 block 的节点 dequeue

E transfer(E e, boolean timed, long nanos) {

    QNode s = null; // constructed/reused as needed
    // 1.判断 e != null 用于区分 producer 与 consumer
    boolean isData = (e != null);

    for (;;) {
        QNode t = tail;
        QNode h = head;

        // 2. 数据未初始化, continue 重来
        if (t == null || h == null)         
            continue;                       

        // 3. 队列为空, 或队列尾节点和自己相同 (注意这里是和尾节点比价, 下面进行匹配时是和 head.next 进行比较)
        if (h == t || t.isData == isData) {
            QNode tn = t.next;

            // 4. tail 改变了, 重新再来
            if (t != tail)                 
                continue;

            // 5. 其他线程添加了 tail.next, 所以帮助推进 tail,重新再来
            if (tn != null) {               
                advanceTail(t, tn);
                continue;
            }

            // 6. 调用的方法的 wait 类型的, 并且 超时了, 直接返回 null;直接见 SynchronousQueue.poll()
            // 方法说明此 poll 的调用只有当前队列中正好有一个与之匹配的线程在等待被【匹配才有返回值
            if (timed && nanos <= 0)        // can't wait
                return null;

            // 7. 构建节点 QNode
            if (s == null)
                s = new QNode(e, isData);

            // 8. 将 新建的节点加入到 队列中;如果假如失败,就再来
            if (!t.casNext(null, s))        
                continue;
            // 到这里,说明 新建的节点已经加到了队列中了
    
            // 9. 帮助推进 tail 节点
            advanceTail(t, s);              

            // 10. 调用awaitFulfill, 若节点是 head.next, 则进行一些自旋, 
            // 若不是的话, 直接 block, 知道有其他线程 与之匹配, 或它自己进行线程的中断
            Object x = awaitFulfill(s, e, timed, nanos);

            // 11. 若 (x == s)节点s 对应的线程 wait 超时 或线程中断, 
            // 不然的话 x == null (s 是 producer) 或 是正真的传递值(s 是 consumer)
            if (x == s) {              
// 12. 对接点 s 进行清除, 若 s 不是链表的最后一个节点, 则直接 CAS 进行 节点的删除, 
// 若 s 是链表的最后一个节点, 则 要么清除以前的 cleamMe 节点(cleamMe != null), 
// 然后将 s.prev 设置为 cleanMe 节点, 下次进行删除 或直接将 s.prev 设置为cleanMe。     
                clean(t, s);
                return null;
            }

            // 13. 节点 s 没有 offlist
            if (!s.isOffList()) {           
// 14. 推进head节点, 下次就调用s.next节点进行匹配(这里调用的是advanceHead, 因为代码能执行到这边说明s已经是head.next节点了)
                advanceHead(t, s);         
                if (x != null)              // and forget fields
                    s.item = s;
                s.waiter = null;   // 15. 释放线程 ref
            }
            return (x != null) ? (E)x : e;

        } else {  
// 16. 进行线程的匹配操作, 匹配操作是从 head.next 开始匹配 (注意 队列刚开始构建时 有个 dummy node, 
// 而且 head 节点永远是个 dummy node 这个和 AQS 中一样的)
            QNode m = h.next;               // 17. 获取 head.next 准备开始匹配                          
            QNode m = h.next;               // node to fulfill
            if (t != tail || m == null || h != head)
                continue;    // 18. 不一致读取, 有其他线程改变了队列的结构inconsistent read

            /** producer 和 consumer 匹配操作
             *  1. 获取 m的 item (注意这里的m是head的next节点
             *  2. 判断 isData 与x的模式是否匹配, 只有produce与consumer才能配成一对
             *  3. x == m 判断是否 节点m 是否已经进行取消了, 具体看(QNOde#tryCancel)
             *  4. m.casItem 将producer与consumer的数据进行交换 (这里存在并发时可能cas操作失败的情况)
             *  5. 若 cas操作成功则将h节点dequeue
             *
             *  疑惑: 为什么将h进行 dequeue, 而不是 m节点
             *  答案: 因为每次进行配对时, 都是将 h 是个 dummy node, 正真的数据节点 是 head.next
             */
            

            Object x = m.item;
            if (isData == (x != null) ||   // 19. 两者的模式是否匹配 (因为并发环境下 有可能其他的线程强走了匹配的节点)
                    x == m ||              // 20. m 节点 线程中断或者 wait 超时了
                    !m.casItem(x, e)) {    // 21.进行CAS操作 更改等待线程的item值(等待的有可能是 concumer / producer)
                advanceHead(h, m);         // 22.推进 head 节点 重试 (尤其 21 操作失败)
                continue;
            }

            advanceHead(h, m);             // 23. producer consumer 交换数据成功, 推进 head 节点
            // 24. 换线等待中的 m 节点, 而在 awaitFulfill 方法中 因为 item 改变了,  所以 x != e 成立, 返回
            LockSupport.unpark(m.waiter);
            //25.操作到这里若是producer, 则x!=null, 返回x;若是consumer,则x ==null,返回 producer(其实就是节点m) 的e
            return (x != null) ? (E)x : e;
        }
    }
}
           

awaitFulfill(QNode s, E e, boolean timed, long nanos) 

Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
    /* Same idea as TransferStack.awaitFulfill */
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    int spins = ((head.next == s) ?
                            (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        if (w.isInterrupted())
            s.tryCancel(e);
        Object x = s.item;
        if (x != e)
            return x;
        if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                s.tryCancel(e);
                continue;
            }
        }
        if (spins > 0)
            --spins;
        else if (s.waiter == null)
            s.waiter = w;
        else if (!timed)
            LockSupport.park(this);
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);
    }
}
           

clean(QNode pred, QNode s) 

void clean(QNode pred, QNode s) {
    s.waiter = null; // forget thread

    while (pred.next == s) { // Return early if already unlinked
        QNode h = head;
        QNode hn = h.next;   // Absorb cancelled first node as head
        if (hn != null && hn.isCancelled()) {
            advanceHead(h, hn);
            continue;
        }
        QNode t = tail;      // Ensure consistent read for tail
        if (t == h)
            return;
        QNode tn = t.next;
        if (t != tail)
            continue;
        if (tn != null) {
            advanceTail(t, tn);
            continue;
        }
        if (s != t) {        // If not tail, try to unsplice
            QNode sn = s.next;
            if (sn == s || pred.casNext(s, sn))
                return;
        }
        QNode dp = cleanMe;
        if (dp != null) {    // Try unlinking previous cancelled node
            QNode d = dp.next;
            QNode dn;
            if (d == null ||               // d is gone or
                    d == dp ||                 // d is off list or
                    !d.isCancelled() ||        // d not cancelled or
                    (d != t &&                 // d not tail and
                    (dn = d.next) != null &&  //   has successor
                    dn != d &&                //   that is on list
                    dp.casNext(d, dn)))       // d unspliced
                casCleanMe(dp, null);
            if (dp == pred)
                return;      // s is already saved node
        } else if (casCleanMe(null, pred))
            return;          // Postpone cleaning s
    }
}
           

4.3、TransferStack 类

4.3.1、SNode内部类

static final class SNode {
    volatile SNode next;        // next node in stack
    volatile SNode match;       // the node matched to this
    volatile Thread waiter;     // to control park/unpark
    Object item;                // data; or null for REQUESTs
    int mode;

    SNode(Object item) {
        this.item = item;
    }

    boolean casNext(SNode cmp, SNode val) {
        return cmp == next &&
                        UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    boolean tryMatch(SNode s) {
        if (match == null &&
                UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
            Thread w = waiter;
            if (w != null) {    // waiters need at most one unpark
                waiter = null;
                LockSupport.unpark(w);
            }
            return true;
        }
        return match == s;
    }

            /**
             * Tries to cancel a wait by matching node to itself.
             */
    void tryCancel() {
        UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
    }

    boolean isCancelled() {
        return match == this;
    }

            // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long matchOffset;
    private static final long nextOffset;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = SNode.class;
            matchOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("match"));
            nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}
           

4.3.2、成员变量 和 构造方法

static final class TransferStack<E> extends Transferer<E> {

        /* Modes for SNodes, ORed together in node fields */
        /** Node represents an unfulfilled consumer */
    static final int REQUEST    = 0;
        /** Node represents an unfulfilled producer */
    static final int DATA       = 1;
        /** Node is fulfilling another unfulfilled DATA or REQUEST */
    static final int FULFILLING = 2;
    
    /** The head (top) of the stack */
    volatile SNode head;

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long headOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = TransferStack.class;
            headOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("head"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}
           

4.3.3、方法

isFulfilling(int m)

/** Returns true if m has fulfilling bit set. */
static boolean isFulfilling(int m) {
    return (m & FULFILLING) != 0; 
}
           

casHead(SNode h, SNode nh) 

boolean casHead(SNode h, SNode nh) {
    return h == head &&
                UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
}
           

 snode(SNode s, Object e, SNode next, int mode) 

static SNode snode(SNode s, Object e, SNode next, int mode) {
    if (s == null) 
        s = new SNode(e);
    s.mode = mode;
    s.next = next;
    return s;
}
           

transfer(E e, boolean timed, long nanos) 

E transfer(E e, boolean timed, long nanos) {
            /*
             * Basic algorithm is to loop trying one of three actions:
             *
             * 1. If apparently empty or already containing nodes of same
             *    mode, try to push node on stack and wait for a match,
             *    returning it, or null if cancelled.
             *
             * 2. If apparently containing node of complementary mode,
             *    try to push a fulfilling node on to stack, match
             *    with corresponding waiting node, pop both from
             *    stack, and return matched item. The matching or
             *    unlinking might not actually be necessary because of
             *    other threads performing action 3:
             *
             * 3. If top of stack already holds another fulfilling node,
             *    help it out by doing its match and/or pop
             *    operations, and then continue. The code for helping
             *    is essentially the same as for fulfilling, except
             *    that it doesn't return the item.
             */

            SNode s = null; // constructed/reused as needed
            int mode = (e == null) ? REQUEST : DATA;

            for (;;) {
                SNode h = head;
                if (h == null || h.mode == mode) {  // empty or same-mode
                    if (timed && nanos <= 0) {      // can't wait
                        if (h != null && h.isCancelled())
                            casHead(h, h.next);     // pop cancelled node
                        else
                            return null;
                    } else if (casHead(h, s = snode(s, e, h, mode))) {
                        SNode m = awaitFulfill(s, timed, nanos);
                        if (m == s) {               // wait was cancelled
                            clean(s);
                            return null;
                        }
                        if ((h = head) != null && h.next == s)
                            casHead(h, s.next);     // help s's fulfiller
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    }
                } else if (!isFulfilling(h.mode)) { // try to fulfill
                    if (h.isCancelled())            // already cancelled
                        casHead(h, h.next);         // pop and retry
                    else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                        for (;;) { // loop until matched or waiters disappear
                            SNode m = s.next;       // m is s's match
                            if (m == null) {        // all waiters are gone
                                casHead(s, null);   // pop fulfill node
                                s = null;           // use new node next time
                                break;              // restart main loop
                            }
                            SNode mn = m.next;
                            if (m.tryMatch(s)) {
                                casHead(s, mn);     // pop both s and m
                                return (E) ((mode == REQUEST) ? m.item : s.item);
                            } else                  // lost match
                                s.casNext(m, mn);   // help unlink
                        }
                    }
                } else {                            // help a fulfiller
                    SNode m = h.next;               // m is h's match
                    if (m == null)                  // waiter is gone
                        casHead(h, null);           // pop fulfilling node
                    else {
                        SNode mn = m.next;
                        if (m.tryMatch(h))          // help match
                            casHead(h, mn);         // pop both h and m
                        else                        // lost match
                            h.casNext(m, mn);       // help unlink
                    }
                }
            }
        }
           

awaitFulfill(SNode s, boolean timed, long nanos) 

SNode awaitFulfill(SNode s, boolean timed, long nanos) {
            /*
             * When a node/thread is about to block, it sets its waiter
             * field and then rechecks state at least one more time
             * before actually parking, thus covering race vs
             * fulfiller noticing that waiter is non-null so should be
             * woken.
             *
             * When invoked by nodes that appear at the point of call
             * to be at the head of the stack, calls to park are
             * preceded by spins to avoid blocking when producers and
             * consumers are arriving very close in time.  This can
             * happen enough to bother only on multiprocessors.
             *
             * The order of checks for returning out of main loop
             * reflects fact that interrupts have precedence over
             * normal returns, which have precedence over
             * timeouts. (So, on timeout, one last check for match is
             * done before giving up.) Except that calls from untimed
             * SynchronousQueue.{poll/offer} don't check interrupts
             * and don't wait at all, so are trapped in transfer
             * method rather than calling awaitFulfill.
             */
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Thread w = Thread.currentThread();
            int spins = (shouldSpin(s) ?
                         (timed ? maxTimedSpins : maxUntimedSpins) : 0);
            for (;;) {
                if (w.isInterrupted())
                    s.tryCancel();
                SNode m = s.match;
                if (m != null)
                    return m;
                if (timed) {
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        s.tryCancel();
                        continue;
                    }
                }
                if (spins > 0)
                    spins = shouldSpin(s) ? (spins-1) : 0;
                else if (s.waiter == null)
                    s.waiter = w; // establish waiter so can park next iter
                else if (!timed)
                    LockSupport.park(this);
                else if (nanos > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanos);
            }
        }
           

shouldSpin(SNode s) 

boolean shouldSpin(SNode s) {
    SNode h = head;
    return (h == s || h == null || isFulfilling(h.mode));
}
           

clean(SNode s) 

void clean(SNode s) {
            s.item = null;   // forget item
            s.waiter = null; // forget thread

            /*
             * At worst we may need to traverse entire stack to unlink
             * s. If there are multiple concurrent calls to clean, we
             * might not see s if another thread has already removed
             * it. But we can stop when we see any node known to
             * follow s. We use s.next unless it too is cancelled, in
             * which case we try the node one past. We don't check any
             * further because we don't want to doubly traverse just to
             * find sentinel.
             */

            SNode past = s.next;
            if (past != null && past.isCancelled())
                past = past.next;

            // Absorb cancelled nodes at head
            SNode p;
            while ((p = head) != null && p != past && p.isCancelled())
                casHead(p, p.next);

            // Unsplice embedded nodes
            while (p != null && p != past) {
                SNode n = p.next;
                if (n != null && n.isCancelled())
                    p.casNext(n, n.next);
                else
                    p = n;
            }
        }
           

4.4、WaitQueue 类

4.5、LifoWaitQueue 类

4.6、FifoWaitQueue 类

继续阅读