天天看点

并发队列ConcurrentLinkedQueue与LinkedBlockingQueue源码分析与对比

目录

  • 前言
  • ConcurrentLinkedQueue
    • 使用方法
    • 存储结构
    • 初始化
    • 入队
    • 出队
    • 获取容器元素数量
  • LinkedBlockingQueue
  • ConcurrentLinkedQueue与LinkedBlockingQueue比较
    • 简单比较
    • 性能比较测试
  • 参考链接

之前在项目中使用到了并发队列,场景为多写多读,查阅资料推荐使用ConcurretLinkedQueue,但不知道为什么。这里对并发队列ConcurrentLinkedQueue与LinkedBlockingQueue的源码做一个简单分析,比较一下两者差别,并测试在不同并发请求下读写的性能差异。使用的JDK版本为1.8。

使用方法很简单,该类实现了Queue接口,提供了offer()、poll()等入队和出队的操作接口。

多线程环境下的使用如下:

// 无界并发队列
ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
// 模拟n个线程竞争环境
int n = 100;
CountDownLatch countDownLatch = new CountDownLatch(n);
for (int i = 0; i < n; i++) {
    int finalI = i;
    new Thread(()->{
        // 进行10000次的写操作
        for (int j = 0; j < 10000; j++) {
            queue.add(j);
        }
        // 进行10000次的读操作
        for (int j = 0; j < 10000; j++) {
            queue.poll();
        }
        // 该线程结束读写请求
        System.out.println("Thread-"+ finalI +"结束");
        countDownLatch.countDown();
    }).start();
}
// 直到所有线程结束读写
countDownLatch.await();
// 验证并发队列中元素是否清空
System.out.println("队列已清空:"+queue.isEmpty());
           

输出结果如下:

Thread-0结束
...........
Thread-55结束
队列已清空:true
           

该类使用了Node类来表示队列中的节点,包含一个volatile修饰的类型为传入泛型的item成员(节点存储的值)和volatile修饰的next指针。同时引入了Unsafe组件,使用了其CAS方法来替换item和next。其中lazySetNext()方法保证了volatile的语义,该次修改对下次读是可见的。

private static class Node<E> {
    volatile E item;
    volatile Node<E> next;
    Node(E item) {
        UNSAFE.putObject(this, itemOffset, item);
    }
    // CAS替换节点的值,返回是否成功
    boolean casItem(E cmp, E val) {
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }
    // 给next引用赋值,这个方法保证了volatile的语义,即该修改对next读取是可见的
    void lazySetNext(Node<E> val) {
        UNSAFE.putOrderedObject(this, nextOffset, val);
    }
    // CAS替换next引用,返回是否成功
    boolean casNext(Node<E> cmp, Node<E> val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }
    // unsafe类引入和相关静态代码
    ...
}
           

默认初始化方法如下:

public ConcurrentLinkedQueue() {
    // 创建空的头尾节点
    head = tail = new Node<E>(null);
}
           

还有一个基于已有集合的初始化方法,大致流程为:依次取出集合元素;检查是否为null;构建新节点;采用尾插法插入到链表尾部。

public ConcurrentLinkedQueue(Collection<? extends E> c) {
    Node<E> h = null, t = null;
    for (E e : c) {
        // 检查元素是否为null
        checkNotNull(e);
        // 基于集合中的元素构建新节点
        Node<E> newNode = new Node<E>(e);
        // 第一个元素设置为头尾结点
        if (h == null)          
            h = t = newNode;
        else {     // 其余元素采用尾插法插入      
            t.lazySetNext(newNode);
            t = newNode;
        }
    }
    // 集合为空集合时,新建值为nul的头尾节点
    if (h == null)
        h = t = new Node<E>(null);
    head = h;
    tail = t;
}
           

public boolean offer(E e) {
    // 确保元素非null,为null时抛出NullPointer异常
    checkNotNull(e);
    // 基于传入值构造新节点
    final Node<E> newNode = new Node<E>(e);

    // 自旋,直到入队成功
    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
        // case1:此时p为队尾节点,q=null
        if (q == null) {
            // 通过cas的方式设置新节点为p的后继节点
            // 如果失败,说明此时p已不再是队尾结点,继续进行自旋
            // 如果成功,尝试修改tail后返回true
            if (p.casNext(null, newNode)) {

                // p != t代表此时p和第一次循环时相比已经向后移动了,此时就通过CAS的方式将tail节点修改为newNode
                // 失败了也没关系,代表有其他线程已经修改了tail
                if (p != t) // hop two nodes at a time
                    casTail(t, newNode);
                return true;
            }
        }
        // case2:p=q,表示是删除的节点
        else if (p == q)

            // t != (t = tail) 说明t!=tail,tail节点已经更新过,此时就使用tail赋值给p,然后继续自旋
            // 否则说明tail没有更新过,指向出队的节点。这时就使用head赋值给p,然后继续自旋
            p = (t != (t = tail)) ? t : head;
        // case3:p不是队尾节点,也没有出队。就更新p,然后继续自旋
        else
            // case3.1:p!=t且t!=tail时,说明tail节点更新过,让p重新指向tail节点
            // case3.2:否则,p往后移动一位,指向q
            p = (p != t && t != (t = tail)) ? t : q;
    }
}
           

入队的逻辑看起来比较复杂,其核心思想就是自旋+cas的方式将新节点插入到队尾节点的后面。

这里就按第一次入队和第二次入队两种情况分析一下:

  • 第一次入队

首先检查非空,然后构造新节点。

t和p都指向tail节点,q为null。此时进入case1:尝试CAS设置p.next为newNode。

成功的话,说明节点入队成功了。然后直接返回true

失败的话,说明p.next!=null,p不是队尾节点了,这时就自旋,q=p.next,然后会进入case3.2的逻辑,更新p。再次自旋,q=p.next,然后会进入case1的逻辑,然后重复上面一样的操作,直到CAS设置成功。

  • 第二次入队

tail节点指向倒数第二个节点,t和p指向tail,q指向最后一个节点。此时进入case3:,执行case3.2的逻辑,p = q。

然后自旋后,q=p.next,进入case1,然后CAS设置p.next为newNode。成功了的话,会发现p!=t,执行重置tail节点的操作,该操作失败了说明有其他线程重置了,所以也ok。之后返回true。

// 将原head(h指向head节点)更新为p
// 并将原head节点next指向自己,表示当前节点已经出队
final void updateHead(Node<E> h, Node<E> p) {
    if (h != p && casHead(h, p))    // 将head通过CAS的方式更新为p
        h.lazySetNext(h);   // 将h节点的next指向自己,表示出队
}

public E poll() {
    restartFromHead:
    // 大循环
    for (;;) {
        // 自旋
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;

            // case1:p指向节点为第一个有元素节点(实质上要出队的节点)
            // cas的方式设置item,失败了的话说明有其他线程将该接节点出队了,会再次自旋
            if (item != null && p.casItem(item, null)) {
                // Successful CAS is the linearization point
                // for item to be removed from this queue.
                // p!=h,表示p已经向后移动了。此时
                if (p != h) // hop two nodes at a time
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            // case2:如果p的后继节点为null,表示p已经是最后一个节点,无节点可出队了
            else if ((q = p.next) == null) {
                // 更新头节点为p,然后返回null
                updateHead(h, p);
                return null;
            }
            // case3:p=q,表示p和q指向的节点已经出队,通过p和q已无法找到头节点,这时需要重新去获取head节点
            else if (p == q)
                // 回到大循环中重新开始小循环自旋
                continue restartFromHead;
            // case4:将p指向q,实质上是q往后移动一位
            else
                p = q;
        }
    }
}
           

出队的核心思想就是找到头节点,CAS将其item设置为null。如果成功的话,就可以出队了,如果失败了,就自旋再次寻找头结点。

这里也分析一下出队执行步骤:

最开始的时候,head节点的item应该是null(queue初始化方法创建的节点)。第一次循环,h和p指向head节点。

如果此时队列中没有元素,会进入case2,直接更新head节点后返回null。

如果队列中有元素,会进入case4,将q向后移动,然后再次自旋,进行case1的判断。如果case1中item!=null且cas设置成功,则表示出队成功,返回出队元素。如果cas设置失败,则继续自旋寻找头结点出队。直至出队成功,同时如果p!=h,会更新下头结点。在自旋的过程中,如果当前节点已经被出队了,会进入case3,然后回到大循环重新寻找head节点。

// 返回p的后继节点,如果p已经出队(next指向自身),则返回head节点
final Node<E> succ(Node<E> p) {
    Node<E> next = p.next;
    // 当一个节点从队列删除后,其next指针会指向自己。此时就返回head节点
    return (p == next) ? head : next;
}

// 获取队首节点
Node<E> first() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            boolean hasItem = (p.item != null);
            // p节点有元素,或者p节点为最后一个节点
            if (hasItem || (q = p.next) == null) {
                // 更新头结点
                updateHead(h, p);
                // p节点有元素返回p,无元素代表p是最后一个节点,返回null
                return hasItem ? p : null;
            }
            // 如果p已经出队,重新回到大循环
            else if (p == q)
                continue restartFromHead;
            // p向后移动一位
            else
                p = q;
        }
    }
}

public int size() {
    int count = 0;
    // 获取首元素后,遍历后继节点的数量
    for (Node<E> p = first(); p != null; p = succ(p))
        if (p.item != null)
            // Collection.size() spec says to max out
            if (++count == Integer.MAX_VALUE)
                break;
    return count;
}

           

可以看到计算的大小不是非常准确的,从获取到首节点开始后,一直遍历到尾结点。期间增加的节点都能被统计进入,出队的节点则不计入数量。所以计算的数量>=计算完成时刻的实际数量。

LinkedBlockingQueue实现了Queue接口,也提供了offer和poll等方法。同时也提供了put和带时间参数的offer和pool方法。简单示例如下:

// 无界并发队列
LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(3);
// 插入一个元素,容量满时会失败
queue.offer(1);
// 插入一个元素,容量满时最多等待2s
queue.offer(2, 2, TimeUnit.SECONDS);
// 插入一个元素,容量满时会一直等待,直到能够入队
queue.put(3);
// 取出一个元素,无元素时返回null
queue.poll();
// 取出一个元素,无元素时最多等待2s
queue.poll(2, TimeUnit.SECONDS);
           

使用了Node节点存储元素,不过没有UNSAFE组件,没有CAS操作。后面也可以看到,使用了可重入锁(独占锁),所以不需要考虑多线程同时修改属性的情况。

static class Node<E> {
    E item;

    Node<E> next;

    Node(E x) { item = x; }
}
           

使用了head和last表示队列的头部和尾部节点,使用了入队锁和出队锁两个锁来实现同一时刻只有一个元素入队,同一时刻只有一个元素出队。使用了AotomicInteger类来表示队列中的元素个数。

transient Node<E> head;

private transient Node<E> last;

private final int capacity;

private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
           

默认初始化方法,设置容量为Integer.MAX_VALUE

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}
           

还有一个基于已有集合的初始化方法,大致思路为:

1.加上putLock入队锁;

2.遍历集合的所有元素,然后依次添加到队列中。

3.解锁。

由于使用了ReentrantLock,同一时刻只有单个线程入队,所以不用考虑并发问题。新增一个节点,然后将该节点添加到last节点后,最后更新last节点即可。

offer方法源码解析如下:需要注意,当入队时容量达到最大容量,会入队失败。

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    // 当前容量已满时,直接返回false
    if (count.get() == capacity)
        return false;
    int c = -1;
    // 构建新节点
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    // 入队锁加锁,已经被其它线程加锁时,当前线程会park挂起
    putLock.lock();
    try {
        // 只有当前元素个数<capacity,才能入队
        if (count.get() < capacity) {
            // 执行入队操作
            enqueue(node);
            // count数量+1
            c = count.getAndIncrement();
            // 如果当前元素个数<capacity,表示还可以继续入队
            if (c + 1 < capacity)
                // 唤醒一个在notFull的条件等待队列中的线程
                notFull.signal();
        }
    } finally {
        // 入队锁解锁
        putLock.unlock();
    }
    // 如果此时元素数量为1,表示可以出队
    if (c == 0)
        // 唤醒一个在notEmpty的条件等待队列中的线程
        signalNotEmpty();
    // c>=表示入队成功,返回true,反之入队失败,返回false
    return c >= 0;
}

// 节点入队,加到队尾节点,然后更新last
private void enqueue(Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node;
}
           

put方法相对于offer方法,多了一个等待逻辑,当元素数量达到最大容量时,会一直等待,直到能够入队。

putLock.lockInterruptibly();
try {
    // 多了一个等待的过程
    // 如果容量已满,当前线程park并进入notFull的条件等待队列
    while (count.get() == capacity) {
        notFull.await();
    }
    enqueue(node);
    c = count.getAndIncrement();
    if (c + 1 < capacity)
        notFull.signal();
} finally {
    putLock.unlock();
}
           

同一时刻只有单个线程出队,所以不用考虑并发问题。

public E poll() {
    final AtomicInteger count = this.count;
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    // 出队锁加锁
    takeLock.lock();
    try {
        // 只有数量>0时才能出队
        if (count.get() > 0) {
            // 执行出队操作
            x = dequeue();
            // 容器数量-1
            c = count.getAndDecrement();
            // 当容器数量>=1时,唤醒notEmpty条件队列中等待的一个线程
            if (c > 1)
                notEmpty.signal();
        }
    } finally {
        // 出队锁释放
        takeLock.unlock();
    }
    // 表示当前数量<capacity时,容器未满,唤醒notFull条件队列中等待的一个线程
    if (c == capacity)
        signalNotFull();
    return x;
}
// 节点出队操作
 private E dequeue() {
    // 获取队首节点以及下一个节点(队首节点值都是null,下一个节点才是真正有元素的节点)
    Node<E> h = head;
    Node<E> first = h.next;
    // h节点next指向自身,表示出队
    h.next = h;
    // 更新head节点
    head = first;
    // 返回第一个实际节点的值并重置为null(head节点的item都是null)
    E x = first.item;
    first.item = null;
    return x;
}
           

take方法相比于poll,多了一个等待逻辑,当元素数量=0时,会一直等待,直到能够入队。

takeLock.lockInterruptibly();
    try {
        // 多了一个等待的过程
        // 如果数量=0,当前线程park并进入notEmpty的条件等待队列
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
           

直接获取原子变量capacity的值即可。由于入队和出队对数量大小的修改都是原子的,所以获取的数量大小是十分准确的,为当前时刻容器元素数量。

public int size() {
    return count.get();
}
           

通过之前的介绍,可以发现

  1. ConcurrentLinkedQueue是一个无界队列,最大长度为Integer.MAX_VALUE;LinkedBlockingQueue是一个有界队列(不设置长度时为Integer.MAX_VALUE),在达到最大容量后添加元素有可能会失败(使用offer方法入队会失败,put方法入队会一直等待)。
  2. ConcurrentLinkedQueue全程是没有线程阻塞的,通过自旋+CAS的方式入队和出队(不达目的不罢休);而LinkedBlockingQueue同一时刻只能有一个线程执行入队操作或出队操作,通过入队锁和出队锁实现(ReentrantLock+Condition)。

ConcurrentLinkedQueue全程是无锁的,而LinkedBlockingQueue多线程出入队时会有挂起和唤醒线程的操作,会进行线程的上下文切换,相对来说更耗时。

这里设置了几组不同的线程数量和并发读取次数,来测试各自的完成时间,每组数据测试5次,取平均数据。使用了同一台机器(4核CPU)进行测试。

代码设计如下:

// 无界并发队列
ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();

long startTime = System.currentTimeMillis();
// 模拟n个线程竞争环境,各自完成m次插入和查找操作,计算最终完成时间
int n = 10;
// 读写次数
int m = 10000;
// 线程执行完成的计数器
CountDownLatch countDownLatch = new CountDownLatch(n);
// 控制所有线程同时运行
CyclicBarrier cyclicBarrier = new CyclicBarrier(n);
for (int i = 0; i < n; i++) {
    int finalI = i;
    new Thread(()->{
        // 等待信号量的改变
        try {
            cyclicBarrier.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
        // 进行100000次的写操作
        for (int j = 0; j < m; j++) {
            queue.add(j);
        }
        // 进行1000000次的读操作
        for (int j = 0; j < m; j++) {
            queue.poll();
        }
        // 该线程结束读写请求
        System.out.println("Thread-"+ finalI +"结束");
        countDownLatch.countDown();
    }).start();
}

// 直到所有线程结束读写,计算时间
countDownLatch.await();
long endTime = System.currentTimeMillis();
long costTime = endTime - startTime;
System.out.println("所用时间:" + costTime + "ms");
// 验证并发队列中元素是否清空
System.out.println("队列已清空:"+queue.isEmpty());

该次运行结果:
Thread-9结束
...
Thread-8结束
所用时间:78ms
队列已清空:true
           

最终测试得到结果:

LinkedBlockingQueue测试结果(ms):

线程数量\读取次数 10000 50000 100000
10 94 125 187
50 167 800 3109
100 266 1332 6168
200 503 5374 11365
78 156 249
172 594 1375
250 828 3343
437 1656 6300