


  • 1.資料結構
    • 1.等待隊列:waitQueue
      • 1.公平隊列:FifoWaitQueue
      • 2.非公平隊列:LifoWaitQueue
    • 2.資料傳輸方式:Transferer
      • 1.TransferStack
      • 2.TransferQueue
  • 2.方法說明
    • 1.轉換(重點):transfer
      • 1.區分操作模式:mode
      • 2.頭節點和目前是相同模式
      • 3.頭結點和目前目前不是相同模式
      • 4.目前節點是輔助模式(可能從上一步來)
      • 5.圖解
    • 2. 擷取一個元素:take
    • 3.加入隊列:put
  • 3.總結




    private WaitQueue waitingProducers;
    private WaitQueue waitingConsumers;



static class FifoWaitQueue extends WaitQueue {
        private static final long serialVersionUID = -3623113410248163686L;



static class LifoWaitQueue extends WaitQueue {
        private static final long serialVersionUID = -3633113410248163686L;



Performs a put or take.


     * Shared internal API for dual stacks and queues.
    abstract static class Transferer<E> {
         * Performs a put or take.
         * @param e if non-null, the item to be handed to a consumer;
         *          if null, requests that transfer return an item
         *          offered by producer.
         * @param timed if this operation should timeout
         * @param nanos the timeout, in nanoseconds
         * @return if non-null, the item provided or received; if null,
         *         the operation failed due to timeout or interrupt --
         *         the caller can distinguish which of these occurred
         *         by checking Thread.interrupted.
        abstract E transfer(E e, boolean timed, long nanos);




static final class SNode {
            volatile SNode next;        // next node in stack
            volatile SNode match;       // the node matched to this
            volatile Thread waiter;     // to control park/unpark
            Object item;                // data; or null for REQUESTs
            int mode;



TransferQueue() {
            QNode h = new QNode(null, false); // initialize to dummy node.
            head = h;
            tail = h;

        static final class QNode {
            volatile QNode next;          // next node in queue
            volatile Object item;         // CAS'ed to or from null
            volatile Thread waiter;       // to control park/unpark
            final boolean isData;






E transfer(E e, boolean timed, long nanos) {
            // constructed/reused as needed
            SNode s = null;
            int mode = (e == null) ? REQUEST : DATA;

            for (; ; ) {
                SNode h = head;
                // empty or same-mode
                if (h == null || h.mode == mode) {
                    // can't wait
                    if (timed && nanos <= 0) {
                        // 如果頭節點不為空且是取消狀态
                        if (h != null && h.isCancelled()) {
                            // pop cancelled node
                            casHead(h, h.next);
                        } else {
                            return null;
                    else if (casHead(h, s = snode(s, e, h, mode))) {
                        SNode m = awaitFulfill(s, timed, nanos);
                        if (m == s) {
                            // wait was cancelled
                            return null;
                        if ((h = head) != null && h.next == s) {
                            // help s's fulfiller
                            casHead(h, s.next);
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    // try to fulfill
                else if (!isFulfilling(h.mode)) {
                    // already cancelled
                    if (h.isCancelled()) {
                        // pop and retry
                        casHead(h, h.next);
                    //這邊應該是頭結點 不是cancelled的,s入棧變成頭結點
                    else if (casHead(h, s = snode(s, e, h, FULFILLING | mode))) {
                        // loop until matched or waiters disappear
                        for (; ; ) {
                            // m is s's match
                            SNode m = s.next;
                            // all waiters are gone
                            if (m == null) {
                                // pop fulfill node
                                casHead(s, null);
                                // use new node next time
                                s = null;
                                // restart main loop
                            SNode mn = m.next;
                            if (m.tryMatch(s)) {
                                // pop both s and m
                                casHead(s, mn);
                                return (E) ((mode == REQUEST) ? m.item : s.item);
                                // lost match
                            } else {
                                // help unlink
                                s.casNext(m, mn);
                    // help a fulfiller
                else {
                    // m is h's match
                    SNode m = h.next;
                    // waiter is gone
                    if (m == null) {
                        // pop fulfilling node
                        casHead(h, null);
                    } else {
                        SNode mn = m.next;
                        // help match
                        if (m.tryMatch(h)) {
                            // pop both h and m
                            casHead(h, mn);
                        // lost match
                        else {
                            // help unlink
                            h.casNext(m, mn);



int mode = (e == null) ? REQUEST : DATA;




        先判斷是否逾時,如果逾時的話,判斷頭部節點不為空,并且被取消(比對的節點是自己,也就是說沒有比對到可以進行交換的節點),通過cas方式把頭部換成下一個節點。如果頭部節點為空, 說明沒有隊列了,直接傳回null;



         if (timed && nanos <= 0) {
             // 如果頭節點不為空且是取消狀态
             if (h != null && h.isCancelled()) {
                 // pop cancelled node
                 casHead(h, h.next);
             } else {
                 return null;
         else if (casHead(h, s = snode(s, e, h, mode))) {
             SNode m = awaitFulfill(s, timed, nanos);
             if (m == s) {
                 // wait was cancelled
                 return null;
             if ((h = head) != null && h.next == s) {
                 // help s's fulfiller
                 casHead(h, s.next);
             return (E) ((mode == REQUEST) ? m.item : s.item);
         // try to fulfill



        然後進入死循環,出口隻有一個就是 match!=null的時候,傳回match值。





SNode awaitFulfill(SNode s, boolean timed, long nanos) {
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Thread w = Thread.currentThread();
            int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0);
            for (; ; ) {
                if (w.isInterrupted()){
                    // Tries to cancel a wait by matching node to itself.
                    //1 - 嘗試通過将節點與自身比對來取消等待。
                SNode m = s.match;
                if (m != null){
                    return m;
                if (timed) {
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        //2 - 嘗試通過将節點與自身比對來取消等待。

                if (spins > 0){
                    spins = shouldSpin(s) ? (spins - 1) : 0;
                else if (s.waiter == null){
                    // establish waiter so can park next iter
                    s.waiter = w;
                else if (!timed){
                else if (nanos > spinForTimeoutThreshold){
                    LockSupport.parkNanos(this, nanos);








// already cancelled
            if (h.isCancelled()) {
                // pop and retry
                casHead(h, h.next);
            //這邊應該是頭結點 不是cancelled的,s入棧變成頭結點
            else if (casHead(h, s = snode(s, e, h, FULFILLING | mode))) {
                // loop until matched or waiters disappear
                for (; ; ) {
                    // m is s's match
                    SNode m = s.next;
                    // all waiters are gone
                    if (m == null) {
                        // pop fulfill node
                        casHead(s, null);
                        // use new node next time
                        s = null;
                        // restart main loop
                    SNode mn = m.next;
                    if (m.tryMatch(s)) {
                        // pop both s and m
                        casHead(s, mn);
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                        // lost match
                    } else {
                        // help unlink
                        s.casNext(m, mn);
            // help a fulfiller



boolean tryMatch(SNode s) {
                if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
                    Thread w = waiter;
                    // waiters need at most one unpark
                    if (w != null) {
                        waiter = null;
                    // 傳回true
                    return true;
                return match == s;






                    // m is h's match
                    SNode m = h.next;
                    // waiter is gone
                    if (m == null) {
                        // pop fulfilling node
                        casHead(h, null);
                    } else {
                        SNode mn = m.next;
                        // help match
                        if (m.tryMatch(h)) {
                            // pop both h and m
                            casHead(h, mn);
                        // lost match
                        else {
                            // help unlink
                            h.casNext(m, mn);



2. 擷取一個元素:take

public E take() throws InterruptedException {
        E e = transferer.transfer(null, false, 0);
        if (e != null){
            return e;
        throw new InterruptedException();




public void put(E e) throws InterruptedException {
        if (e == null) {
            throw new NullPointerException();
        if (transferer.transfer(e, false, 0) == null) {
            throw new InterruptedException();


