天天看點

ConcurrentLinkedQueue詳解與源碼分析

作者:四季更新221789896

ConcurrentLinkedQueue詳解

ConcurrentLinkedQueue是Java中的一個線程安全的隊列,它是基于連結清單實作的。作為一個線程安全的隊列,ConcurrentLinkedQueue支援并發地插入、删除和檢查元素,而不需要使用顯式的鎖來保護共享資源。這樣,ConcurrentLinkedQueue可以在并發環境中提供更好的性能和擴充性。

ConcurrentLinkedQueue的實作是基于單向連結清單的,每個節點都包含了一個元素和指向下一個節點的引用。由于隊列的頭和尾部可能同時被多個線程通路,是以ConcurrentLinkedQueue使用了兩個節點(head和tail)來分别表示隊列的頭和尾,并使用volatile類型的變量來保證它們的可見性。

ConcurrentLinkedQueue提供了以下方法:

  1. add()和offer()方法:用于将元素添加到隊列的尾部。
  2. poll()和peek()方法:用于擷取并删除隊列的頭部元素,或者隻是擷取頭部元素而不删除它。
  3. size()方法:用于擷取隊列中的元素數量。
  4. isEmpty()方法:用于判斷隊列是否為空。
  5. iterator()方法:用于擷取一個疊代器,可以周遊隊列中的元素。

需要注意的是,ConcurrentLinkedQueue并不保證元素的順序是FIFO(先進先出),因為多個線程可以同時對隊列的頭和尾進行操作。如果需要保證嚴格的FIFO順序,可以考慮使用其他的資料結構,例如ArrayBlockingQueue或LinkedBlockingQueue。

總之,ConcurrentLinkedQueue是一個線程安全的隊列,它提供了高效的并發操作,可以在并發環境中提供更好的性能和擴充性。但是需要注意的是,它并不保證元素的順序是嚴格的FIFO順序。

ConcurrentLinkedQueue源碼分析

以下是ConcurrentLinkedQueue的源碼分析:

ConcurrentLinkedQueue是基于單向連結清單實作的,每個節點包含了一個元素和指向下一個節點的引用。由于隊列的頭和尾部可能同時被多個線程通路,是以ConcurrentLinkedQueue使用了兩個節點(head和tail)來分别表示隊列的頭和尾,并使用volatile類型的變量來保證它們的可見性。

ConcurrentLinkedQueue的構造方法:

public ConcurrentLinkedQueue() {
    head = tail = new Node<E>(null);
}           

在構造方法中,初始化了head和tail節點,并将它們的元素設定為null。這樣,head和tail節點都是一個空節點,不包含任何元素。

ConcurrentLinkedQueue的add()方法:

public boolean add(E e) {
    return offer(e);
}           

add()方法實際上是調用了offer()方法,将元素添加到隊列的尾部。

ConcurrentLinkedQueue的offer()方法:

public boolean offer(E e) {
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);
    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
        if (q == null) {
            // p is last node
            if (p.casNext(null, newNode)) {
                // Successful CAS is the linearization point
                // for e to become an element of this queue,
                // and for newNode to become "live".
                if (p != t) // hop two nodes at a time
                    casTail(t, newNode);  // Failure is OK.
                return true;
            }
            // Lost CAS race to another thread; re-read next
        }
        else if (p == q)
            // We have fallen off list.  If tail is unchanged, it
            // will also be off-list, in which case we need to
            // jump to head, from which all live nodes are always
            // reachable.  Else the new tail is a better bet.
            p = (t != (t = tail)) ? t : head;
        else
            // Check for tail updates after two hops.
            p = (p != t && t != (t = tail)) ? t : q;
    }
}           

offer()方法的實作比較複雜,它使用了一個while循環來不斷地嘗試将新節點添加到隊列的尾部。具體來說,它會先建立一個新的節點newNode,并使用CAS(compare-and-swap)操作将它添加到tail節點的後面。如果CAS操作成功,就表示新節點已經成功添加到隊列的尾部,并傳回true。如果CAS操作失敗,表示有其他線程已經先一步将新節點添加到隊列的尾部了,此時offer()方法會重新擷取tail節點的引用,然後繼續嘗試添加新節點。

需要注意的是,offer()方法中的for循環使用了一個局部變量p來表示目前節點,它的初始值為tail節點。在每次循環中,p會先擷取它的下一個節點q,然後判斷q是否為空,如果為空,就表示p是隊列的最後一個節點,此時就可以将新節點添加到p的後面了。如果q不為空,就表示p不是隊列的最後一個節點,此時offer()方法需要更新p的值,使它指向下一個節點q。

如果在更新p的過程中,發現p和tail節點不一緻,就表示tail節點已經被其他線程更新了,此時offer()方法需要重新擷取tail節點的引用,然後繼續嘗試添加新節點。如果p和tail節點一緻,就表示p仍然是隊列的有效節點,此時offer()方法可以将p的值更新為q,然後繼續循環。

需要注意的是,offer()方法中的CAS操作是一個樂觀鎖的實作,它可以避免競态條件的發生,進而提高了并發性能。但是,由于CAS操作可能會失敗,是以offer()方法需要不斷地嘗試添加新節點,直到成功為止。

ConcurrentLinkedQueue的poll()方法:

public E poll() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;
            if (item != null && p.casItem(item, null)) {
                // Successful CAS is the linearization point
                // for item to be removed from this queue.
                if (p != h) // hop two nodes at a time
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}           

poll()方法用于擷取并删除隊列的頭部元素,它的實作比offer()方法更為複雜。它使用了一個while(true)循環和一個标簽(restartFromHead),來確定能夠從頭部開始周遊隊列并找到有效元素。具體來說,poll()方法會先擷取head節點的引用,并将其指派給p和h兩個變量。

在循環中,poll()方法會先擷取p節點的元素item,如果item不為空,就表示p節點是一個有效節點,此時poll()方法可以使用CAS操作将item設定為null,并傳回原來的item值。如果CAS操作成功,就表示item已經被成功删除了。然後,poll()方法會檢查p節點是否已經被其他線程删除了,如果是,就需要重新從頭部開始周遊隊列。

如果p節點的元素item為空,就表示p節點不是一個有效節點,此時poll()方法會将p節點的下一個節點指派給q變量。如果q為空,就表示p是隊列的最後一個節點,此時poll()方法會将head節點更新為p節點,并傳回null。如果q不為空,就表示p不是隊列的最後一個節點,此時poll()方法會将p的值更新為q,并繼續循環。

需要注意的是,poll()方法中的CAS操作也是一個樂觀鎖的實作,它可以避免競态條件的發生,進而提高了并發性能。但是,由于CAS操作可能會失敗,是以poll()方法需要不斷地嘗試删除元素,直到成功為止。

ConcurrentLinkedQueue的其他方法:

除了add()和poll()方法之外,ConcurrentLinkedQueue還提供了一些其他的方法,如peek()、isEmpty()、size()等等。這些方法的實作比較簡單,不再贅述。需要注意的是,由于ConcurrentLinkedQueue是一個非阻塞隊列,是以它的方法都是非阻塞的,不會阻塞線程的執行。同時,由于ConcurrentLinkedQueue是一個線程安全的隊列,是以多個線程可以同時通路它,而不會出現競态條件的問題。

手寫單向連結清單

好的,以下是一個單向連結清單的實作代碼,帶有中文注釋:

/**
 * 單向連結清單節點類
 */
class Node {
    int val; // 節點的值
    Node next; // 指向下一個節點的指針

    public Node(int val) {
        this.val = val;
        this.next = null;
    }
}

/**
 * 單向連結清單類
 */
class LinkedList {
    private Node head; // 連結清單頭節點

    /**
     * 在連結清單末尾添加一個節點
     * @param val 要添加的節點的值
     */
    public void add(int val) {
        Node newNode = new Node(val); // 建立新節點
        if (head == null) { // 如果連結清單為空,将新節點設為頭節點
            head = newNode;
        } else {
            Node cur = head;
            while (cur.next != null) { // 周遊連結清單,找到最後一個節點
                cur = cur.next;
            }
            cur.next = newNode; // 将新節點添加到連結清單末尾
        }
    }

    /**
     * 在連結清單中删除一個節點
     * @param val 要删除的節點的值
     * @return 是否成功删除節點
     */
    public boolean remove(int val) {
        if (head == null) { // 如果連結清單為空,傳回false
            return false;
        }
        if (head.val == val) { // 如果頭節點就是要删除的節點,将頭節點指向下一個節點
            head = head.next;
            return true;
        }
        Node prev = head;
        Node cur = head.next;
        while (cur != null) { // 周遊連結清單,找到要删除的節點
            if (cur.val == val) {
                prev.next = cur.next; // 将前一個節點指向下一個節點,跳過目前節點
                return true;
            }
            prev = cur;
            cur = cur.next;
        }
        return false; // 沒有找到要删除的節點,傳回false
    }

    /**
     * 在連結清單中查找一個節點
     * @param val 要查找的節點的值
     * @return 是否找到節點
     */
    public boolean contains(int val) {
        Node cur = head;
        while (cur != null) { // 周遊連結清單,查找節點
            if (cur.val == val) {
                return true;
            }
            cur = cur.next;
        }
        return false; // 沒有找到節點,傳回false
    }

    /**
     * 擷取連結清單中的所有節點的值
     * @return 所有節點的值的清單
     */
    public List<Integer> getAll() {
        List<Integer> list = new ArrayList<>();
        Node cur = head;
        while (cur != null) { // 周遊連結清單,将所有節點的值添加到清單中
            list.add(cur.val);
            cur = cur.next;
        }
        return list;
    }
}           

這個單向連結清單實作包含了添加節點、删除節點、查找節點和擷取所有節點的值等基本操作。其中,每個節點包含了一個值和指向下一個節點的指針。連結清單類中有一個頭節點,可以通過它來周遊整個連結清單。在添加節點時,如果連結清單為空,将新節點設為頭節點;否則,周遊連結清單,找到最後一個節點,将新節點添加到連結清單末尾。在删除節點時,周遊連結清單,找到要删除的節點,将前一個節點指向下一個節點,跳過目前節點。在查找節點時,周遊連結清單,查找節點的值。在擷取所有節點的值時,周遊連結清單,将所有節點的值添加到一個清單中,并傳回該清單。

ConcurrentLinkedQueue如何保證線程安全的

除了使用原子操作外,ConcurrentLinkedQueue還使用了一些其他的技術來保證線程安全,這裡列舉幾個:

1. 非阻塞算法:ConcurrentLinkedQueue使用了一種非阻塞算法,即不需要線程阻塞即可完成隊列操作。這個算法的核心思想是,如果一個線程的操作被其他線程幹擾了,它會重新嘗試這個操作,直到成功為止。

2. 無鎖設計:ConcurrentLinkedQueue不使用鎖來保證線程安全,因為鎖會帶來一些性能上的開銷。相反,它使用了一些基于CAS操作的無鎖設計來保證線程安全。

3. 分離頭尾節點:ConcurrentLinkedQueue使用了兩個節點,分别表示隊列的頭和尾。這樣,入隊和出隊操作可以同時進行,而不會影響彼此。

綜上所述,ConcurrentLinkedQueue通過使用多種技術來保證線程安全,這些技術包括原子操作、非阻塞算法、無鎖設計和分離頭尾節點等。這使得ConcurrentLinkedQueue成為了一個高效而且線程安全的隊列實作。

繼續閱讀