天天看點

[Java] Java 并發包中并發原理剖析之ConcurrentLinkedQueue

[Java] Java 并發包中并發原理剖析之ConcurrentLinkedQueue

ConcurrentLinkedQueue

是線程安全的無界非阻塞隊列,其底層資料結構使用單向連結清單實作,對于入隊和出隊操作使用CAS來實作線程安全。

類圖結構

[Java] Java 并發包中并發原理剖析之ConcurrentLinkedQueue

ConcurrentLinkedQueue

内部的隊列使用單向連結清單方式實作,其中有兩個volatile類型的Node節點分别用來存放隊列的首、尾節點。從下面的無參構造函數可知,預設的頭、尾節點都是指向item為null的哨兵節點。新元素會被插入隊列末尾,出隊時從隊列頭部擷取一個元素。

/**
 * Creates a {@code ConcurrentLinkedQueue} that is initially empty.
 */
public ConcurrentLinkedQueue() {
  head = tail = new Node<E>(null);
}
           

在Node節點内部維護一個使用volatile修飾的變量item,用來存放節點的值;next用來存放連結清單的下一個節點,進而連結為一個單向無界連結清單。其内部則使用UNSafe工具類提供的CAS算法來保證出入隊時操作連結清單的原子性。

private static class Node<E> {
    volatile E item;
    volatile Node<E> next;

    /**
     * Constructs a new node.  Uses relaxed write because item can
     * only be seen after publication via casNext.
     */
    Node(E item) {
        UNSAFE.putObject(this, itemOffset, item);
    }

    boolean casItem(E cmp, E val) {
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }

    void lazySetNext(Node<E> val) {
        UNSAFE.putOrderedObject(this, nextOffset, val);
    }

    boolean casNext(Node<E> cmp, Node<E> val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    // Unsafe mechanics

    private static final sun.misc.Unsafe UNSAFE;
    private static final long itemOffset;
    private static final long nextOffset;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = Node.class;
            itemOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("item"));
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}
           

注意,像這個包中大多數的非阻塞算法一樣,這個實作依賴于,在垃圾收集系統,沒有ABA的可能性問題的事實。由于回收節點,是以沒有必要使用“數指針”或在版本中使用“ non-GC”設定的相關技術。

基本保持不變的特性有:

  • 假如正好有一個(最後一個)引用為空的節點next,在排隊時使用CAS算法。最後一個節點可以在O(1)時間内從tail到達,但tail隻是一個優化——它也總是可以在O(N)時間内從head到達。
  • 隊列中包含的元素是從head通路的非空節點。通過CAS将節點的引用指向null,自動的會将它從隊列中移除。來自head的所有元素的可達性必須保持true,即使在導緻head前進的并發修改的情況下也是如此。由于建立了一個疊代器,或者是一個丢失了時間片的poll()操作,一個離開隊列的節點可能會無限期地繼續使用。

一個基于連結節點的無界線程安全隊列。此隊列按照 FIFO(先進先出)原則對元素進行排序。隊列的頭部 是隊列中時間最長的元素。隊列的尾部 是隊列中時間最短的元素。新的元素插入到隊列的尾部,隊列擷取操作從隊列頭部獲得元素。當多個線程共享通路一個公共 collection 時,

ConcurrentLinkedQueue

是一個恰當的選擇。此隊列不允許使用

null

元素。

此實作采用了有效的“無等待 (wait-free)”算法,該算法基于 Maged M. Michael 和 Michael L. Scott 合著的 Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms 中描述的算法。适用于垃圾收集環境,支援内部節點删除(以支援删除(對象))。

需要小心的是,與大多數 collection 不同,

size

方法不是 一個固定時間操作。由于這些隊列的異步特性,确定目前元素的數量需要周遊這些元素。

此類及其疊代器實作了

Collection

Iterator

接口的所有可選 方法。

記憶體一緻性效果:當存在其他并發 collection 時,将對象放入

ConcurrentLinkedQueue

之前的線程中的操作 happen-before 随後通過另一線程從

ConcurrentLinkedQueue

通路或移除該元素的操作。

此類是 Java Collections Framework 的成員。

方法摘要
傳回值 方法

boolean

add(E e)

将指定元素插入此隊列的尾部。

boolean

contains(Object o)

如果此隊列包含指定元素,則傳回

true

boolean

isEmpty()

如果此隊列不包含任何元素,則傳回

true

Iterator<E>

iterator()

傳回在此隊列元素上以恰當順序進行疊代的疊代器。

boolean

offer(E e)

E

peek()

擷取但不移除此隊列的頭;如果此隊列為空,則傳回

null

E

poll()

擷取并移除此隊列的頭,如果此隊列為空,則傳回

null

boolean

remove(Object o)

從隊列中移除指定元素的單個執行個體(如果存在)。

int

size()

傳回此隊列中的元素數量。

Object[]

toArray()

傳回以恰當順序包含此隊列所有元素的數組。

<T> T[]

toArray(T[] a)

傳回以恰當順序包含此隊列所有元素的數組;傳回數組的運作時類型是指定數組的運作時類型。

ConcurrentLinkedQueue 原理介紹

offer
/**
     * Inserts the specified element at the tail of this queue.
     * As the queue is unbounded, this method will never return {@code false}.
     *
     * @return {@code true} (as specified by {@link Queue#offer})
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        //[1]抛出空指針異常
        checkNotNull(e);
      	//[2]構造Node節點,在構造函數内部調用unsafe.putObject
        final Node<E> newNode = new Node<E>(e);
		//[3]從尾節點進行插入
        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
   			//[4]如果q==null說明p是尾節點,則執行插入
            if (q == null) {
                // p is last node
                //[5]使用CAS設定p節點的next節點
                if (p.casNext(null, newNode)) {
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
                    //[6] CAS成功,則說明新增節點已經被放傳入連結表,然後設定目前尾節點(包含head,第
					//1, 3 , 5 . . .個節點為尾節點)
                    if (p != t) // hop two nodes at a time
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            else if (p == q)
                // We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
              	//[7]多線程操作時,由于poll操作移除元素後可能會把head變為自引用,也就是head的next變
				//成了 head,是以這裡需要重新找新的head
                p = (t != (t = tail)) ? t : head;
            else
                // Check for tail updates after two hops.
                //[8]尋找尾節點
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }
           

offer 操作時在隊列末尾添加一個元素,如果該元素為null,則抛出NPE異常,否則由于ConcurrentLinkedQueue是無界隊列,該方法一直會傳回true。由于使用CAS無阻塞算法,是以該方法不會阻塞挂起調用的線程。

add

add操作是在連結清單末尾添加一個元素,其實在内部調用的還是offer操作。

/**
 * Inserts the specified element at the tail of this queue.
 * As the queue is unbounded, this method will never throw
 * {@link IllegalStateException} or return {@code false}.
 *
 * @return {@code true} (as specified by {@link Collection#add})
 * @throws NullPointerException if the specified element is null
 */
public boolean add(E e) {
    return offer(e);
}
           
poll

poll操作是在隊列頭部擷取并移除一個元素,如果隊列為空則傳回null。

public E poll() {
	  //[1]goto标記
    restartFromHead:
    //[2]無限循環
  	for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            //[3]儲存目前節點值
          	E item = p.item;
			//[4]目前節點有值則用CAS變為null
            if (item != null && p.casItem(item, null)) {
                // Successful CAS is the linearization point
                // for item to be removed from this queue.
              	//[5]CAS成功則标記目前節點并從連結清單中删除
                if (p != h) // hop two nodes at a time
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
          	//[6]目前隊列為空則傳回null
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
          	//[7]如果目前節點被自引用了,則重新尋找新的隊列頭節點
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}
           
peek

擷取隊列頭部的一個元素(隻擷取不移除),如果隊列為空則傳回null。

public E peek() {
  	//[1]
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
          	//[2]
            E item = p.item;
          	//[3]
            if (item != null || (q = p.next) == null) {
                updateHead(h, p);
                return item;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}
           

peek與poll操作類似,不同之處在于代碼[3]處少了castItem操作,其實這很正常,因為peek隻是擷取隊列頭元素,并不清空值。第一次調用peek操作的時候會删除哨兵節點,并讓隊列的head節點指向隊列的第一個元素或是null。

size

計算目前隊列元素的個數,在并發環境下不是很有用,因為CAS沒有加鎖,是以從調用size函數到傳回結果期間有可能增删元素,導緻統計的元素個數不精确。

/**
 * Returns the number of elements in this queue.  If this queue
 * contains more than {@code Integer.MAX_VALUE} elements, returns
 * {@code Integer.MAX_VALUE}.
 *
 * <p>Beware that, unlike in most collections, this method is
 * <em>NOT</em> a constant-time operation. Because of the
 * asynchronous nature of these queues, determining the current
 * number of elements requires an O(n) traversal.
 * Additionally, if elements are added or removed during execution
 * of this method, the returned result may be inaccurate.  Thus,
 * this method is typically not very useful in concurrent
 * applications.
 *
 * @return the number of elements in this queue
 */
public int size() {
    int count = 0;
    for (Node<E> p = first(); p != null; p = succ(p))
        if (p.item != null)
            // Collection.size() spec says to max out
            if (++count == Integer.MAX_VALUE)
                break;
    return count;
}

 /**
 * Returns the first live (non-deleted) node on list, or null if none.
 * This is yet another variant of poll/peek; here returning the
 * first node, not element.  We could make peek() a wrapper around
 * first(), but that would cost an extra volatile read of item,
 * and the need to add a retry loop to deal with the possibility
 * of losing a race to a concurrent poll().
 */
Node<E> first() {
  restartFromHead:
  for (;;) {
    for (Node<E> h = head, p = h, q;;) {
      boolean hasItem = (p.item != null);
      if (hasItem || (q = p.next) == null) {
        updateHead(h, p);
        return hasItem ? p : null;
      }
      else if (p == q)
        continue restartFromHead;
      else
        p = q;
    }
  }
}

/**
* Returns the successor of p, or the head node if p.next has been
* linked to self, which will only be true if traversing with a
* stale pointer that is now off the list.
*/
final Node<E> succ(Node<E> p) {
Node<E> next = p.next;
return (p == next) ? head : next;
}
           

first擷取第一個隊列的元素(哨兵元素不算),沒有則為null。

succ擷取目前節點的next元素,如果是自引入節點則傳回真正的頭節點

remove

如果隊列裡面存在該元素則删除該元素,如果存在多個則删除第一個,并傳回true,否則傳回false。

/**
 * Removes a single instance of the specified element from this queue,
 * if it is present.  More formally, removes an element {@code e} such
 * that {@code o.equals(e)}, if this queue contains one or more such
 * elements.
 * Returns {@code true} if this queue contained the specified element
 * (or equivalently, if this queue changed as a result of the call).
 *
 * @param o element to be removed from this queue, if present
 * @return {@code true} if this queue changed as a result of the call
 */
public boolean remove(Object o) {
  	//[1]
    if (o != null) {
        Node<E> next, pred = null;
        for (Node<E> p = first(); p != null; pred = p, p = next) {
            boolean removed = false;
            E item = p.item;
            //[2]相等則使用CAS設定為null,否則擷取next節點,繼續下一次循環查找是否有比對其他元素
          	//同時隻有一個線程可以操作成功
          	if (item != null) {
                if (!o.equals(item)) {
                    next = succ(p);
                    continue;
                }
                removed = p.casItem(item, null);
            }
			//[3]擷取next節點
            next = succ(p);
          	//[4]如果有前驅節點,并且next節點不為空則連結前驅節點到next
            if (pred != null && next != null) // unlink
                pred.casNext(p, next);
            if (removed)
                return true;
        }
    }
    return false;
}
           
contains

判斷隊列裡面是否含有指定對象,由于是周遊整個隊列,是以需要像size操作一樣結果也不是那麼精确,有可能調用該方法時元素還在隊列裡面,但是周遊過程中其他線程才把該元素删除了,那麼就會傳回false了。

/**
 * Returns {@code true} if this queue contains the specified element.
 * More formally, returns {@code true} if and only if this queue contains
 * at least one element {@code e} such that {@code o.equals(e)}.
 *
 * @param o object to be checked for containment in this queue
 * @return {@code true} if this queue contains the specified element
 */
public boolean contains(Object o) {
    if (o == null) return false;
    for (Node<E> p = first(); p != null; p = succ(p)) {
        E item = p.item;
        if (item != null && o.equals(item))
            return true;
    }
    return false;
}
           

小結

  • ConcurrentLinkedQueue

    的底層使用單向連結清單資料結構來儲存隊列元素,每個元素被包裝成一個 Node 節點。隊列是靠頭、尾節點來維護的,建立隊列時頭、尾節點指向-個 item 為 null 的哨兵節點。第一次執行 peek 或者first 操作時會把 head 指向第一個真正的隊 列元素。由于使用非阻塞 CAS 算法,沒有加鎖,是以在計算 size 時有可能進行了 offer 、poll 或者 remove 操作 , 導緻計算的元素個數不精确,是以在井發情況下 size 函數不是很 有用。
  • 入隊、出隊都是操作使用 volatile 修飾的 tail 、 head 節點,要保證在多線程下出入隊線程安全,隻需要保證這兩個 Node 操作的可見性和原子性即可。由于 volatile 本身可以保證可見性,是以隻需要保證對兩個變量操作的原子性即可。
[Java] Java 并發包中并發原理剖析之ConcurrentLinkedQueue
  • offer 操作是在 tail 後面添加元素,也就是調用 tail.casNext 方法,而這個方法使用的是CAS 操作,隻有一個線程會成功,然後失敗的線程會循環,重新擷取 tail , 再執行 casNext 方法。 poll 操作也通過類似 CAS 的算法保證出隊時移除節點操作的原子性。

REFERENCES

  • Java并發程式設計之美
  • JDK-API-DOCS