天天看点

Java线程池队列SynchronousQueue的详细原理分析-刘宇一、什么是SynchronousQueue?二、SynchronousQueue类的结构图三、SynchronousQueue的小Demo四、SynchronousQueue源码分析

Java线程池队列SynchronousQueue的详细原理分析-刘宇

  • 一、什么是SynchronousQueue?
  • 二、SynchronousQueue类的结构图
  • 三、SynchronousQueue的小Demo
  • 四、SynchronousQueue源码分析
    • 1、构造方法
    • 2、put方法
    • 3、take方法
    • 4、栈结构
      • 4.1、常量讲解
      • 4.2、TransferStack讲解
        • 4.2.1、前期代码
        • 4.2.2、核心代码
        • 4.2.3、线程阻塞的实现
        • 4.2.4、不公平策略下队列图解
      • 4.3、栈结构小总结
    • 5、队列结构
      • 5.1、前期代码
      • 5.2、核心代码
      • 5.3、线程阻塞的实现
      • 5.4、公平策略下队列图解
      • 5.5、出队代码
      • 5.5、队列结构总结

CSDN博客地址:https://blog.csdn.net/liuyu973971883

文章来源:转载,原文地址:https://blog.csdn.net/weixin_41622183/article/details/89283085,感谢这位老哥的辛勤付出,写的非常棒,各位看完别忘了给这位老哥点个赞啊。如有侵权,请联系删除。

一、什么是SynchronousQueue?

  • SynchronousQueue作为阻塞队列的时候,对于每一个take的线程会阻塞直到有一个put的线程放入元素为止,反之亦然。
  • 在SynchronousQueue内部没有任何存放元素的能力,可以理解为容量为 0。所以类似peek操作或者迭代器操作也是无效的,元素只能通过put类操作或者take类操作才有效。
  • SynchronousQueue支持支持生产者和消费者等待的公平性策略。默认情况下:非公平。
  • 如果是公平锁的话可以保证当前第一个队首的线程是等待时间最长的线程,这时可以视SynchronousQueue为一个FIFO队列。
  • SynchronousQueue 提供两种实现方式,分别是栈和队列的方式实现。这两种实现方式中,栈是属于非公平的策略,队列是属于公平策略。

二、SynchronousQueue类的结构图

Java线程池队列SynchronousQueue的详细原理分析-刘宇一、什么是SynchronousQueue?二、SynchronousQueue类的结构图三、SynchronousQueue的小Demo四、SynchronousQueue源码分析

三、SynchronousQueue的小Demo

public class TestSynchronousQueue {
    public static void main(String[] args) throws Exception {
        //使用非公平策略
//        SynchronousQueue synchronousQueue= new SynchronousQueue();
        //使用公平策略
        SynchronousQueue synchronousQueue= new SynchronousQueue(true);
        new Thread(()-> {
            try {
                synchronousQueue.put("A");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        //休眠一下,让异步线程完成
        Thread.sleep(1000);
        new Thread(()-> {
            try {
                synchronousQueue.put("B");
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
        }).start();
        //休眠一下,让异步线程完成
        Thread.sleep(1000);
        new Thread(()-> {
            try {
                Object take = synchronousQueue.take();
                System.out.println(take);
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
        }).start();
        //休眠一下,让异步线程完成
        Thread.sleep(1000);
        //不管如何输出,都是 0
        System.out.println(synchronousQueue.size());
    }
}
           

公平策略结果:

Java线程池队列SynchronousQueue的详细原理分析-刘宇一、什么是SynchronousQueue?二、SynchronousQueue类的结构图三、SynchronousQueue的小Demo四、SynchronousQueue源码分析

非公平策略结果:

Java线程池队列SynchronousQueue的详细原理分析-刘宇一、什么是SynchronousQueue?二、SynchronousQueue类的结构图三、SynchronousQueue的小Demo四、SynchronousQueue源码分析

为什么会出现这种情况?我们这里先不解释,先继续学习。

四、SynchronousQueue源码分析

1、构造方法

//默认构造,false 为非公平策略
public SynchronousQueue() {
    this(false);
}
//可选策略。可以看出使用的是不同形式的实现。
public SynchronousQueue(boolean fair) {
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
           

2、put方法

入队方法,该方法为阻塞方法
public void put(E e) throws InterruptedException {
	//入队时判断是否传入元素为空
    if (e == null) throw new NullPointerException();
    //入队
    if (transferer.transfer(e, false, 0) == null) {
        Thread.interrupted();
        throw new InterruptedException();
    }
}
           

3、take方法

出队方法,该方法为阻塞方法
public E take() throws InterruptedException {
	//出队穿个 null 就说明这个是出队
    E e = transferer.transfer(null, false, 0);
    if (e != null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}
           

4、栈结构

4.1、常量讲解

  • REQUEST:表示这是个请求节点,从队列中取数据的标识(方法有 take,poll)
  • DATA:表示这个是数据节点,插入数据到队列中的标识(方法有 offer,put)
  • FULFILLING:这个表示配对成功,只有一消费者和生产者进行配对成功后,才会更改为该状态
/** 表示这是个请求节点 */
static final int REQUEST    = 0;
/** 数据节点 */
static final int DATA       = 1;
/** 匹配成功后设置的节点 */
static final int FULFILLING = 2;
           

4.2、TransferStack讲解

4.2.1、前期代码

static final class TransferStack<E> extends Transferer<E> {
      	/** 表示这是个请求节点 */
        static final int REQUEST    = 0;
        /** 数据节点 */
        static final int DATA       = 1;
        /** 匹配成功后设置的节点 */
        static final int FULFILLING = 2;
        
        /** 内部维护的 SNode 类 */
        static final class SNode {
            volatile SNode next;        // next node in stack
            volatile SNode match;       // the node matched to this
            volatile Thread waiter;     // to control park/unpark
            Object item;                // data; or null for REQUESTs
            int mode;  //0表示请求节点,1表示数据节点
            SNode(Object item) {
                this.item = item;
            }
        //栈顶部指针
        volatile SNode head;
           

4.2.2、核心代码

由于入队出队没太大区别。代码都是调用同一个方法,唯一的不同是传入的值是否为空。下面看看该 transfer 方法:

E transfer(E e, boolean timed, long nanos) {
	//空节点
	SNode s = null; // constructed/reused as needed
	//判断是请求节点还是数据节点
	int mode = (e == null) ? REQUEST : DATA;
	
	for (;;) {
		//拿到头指针
	    SNode h = head;
	    //头指针为空,h.mode 默认是0,判断是否一致
	    if (h == null || h.mode == mode) {  // empty or same-mode
	    	//判断是否为有设置超时时间
	        if (timed && nanos <= 0) {      // can't wait
	        	//
	            if (h != null && h.isCancelled())
	                casHead(h, h.next);     // pop cancelled node
	            else
	                return null;
	        // 将当前节点压入栈
	        } else if (casHead(h, s = snode(s, e, h, mode))) {
	        	//进行线程堵塞
	            SNode m = awaitFulfill(s, timed, nanos);
	            if (m == s) {               // wait was cancelled
	            	//清除该节点
	                clean(s);
	                return null;
	            }
	            /**
	            * 线程池被唤醒后,这里需要 cas 设置一下头指针,配合出队线程
	            * 这里难理解,下面会用图解的形式解析
	            */
	            if ((h = head) != null && h.next == s)
	                casHead(h, s.next);     // help s's fulfiller
	            //返回配对的值
	            return (E) ((mode == REQUEST) ? m.item : s.item);
	        }
	     /**
	     * 这里表示的是出队操作
	     * 如果是入队操作走到这一步,说明压栈失败,继续 CAS 吧
	     */
	    } else if (!isFulfilling(h.mode)) { // try to fulfill
	    	//判断线程是否中断
	        if (h.isCancelled())            // already cancelled
	            casHead(h, h.next);         // pop and retry
	        //将当前节点压入栈
	        else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
	            for (;;) { // loop until matched or waiters disappear
	            	//拿到原来的栈头指针指向的节点
	                SNode m = s.next;       // m is s's match
	                //如果为空,说明被其他线程抢走了,重新 CAS 吧
	                if (m == null) {        // all waiters are gone
	                    casHead(s, null);   // pop fulfill node
	                    s = null;           // use new node next time
	                    break;              // restart main loop
	                }
	                //这里拿到配对的数据节点
	                SNode mn = m.next;
	                //尝试去匹配
	                if (m.tryMatch(s)) {
	                    casHead(s, mn);     // pop both s and m
	                    //匹配成功返回数据节点的值
	                    return (E) ((mode == REQUEST) ? m.item : s.item);
	               //匹配失败
	                } else                  // lost match
	                    s.casNext(m, mn);   // help unlink
	            }
	        }
	    //有其他线程在配对
	    } else {                            // help a fulfiller
	        SNode m = h.next;               // m is h's match
	        if (m == null)                  // waiter is gone
	            casHead(h, null);           // pop fulfilling node
	        else {
	            SNode mn = m.next;	
	            //尝试和其它线程竞争匹配		
	            if (m.tryMatch(h))          // help match
	            //配对成功就一起离开
	                casHead(h, mn);         // pop both h and m
	            else                        // lost match
	            	//匹配失败,肯定要擦屁股,将链接置空吧
	                h.casNext(m, mn);       // help unlink
	        }
	    }
	}
}
           

4.2.3、线程阻塞的实现

SNode awaitFulfill(SNode s, boolean timed, long nanos) {

	final long deadline = timed ? System.nanoTime() + nanos : 0L;
	Thread w = Thread.currentThread();
	//计算自旋次数
	int spins = (shouldSpin(s) ?
	             (timed ? maxTimedSpins : maxUntimedSpins) : 0);
	for (;;) {
		//判断是否中断
	    if (w.isInterrupted())
	        s.tryCancel();
	    //拿到匹配的节点
	    SNode m = s.match;
	    if (m != null)
	        return m;
	    if (timed) {
	        nanos = deadline - System.nanoTime();
	        if (nanos <= 0L) {
	            s.tryCancel();
	            continue;
	        }
	    }
	    //判断自旋次数是否大于 0 了
	    if (spins > 0)
	        spins = shouldSpin(s) ? (spins-1) : 0;
	    //如果没有匹配的节点,就保存当前阻塞的线程
	    else if (s.waiter == null)
	        s.waiter = w; // establish waiter so can park next iter
	    else if (!timed)
	    	//阻塞当前线程
	        LockSupport.park(this);
	    else if (nanos > spinForTimeoutThreshold)
	        LockSupport.parkNanos(this, nanos);
	}
}
           

4.2.4、不公平策略下队列图解

队列的初始状态:初始状态下,头部指针指向空。

Java线程池队列SynchronousQueue的详细原理分析-刘宇一、什么是SynchronousQueue?二、SynchronousQueue类的结构图三、SynchronousQueue的小Demo四、SynchronousQueue源码分析

假设来了一条线程 A,数据也是 A,头指针指向位置为空且我们使用是不会超时的 put 方法,那么会将数据压入栈中,并将指针指向栈顶,并将线程阻塞,如下图:

Java线程池队列SynchronousQueue的详细原理分析-刘宇一、什么是SynchronousQueue?二、SynchronousQueue类的结构图三、SynchronousQueue的小Demo四、SynchronousQueue源码分析

接下来再来一条线程 B,带的数据也是 B。那么还是会将数据压入栈中,并将该节点的 next 节点指向最先入栈的节点,并将头指针指向栈顶(也就是我们的数据 B),最后将线程阻塞。

Java线程池队列SynchronousQueue的详细原理分析-刘宇一、什么是SynchronousQueue?二、SynchronousQueue类的结构图三、SynchronousQueue的小Demo四、SynchronousQueue源码分析

如果我们在来一条线程 C,C 是出队线程。操作是一样的,这时,我们会将该取出节点压入栈中,将头指针指向出队的那个节点。结果图如下:

Java线程池队列SynchronousQueue的详细原理分析-刘宇一、什么是SynchronousQueue?二、SynchronousQueue类的结构图三、SynchronousQueue的小Demo四、SynchronousQueue源码分析

接下来,就是线程 C 要进行匹配了。这时,非公平策略的 坑爹之处 就出来了。它会和待匹配的线程进行匹配,但是我们知道栈的数据结构是先进后出,所以它就找了数据 B,为匹配对象,唤醒线程 B,线程 B 唤醒后继续走如下代码:

if ((h = head) != null && h.next == s)

   casHead(h, s.next); // help s’s fulfiller

   return (E) ((mode == REQUEST) ? m.item : s.item);

而出队的线程 C 则走如下代码:

if (m.tryMatch(s)) {

   casHead(s, mn); // 将两个节点弹出栈

   return (E) ((mode == REQUEST) ? m.item : s.item);

}

线程 C 拿到数据直接返回,线程 B 也直接返回。上述中 **casHead(s, mn); ** 就将头指针指向栈顶,也是就我们的线程 A。而线程 B 和线程 C 就 携手双飞 了,一起弹出栈。最后数据结果如下:

Java线程池队列SynchronousQueue的详细原理分析-刘宇一、什么是SynchronousQueue?二、SynchronousQueue类的结构图三、SynchronousQueue的小Demo四、SynchronousQueue源码分析

4.3、栈结构小总结

队列中非公平策略坑爹有一点,假设没有和它配对,或者每一次来一个配对对象时,都被另一个节点抢了,那就很悲催,一直呆在最底层没人要。

5、队列结构

5.1、前期代码

static final class TransferQueue<E> extends Transferer<E> {
    /** Node class for TransferQueue. */
    static final class QNode {
    	//下一个节点
        volatile QNode next;          // next node in queue
        //数据
        volatile Object item;         // CAS'ed to or from null
        //等待线程
        volatile Thread waiter;       // to control park/unpark
        //判断数据类型
        final boolean isData;

        QNode(Object item, boolean isData) {
            this.item = item;
            this.isData = isData;
        }

    /** 头指针 */
    transient volatile QNode head;
    /** 尾指针 */
    transient volatile QNode tail;

    transient volatile QNode cleanMe;
	//初始化时就构建了一个空节点
    TransferQueue() {
        QNode h = new QNode(null, false); // initialize to dummy node.
        head = h;
        tail = h;
    }
           

5.2、核心代码

队列结构的实现方式和栈结构的有很大不同,下面我们来看看具体实现

E transfer(E e, boolean timed, long nanos) {
	
	QNode s = null; // constructed/reused as needed
	boolean isData = (e != null);
	
	for (;;) {
		//尾节点
	    QNode t = tail;
	    //头结点
	    QNode h = head;
	    //没有初始化时都为空
	    if (t == null || h == null)         // saw uninitialized value
	        continue;                       // spin
		//判断节点类型,头节点为空或是入队类型才可入
	    if (h == t || t.isData == isData) { // empty or same-mode
	        QNode tn = t.next;
	        //判断尾节点是否有改变(可能有其他线程操作)
	        if (t != tail)                  // inconsistent read
	            continue;
	        //判断是否有其他线程添加了新节点
	        if (tn != null) {               // lagging tail
	        	//如果有就要设置一下尾节点指针
	            advanceTail(t, tn);
	            continue;
	        }
	        //时间的,这里可以不理会
	        if (timed && nanos <= 0)        // can't wait
	            return null;
	        //构建一个节点
	        if (s == null)
	            s = new QNode(e, isData);
	        //将节点加入队列中
	        if (!t.casNext(null, s))        // failed to link in
	            continue;
			//设置尾节点
	        advanceTail(t, s);              // swing tail and wait
	        //线程阻塞
	        Object x = awaitFulfill(s, e, timed, nanos);
	        //判断线程有没被中断,中断就清除节点
	        if (x == s) {                   // wait was cancelled
	            clean(t, s);
	            return null;
	        }
			//判断节点是否已经离开队列
	        if (!s.isOffList()) {           // not already unlinked
	        	//设置头指针
	            advanceHead(t, s);          // unlink if head
	            if (x != null)              // and forget fields
	                s.item = s;
	            //阻塞线程置空
	            s.waiter = null;
	        }
	        //返回存储的值
	        return (x != null) ? (E)x : e;
		//这里就是出队操作
	    } else {                            // complementary-mode
	        QNode m = h.next;               // node to fulfill
	        if (t != tail || m == null || h != head)
	            continue;                   // inconsistent read
			//拿到头节点下的下一个节点
	        Object x = m.item;
	        //判断是否被另一个线程匹配过了
	        if (isData == (x != null) ||    // m already fulfilled
	            x == m ||                   // m cancelled
	            !m.casItem(x, e)) {         // lost CAS
	            advanceHead(h, m);          // dequeue and retry
	            continue;
	        }
			//匹配成功就重新设置头指针
	        advanceHead(h, m);              // successfully fulfilled
	        //线程唤醒
	        LockSupport.unpark(m.waiter);
	        //返回值
	        return (x != null) ? (E)x : e;
	    }
	}
}
           

5.3、线程阻塞的实现

Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
	/* Same idea as TransferStack.awaitFulfill */
	final long deadline = timed ? System.nanoTime() + nanos : 0L;
	Thread w = Thread.currentThread();
	//计算要自旋的次数
	int spins = ((head.next == s) ?
	             (timed ? maxTimedSpins : maxUntimedSpins) : 0);
	for (;;) {
	    if (w.isInterrupted())
	        s.tryCancel(e);
	    Object x = s.item;
	    if (x != e)
	        return x;
	    if (timed) {
	        nanos = deadline - System.nanoTime();
	        if (nanos <= 0L) {
	            s.tryCancel(e);
	            continue;
	        }
	    }
	    //计算自旋次数
	    if (spins > 0)
	        --spins;
	        //保存阻塞线程
	    else if (s.waiter == null)
	        s.waiter = w;
	    else if (!timed)
	    	//线程阻塞
	        LockSupport.park(this);
	    else if (nanos > spinForTimeoutThreshold)
	        LockSupport.parkNanos(this, nanos);
	}
}
           

5.4、公平策略下队列图解

首先,我们在 new SynchronousQueue 队列使用公平策略的时候就已经构建了一个空节点。

Java线程池队列SynchronousQueue的详细原理分析-刘宇一、什么是SynchronousQueue?二、SynchronousQueue类的结构图三、SynchronousQueue的小Demo四、SynchronousQueue源码分析

对应的代码如下:

TransferQueue() {

  QNode h = new QNode(null, false); // initialize to dummy node.

  head = h;

  tail = h;

}

接下来我们来了线程 A,线程 A 携带的数据也是 A。这时我们调用 put 方法入队,经过一大堆的判断,我们将数据入队成功,将尾节点指针指向数据 A 的节点,并且将线程阻塞,等待被消费,结果如下:

Java线程池队列SynchronousQueue的详细原理分析-刘宇一、什么是SynchronousQueue?二、SynchronousQueue类的结构图三、SynchronousQueue的小Demo四、SynchronousQueue源码分析

我们可以看到头节点一直为一个空节点,目的是为了等待消费线程进来后,和最先入队的元素进行匹配。接下来线程 B 带着数据 B 又进来了,经过一大堆的判断,我们将数据入队成功,数据 A 的 next 指向数据 B,将尾节点指针指向数据 B 的节点,并且将线程阻塞,等待被消费,结果如下:

Java线程池队列SynchronousQueue的详细原理分析-刘宇一、什么是SynchronousQueue?二、SynchronousQueue类的结构图三、SynchronousQueue的小Demo四、SynchronousQueue源码分析

这样,我们就入队了两个节点了,接下来来了一条出队线程 C,C 来了却不是入队了,而是找最先入队的那个节点。找到入队节点后,会尝试进行配对,假设配对成功,那会将配对成功的线程 A 出队,返回数据 A,被唤醒的线程 A 会继续走余下的代码,上面解释过,原理差不多就不解释了。最后结果图如下:

Java线程池队列SynchronousQueue的详细原理分析-刘宇一、什么是SynchronousQueue?二、SynchronousQueue类的结构图三、SynchronousQueue的小Demo四、SynchronousQueue源码分析

5.5、出队代码

{                            // complementary-mode
    QNode m = h.next;               // node to fulfill
    if (t != tail || m == null || h != head)
        continue;                   // inconsistent read
	//拿到头节点下的下一个节点
    Object x = m.item;
    //判断是否被另一个线程匹配过了
    if (isData == (x != null) ||    // m already fulfilled
        x == m ||                   // m cancelled
        !m.casItem(x, e)) {         // lost CAS
        advanceHead(h, m);          // dequeue and retry
        continue;
    }
	//匹配成功就重新设置头指针
    advanceHead(h, m);              // successfully fulfilled
    //线程唤醒
    LockSupport.unpark(m.waiter);
    //返回值
    return (x != null) ? (E)x : e;
}
           

5.5、队列结构总结

队列结构和栈结构是类似的,都是需要配对进行离开,基本原理差不多。不同之处就是现进先出和先进后出的问题。