一、ConcurrentLinkedQueue簡介
ConcurrentLinkedQueue是JDK1.5時随着J.U.C一起引入的一個支援并發環境的隊列。從名字就可以看出來,ConcurrentLinkedQueue底層是基于連結清單實作的。
Doug Lea在實作ConcurrentLinkedQueue時,并沒有利用鎖或底層同步原語,而是完全基于自旋+CAS的方式實作了該隊列。回想一下AQS,AQS内部的CLH等待隊列也是利用了這種方式。
由于是完全基于無鎖算法實作的,是以當出現多個線程同時進行修改隊列的操作(比如同時入隊),很可能出現CAS修改失敗的情況,那麼失敗的線程會進入下一次自旋,再嘗試入隊操作,直到成功。是以,在并發量适中的情況下,ConcurrentLinkedQueue一般具有較好的性能。
二、ConcurrentLinkedQueue原理
隊列結構
我們來看下ConcurrentLinkedQueue的内部結構,:
public class ConcurrentLinkedQueue extends AbstractQueue
implements Queue, java.io.Serializable {
private transient volatile Node head;
private transient volatile Node tail;
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long headOffset;
private static final long tailOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class> k = ConcurrentLinkedQueue.class;
headOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("head"));
tailOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("tail"));
} catch (Exception e) {
throw new Error(e);
}
}
private static class Node {
volatile E item; // 元素值
volatile Node next; // 後驅指針
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
void lazySetNext(Node val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
boolean casNext(Node cmp, Node val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class> k = Node.class;
itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
//...
}
可以看到,ConcurrentLinkedQueue内部就是一個簡單的單連結清單結構,每入隊一個元素就是插入一個Node類型的結點。字段head指向隊列頭,tail指向隊列尾,通過Unsafe來CAS操作字段值以及Node對象的字段值。
構造器定義
ConcurrentLinkedQueue包含兩種構造器:
public ConcurrentLinkedQueue() {
head = tail = new Node(null);
}
public ConcurrentLinkedQueue(Collection extends E> c) {
Node h = null, t = null;
for (E e : c) {
checkNotNull(e);
Node newNode = new Node(e);
if (h == null)
h = t = newNode;
else {
t.lazySetNext(newNode);
t = newNode;
}
}
if (h == null)
h = t = new Node(null);
head = h;
tail = t;
}
我們重點看下空構造器,通過空構造器建立的ConcurrentLinkedQueue對象,其head和tail指針并非指向null,而是指向一個item值為null的Node結點——哨兵結點,如下圖:
入隊操作
元素的入隊是在隊尾插入元素,關于隊列的操作,如果讀者不熟悉,可以參考《Algorithms 4th》或我的這篇博文:https://www.jianshu.com/p/f9b...
ConcurrentLinkedQueue的入隊代碼很簡單,卻非常精妙:
public boolean add(E e) {
return offer(e);
}
public boolean offer(E e) {
checkNotNull(e);
final Node newNode = new Node(e);
for (Node t = tail, p = t; ; ) { // 自旋, 直到插入結點成功
Node q = p.next;
if (q == null) { // CASE1: 正常情況下, 新結點直接插入到隊尾
if (p.casNext(null, newNode)) {
// CAS競争插入成功
if (p != t) // CAS競争失敗的線程會在下一次自旋中進入該邏輯
casTail(t, newNode); // 重新設定隊尾指針tail
return true;
}
// CAS競争插入失敗,則進入下一次自旋
} else if (p == q) // CASE2: 發生了出隊操作
p = (t != (t = tail)) ? t : head;
else
// 将p重新指向隊尾結點
p = (p != t && t != (t = tail)) ? t : q;
}
}
我們來分析下上面offer方法的實作。單線程的情況下,元素入隊比較好了解,直接線性地在隊首插入元素即可。現在我們假設有兩個線程ThreadA和ThreadB同時進行入隊操作:
①ThreadA先單獨入隊兩個元素9、2
此時隊列的結構如下:
②ThreadA入隊元素“10”,ThreadB入隊元素“25”
此時ThreadA和ThreadB若并發執行,我們看下會發生什麼:
1、ThreadA和ThreadB同時進入自旋中的以下代碼塊:
if (q == null) { // CASE1: 正常情況下, 新結點直接插入到隊尾
if (p.casNext(null, newNode)) {
// CAS競争插入成功
if (p != t) // CAS競争失敗的線程會在下一次自旋中進入該邏輯
casTail(t, newNode); // 重新設定隊尾指針tail
return true;
}
// CAS競争插入失敗,則進入下一次自旋
}
2、ThreadA執行cas操作(p.casNext)成功,插入新結點“10”
ThreadA執行完成後,直接傳回true,隊列結構如下:
3、ThreadB執行cas操作(p.casNext)失敗
由于CAS操作同時修改隊尾元素,導緻ThreadB操作失敗,則ThreadB進入下一次自旋;
在下一次自旋中,進入以下代碼塊:
else
// 将p重新指向隊尾結點
p = (p != t && t != (t = tail)) ? t : q;
上述分支的作用就是讓p指針重新定位到隊尾結點,此時隊列結構如下:
然後ThreadB會繼續下一次自旋,并再次進入以下代碼塊:
if (q == null) { // CASE1: 正常情況下, 新結點直接插入到隊尾
if (p.casNext(null, newNode)) {
// CAS競争插入成功
if (p != t) // CAS競争失敗的線程會在下一次自旋中進入該邏輯
casTail(t, newNode); // 重新設定隊尾指針tail
return true;
}
// CAS競争插入失敗,則進入下一次自旋
}
此時,CAS操作成功,隊列結構如下:
由于此時p!=t ,是以會調用casTail方法重新設定隊尾指針:
casTail(t, newNode); // 重新設定隊尾指針tail
最終隊列如下:
從上面的分析過程可以看到,由于入隊元素一定是要連結到隊尾的,但并發情況下隊尾結點可能随時變化,是以就需要指針定位最新的隊尾結點,并在入隊時判斷隊尾結點是否改變了,如果改變了,就需要重新設定定位指針,然後在下一次自旋中繼續嘗試入隊操作。
上面整個執行步驟有一段分支還沒有覆寫到:
else if (p == q) // CASE2: 發生了出隊操作
p = (t != (t = tail)) ? t : head;
這個分支隻有在元素入隊的同時,針對該元素也發生了“出隊”操作才會執行,我們後面會分析元素的“出隊”,了解了“出隊”操作再回頭來看這個分支就容易了解很多了。
出隊操作
隊列中元素的“出隊”是從隊首移除元素,我們來看下ConcurrentLinkedQueue是如何實作出隊的:
public E poll() {
restartFromHead:
for (; ; ) {
for (Node h = head, p = h, q; ; ) {
E item = p.item;
if (item != null && p.casItem(item, null)) { // CASE2: 隊首是非哨兵結點(item!=null)
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
} else if ((q = p.next) == null) { // CASE1: 隊首是一個哨兵結點(item==null)
updateHead(h, p);
return null;
} else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
還是通過示例來看,假設初始的隊列結構如下:
①ThreadA先單獨進行出隊操作
由于head所指的是item==null的結點,是以ThreadA會執行以下分支:
else
p = q;
然後進入下一次自旋,在自旋中執行以下分支,如果CAS操作成功,則移除首個有效元素,并重新設定頭指針:
if (item != null && p.casItem(item, null)) { // CASE2: 隊首是非哨兵結點(item!=null)
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
此時隊列的結構如下:
如果ThreadA的CAS操作失敗呢?
CAS操作失敗則會進入以下分支,并重新開始自旋:
else if (p == q)
continue restartFromHead;
最終前面兩個null結點會被GC回收,隊列結構如下:
②ThreadA繼續進行出隊操作
ThreadA繼續執行“出隊”操作,還是執行以下分支:
if (item != null && p.casItem(item, null)) { // CASE2: 隊首是非哨兵結點(item!=null)
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
但是此時p==h,是以僅将頭結點置null,這其實是一種“懶删除”的政策。
出隊元素“2”:
出隊元素“10”:
最終隊列結果如下:
③ThreadA進行出隊,其它線程進行入隊
這是最特殊的一種情況,當隊列中隻剩下一個元素時,如果同時發生出隊和入隊操作,會導緻隊列出現下面這種結構:(假設ThreadA進行出隊元素“25”,ThreadB進行入隊元素“11”)
此時tail.next=tail自身,是以ThreadB在執行入隊時,會進入到offer方法的以下分支:
else if (p == q) // CASE2: 發生了出隊操作
p = (t != (t = tail)) ? t : head;
三、總結
ConcurrentLinkedQueue使用了自旋+CAS的非阻塞算法來保證線程并發通路時的資料一緻性。由于隊列本身是一種連結清單結構,是以雖然算法看起來很簡單,但其實需要考慮各種并發的情況,實作複雜度較高,并且ConcurrentLinkedQueue不具備實時的資料一緻性,實際運用中,隊列一般在生産者-消費者的場景下使用得較多,是以ConcurrentLinkedQueue的使用場景并不如阻塞隊列那麼多。
另外,關于ConcurrentLinkedQueue還有以下需要注意的幾點:
ConcurrentLinkedQueue的疊代器是弱一緻性的,這在并發容器中是比較普遍的現象,主要是指在一個線程在周遊隊列結點而另一個線程嘗試對某個隊列結點進行修改的話不會抛出ConcurrentModificationException,這也就造成在周遊某個尚未被修改的結點時,在next方法傳回時可以看到該結點的修改,但在周遊後再對該結點修改時就看不到這種變化。
size方法需要周遊連結清單,是以在并發情況下,其結果不一定是準确的,隻能供參考。