目錄:
帶着BAT大廠的面試問題去了解
ConcurrentLinkedQueue資料結構
ConcurrentLinkedQueue源碼分析
ConcurrentLinkedQueue示例
HOPS(延遲更新的政策)的設計
ConcurrentLinkedQueue适合的場景
ConcurerntLinkedQueue一個基于連結節點的無界線程安全隊列。此隊列按照 FIFO(先進先出)原則對元素進行排序。隊列的頭部是隊列中時間最長的元素。隊列的尾部 是隊列中時間最短的元素。新的元素插入到隊列的尾部,隊列擷取操作從隊列頭部獲得元素。當多個線程共享通路一個公共 collection 時,ConcurrentLinkedQueue是一個恰當的選擇。此隊列不允許使用null元素。
- JUC集合: ConcurrentLinkedQueue詳解 帶着BAT大廠的面試問題去了解 ConcurrentLinkedQueue資料結構 ConcurrentLinkedQueue源碼分析 類的繼承關系 類的内部類 類的屬性 類的構造函數 核心函數分析 offer函數 poll函數 remove函數 size函數 ConcurrentLinkedQueue示例 再深入了解 HOPS(延遲更新的政策)的設計 ConcurrentLinkedQueue适合的場景 參考文章
帶着BAT大廠的面試問題去了解
請帶着這些問題繼續後文,會很大程度上幫助你更好的了解相關知識點
- 要想用線程安全的隊列有哪些選擇? Vector,Collections.synchronizedList(List<T> list), ConcurrentLinkedQueue等
- ConcurrentLinkedQueue實作的資料結構?
- ConcurrentLinkedQueue底層原理? 全程無鎖(CAS)
- ConcurrentLinkedQueue的核心方法有哪些? offer(),poll(),peek(),isEmpty()等隊列常用方法
- 說說ConcurrentLinkedQueue的HOPS(延遲更新的政策)的設計?
- ConcurrentLinkedQueue适合什麼樣的使用場景?
ConcurrentLinkedQueue資料結構
通過源碼分析可知,ConcurrentLinkedQueue的資料結構與LinkedBlockingQueue的資料結構相同,都是使用的連結清單結構。ConcurrentLinkedQueue的資料結構如下:
說明: ConcurrentLinkedQueue采用的連結清單結構,并且包含有一個頭節點和一個尾結點。
ConcurrentLinkedQueue源碼分析
類的繼承關系
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<E>, java.io.Serializable {}
說明: ConcurrentLinkedQueue繼承了抽象類AbstractQueue,AbstractQueue定義了對隊列的基本操作;同時實作了Queue接口,Queue定義了對隊列的基本操作,同時,還實作了Serializable接口,表示可以被序列化。
類的内部類
private static class Node<E> {
// 元素
volatile E item;
// next域
volatile Node<E> next;
/**
* Constructs a new node. Uses relaxed write because item can
* only be seen after publication via casNext.
*/
// 構造函數
Node(E item) {
// 設定item的值
UNSAFE.putObject(this, itemOffset, item);
}
// 比較并替換item值
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
void lazySetNext(Node<E> val) {
// 設定next域的值,并不會保證修改對其他線程立即可見
UNSAFE.putOrderedObject(this, nextOffset, val);
}
// 比較并替換next域的值
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// Unsafe mechanics
// 反射機制
private static final sun.misc.Unsafe UNSAFE;
// item域的偏移量
private static final long itemOffset;
// next域的偏移量
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = Node.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
說明: Node類表示連結清單結點,用于存放元素,包含item域和next域,item域表示元素,next域表示下一個結點,其利用反射機制和CAS機制來更新item域和next域,保證原子性。
類的屬性
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<E>, java.io.Serializable {
// 版本序列号
private static final long serialVersionUID = 196745693267521676L;
// 反射機制
private static final sun.misc.Unsafe UNSAFE;
// head域的偏移量
private static final long headOffset;
// tail域的偏移量
private static final long tailOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = ConcurrentLinkedQueue.class;
headOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("head"));
tailOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("tail"));
} catch (Exception e) {
throw new Error(e);
}
}
// 頭節點
private transient volatile Node<E> head;
// 尾結點
private transient volatile Node<E> tail;
}
說明: 屬性中包含了head域和tail域,表示連結清單的頭節點和尾結點,同時,ConcurrentLinkedQueue也使用了反射機制和CAS機制來更新頭節點和尾結點,保證原子性。
類的構造函數
- ConcurrentLinkedQueue()型構造函數
public ConcurrentLinkedQueue() {
// 初始化頭節點與尾結點
head = tail = new Node<E>(null);
}
說明: 該構造函數用于建立一個最初為空的 ConcurrentLinkedQueue,頭節點與尾結點指向同一個結點,該結點的item域為null,next域也為null。
- ConcurrentLinkedQueue(Collection<? extends E>)型構造函數
public ConcurrentLinkedQueue(Collection<? extends E> c) {
Node<E> h = null, t = null;
for (E e : c) { // 周遊c集合
// 保證元素不為空
checkNotNull(e);
// 新生一個結點
Node<E> newNode = new Node<E>(e);
if (h == null) // 頭節點為null
// 指派頭節點與尾結點
h = t = newNode;
else {
// 直接頭節點的next域
t.lazySetNext(newNode);
// 重新指派頭節點
t = newNode;
}
}
if (h == null) // 頭節點為null
// 新生頭節點與尾結點
h = t = new Node<E>(null);
// 指派頭節點
head = h;
// 指派尾結點
tail = t;
}
說明: 該構造函數用于建立一個最初包含給定 collection 元素的 ConcurrentLinkedQueue,按照此 collection 疊代器的周遊順序來添加元素。
核心函數分析
offer函數
public boolean offer(E e) {
// 元素不為null
checkNotNull(e);
// 新生一個結點
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) { // 無限循環
// q為p結點的下一個結點
Node<E> q = p.next;
if (q == null) { // q結點為null
// p is last node
if (p.casNext(null, newNode)) { // 比較并進行替換p結點的next域
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
if (p != t) // p不等于t結點,不一緻 // hop two nodes at a time
// 比較并替換尾結點
casTail(t, newNode); // Failure is OK.
// 傳回
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q) // p結點等于q結點
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
// 原來的尾結點與現在的尾結點是否相等,若相等,則p指派為head,否則,指派為現在的尾結點
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
// 重新指派p結點
p = (p != t && t != (t = tail)) ? t : q;
}
}
說明: offer函數用于将指定元素插入此隊列的尾部。下面模拟offer函數的操作,隊列狀态的變化(假設單線程添加元素,連續添加10、20兩個元素)。
- 若ConcurrentLinkedQueue的初始狀态如上圖所示,即隊列為空。單線程添加元素,此時,添加元素10,則狀态如下所示
- 如上圖所示,添加元素10後,tail沒有變化,還是指向之前的結點,繼續添加元素20,則狀态如下所示
- 如上圖所示,添加元素20後,tail指向了最新添加的結點。
poll函數
public E poll() {
restartFromHead:
for (;;) { // 無限循環
for (Node<E> h = head, p = h, q;;) { // 儲存頭節點
// item項
E item = p.item;
if (item != null && p.casItem(item, null)) { // item不為null并且比較并替換item成功
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // p不等于h // hop two nodes at a time
// 更新頭節點
updateHead(h, ((q = p.next) != null) ? q : p);
// 傳回item
return item;
}
else if ((q = p.next) == null) { // q結點為null
// 更新頭節點
updateHead(h, p);
return null;
}
else if (p == q) // p等于q
// 繼續循環
continue restartFromHead;
else
// p指派為q
p = q;
}
}
}
說明: 此函數用于擷取并移除此隊列的頭,如果此隊列為空,則傳回null。下面模拟poll函數的操作,隊列狀态的變化(假設單線程操作,狀态為之前offer10、20後的狀态,poll兩次)。
- 隊列初始狀态如上圖所示,在poll操作後,隊列的狀态如下圖所示
- 如上圖可知,poll操作後,head改變了,并且head所指向的結點的item變為了null。再進行一次poll操作,隊列的狀态如下圖所示。
- 如上圖可知,poll操作後,head結點沒有變化,隻是訓示的結點的item域變成了null。
remove函數
public boolean remove(Object o) {
// 元素為null,傳回
if (o == null) return false;
Node<E> pred = null;
for (Node<E> p = first(); p != null; p = succ(p)) { // 擷取第一個存活的結點
// 第一個存活結點的item值
E item = p.item;
if (item != null &&
o.equals(item) &&
p.casItem(item, null)) { // 找到item相等的結點,并且将該結點的item設定為null
// p的後繼結點
Node<E> next = succ(p);
if (pred != null && next != null) // pred不為null并且next不為null
// 比較并替換next域
pred.casNext(p, next);
return true;
}
// pred指派為p
pred = p;
}
return false;
}
說明: 此函數用于從隊列中移除指定元素的單個執行個體(如果存在)。其中,會調用到first函數和succ函數,first函數的源碼如下
Node<E> first() {
restartFromHead:
for (;;) { // 無限循環,確定成功
for (Node<E> h = head, p = h, q;;) {
// p結點的item域是否為null
boolean hasItem = (p.item != null);
if (hasItem || (q = p.next) == null) { // item不為null或者next域為null
// 更新頭節點
updateHead(h, p);
// 傳回結點
return hasItem ? p : null;
}
else if (p == q) // p等于q
// 繼續從頭節點開始
continue restartFromHead;
else
// p指派為q
p = q;
}
}
}
說明: first函數用于找到連結清單中第一個存活的結點。succ函數源碼如下
final Node<E> succ(Node<E> p) {
// p結點的next域
Node<E> next = p.next;
// 如果next域為自身,則傳回頭節點,否則,傳回next
return (p == next) ? head : next;
}
@pdai: 代碼已經複制到剪貼闆
說明: succ用于擷取結點的下一個結點。如果結點的next域指向自身,則傳回head頭節點,否則,傳回next結點。下面模拟remove函數的操作,隊列狀态的變化(假設單線程操作,狀态為之前offer10、20後的狀态,執行remove(10)、remove(20)操作)。
- 如上圖所示,為ConcurrentLinkedQueue的初始狀态,remove(10)後的狀态如下圖所示
- 如上圖所示,當執行remove(10)後,head指向了head結點之前指向的結點的下一個結點,并且head結點的item域置為null。繼續執行remove(20),狀态如下圖所示
- 如上圖所示,執行remove(20)後,head與tail指向同一個結點,item域為null。
size函數
public int size() {
// 計數
int count = 0;
for (Node<E> p = first(); p != null; p = succ(p)) // 從第一個存活的結點開始往後周遊
if (p.item != null) // 結點的item域不為null
// Collection.size() spec says to max out
if (++count == Integer.MAX_VALUE) // 增加計數,若達到最大值,則跳出循環
break;
// 傳回大小
return count;
}
說明: 此函數用于傳回ConcurrenLinkedQueue的大小,從第一個存活的結點(first)開始,往後周遊連結清單,當結點的item域不為null時,增加計數,之後傳回大小。
ConcurrentLinkedQueue示例
下面通過一個示例來了解ConcurrentLinkedQueue的使用
import java.util.concurrent.ConcurrentLinkedQueue;
class PutThread extends Thread {
private ConcurrentLinkedQueue<Integer> clq;
public PutThread(ConcurrentLinkedQueue<Integer> clq) {
this.clq = clq;
}
public void run() {
for (int i = 0; i < 10; i++) {
try {
System.out.println("add " + i);
clq.add(i);
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class GetThread extends Thread {
private ConcurrentLinkedQueue<Integer> clq;
public GetThread(ConcurrentLinkedQueue<Integer> clq) {
this.clq = clq;
}
public void run() {
for (int i = 0; i < 10; i++) {
try {
System.out.println("poll " + clq.poll());
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class ConcurrentLinkedQueueDemo {
public static void main(String[] args) {
ConcurrentLinkedQueue<Integer> clq = new ConcurrentLinkedQueue<Integer>();
PutThread p1 = new PutThread(clq);
GetThread g1 = new GetThread(clq);
p1.start();
g1.start();
}
}
運作結果(某一次):
add 0
poll null
add 1
poll 0
add 2
poll 1
add 3
poll 2
add 4
poll 3
add 5
poll 4
poll 5
add 6
add 7
poll 6
poll 7
add 8
add 9
poll 8
說明: GetThread線程不會因為ConcurrentLinkedQueue隊列為空而等待,而是直接傳回null,是以當實作隊列不空時,等待時,則需要使用者自己實作等待邏輯。
再深入了解
HOPS(延遲更新的政策)的設計
通過上面對offer和poll方法的分析,我們發現tail和head是延遲更新的,兩者更新觸發時機為:
- tail更新觸發時機:當tail指向的節點的下一個節點不為null的時候,會執行定位隊列真正的隊尾節點的操作,找到隊尾節點後完成插入之後才會通過casTail進行tail更新;當tail指向的節點的下一個節點為null的時候,隻插入節點不更新tail。
- head更新觸發時機:當head指向的節點的item域為null的時候,會執行定位隊列真正的隊頭節點的操作,找到隊頭節點後完成删除之後才會通過updateHead進行head更新;當head指向的節點的item域不為null的時候,隻删除節點不更新head。
并且在更新操作時,源碼中會有注釋為:hop two nodes at a time。是以這種延遲更新的政策就被叫做HOPS的大概原因是這個(猜的 ),從上面更新時的狀态圖可以看出,head和tail的更新是“跳着的”即中間總是間隔了一個。那麼這樣設計的意圖是什麼呢?
如果讓tail永遠作為隊列的隊尾節點,實作的代碼量會更少,而且邏輯更易懂。但是,這樣做有一個缺點,如果大量的入隊操作,每次都要執行CAS進行tail的更新,彙總起來對性能也會是大大的損耗。如果能減少CAS更新的操作,無疑可以大大提升入隊的操作效率,是以doug lea大師每間隔1次(tail和隊尾節點的距離為1)進行才利用CAS更新tail。對head的更新也是同樣的道理,雖然,這樣設計會多出在循環中定位隊尾節點,但總體來說讀的操作效率要遠遠高于寫的性能,是以,多出來的在循環中定位尾節點的操作的性能損耗相對而言是很小的。
ConcurrentLinkedQueue适合的場景
ConcurrentLinkedQueue通過無鎖來做到了更高的并發量,是個高性能的隊列,但是使用場景相對不如阻塞隊列常見,畢竟取資料也要不停的去循環,不如阻塞的邏輯好設計,但是在并發量特别大的情況下,是個不錯的選擇,性能上好很多,而且這個隊列的設計也是特别費力,尤其的使用的改良算法和對哨兵的處理。整體的思路都是比較嚴謹的,這個也是使用了無鎖造成的,我們自己使用無鎖的條件的話,這個隊列是個不錯的參考。