在并發程式設計中,有時候需要使用線程安全的隊列。如果要實作一個線程安全的隊列有兩 種方式:一種是使用阻塞算法,另一種是使用非阻塞算法。使用阻塞算法的隊列可以用一個鎖 (入隊和出隊用同一把鎖)或兩個鎖(入隊和出隊用不同的鎖)等方式來實作。非阻塞的實作方式則可以使用循環CAS的方式來實作。
ConcurrentLinkedQueue是一個非阻塞的基于連結清單節點的無界線程安全隊列,它遵循先進先出的原則,并采用了“wait-free”算法(即CAS算法)來實作。
類關系圖
核心屬性
// 頭結點指針(不一定指向頭,緩更新,非阻塞的,是以沒法保證)
private volatile Node<E> head;
// 尾節點指針(不一定指向尾,緩更新,非阻塞的,是以沒法保證)
private volatile Node<E> tail;
// 連結清單節點
private static class Node<E> {
// 節點元素(存資料)
volatile E item;
// 下一個節點
volatile Node<E> next;
...
}
ConcurrentLinkedQueue由head節點和tail節點組成,每個節點(Node)由節點元素(item)和 指向下一個節點(next)的引用組成,節點與節點之間就是通過這個next關聯起來,進而組成一 張連結清單結構的隊列。預設情況下head節點存儲的元素為空,tail節點等于head節點。
注意:
- 核心屬性都是用volatile來修飾來保證資料的可見性。
- 預設情況下頭結點和尾節點是相等的對應節點的
為
item
null
- 頭和尾節點都是緩更新,是以他們隻能代表一個邏輯上的位置,不是實際的頭尾節點
offer() 入隊方法
public boolean offer(E e) {
// 檢查元素是否為NULL,這裡不允許存NULL值
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
// 自旋,t是tail節點的引用,p表示尾節點,預設情況下p=t的
for (Node<E> t = tail, p = t; ; ) {
// q是p的下一個節點
Node<E> q = p.next;
// q==null表示p是尾節點,否則需要更新p進入下一次循環
if (q == null) {
// 使用CAS将新節點設定為p的next節點
if (p.casNext(null, newNode)) {
// 不是每次添加新節點都會更新尾節點指針,每新增兩個節點才會更新尾節點指針。
// 這樣做的好處是減少了更新尾節點的競争,但是增加了尋找真正尾節點的代價。
if (p != t) // hop two nodes at a time
// 即使設定失敗也沒有問題,說明其他線程已經更新了尾節點的值
casTail(t, newNode);
// 永遠傳回true
return true;
}
}
// 定位尾節點
// 删除節點的時候,會把删除的節點指向自己(自引用),就會發生p==q. 當添加第一個節點後tail也會發生自引用
else if (p == q)
// t != tail表示尾節點指針已經被移動過,這時p直接指向新的尾節點指針tail,否則p直接指向頭結點指針,
// 因為這時p已經被移出了連結清單,是以不能将p指向p.next
p = (t != (t = tail)) ? t : head;
else
// t != tail表示尾節點指針已經被移動過,是以p直接指向新的尾節點指針(tail),否則p指向p.next
p = (p != t && t != (t = tail)) ? t : q;
}
}
入隊過程:
- 檢查元素是否為NULL,如果是NULL直接抛出異常
- 初始化新節點
newNode
- 找到尾節點指針```t``
- 根據尾節點指針
找到真正的尾節點t
p
- CAS将
的p
指向next
newNode
- 判斷是否更新尾節點指針的位置
注意:
- ConcurrentLinkedQueue 是不允許存NULL值的,NULL有特殊含義,表示已經從連結清單中移除(出隊了)
- 入隊過程大緻分為三步,第一步找到真正的尾節點
;第二步CAS更新
p
;第三步CAS更新尾節點指針的位置
p.next
casTail(t, newNode);
-
表示尾節點指針已經被移動過
t != tail
-
表示
p == q
節點已經從連結清單中移除,并發生了自引用
p
- 每新增兩個節點才會更新尾節點指針, 這樣能減少了更新尾節點的競争,但是增加了尋找真正尾節點的代價。從本質上來 看它通過增加對volatile變量的讀操作來減少對volatile變量的寫操作,而對volatile變量的寫操作開銷要遠遠大于讀操作,是以入隊效率會有所提升。
入隊快照圖
入隊debug圖
poll()出隊方法
public E poll() {
// 外循環标記
restartFromHead:
for (; ; ) {
// h是頭結點指針,p頭結點,q是p的下一個節點
for (Node<E> h = head, p = h, q; ; ) {
// 儲存頭節點的值
E item = p.item;
// CAS替換頭結點的item值為null,如果item為null表示該節點已經從連結清單中移除了
if (item != null && p.casItem(item, null)) {
// 每移除兩個元素,頭節點指針才開始移動
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
// 目前隊列為NULL
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
// 發生了自引用,重頭再來
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
出隊過程:
- 根據頭結點指針找到頭結點
- 暫存頭結點的值
- 判斷節點内容
是否為item
,如果為NULL
表示該節點已經出隊了NULL
- 如果
不為item
則使用CAS替換為NULL
,如果成功判斷是否需要更新頭結點指針NULL
- 如果失敗表示該節點已經被其他線程出隊了,尋找新的頭結點,繼續第二步
注意:
- 每删除兩個節點才會更新頭點指針, 這樣能減少了更新頭點的競争。
- 當節點的
時表示節點已經出隊了
item==null
測試代碼
public class ConcurrentLinkedQueueTset {
public static void main(String[] args) {
ConcurrentLinkedQueue<Integer> linkedQueue = new ConcurrentLinkedQueue<>();
linkedQueue.offer(1);
linkedQueue.offer(2);
linkedQueue.poll();
linkedQueue.offer(3);
linkedQueue.offer(4);
linkedQueue.poll();
linkedQueue.poll();
}
}
參考
《java并發程式設計的藝術》
源碼
https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases