天天看点

ConcurrentLinkedQueue详解与源码分析

作者:四季更新221789896

ConcurrentLinkedQueue详解

ConcurrentLinkedQueue是Java中的一个线程安全的队列,它是基于链表实现的。作为一个线程安全的队列,ConcurrentLinkedQueue支持并发地插入、删除和检查元素,而不需要使用显式的锁来保护共享资源。这样,ConcurrentLinkedQueue可以在并发环境中提供更好的性能和扩展性。

ConcurrentLinkedQueue的实现是基于单向链表的,每个节点都包含了一个元素和指向下一个节点的引用。由于队列的头和尾部可能同时被多个线程访问,因此ConcurrentLinkedQueue使用了两个节点(head和tail)来分别表示队列的头和尾,并使用volatile类型的变量来保证它们的可见性。

ConcurrentLinkedQueue提供了以下方法:

  1. add()和offer()方法:用于将元素添加到队列的尾部。
  2. poll()和peek()方法:用于获取并删除队列的头部元素,或者只是获取头部元素而不删除它。
  3. size()方法:用于获取队列中的元素数量。
  4. isEmpty()方法:用于判断队列是否为空。
  5. iterator()方法:用于获取一个迭代器,可以遍历队列中的元素。

需要注意的是,ConcurrentLinkedQueue并不保证元素的顺序是FIFO(先进先出),因为多个线程可以同时对队列的头和尾进行操作。如果需要保证严格的FIFO顺序,可以考虑使用其他的数据结构,例如ArrayBlockingQueue或LinkedBlockingQueue。

总之,ConcurrentLinkedQueue是一个线程安全的队列,它提供了高效的并发操作,可以在并发环境中提供更好的性能和扩展性。但是需要注意的是,它并不保证元素的顺序是严格的FIFO顺序。

ConcurrentLinkedQueue源码分析

以下是ConcurrentLinkedQueue的源码分析:

ConcurrentLinkedQueue是基于单向链表实现的,每个节点包含了一个元素和指向下一个节点的引用。由于队列的头和尾部可能同时被多个线程访问,因此ConcurrentLinkedQueue使用了两个节点(head和tail)来分别表示队列的头和尾,并使用volatile类型的变量来保证它们的可见性。

ConcurrentLinkedQueue的构造方法:

public ConcurrentLinkedQueue() {
    head = tail = new Node<E>(null);
}           

在构造方法中,初始化了head和tail节点,并将它们的元素设置为null。这样,head和tail节点都是一个空节点,不包含任何元素。

ConcurrentLinkedQueue的add()方法:

public boolean add(E e) {
    return offer(e);
}           

add()方法实际上是调用了offer()方法,将元素添加到队列的尾部。

ConcurrentLinkedQueue的offer()方法:

public boolean offer(E e) {
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);
    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
        if (q == null) {
            // p is last node
            if (p.casNext(null, newNode)) {
                // Successful CAS is the linearization point
                // for e to become an element of this queue,
                // and for newNode to become "live".
                if (p != t) // hop two nodes at a time
                    casTail(t, newNode);  // Failure is OK.
                return true;
            }
            // Lost CAS race to another thread; re-read next
        }
        else if (p == q)
            // We have fallen off list.  If tail is unchanged, it
            // will also be off-list, in which case we need to
            // jump to head, from which all live nodes are always
            // reachable.  Else the new tail is a better bet.
            p = (t != (t = tail)) ? t : head;
        else
            // Check for tail updates after two hops.
            p = (p != t && t != (t = tail)) ? t : q;
    }
}           

offer()方法的实现比较复杂,它使用了一个while循环来不断地尝试将新节点添加到队列的尾部。具体来说,它会先创建一个新的节点newNode,并使用CAS(compare-and-swap)操作将它添加到tail节点的后面。如果CAS操作成功,就表示新节点已经成功添加到队列的尾部,并返回true。如果CAS操作失败,表示有其他线程已经先一步将新节点添加到队列的尾部了,此时offer()方法会重新获取tail节点的引用,然后继续尝试添加新节点。

需要注意的是,offer()方法中的for循环使用了一个局部变量p来表示当前节点,它的初始值为tail节点。在每次循环中,p会先获取它的下一个节点q,然后判断q是否为空,如果为空,就表示p是队列的最后一个节点,此时就可以将新节点添加到p的后面了。如果q不为空,就表示p不是队列的最后一个节点,此时offer()方法需要更新p的值,使它指向下一个节点q。

如果在更新p的过程中,发现p和tail节点不一致,就表示tail节点已经被其他线程更新了,此时offer()方法需要重新获取tail节点的引用,然后继续尝试添加新节点。如果p和tail节点一致,就表示p仍然是队列的有效节点,此时offer()方法可以将p的值更新为q,然后继续循环。

需要注意的是,offer()方法中的CAS操作是一个乐观锁的实现,它可以避免竞态条件的发生,从而提高了并发性能。但是,由于CAS操作可能会失败,因此offer()方法需要不断地尝试添加新节点,直到成功为止。

ConcurrentLinkedQueue的poll()方法:

public E poll() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;
            if (item != null && p.casItem(item, null)) {
                // Successful CAS is the linearization point
                // for item to be removed from this queue.
                if (p != h) // hop two nodes at a time
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}           

poll()方法用于获取并删除队列的头部元素,它的实现比offer()方法更为复杂。它使用了一个while(true)循环和一个标签(restartFromHead),来确保能够从头部开始遍历队列并找到有效元素。具体来说,poll()方法会先获取head节点的引用,并将其赋值给p和h两个变量。

在循环中,poll()方法会先获取p节点的元素item,如果item不为空,就表示p节点是一个有效节点,此时poll()方法可以使用CAS操作将item设置为null,并返回原来的item值。如果CAS操作成功,就表示item已经被成功删除了。然后,poll()方法会检查p节点是否已经被其他线程删除了,如果是,就需要重新从头部开始遍历队列。

如果p节点的元素item为空,就表示p节点不是一个有效节点,此时poll()方法会将p节点的下一个节点赋值给q变量。如果q为空,就表示p是队列的最后一个节点,此时poll()方法会将head节点更新为p节点,并返回null。如果q不为空,就表示p不是队列的最后一个节点,此时poll()方法会将p的值更新为q,并继续循环。

需要注意的是,poll()方法中的CAS操作也是一个乐观锁的实现,它可以避免竞态条件的发生,从而提高了并发性能。但是,由于CAS操作可能会失败,因此poll()方法需要不断地尝试删除元素,直到成功为止。

ConcurrentLinkedQueue的其他方法:

除了add()和poll()方法之外,ConcurrentLinkedQueue还提供了一些其他的方法,如peek()、isEmpty()、size()等等。这些方法的实现比较简单,不再赘述。需要注意的是,由于ConcurrentLinkedQueue是一个非阻塞队列,因此它的方法都是非阻塞的,不会阻塞线程的执行。同时,由于ConcurrentLinkedQueue是一个线程安全的队列,因此多个线程可以同时访问它,而不会出现竞态条件的问题。

手写单向链表

好的,以下是一个单向链表的实现代码,带有中文注释:

/**
 * 单向链表节点类
 */
class Node {
    int val; // 节点的值
    Node next; // 指向下一个节点的指针

    public Node(int val) {
        this.val = val;
        this.next = null;
    }
}

/**
 * 单向链表类
 */
class LinkedList {
    private Node head; // 链表头节点

    /**
     * 在链表末尾添加一个节点
     * @param val 要添加的节点的值
     */
    public void add(int val) {
        Node newNode = new Node(val); // 创建新节点
        if (head == null) { // 如果链表为空,将新节点设为头节点
            head = newNode;
        } else {
            Node cur = head;
            while (cur.next != null) { // 遍历链表,找到最后一个节点
                cur = cur.next;
            }
            cur.next = newNode; // 将新节点添加到链表末尾
        }
    }

    /**
     * 在链表中删除一个节点
     * @param val 要删除的节点的值
     * @return 是否成功删除节点
     */
    public boolean remove(int val) {
        if (head == null) { // 如果链表为空,返回false
            return false;
        }
        if (head.val == val) { // 如果头节点就是要删除的节点,将头节点指向下一个节点
            head = head.next;
            return true;
        }
        Node prev = head;
        Node cur = head.next;
        while (cur != null) { // 遍历链表,找到要删除的节点
            if (cur.val == val) {
                prev.next = cur.next; // 将前一个节点指向下一个节点,跳过当前节点
                return true;
            }
            prev = cur;
            cur = cur.next;
        }
        return false; // 没有找到要删除的节点,返回false
    }

    /**
     * 在链表中查找一个节点
     * @param val 要查找的节点的值
     * @return 是否找到节点
     */
    public boolean contains(int val) {
        Node cur = head;
        while (cur != null) { // 遍历链表,查找节点
            if (cur.val == val) {
                return true;
            }
            cur = cur.next;
        }
        return false; // 没有找到节点,返回false
    }

    /**
     * 获取链表中的所有节点的值
     * @return 所有节点的值的列表
     */
    public List<Integer> getAll() {
        List<Integer> list = new ArrayList<>();
        Node cur = head;
        while (cur != null) { // 遍历链表,将所有节点的值添加到列表中
            list.add(cur.val);
            cur = cur.next;
        }
        return list;
    }
}           

这个单向链表实现包含了添加节点、删除节点、查找节点和获取所有节点的值等基本操作。其中,每个节点包含了一个值和指向下一个节点的指针。链表类中有一个头节点,可以通过它来遍历整个链表。在添加节点时,如果链表为空,将新节点设为头节点;否则,遍历链表,找到最后一个节点,将新节点添加到链表末尾。在删除节点时,遍历链表,找到要删除的节点,将前一个节点指向下一个节点,跳过当前节点。在查找节点时,遍历链表,查找节点的值。在获取所有节点的值时,遍历链表,将所有节点的值添加到一个列表中,并返回该列表。

ConcurrentLinkedQueue如何保证线程安全的

除了使用原子操作外,ConcurrentLinkedQueue还使用了一些其他的技术来保证线程安全,这里列举几个:

1. 非阻塞算法:ConcurrentLinkedQueue使用了一种非阻塞算法,即不需要线程阻塞即可完成队列操作。这个算法的核心思想是,如果一个线程的操作被其他线程干扰了,它会重新尝试这个操作,直到成功为止。

2. 无锁设计:ConcurrentLinkedQueue不使用锁来保证线程安全,因为锁会带来一些性能上的开销。相反,它使用了一些基于CAS操作的无锁设计来保证线程安全。

3. 分离头尾节点:ConcurrentLinkedQueue使用了两个节点,分别表示队列的头和尾。这样,入队和出队操作可以同时进行,而不会影响彼此。

综上所述,ConcurrentLinkedQueue通过使用多种技术来保证线程安全,这些技术包括原子操作、非阻塞算法、无锁设计和分离头尾节点等。这使得ConcurrentLinkedQueue成为了一个高效而且线程安全的队列实现。

继续阅读