
孤獨,所有人都是孤獨的,沒有人能獨自超脫這一切。
——瑪娅·安傑格
0 前言
LinkedBlockingQueue - 單連結清單實作的阻塞隊列。該隊列按 FIFO(先進先出)排序元素,新元素從隊列尾部插入,從隊首擷取元素.是深入并發程式設計的基礎資料結構.
1 繼承體系
- Queue 作為最基礎的接口,定義了隊列的三大類基本操作:
LinkedBlockingQueue 核心源碼解析0 前言1 繼承體系2 屬性3 構造方法4 新增5 取出6 總結 - BlockingQueue 即在 Queue 的基礎上加上了阻塞的概念,比如
- 一直阻塞
- 阻塞一定時間,傳回特殊值
remove 方法,BlockingQueue 類注釋中定義的是抛異常,但 LinkedBlockingQueue 中 remove 方法實際是傳回 false。
2 屬性
2.1 鍊式存儲
- 節點的資料結構
LinkedBlockingQueue 核心源碼解析0 前言1 繼承體系2 屬性3 構造方法4 新增5 取出6 總結 - next 為目前元素的下一個,為 null 表示目前節點是最後一個
- 連結清單容量 預設大小為 Integer.MAX_VALUE !顯然太大了!禁用!
LinkedBlockingQueue 核心源碼解析0 前言1 繼承體系2 屬性3 構造方法4 新增5 取出6 總結 LinkedBlockingQueue 核心源碼解析0 前言1 繼承體系2 屬性3 構造方法4 新增5 取出6 總結 -
連結清單已有元素大小
使用了 AtomicInteger,線程安全
LinkedBlockingQueue 核心源碼解析0 前言1 繼承體系2 屬性3 構造方法4 新增5 取出6 總結 - 連結清單頭、尾
LinkedBlockingQueue 核心源碼解析0 前言1 繼承體系2 屬性3 構造方法4 新增5 取出6 總結 LinkedBlockingQueue 核心源碼解析0 前言1 繼承體系2 屬性3 構造方法4 新增5 取出6 總結
2.2 鎖
- take 時的鎖
LinkedBlockingQueue 核心源碼解析0 前言1 繼承體系2 屬性3 構造方法4 新增5 取出6 總結 - take 的條件隊列,condition 可以簡單了解為基于 ASQ 建立的條件隊列
LinkedBlockingQueue 核心源碼解析0 前言1 繼承體系2 屬性3 構造方法4 新增5 取出6 總結 - put 時的鎖
LinkedBlockingQueue 核心源碼解析0 前言1 繼承體系2 屬性3 構造方法4 新增5 取出6 總結 - put 的條件隊列
LinkedBlockingQueue 核心源碼解析0 前言1 繼承體系2 屬性3 構造方法4 新增5 取出6 總結
鎖有 take 鎖和 put 鎖,是為了保證隊列操作時的線程安全,設計兩種鎖,是為了 take 和 put 兩種操作可以同時進行,而互不影響。
2.3 疊代器
- 實作了自己的疊代器
LinkedBlockingQueue 核心源碼解析0 前言1 繼承體系2 屬性3 構造方法4 新增5 取出6 總結
3 構造方法
3.1 無參
- 預設為 Integer 的最大值
LinkedBlockingQueue 核心源碼解析0 前言1 繼承體系2 屬性3 構造方法4 新增5 取出6 總結
3.2 有參
- 指定容量大小.連結清單頭尾相等,節點值(item)都是 null
LinkedBlockingQueue 核心源碼解析0 前言1 繼承體系2 屬性3 構造方法4 新增5 取出6 總結 -
已有集合資料public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
// 集合元素不能為 null
if (e == null)
throw new NullPointerException();
// capacity 代表連結清單的大小,在這為 Integer.MAX_VALUE
// 若集合大小 > IntegerMAX_VALUE,直接抛異常.
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
4 新增
以 offer 方法為例.
将元素E添加到隊列的末尾.
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
// 如果“隊列已滿”,則傳回false,表示插入失敗。
final AtomicInteger count = this.count;
if (count.get() == capacity) return false;
int c = -1;
// 建立“節點e”
Node<E> node = new Node(e);
final ReentrantLock putLock = this.putLock;
// 擷取“插入鎖putLock”
putLock.lock();
try {
// 再次對“隊列是不是滿”的進行判斷。
// 若“隊列未滿”,則插入節點。
if (count.get() < capacity) {
// 插入節點 enqueue(node);
// 将“目前節點數量”+1,并傳回“原始的數量”
c = count.getAndIncrement();
// 如果在插入元素之後,隊列仍然未滿,則喚醒notFull上的等待線程。
if (c + 1 < capacity) notFull.signal();
}
} finally {
// 釋放“插入鎖putLock”
putLock.unlock();
}
// 如果在插入節點前,隊列為空;則插入節點後,喚醒notEmpty上的等待線程
if (c == 0)
signalNotEmpty();
return c >= 0;
}
複制
enqueue
- 将node添加到隊列末尾,并設定node為新的尾節點
LinkedBlockingQueue 核心源碼解析0 前言1 繼承體系2 屬性3 構造方法4 新增5 取出6 總結
signalNotEmpty
- 發出等待信号。 僅能從put/offer調用(否則通常不鎖定takeLock),喚醒notEmpty上的等待線程.
LinkedBlockingQueue 核心源碼解析0 前言1 繼承體系2 屬性3 構造方法4 新增5 取出6 總結
新增資料成功後,在适當時機,會喚起 put 的等待線程(隊列不滿時),或者 take 的等待線程(隊列不為空時),這樣保證隊列一旦滿足 put 或者 take 條件時,立馬就能喚起阻塞線程,繼續運作,保證了喚起的時機不被浪費。
5 取出
以take()為例.取出并傳回隊列的頭。若隊列為空,則一直等待。
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
// 擷取“取出鎖”,若目前線程是中斷狀态,則抛出InterruptedException異常
takeLock.lockInterruptibly();
try {
// 若“隊列為空”,則一直等待。
while (count.get() == 0) {
notEmpty.await();
}
// 取出元素
x = dequeue();
// 取出元素之後,将“節點數量”-1;并傳回“原始的節點數量”。
c = count.getAndDecrement();
if (c > 1) notEmpty.signal();
} finally {
// 釋放“取出鎖”
takeLock.unlock();
}
// 如果在“取出元素之前”,隊列是滿的;則在取出元素之後,喚醒notFull上的等待線程。
if (c == capacity)
signalNotFull();
return x;
}
複制
dequeue
- 删除隊列的頭節點,并将表頭指向“原頭節點的下一個節點”。
LinkedBlockingQueue 核心源碼解析0 前言1 繼承體系2 屬性3 構造方法4 新增5 取出6 總結
signalNotFull
- 發出等待的信号。 僅能從take/poll調用.喚醒notFull上的等待線程.
LinkedBlockingQueue 核心源碼解析0 前言1 繼承體系2 屬性3 構造方法4 新增5 取出6 總結
6 總結
了解好阻塞隊列很重要,經常會被用線上程池的場景中.