天天看點

JAVA并發容器-ConcurrentLinkedQueue 源碼分析

在并發程式設計中,有時候需要使用線程安全的隊列。如果要實作一個線程安全的隊列有兩 種方式:一種是使用阻塞算法,另一種是使用非阻塞算法。使用阻塞算法的隊列可以用一個鎖 (入隊和出隊用同一把鎖)或兩個鎖(入隊和出隊用不同的鎖)等方式來實作。非阻塞的實作方式則可以使用循環CAS的方式來實作。

ConcurrentLinkedQueue是一個非阻塞的基于連結清單節點的無界線程安全隊列,它遵循先進先出的原則,并采用了“wait-free”算法(即CAS算法)來實作。

類關系圖

JAVA并發容器-ConcurrentLinkedQueue 源碼分析

核心屬性

// 頭結點指針(不一定指向頭,緩更新,非阻塞的,是以沒法保證)
private volatile Node<E> head;

// 尾節點指針(不一定指向尾,緩更新,非阻塞的,是以沒法保證)
private volatile Node<E> tail;

// 連結清單節點
private static class Node<E> {
    // 節點元素(存資料)
    volatile E item;
    // 下一個節點
    volatile Node<E> next;
...
}      

ConcurrentLinkedQueue由head節點和tail節點組成,每個節點(Node)由節點元素(item)和 指向下一個節點(next)的引用組成,節點與節點之間就是通過這個next關聯起來,進而組成一 張連結清單結構的隊列。預設情況下head節點存儲的元素為空,tail節點等于head節點。

注意:
  • 核心屬性都是用volatile來修飾來保證資料的可見性。
  • 預設情況下頭結點和尾節點是相等的對應節點的​

    ​item​

    ​​為​

    ​null​

  • 頭和尾節點都是緩更新,是以他們隻能代表一個邏輯上的位置,不是實際的頭尾節點

offer() 入隊方法

public boolean offer(E e) {
    // 檢查元素是否為NULL,這裡不允許存NULL值
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);
    // 自旋,t是tail節點的引用,p表示尾節點,預設情況下p=t的
    for (Node<E> t = tail, p = t; ; ) {
        // q是p的下一個節點
        Node<E> q = p.next;
        // q==null表示p是尾節點,否則需要更新p進入下一次循環
        if (q == null) {
            // 使用CAS将新節點設定為p的next節點
            if (p.casNext(null, newNode)) {
                // 不是每次添加新節點都會更新尾節點指針,每新增兩個節點才會更新尾節點指針。
                // 這樣做的好處是減少了更新尾節點的競争,但是增加了尋找真正尾節點的代價。
                if (p != t) // hop two nodes at a time
                    // 即使設定失敗也沒有問題,說明其他線程已經更新了尾節點的值
                    casTail(t, newNode);

                // 永遠傳回true
                return true;
            }
        }
        // 定位尾節點
        // 删除節點的時候,會把删除的節點指向自己(自引用),就會發生p==q. 當添加第一個節點後tail也會發生自引用
        else if (p == q)
            // t != tail表示尾節點指針已經被移動過,這時p直接指向新的尾節點指針tail,否則p直接指向頭結點指針,
            // 因為這時p已經被移出了連結清單,是以不能将p指向p.next
            p = (t != (t = tail)) ? t : head;
        else
            // t != tail表示尾節點指針已經被移動過,是以p直接指向新的尾節點指針(tail),否則p指向p.next
            p = (p  != t && t != (t = tail)) ? t : q;
    }
}      

入隊過程:

  1. 檢查元素是否為NULL,如果是NULL直接抛出異常
  2. 初始化新節點​

    ​newNode​

  3. 找到尾節點指針```t``
  4. 根據尾節點指針​

    ​t​

    ​​找到真正的尾節點​

    ​p​

  5. CAS将​

    ​p​

    ​​的​

    ​next​

    ​​指向​

    ​newNode​

  6. 判斷是否更新尾節點指針的位置
注意:
  • ConcurrentLinkedQueue 是不允許存NULL值的,NULL有特殊含義,表示已經從連結清單中移除(出隊了)
  • 入隊過程大緻分為三步,第一步找到真正的尾節點​

    ​p​

    ​​;第二步CAS更新​

    ​p.next​

    ​​;第三步CAS更新尾節點指針的位置​

    ​casTail(t, newNode);​

  • ​t != tail​

    ​表示尾節點指針已經被移動過
  • ​p == q​

    ​​表示​

    ​p​

    ​節點已經從連結清單中移除,并發生了自引用
  • 每新增兩個節點才會更新尾節點指針, 這樣能減少了更新尾節點的競争,但是增加了尋找真正尾節點的代價。從本質上來 看它通過增加對volatile變量的讀操作來減少對volatile變量的寫操作,而對volatile變量的寫操作開銷要遠遠大于讀操作,是以入隊效率會有所提升。

入隊快照圖

JAVA并發容器-ConcurrentLinkedQueue 源碼分析

入隊debug圖

JAVA并發容器-ConcurrentLinkedQueue 源碼分析

poll()出隊方法

public E poll() {
    //  外循環标記
    restartFromHead:
    for (; ; ) {
        // h是頭結點指針,p頭結點,q是p的下一個節點
        for (Node<E> h = head, p = h, q; ; ) {
            // 儲存頭節點的值
            E item = p.item;
            // CAS替換頭結點的item值為null,如果item為null表示該節點已經從連結清單中移除了
            if (item != null && p.casItem(item, null)) {
                // 每移除兩個元素,頭節點指針才開始移動
                if (p != h) // hop two nodes at a time
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            // 目前隊列為NULL
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            // 發生了自引用,重頭再來
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}      

出隊過程:

  1. 根據頭結點指針找到頭結點
  2. 暫存頭結點的值
  3. 判斷節點内容​

    ​item​

    ​​是否為​

    ​NULL​

    ​​,如果為​

    ​NULL​

    ​表示該節點已經出隊了
  4. 如果​

    ​item​

    ​​不為​

    ​NULL​

    ​​則使用CAS替換為​

    ​NULL​

    ​,如果成功判斷是否需要更新頭結點指針
  5. 如果失敗表示該節點已經被其他線程出隊了,尋找新的頭結點,繼續第二步
注意:
  • 每删除兩個節點才會更新頭點指針, 這樣能減少了更新頭點的競争。
  • 當節點的​

    ​item==null​

    ​時表示節點已經出隊了

測試代碼

public class ConcurrentLinkedQueueTset {
    public static void main(String[] args) {
        ConcurrentLinkedQueue<Integer> linkedQueue = new ConcurrentLinkedQueue<>();

        linkedQueue.offer(1);
        linkedQueue.offer(2);
        linkedQueue.poll();
        linkedQueue.offer(3);
        linkedQueue.offer(4);
        linkedQueue.poll();
        linkedQueue.poll();
    }
}      

參考

《java并發程式設計的藝術》

源碼

​​https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases​​

layering-cache

繼續閱讀