天天看點

Java并發程式設計的藝術——并發容器和架構之ConcurrentLinkedQueue

作者:一個即将被退役的碼農

目錄:

帶着BAT大廠的面試問題去了解

ConcurrentLinkedQueue資料結構

ConcurrentLinkedQueue源碼分析

ConcurrentLinkedQueue示例

HOPS(延遲更新的政策)的設計

ConcurrentLinkedQueue适合的場景

ConcurerntLinkedQueue一個基于連結節點的無界線程安全隊列。此隊列按照 FIFO(先進先出)原則對元素進行排序。隊列的頭部是隊列中時間最長的元素。隊列的尾部 是隊列中時間最短的元素。新的元素插入到隊列的尾部,隊列擷取操作從隊列頭部獲得元素。當多個線程共享通路一個公共 collection 時,ConcurrentLinkedQueue是一個恰當的選擇。此隊列不允許使用null元素。
  • JUC集合: ConcurrentLinkedQueue詳解 帶着BAT大廠的面試問題去了解 ConcurrentLinkedQueue資料結構 ConcurrentLinkedQueue源碼分析 類的繼承關系 類的内部類 類的屬性 類的構造函數 核心函數分析 offer函數 poll函數 remove函數 size函數 ConcurrentLinkedQueue示例 再深入了解 HOPS(延遲更新的政策)的設計 ConcurrentLinkedQueue适合的場景 參考文章

帶着BAT大廠的面試問題去了解

請帶着這些問題繼續後文,會很大程度上幫助你更好的了解相關知識點

  • 要想用線程安全的隊列有哪些選擇? Vector,Collections.synchronizedList(List<T> list), ConcurrentLinkedQueue等
  • ConcurrentLinkedQueue實作的資料結構?
  • ConcurrentLinkedQueue底層原理? 全程無鎖(CAS)
  • ConcurrentLinkedQueue的核心方法有哪些? offer(),poll(),peek(),isEmpty()等隊列常用方法
  • 說說ConcurrentLinkedQueue的HOPS(延遲更新的政策)的設計?
  • ConcurrentLinkedQueue适合什麼樣的使用場景?

ConcurrentLinkedQueue資料結構

通過源碼分析可知,ConcurrentLinkedQueue的資料結構與LinkedBlockingQueue的資料結構相同,都是使用的連結清單結構。ConcurrentLinkedQueue的資料結構如下:

Java并發程式設計的藝術——并發容器和架構之ConcurrentLinkedQueue

說明: ConcurrentLinkedQueue采用的連結清單結構,并且包含有一個頭節點和一個尾結點。

ConcurrentLinkedQueue源碼分析

類的繼承關系

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>, java.io.Serializable {}           

說明: ConcurrentLinkedQueue繼承了抽象類AbstractQueue,AbstractQueue定義了對隊列的基本操作;同時實作了Queue接口,Queue定義了對隊列的基本操作,同時,還實作了Serializable接口,表示可以被序列化。

類的内部類

private static class Node<E> {
    // 元素
    volatile E item;
    // next域
    volatile Node<E> next;

    /**
        * Constructs a new node.  Uses relaxed write because item can
        * only be seen after publication via casNext.
        */
    // 構造函數
    Node(E item) {
        // 設定item的值
        UNSAFE.putObject(this, itemOffset, item);
    }
    // 比較并替換item值
    boolean casItem(E cmp, E val) {
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }
    
    void lazySetNext(Node<E> val) {
        // 設定next域的值,并不會保證修改對其他線程立即可見
        UNSAFE.putOrderedObject(this, nextOffset, val);
    }
    // 比較并替換next域的值
    boolean casNext(Node<E> cmp, Node<E> val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    // Unsafe mechanics
    // 反射機制
    private static final sun.misc.Unsafe UNSAFE;
    // item域的偏移量
    private static final long itemOffset;
    // next域的偏移量
    private static final long nextOffset;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = Node.class;
            itemOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("item"));
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}           

說明: Node類表示連結清單結點,用于存放元素,包含item域和next域,item域表示元素,next域表示下一個結點,其利用反射機制和CAS機制來更新item域和next域,保證原子性。

類的屬性

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>, java.io.Serializable {
    // 版本序列号        
    private static final long serialVersionUID = 196745693267521676L;
    // 反射機制
    private static final sun.misc.Unsafe UNSAFE;
    // head域的偏移量
    private static final long headOffset;
    // tail域的偏移量
    private static final long tailOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = ConcurrentLinkedQueue.class;
            headOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("head"));
            tailOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("tail"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
    
    // 頭節點
    private transient volatile Node<E> head;
    // 尾結點
    private transient volatile Node<E> tail;
}
               

說明: 屬性中包含了head域和tail域,表示連結清單的頭節點和尾結點,同時,ConcurrentLinkedQueue也使用了反射機制和CAS機制來更新頭節點和尾結點,保證原子性。

類的構造函數

  • ConcurrentLinkedQueue()型構造函數
public ConcurrentLinkedQueue() {
    // 初始化頭節點與尾結點
    head = tail = new Node<E>(null);
}
               

說明: 該構造函數用于建立一個最初為空的 ConcurrentLinkedQueue,頭節點與尾結點指向同一個結點,該結點的item域為null,next域也為null。

  • ConcurrentLinkedQueue(Collection<? extends E>)型構造函數
public ConcurrentLinkedQueue(Collection<? extends E> c) {
    Node<E> h = null, t = null;
    for (E e : c) { // 周遊c集合
        // 保證元素不為空
        checkNotNull(e);
        // 新生一個結點
        Node<E> newNode = new Node<E>(e);
        if (h == null) // 頭節點為null
            // 指派頭節點與尾結點
            h = t = newNode;
        else {
            // 直接頭節點的next域
            t.lazySetNext(newNode);
            // 重新指派頭節點
            t = newNode;
        }
    }
    if (h == null) // 頭節點為null
        // 新生頭節點與尾結點
        h = t = new Node<E>(null);
    // 指派頭節點
    head = h;
    // 指派尾結點
    tail = t;
}
               

說明: 該構造函數用于建立一個最初包含給定 collection 元素的 ConcurrentLinkedQueue,按照此 collection 疊代器的周遊順序來添加元素。

核心函數分析

offer函數

public boolean offer(E e) {
    // 元素不為null
    checkNotNull(e);
    // 新生一個結點
    final Node<E> newNode = new Node<E>(e);

    for (Node<E> t = tail, p = t;;) { // 無限循環
        // q為p結點的下一個結點
        Node<E> q = p.next;
        if (q == null) { // q結點為null
            // p is last node
            if (p.casNext(null, newNode)) { // 比較并進行替換p結點的next域
                // Successful CAS is the linearization point
                // for e to become an element of this queue,
                // and for newNode to become "live".
                if (p != t) // p不等于t結點,不一緻    // hop two nodes at a time
                    // 比較并替換尾結點
                    casTail(t, newNode);  // Failure is OK.
                // 傳回
                return true;
            }
            // Lost CAS race to another thread; re-read next
        }
        else if (p == q) // p結點等于q結點
            // We have fallen off list.  If tail is unchanged, it
            // will also be off-list, in which case we need to
            // jump to head, from which all live nodes are always
            // reachable.  Else the new tail is a better bet.
            // 原來的尾結點與現在的尾結點是否相等,若相等,則p指派為head,否則,指派為現在的尾結點
            p = (t != (t = tail)) ? t : head;
        else
            // Check for tail updates after two hops.
            // 重新指派p結點
            p = (p != t && t != (t = tail)) ? t : q;
    }
}
               

說明: offer函數用于将指定元素插入此隊列的尾部。下面模拟offer函數的操作,隊列狀态的變化(假設單線程添加元素,連續添加10、20兩個元素)。

Java并發程式設計的藝術——并發容器和架構之ConcurrentLinkedQueue
  • 若ConcurrentLinkedQueue的初始狀态如上圖所示,即隊列為空。單線程添加元素,此時,添加元素10,則狀态如下所示
Java并發程式設計的藝術——并發容器和架構之ConcurrentLinkedQueue
  • 如上圖所示,添加元素10後,tail沒有變化,還是指向之前的結點,繼續添加元素20,則狀态如下所示
Java并發程式設計的藝術——并發容器和架構之ConcurrentLinkedQueue
  • 如上圖所示,添加元素20後,tail指向了最新添加的結點。

poll函數

public E poll() {
    restartFromHead:
    for (;;) { // 無限循環
        for (Node<E> h = head, p = h, q;;) { // 儲存頭節點
            // item項
            E item = p.item;

            if (item != null && p.casItem(item, null)) { // item不為null并且比較并替換item成功
                // Successful CAS is the linearization point
                // for item to be removed from this queue.
                if (p != h) // p不等于h    // hop two nodes at a time
                    // 更新頭節點
                    updateHead(h, ((q = p.next) != null) ? q : p); 
                // 傳回item
                return item;
            }
            else if ((q = p.next) == null) { // q結點為null
                // 更新頭節點
                updateHead(h, p);
                return null;
            }
            else if (p == q) // p等于q
                // 繼續循環
                continue restartFromHead;
            else
                // p指派為q
                p = q;
        }
    }
}
               

說明: 此函數用于擷取并移除此隊列的頭,如果此隊列為空,則傳回null。下面模拟poll函數的操作,隊列狀态的變化(假設單線程操作,狀态為之前offer10、20後的狀态,poll兩次)。

Java并發程式設計的藝術——并發容器和架構之ConcurrentLinkedQueue
  • 隊列初始狀态如上圖所示,在poll操作後,隊列的狀态如下圖所示
Java并發程式設計的藝術——并發容器和架構之ConcurrentLinkedQueue
  • 如上圖可知,poll操作後,head改變了,并且head所指向的結點的item變為了null。再進行一次poll操作,隊列的狀态如下圖所示。
Java并發程式設計的藝術——并發容器和架構之ConcurrentLinkedQueue
  • 如上圖可知,poll操作後,head結點沒有變化,隻是訓示的結點的item域變成了null。

remove函數

public boolean remove(Object o) {
    // 元素為null,傳回
    if (o == null) return false;
    Node<E> pred = null;
    for (Node<E> p = first(); p != null; p = succ(p)) { // 擷取第一個存活的結點
        // 第一個存活結點的item值
        E item = p.item;
        if (item != null &&
            o.equals(item) &&
            p.casItem(item, null)) { // 找到item相等的結點,并且将該結點的item設定為null
            // p的後繼結點
            Node<E> next = succ(p);
            if (pred != null && next != null) // pred不為null并且next不為null
                // 比較并替換next域
                pred.casNext(p, next);
            return true;
        }
        // pred指派為p
        pred = p;
    }
    return false;
}
               

說明: 此函數用于從隊列中移除指定元素的單個執行個體(如果存在)。其中,會調用到first函數和succ函數,first函數的源碼如下

Node<E> first() {
    restartFromHead:
    for (;;) { // 無限循環,確定成功
        for (Node<E> h = head, p = h, q;;) {
            // p結點的item域是否為null
            boolean hasItem = (p.item != null);
            if (hasItem || (q = p.next) == null) { // item不為null或者next域為null
                // 更新頭節點
                updateHead(h, p);
                // 傳回結點
                return hasItem ? p : null;
            }
            else if (p == q) // p等于q
                // 繼續從頭節點開始
                continue restartFromHead;
            else
                // p指派為q
                p = q;
        }
    }
}
               

說明: first函數用于找到連結清單中第一個存活的結點。succ函數源碼如下

final Node<E> succ(Node<E> p) {
    // p結點的next域
    Node<E> next = p.next;
    // 如果next域為自身,則傳回頭節點,否則,傳回next
    return (p == next) ? head : next;
}
  
        @pdai: 代碼已經複制到剪貼闆
               

說明: succ用于擷取結點的下一個結點。如果結點的next域指向自身,則傳回head頭節點,否則,傳回next結點。下面模拟remove函數的操作,隊列狀态的變化(假設單線程操作,狀态為之前offer10、20後的狀态,執行remove(10)、remove(20)操作)。

Java并發程式設計的藝術——并發容器和架構之ConcurrentLinkedQueue
  • 如上圖所示,為ConcurrentLinkedQueue的初始狀态,remove(10)後的狀态如下圖所示
Java并發程式設計的藝術——并發容器和架構之ConcurrentLinkedQueue
  • 如上圖所示,當執行remove(10)後,head指向了head結點之前指向的結點的下一個結點,并且head結點的item域置為null。繼續執行remove(20),狀态如下圖所示
Java并發程式設計的藝術——并發容器和架構之ConcurrentLinkedQueue
  • 如上圖所示,執行remove(20)後,head與tail指向同一個結點,item域為null。

size函數

public int size() {
    // 計數
    int count = 0;
    for (Node<E> p = first(); p != null; p = succ(p)) // 從第一個存活的結點開始往後周遊
        if (p.item != null) // 結點的item域不為null
            // Collection.size() spec says to max out
            if (++count == Integer.MAX_VALUE) // 增加計數,若達到最大值,則跳出循環
                break;
    // 傳回大小
    return count;
}
               

說明: 此函數用于傳回ConcurrenLinkedQueue的大小,從第一個存活的結點(first)開始,往後周遊連結清單,當結點的item域不為null時,增加計數,之後傳回大小。

ConcurrentLinkedQueue示例

下面通過一個示例來了解ConcurrentLinkedQueue的使用

import java.util.concurrent.ConcurrentLinkedQueue;

class PutThread extends Thread {
    private ConcurrentLinkedQueue<Integer> clq;
    public PutThread(ConcurrentLinkedQueue<Integer> clq) {
        this.clq = clq;
    }
    
    public void run() {
        for (int i = 0; i < 10; i++) {
            try {
                System.out.println("add " + i);
                clq.add(i);
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class GetThread extends Thread {
    private ConcurrentLinkedQueue<Integer> clq;
    public GetThread(ConcurrentLinkedQueue<Integer> clq) {
        this.clq = clq;
    }
    
    public void run() {
        for (int i = 0; i < 10; i++) {
            try {
                System.out.println("poll " + clq.poll());
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

public class ConcurrentLinkedQueueDemo {
    public static void main(String[] args) {
        ConcurrentLinkedQueue<Integer> clq = new ConcurrentLinkedQueue<Integer>();
        PutThread p1 = new PutThread(clq);
        GetThread g1 = new GetThread(clq);
        
        p1.start();
        g1.start();
        
    }
}
               

運作結果(某一次):

add 0
poll null
add 1
poll 0
add 2
poll 1
add 3
poll 2
add 4
poll 3
add 5
poll 4
poll 5
add 6
add 7
poll 6
poll 7
add 8
add 9
poll 8           

說明: GetThread線程不會因為ConcurrentLinkedQueue隊列為空而等待,而是直接傳回null,是以當實作隊列不空時,等待時,則需要使用者自己實作等待邏輯。

再深入了解

HOPS(延遲更新的政策)的設計

通過上面對offer和poll方法的分析,我們發現tail和head是延遲更新的,兩者更新觸發時機為:

  • tail更新觸發時機:當tail指向的節點的下一個節點不為null的時候,會執行定位隊列真正的隊尾節點的操作,找到隊尾節點後完成插入之後才會通過casTail進行tail更新;當tail指向的節點的下一個節點為null的時候,隻插入節點不更新tail。
  • head更新觸發時機:當head指向的節點的item域為null的時候,會執行定位隊列真正的隊頭節點的操作,找到隊頭節點後完成删除之後才會通過updateHead進行head更新;當head指向的節點的item域不為null的時候,隻删除節點不更新head。

并且在更新操作時,源碼中會有注釋為:hop two nodes at a time。是以這種延遲更新的政策就被叫做HOPS的大概原因是這個(猜的 ),從上面更新時的狀态圖可以看出,head和tail的更新是“跳着的”即中間總是間隔了一個。那麼這樣設計的意圖是什麼呢?

如果讓tail永遠作為隊列的隊尾節點,實作的代碼量會更少,而且邏輯更易懂。但是,這樣做有一個缺點,如果大量的入隊操作,每次都要執行CAS進行tail的更新,彙總起來對性能也會是大大的損耗。如果能減少CAS更新的操作,無疑可以大大提升入隊的操作效率,是以doug lea大師每間隔1次(tail和隊尾節點的距離為1)進行才利用CAS更新tail。對head的更新也是同樣的道理,雖然,這樣設計會多出在循環中定位隊尾節點,但總體來說讀的操作效率要遠遠高于寫的性能,是以,多出來的在循環中定位尾節點的操作的性能損耗相對而言是很小的。

ConcurrentLinkedQueue适合的場景

ConcurrentLinkedQueue通過無鎖來做到了更高的并發量,是個高性能的隊列,但是使用場景相對不如阻塞隊列常見,畢竟取資料也要不停的去循環,不如阻塞的邏輯好設計,但是在并發量特别大的情況下,是個不錯的選擇,性能上好很多,而且這個隊列的設計也是特别費力,尤其的使用的改良算法和對哨兵的處理。整體的思路都是比較嚴謹的,這個也是使用了無鎖造成的,我們自己使用無鎖的條件的話,這個隊列是個不錯的參考。

繼續閱讀