天天看点

阻塞与非阻塞队列

作者:大数据与人工智能分享

如何实现一个非阻塞队列?阻塞和非阻塞算法有什么区别?

当讨论阻塞和非阻塞算法时所使用的术语可能会令人困惑,因此让我们从并发领域的术语开始,通过以下图表对其进行梳理。

阻塞:

阻塞算法使用锁。线程A首先获取锁,而如果线程A在持有锁的情况下被挂起,则线程B可能会等待任意长的时间。这种算法可能会导致线程B挨饿。

非阻塞:

非阻塞算法允许线程A访问队列,但线程A必须在一定步骤内完成任务。其他线程,如线程B,可能会因被拒绝而挨饿。

这是阻塞和非阻塞算法的主要区别:阻塞算法会阻塞线程B,直到锁被释放。非阻塞算法会通知线程B访问被拒绝。

免饥饿:

线程饥饿指线程无法访问某些共享资源且无法继续执行。免饥饿指这种情况不会发生。

等待自由:

所有线程都可以在有限的步骤内完成任务。

- = - + -

➡️ 非阻塞队列实现

我们可以使用比较和交换(CAS)来实现非阻塞队列。下面的图表说明了算法。

阻塞与非阻塞队列

➡️ 好处

1.无线程挂起。线程B可以立即获得响应,然后决定下一步该做什么。这样,线程的延迟大大降低了。2.无死锁。线程A和B不等待锁释放,因此不可能发生死锁。

➡️ 代码示例

import java.util.concurrent.atomic.AtomicReference;

public class NonBlockingQueue<T> {
    private static class Node<T> {
        final T value;
        AtomicReference<Node<T>> next;

        public Node(T value) {
            this.value = value;
            this.next = new AtomicReference<>(null);
        }
    }

    private AtomicReference<Node<T>> head, tail;

    public NonBlockingQueue() {
        Node<T> dummy = new Node<>(null);
        head = new AtomicReference<>(dummy);
        tail = new AtomicReference<>(dummy);
    }

    public boolean enqueue(T value) {
        Node<T> node = new Node<>(value);
        while (true) {
            Node<T> last = tail.get(), next = last.next.get();
            if (last == tail.get()) {
                if (next == null) {
                    if (last.next.compareAndSet(null, node)) {
                        tail.compareAndSet(last, node);
                        return true;
                    }
                } else {
                    tail.compareAndSet(last, next);
                }
            }
        }
    }

    public T dequeue() {
        while (true) {
            Node<T> first = head.get(), last = tail.get(), next = first.next.get();
            if (first == head.get()) {
                if (first == last) {
                    if (next == null) {
                        return null;
                    }
                    tail.compareAndSet(last, next);
                } else {
                    T value = next.value;
                    if (head.compareAndSet(first, next)) {
                        return value;
                    }
                }
            }
        }
    }
}           

在这个实现中,我们使用了一个Node类表示队列中的一个节点。每个节点保存了一个值,以及一个AtomicReference类型的next变量,表示这个节点的下一个节点。

•enqueue

我们使用AtomicReference类型的head和tail来分别指向队列的头和尾。在enqueue()方法中,我们首先创建一个新的节点,并不断循环尝试将其添加到队列尾部。在每次循环中,我们首先获取当前队列的最后一个节点last以及它的下一个节点next,然后判断是否有其他线程在同时修改队列,如果没有,则通过compareAndSet()方法将新节点添加到last的next中,如果成功,则通过compareAndSet()方法将tail指向新节点,并返回true,表示添加成功。如果失败,则尝试将tail指向当前队列的最后一个节点。

•dequeue

在dequeue()方法中,我们首先不断循环尝试获取队列头和尾,以及队列头的下一个节点。然后,我们判断队列是否为空,如果为空则返回null。否则,我们返回队列头的下一个节点的值,并通过compareAndSet()方法将head指向下一个节点,完成出队操作。如果失败,则继续循环。

这个实现保证了非阻塞特性,因为在enqueue()和dequeue()方法中,每个线程都可以通过CAS操作进行无锁操作。

继续阅读