文章目錄
- 1.資料結構
-
- 1.等待隊列:waitQueue
-
- 1.公平隊列:FifoWaitQueue
- 2.非公平隊列:LifoWaitQueue
- 2.資料傳輸方式:Transferer
-
- 1.TransferStack
- 2.TransferQueue
- 2.方法說明
-
- 1.轉換(重點):transfer
-
- 1.區分操作模式:mode
- 2.頭節點和目前是相同模式
- 3.頭結點和目前目前不是相同模式
- 4.目前節點是輔助模式(可能從上一步來)
- 5.圖解
- 2. 擷取一個元素:take
- 3.加入隊列:put
- 3.總結
1.資料結構
1.等待隊列:waitQueue
waitQueue是一個接口,在這個隊列當中會聲明兩個等待的隊列,分别是生産者和消費者
//生産者隊列
private WaitQueue waitingProducers;
//消費者隊列
private WaitQueue waitingConsumers;
1.公平隊列:FifoWaitQueue
公平隊列對應的是先進先出,對應的資料結構的queue
static class FifoWaitQueue extends WaitQueue {
private static final long serialVersionUID = -3623113410248163686L;
}
2.非公平隊列:LifoWaitQueue
非公平隊列對應的是後進先出,對應的資料結構是stack
static class LifoWaitQueue extends WaitQueue {
private static final long serialVersionUID = -3633113410248163686L;
}
2.資料傳輸方式:Transferer
資料傳輸方式,這邊定義了一個抽象方法transfer,
Performs a put or take.
,是用來處理拿出和放入操作的.
/**
* Shared internal API for dual stacks and queues.
*/
abstract static class Transferer<E> {
/**
* Performs a put or take.
*
* @param e if non-null, the item to be handed to a consumer;
* if null, requests that transfer return an item
* offered by producer.
* @param timed if this operation should timeout
* @param nanos the timeout, in nanoseconds
* @return if non-null, the item provided or received; if null,
* the operation failed due to timeout or interrupt --
* the caller can distinguish which of these occurred
* by checking Thread.interrupted.
*/
abstract E transfer(E e, boolean timed, long nanos);
}
這邊分别對應着兩種不同的傳輸方式:TransferStack、TransferQueue
1.TransferStack
TransferStack包含一個Snode節點,這邊不直接看方法,看到隊列操作之後再根據具體詳細分析
static final class SNode {
volatile SNode next; // next node in stack
volatile SNode match; // the node matched to this
volatile Thread waiter; // to control park/unpark
Object item; // data; or null for REQUESTs
int mode;
}
2.TransferQueue
TransferQueue包含一個Qnode節點,這邊不直接看方法,看到隊列操作之後再根據具體詳細分析
TransferQueue() {
QNode h = new QNode(null, false); // initialize to dummy node.
head = h;
tail = h;
}
static final class QNode {
volatile QNode next; // next node in queue
volatile Object item; // CAS'ed to or from null
volatile Thread waiter; // to control park/unpark
final boolean isData;
}
2.方法說明
1.轉換(重點):transfer
在這個類型隊列的所有操作中基本都用到了transfer方法,主要的差別是是否計時,後面會講到。這邊隻檢視stack的實作:
這一段代碼比較長,這邊我們分成四個部分來說,下面是代碼的原貌。後面我們一個一個部分來了解.
E transfer(E e, boolean timed, long nanos) {
// constructed/reused as needed
SNode s = null;
//當e==null的時候說明是消費者進來等待了(REQUEST),否則表示生産者(DATA)生産了等待消費
int mode = (e == null) ? REQUEST : DATA;
for (; ; ) {
//拿到頭結點
SNode h = head;
// empty or same-mode
//如果頭結點為空,或者也是相同的模式(比如都是消費者)
if (h == null || h.mode == mode) {
// can't wait
//時間到了
if (timed && nanos <= 0) {
// 如果頭節點不為空且是取消狀态
if (h != null && h.isCancelled()) {
// pop cancelled node
//把頭結點彈出
casHead(h, h.next);
} else {
return null;
}
}
//cas會進行入棧,這邊是入棧成功了
else if (casHead(h, s = snode(s, e, h, mode))) {
//睡眠,如果是逾時或者是中斷,會把match替換成this(自己)嘗試取消
SNode m = awaitFulfill(s, timed, nanos);
//傳回節點的match是自己,取消
if (m == s) {
// wait was cancelled
//清除目前節點
clean(s);
return null;
}
//比對到元素:因為從awaitFulfill()裡面出來要不被取消了要不就比對到了
//如果頭結點不為空,并且s是頭結點下一個節點,彈出head和s.
if ((h = head) != null && h.next == s) {
// help s's fulfiller
casHead(h, s.next);
}
//根據模式傳回生産者還是消費者的值
return (E) ((mode == REQUEST) ? m.item : s.item);
}
// try to fulfill
}
//如果h.mode不是FULFILLING,并且不是相同的模式
else if (!isFulfilling(h.mode)) {
// already cancelled
//節點取消
if (h.isCancelled()) {
// pop and retry
//頭結點換成下一個節點
casHead(h, h.next);
}
//這邊應該是頭結點 不是cancelled的,s入棧變成頭結點
else if (casHead(h, s = snode(s, e, h, FULFILLING | mode))) {
// loop until matched or waiters disappear
for (; ; ) {
// m is s's match
SNode m = s.next;
// all waiters are gone
//如果沒有下一個節點了,清空棧跳出循環繼續往下走
if (m == null) {
// pop fulfill node
casHead(s, null);
// use new node next time
s = null;
// restart main loop
break;
}
//如果頭結點的下一個節點不為空,往再下一個節點找
SNode mn = m.next;
//這個地方的條件是是兩個可以比對的節點(生産和消費)
if (m.tryMatch(s)) {
//比對成功,兩個節點都彈出
// pop both s and m
casHead(s, mn);
return (E) ((mode == REQUEST) ? m.item : s.item);
// lost match
} else {
//嘗試比對失敗,說明m已經先一步被其它線程比對了,清除m節點
// help unlink
s.casNext(m, mn);
}
}
}
// help a fulfiller
}
//如果不是相同模式,并且目前節點是FULFILLING的情況
else {
//mode在上面隻可能是生産或者消費,這邊的話說明都是FULFILLING的情況
// m is h's match
SNode m = h.next;
// waiter is gone
if (m == null) {
//頭結點的下一個節點為null
// pop fulfilling node
casHead(h, null);
} else {
//否則嘗試比對.
SNode mn = m.next;
// help match
//比對成功,兩個節點都彈出
if (m.tryMatch(h)) {
// pop both h and m
casHead(h, mn);
}
// lost match
else {
// help unlink
//嘗試比對失敗,說明m已經先一步被其它線程比對了,清除m節點
h.casNext(m, mn);
}
}
}
}
}
1.區分操作模式:mode
這邊說的模式是相對于線程操作來說的,這邊定義的三種模式:REQUEST(消費)、DATA(生産)、FULFILLING(交換),第一部分是怎麼判斷這個模式:
int mode = (e == null) ? REQUEST : DATA;
這邊的話,根據入參,如果傳入的是null就是要消費,可以看下面的take方法,如果傳入的不是null,就是要生産,可以看下面的put,這個應該很好了解。
2.頭節點和目前是相同模式
這邊的操作步驟分别如下:
先判斷是否逾時,如果逾時的話,判斷頭部節點不為空,并且被取消(比對的節點是自己,也就是說沒有比對到可以進行交換的節點),通過cas方式把頭部換成下一個節點。如果頭部節點為空, 說明沒有隊列了,直接傳回null;
如果沒有逾時一說,先把目前節點添加到棧中,這邊的棧的入棧和以前的入棧可能不一樣(後入的節點添加到head,然後出棧的時候出的也是head,實作後進先出),然後下一步調用awaitFulfill方法,這個方法的作用是找到比對的節點(不同模式),如果沒找到自旋結束挂起等待被喚醒,match如果是自己則說明比對失敗(被cancel)
往後判斷一下比對的節點是不是自己,如果是則清除目前節點傳回null。否則就是比對到了節點(因為awaitFulfill方法隻有一個出口,出口必定攜帶比對的節點值),比對到之後設定頭部節點,根據mode傳回相應的節點值(item)
//時間到了
if (timed && nanos <= 0) {
// 如果頭節點不為空且是取消狀态
if (h != null && h.isCancelled()) {
// pop cancelled node
//把頭結點彈出
casHead(h, h.next);
} else {
return null;
}
}
//cas會進行入棧,這邊是入棧成功了
else if (casHead(h, s = snode(s, e, h, mode))) {
//睡眠,如果是逾時或者是中斷,會把match替換成this(自己)嘗試取消
SNode m = awaitFulfill(s, timed, nanos);
//傳回節點的match是自己,取消
if (m == s) {
// wait was cancelled
//清除目前節點
clean(s);
return null;
}
//比對到元素:因為從awaitFulfill()裡面出來要不被取消了要不就比對到了
//如果頭結點不為空,并且s是頭結點下一個節點,彈出head和s.
if ((h = head) != null && h.next == s) {
// help s's fulfiller
casHead(h, s.next);
}
//根據模式傳回生産者還是消費者的值
return (E) ((mode == REQUEST) ? m.item : s.item);
}
// try to fulfill
看一下awaitFulfill方法:
這邊先是會計算一下循環的次數
然後進入死循環,出口隻有一個就是 match!=null的時候,傳回match值。
如果被中斷和逾時的情況下會調用trycancel方法會将目前節點的match值設定為自己。然後下一個循環開始的時候,match不為null就傳回了
如果是沒有計時的情況,可以看到
park
的操作,這邊的park是等待下一個操作進來的時候去喚醒這個節點。我們可以往後去看下面的方法
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
//計算循環的次數,這邊會分别根據有計時和沒計時進行判斷
int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (; ; ) {
//被中斷,嘗試取消這個節點
if (w.isInterrupted()){
// Tries to cancel a wait by matching node to itself.
//1 - 嘗試通過将節點與自身比對來取消等待。
s.tryCancel();
}
//比對的節點不為空傳回
SNode m = s.match;
if (m != null){
return m;
}
//有計時的操作,到達時間取消節點
if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
//2 - 嘗試通過将節點與自身比對來取消等待。
s.tryCancel();
continue;
}
}
//還有自旋次數,計算自旋次數
if (spins > 0){
spins = shouldSpin(s) ? (spins - 1) : 0;
}
//沒有自旋次數,設定waiter為目前線程
else if (s.waiter == null){
// establish waiter so can park next iter
s.waiter = w;
}
//沒有自旋次數并且不計數,進行park休眠
else if (!timed){
LockSupport.park(this);
}
//如果有計時,則休眠對應時間
else if (nanos > spinForTimeoutThreshold){
LockSupport.parkNanos(this, nanos);
}
}
}
3.頭結點和目前目前不是相同模式
如果頭節點和目前節點不是相同的模式,這邊還有一個條件就是不是輔助模式。也就是說可能進來的節點是消費(take),而頭結點是生産(put),那麼這兩個模式不一樣,正好可以抵消了。是以就進入這個判斷:
這邊會先判斷節點是否被取消(就是比對的等于自己,這個是操作基本是中斷和逾時導緻),頭結點被取消就往下一個節點走
否則把目前節點添加到頭部(入棧),并且修改狀态為交換,開始死循環
如果沒有下一個節點,設定頭結點為null,跳出循環結束,下一步就會再走到上面的2.頭節點和目前是相同模式,可能被挂起等待或者逾時結束。
如果下一個節點不為空的情況,進行tryMatch比對,如果能比對到出棧(這個時候是兩個節點),傳回對應的元素
如果比對不上,說明可能被另外一個節點比對了,就要往在下一個節點去比對。
// already cancelled
//節點取消
if (h.isCancelled()) {
// pop and retry
//頭結點換成下一個節點
casHead(h, h.next);
}
//這邊應該是頭結點 不是cancelled的,s入棧變成頭結點
else if (casHead(h, s = snode(s, e, h, FULFILLING | mode))) {
// loop until matched or waiters disappear
for (; ; ) {
// m is s's match
SNode m = s.next;
// all waiters are gone
//如果沒有下一個節點了,清空棧跳出循環繼續往下走
if (m == null) {
// pop fulfill node
casHead(s, null);
// use new node next time
s = null;
// restart main loop
break;
}
//如果頭結點的下一個節點不為空,往再下一個節點找
SNode mn = m.next;
//這個地方的條件是是兩個可以比對的節點(生産和消費)
if (m.tryMatch(s)) {
//比對成功,兩個節點都彈出
// pop both s and m
casHead(s, mn);
return (E) ((mode == REQUEST) ? m.item : s.item);
// lost match
} else {
//嘗試比對失敗,說明m已經先一步被其它線程比對了,清除m節點
// help unlink
s.casNext(m, mn);
}
}
}
// help a fulfiller
}
tryMatch如何進行比對,這邊看一下
這邊如果比對成功,成功的前提原來的頭結點(目前節點的下一個節點)的match為空(沒有被比對),并且cas操作是從null到目前節點成功,可能失敗,就是在這個過程中被其他節點比對走了,如果成功要喚醒原來的頭結點,因為之前可能被park起來等待,現在match不為空了,可以起來工作了,比對失敗傳回結果
boolean tryMatch(SNode s) {
//如果m沒有比對這,将s作為他的比對這
if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
Thread w = waiter;
// waiters need at most one unpark
//喚醒m中的waiter
if (w != null) {
waiter = null;
LockSupport.unpark(w);
}
// 傳回true
return true;
}
//可能有其他線程比對了m,傳回是否比對結果
return match == s;
}
4.目前節點是輔助模式(可能從上一步來)
因為之前的第二個判斷會把目前節點設值為交換中狀态。這邊的操作相當于第二個判斷之後會進來
操作和上面的操作很類似,如果head的下一個節點為null,也要回到第一個判斷在進行操作等待。
否則嘗試比對,成功彈出兩個節點,失敗繼續往下,不過這邊成功沒有傳回的打算,依然會再進入自旋的下一步操作
這邊可以了解這一步操作是輔助操作,清除比對成功的節點,或者當節點所屬線程消失後将其移除棧。
//mode在上面隻可能是生産或者消費,這邊的話說明都是FULFILLING的情況
// m is h's match
SNode m = h.next;
// waiter is gone
if (m == null) {
//頭結點的下一個節點為null
// pop fulfilling node
casHead(h, null);
} else {
//否則嘗試比對.
SNode mn = m.next;
// help match
//比對成功,兩個節點都彈出
if (m.tryMatch(h)) {
// pop both h and m
casHead(h, mn);
}
// lost match
else {
// help unlink
//嘗試比對失敗,說明m已經先一步被其它線程比對了,清除m節點
h.casNext(m, mn);
}
}
5.圖解
2. 擷取一個元素:take
public E take() throws InterruptedException {
//直接調用transfer方法,不計數
E e = transferer.transfer(null, false, 0);
if (e != null){
return e;
}
//如果元素為空,讓線程中斷抛出中斷異常結束.
Thread.interrupted();
throw new InterruptedException();
}
可以看到take這邊是直接傳入null,表明自己是消費者,并且沒有設定逾時時間,一直等待到對應的生産者給定資料傳回
3.加入隊列:put
put這邊也不設定時間,相當于是等待消費者消費掉目前操作生産的節點.
public void put(E e) throws InterruptedException {
if (e == null) {
throw new NullPointerException();
}
//把目前節點添加進去執行transfer操作
if (transferer.transfer(e, false, 0) == null) {
//傳回為空就中斷被排除異常
Thread.interrupted();
throw new InterruptedException();
}
}
3.總結
可以看到雖然說這個隊列内部的容量是0,但是他還是會維護一個隊列,個人感覺這種隊列适合用于短期的任務,生産和消費相近,不會造成大量任務堆積。等後面看完線程池再回來補充吧