看下令人dan疼無比的LinkedTransferQueue實作。
自認為,該類的複雜,主要展現在3個方面:
1、設計理念的複雜
2、實作邏輯的複雜
3、代碼注釋,實在是有點不明其意,有道來來回回地都用了幾次,看的還是一頭霧水。
但是終歸還是要看的,有多大的能力辦多大的事,現在以最大的努力,來記錄下對LinkedTransferQueue類的解析。
首先,還是看類簡介:
/**
* 基于連結清單節點的LinkedTransferQueue 是一個無限制的 TransferQueue,隊列元素FIFO,頭元素就是在隊列中呆的最久的元素,尾元素就是進隊列時間最短的。
* An unbounded {@link TransferQueue} based on linked nodes.
* This queue orders elements FIFO (first-in-first-out) with respect
* to any given producer. The <em>head</em> of the queue is that
* element that has been on the queue the longest time for some
* producer. The <em>tail</em> of the queue is that element that has
* been on the queue the shortest time for some producer.
*
* 和其他大多數的集合不同,LinkedTransferQueue的size()方法不是一個固定時間的操作,
* 由于這些隊列的異步特性,需要周遊元素才能确定元素數量,是以在周遊期間對集合的修改,就會導緻錯誤的結果。
* 此外,批量操作addAll()/removeAll()/retainAll()/containsAll()/equals()/toArray()不能保證以原子方式執行。
* 例如,一個與addAll()操作并發操作的疊代器可能隻檢視添加的一些元素。
* <p>Beware that, unlike in most collections, the {@code size} method
* is <em>NOT</em> a constant-time operation. Because of the
* asynchronous nature of these queues, determining the current number
* of elements requires a traversal of the elements, and so may report
* inaccurate results if this collection is modified during traversal.
* Additionally, the bulk operations {@code addAll},
* {@code removeAll}, {@code retainAll}, {@code containsAll},
* {@code equals}, and {@code toArray} are <em>not</em> guaranteed
* to be performed atomically. For example, an iterator operating
* concurrently with an {@code addAll} operation might view only some
* of the added elements.
*
* <p>This class and its iterator implement all of the
* <em>optional</em> methods of the {@link Collection} and {@link
* Iterator} interfaces.
*
* 記憶體一緻性影響:像其他并發集合一樣,在一個線程中向 LinkedTransferQueue中放置一個元素 happen-before 另外一個并發線程。
* <p>Memory consistency effects: As with other concurrent
* collections, actions in a thread prior to placing an object into a
* {@code LinkedTransferQueue}
* <a href="package-summary.html#MemoryVisibility" target="_blank" rel="external nofollow" ><i>happen-before</i></a>
* actions subsequent to the access or removal of that element from
* the {@code LinkedTransferQueue} in another thread.
*
* <p>This class is a member of the
* <a href="{@docRoot}/../technotes/guides/collections/index.html" target="_blank" rel="external nofollow" >
* Java Collections Framework</a>.
*
* @since 1.7
* @author Doug Lea
* @param <E> the type of elements held in this collection
*/
從中可以看到:
1、LinkedTransferQueue 是一個FIFO的無限制的連結清單TransferQueue,頭元素就是在隊列中呆的最久的元素,尾元素就是進隊列時間最短的元素。
2、和其他的集合不同,LinkedTransferQueue 的size()方法不是一個耗時恒定的方法,因為要對内部元素進行周遊。并且該過程中,如果元素數量發生變化,傳回的值就不準确。
3、批量方法addAll()/removeAll()/retainAll()/containsAll()/equals()/toArray()不能保證以原子方式執行,并發的疊代器,可能隻能看見一部分。
類主要的屬性有:
/** 判斷目前系統環境是否是多線程 */
/** True if on multiprocessor */
private static final boolean MP =
Runtime.getRuntime().availableProcessors() > 1;
/**
* 當一個節點是隊列中的第一個服務員時,在阻塞之前在多處理器上旋轉(随機穿插對Thread.yield的調用)的次數。
* 務必是2的次方數,這個值是根據經驗得出的——它在各種處理器、cpu數量和作業系統上都能很好地工作。
* The number of times to spin (with randomly interspersed calls
* to Thread.yield) on multiprocessor before blocking when a node
* is apparently the first waiter in the queue. See above for
* explanation. Must be a power of two. The value is empirically
* derived -- it works pretty well across a variety of processors,
* numbers of CPUs, and OSes.
*/
private static final int FRONT_SPINS = 1 << 7;
/**
* 當一個節點前面有另一個明顯正在旋轉的節點時,在阻塞之前旋轉的次數。也可作為相位變化時FRONT_SPINS的增量,并作為自旋期間調用yield的基本平均頻率。一定是2的幂。
* The number of times to spin before blocking when a node is
* preceded by another node that is apparently spinning. Also
* serves as an increment to FRONT_SPINS on phase changes, and as
* base average frequency for yielding during spins. Must be a
* power of two.
*/
private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
/**
* 在清除隊列之前允許的最大預估删除失敗數(sweepVotes),該值必須至少為2,以避免在删除尾随節點時無用的掃描。
* The maximum number of estimated removal failures (sweepVotes)
* to tolerate before sweeping through the queue unlinking
* cancelled nodes that were not unlinked upon initial
* removal. See above for explanation. The value must be at least
* two to avoid useless sweeps when removing trailing nodes.
*/
static final int SWEEP_THRESHOLD = 32;
/** 隊列頭,初始為空,直到第一個元素入隊 */
/** head of the queue; null until first enqueue */
transient volatile Node head;
/** 隊列尾,初始為空,直到第一個元素入隊 */
/** tail of the queue; null until first append */
private transient volatile Node tail;
/** 清除節點失敗的次數 */
/** The number of apparent failures to unsplice removed nodes */
private transient volatile int sweepVotes;
其中的Node資料結構如下,中間使用了Unsafe類進行實作,稍後會對Unsafe做一個簡單的介紹:
static final class Node {
final boolean isData; // false if this is a request node
volatile Object item; // initially non-null if isData; CASed to match
volatile Node next;
volatile Thread waiter; // null until waiting
// CAS methods for fields
final boolean casNext(Node cmp, Node val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
final boolean casItem(Object cmp, Object val) {
// assert cmp == null || cmp.getClass() != Node.class;
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
/**
* Constructs a new node. Uses relaxed write because item can
* only be seen after publication via casNext.
*/
Node(Object item, boolean isData) {
UNSAFE.putObject(this, itemOffset, item); // relaxed write
this.isData = isData;
}
/**
* Links node to itself to avoid garbage retention. Called
* only after CASing head field, so uses relaxed write.
*/
final void forgetNext() {
UNSAFE.putObject(this, nextOffset, this);
}
/**
* Sets item to self and waiter to null, to avoid garbage
* retention after matching or cancelling. Uses relaxed writes
* because order is already constrained in the only calling
* contexts: item is forgotten only after volatile/atomic
* mechanics that extract items. Similarly, clearing waiter
* follows either CAS or return from park (if ever parked;
* else we don't care).
*/
final void forgetContents() {
UNSAFE.putObject(this, itemOffset, this);
UNSAFE.putObject(this, waiterOffset, null);
}
/**
* Returns true if this node has been matched, including the
* case of artificial matches due to cancellation.
*/
final boolean isMatched() {
Object x = item;
return (x == this) || ((x == null) == isData);
}
/**
* Returns true if this is an unmatched request node.
*/
final boolean isUnmatchedRequest() {
return !isData && item == null;
}
/**
* Returns true if a node with the given mode cannot be
* appended to this node because this node is unmatched and
* has opposite data mode.
*/
final boolean cannotPrecede(boolean haveData) {
boolean d = isData;
Object x;
return d != haveData && (x = item) != this && (x != null) == d;
}
/**
* Tries to artificially match a data node -- used by remove.
*/
final boolean tryMatchData() {
// assert isData;
Object x = item;
if (x != null && x != this && casItem(x, null)) {
LockSupport.unpark(waiter);
return true;
}
return false;
}
private static final long serialVersionUID = -3375979862319811754L;
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
private static final long waiterOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = Node.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
waiterOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("waiter"));
} catch (Exception e) {
throw new Error(e);
}
}
}
接着看方法實作,看一個該類中比較重要的一個内部私有方法,xfer():
/**
* 該方法的重要性從注釋上可見一斑:實作了所有隊列的方法
* Implements all queuing methods. See above for explanation.
*
* @param e the item or null for take
* @param haveData true if this is a put, else a take
* @param how NOW, ASYNC, SYNC, or TIMED
* @param nanos timeout in nanosecs, used only if mode is TIMED
* @return an item if matched, else e
* @throws NullPointerException if haveData mode but e is null
*/
private E xfer(E e, boolean haveData, int how, long nanos) {
if (haveData && (e == null))
throw new NullPointerException();
Node s = null; // the node to append, if needed
retry:
for (;;) { // restart on append race
for (Node h = head, p = h; p != null;) { // find & match first node
boolean isData = p.isData;
Object item = p.item;
if (item != p && (item != null) == isData) { // unmatched
if (isData == haveData) // can't match
break;
if (p.casItem(item, e)) { // match
for (Node q = p; q != h;) {
Node n = q.next; // update by 2 unless singleton
if (head == h && casHead(h, n == null ? q : n)) {
h.forgetNext();
break;
} // advance and retry
if ((h = head) == null ||
(q = h.next) == null || !q.isMatched())
break; // unless slack < 2
}
LockSupport.unpark(p.waiter);
return LinkedTransferQueue.<E>cast(item);
}
}
Node n = p.next;
p = (p != n) ? n : (h = head); // Use head if p offlist
}
if (how != NOW) { // No matches available
if (s == null)
s = new Node(e, haveData);
Node pred = tryAppend(s, haveData);
if (pred == null)
continue retry; // lost race vs opposite mode
if (how != ASYNC)
return awaitMatch(s, pred, e, (how == TIMED), nanos);
}
return e; // not waiting
}
}
基本上所有的對隊列的操作方法,内部都是調用了該私有方法,一個方法滿足所有的需求。
該方法參數清單中,有一個how,代表了該方法的模式,how總共有4個值,分别為:
/*
* Possible values for "how" argument in xfer method.
*/
private static final int NOW = 0; // for untimed poll, tryTransfer
private static final int ASYNC = 1; // for offer, put, add
private static final int SYNC = 2; // for transfer, take
private static final int TIMED = 3; // for timed poll, tryTransfer
主要方法:
/**
* 插入元素到隊尾,對于一個無界隊列,該方法永遠不會被阻塞
* Inserts the specified element at the tail of this queue.
* As the queue is unbounded, this method will never block.
*
* @throws NullPointerException if the specified element is null
*/
public void put(E e) {
xfer(e, true, ASYNC, 0);
}
/**
* 插入元素到隊尾,對于一個無界隊列,該方法永遠不會被阻塞,或者傳回fasle
* Inserts the specified element at the tail of this queue.
* As the queue is unbounded, this method will never block or
* return {@code false}.
*
* @return {@code true} (as specified by
* {@link java.util.concurrent.BlockingQueue#offer(Object,long,TimeUnit)
* BlockingQueue.offer})
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e, long timeout, TimeUnit unit) {
xfer(e, true, ASYNC, 0);
return true;
}
/**
* 插入元素到隊尾,對于一個無界隊列,該方法永遠不會傳回false。
* Inserts the specified element at the tail of this queue.
* As the queue is unbounded, this method will never return {@code false}.
*
* @return {@code true} (as specified by {@link Queue#offer})
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
xfer(e, true, ASYNC, 0);
return true;
}
/**
* 插入元素到隊尾,對于一個無界隊列,該方法永遠不會傳回false,也不會抛出 IllegalStateException異常。
* Inserts the specified element at the tail of this queue.
* As the queue is unbounded, this method will never throw
* {@link IllegalStateException} or return {@code false}.
*
* @return {@code true} (as specified by {@link Collection#add})
* @throws NullPointerException if the specified element is null
*/
public boolean add(E e) {
xfer(e, true, ASYNC, 0);
return true;
}
/**
* 嘗試将一個元素轉移給等待的消費者
* Transfers the element to a waiting consumer immediately, if possible.
*
* <p>More precisely, transfers the specified element immediately
* if there exists a consumer already waiting to receive it (in
* {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
* otherwise returning {@code false} without enqueuing the element.
*
* @throws NullPointerException if the specified element is null
*/
public boolean tryTransfer(E e) {
return xfer(e, true, NOW, 0) == null;
}
/**
* 嘗試将一個元素轉移給等待的消費者,如果需要的話,進行等待
* Transfers the element to a consumer, waiting if necessary to do so.
*
* <p>More precisely, transfers the specified element immediately
* if there exists a consumer already waiting to receive it (in
* {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
* else inserts the specified element at the tail of this queue
* and waits until the element is received by a consumer.
*
* @throws NullPointerException if the specified element is null
*/
public void transfer(E e) throws InterruptedException {
if (xfer(e, true, SYNC, 0) != null) {
Thread.interrupted(); // failure possible only due to interrupt
throw new InterruptedException();
}
}
/**
* 嘗試将一個元素轉移給等待的消費者,如果需要的話,進行等待,直到逾時。
* Transfers the element to a consumer if it is possible to do so
* before the timeout elapses.
*
* <p>More precisely, transfers the specified element immediately
* if there exists a consumer already waiting to receive it (in
* {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
* else inserts the specified element at the tail of this queue
* and waits until the element is received by a consumer,
* returning {@code false} if the specified wait time elapses
* before the element can be transferred.
*
* @throws NullPointerException if the specified element is null
*/
public boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
return true;
if (!Thread.interrupted())
return false;
throw new InterruptedException();
}
public E take() throws InterruptedException {
E e = xfer(null, false, SYNC, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E e = xfer(null, false, TIMED, unit.toNanos(timeout));
if (e != null || !Thread.interrupted())
return e;
throw new InterruptedException();
}
public E poll() {
return xfer(null, false, NOW, 0);
}
可以看到,基本上每個方法内部,都是調用了xfer()方法來進行邏輯處理。
此外:
該類中還有一個比較長的解釋說明,如下:
/*
* *** Overview of Dual Queues with Slack ***
*
* Dual Queues, introduced by Scherer and Scott
* (http://www.cs.rice.edu/~wns1/papers/2004-DISC-DDS.pdf) are
* (linked) queues in which nodes may represent either data or
* requests. When a thread tries to enqueue a data node, but
* encounters a request node, it instead "matches" and removes it;
* and vice versa for enqueuing requests. Blocking Dual Queues
* arrange that threads enqueuing unmatched requests block until
* other threads provide the match. Dual Synchronous Queues (see
* Scherer, Lea, & Scott
* http://www.cs.rochester.edu/u/scott/papers/2009_Scherer_CACM_SSQ.pdf)
* additionally arrange that threads enqueuing unmatched data also
* block. Dual Transfer Queues support all of these modes, as
* dictated by callers.
*
* A FIFO dual queue may be implemented using a variation of the
* Michael & Scott (M&S) lock-free queue algorithm
* (http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf).
* It maintains two pointer fields, "head", pointing to a
* (matched) node that in turn points to the first actual
* (unmatched) queue node (or null if empty); and "tail" that
* points to the last node on the queue (or again null if
* empty). For example, here is a possible queue with four data
* elements:
*
* head tail
* | |
* v v
* M -> U -> U -> U -> U
*
* The M&S queue algorithm is known to be prone to scalability and
* overhead limitations when maintaining (via CAS) these head and
* tail pointers. This has led to the development of
* contention-reducing variants such as elimination arrays (see
* Moir et al http://portal.acm.org/citation.cfm?id=1074013) and
* optimistic back pointers (see Ladan-Mozes & Shavit
* http://people.csail.mit.edu/edya/publications/OptimisticFIFOQueue-journal.pdf).
* However, the nature of dual queues enables a simpler tactic for
* improving M&S-style implementations when dual-ness is needed.
*
* In a dual queue, each node must atomically maintain its match
* status. While there are other possible variants, we implement
* this here as: for a data-mode node, matching entails CASing an
* "item" field from a non-null data value to null upon match, and
* vice-versa for request nodes, CASing from null to a data
* value. (Note that the linearization properties of this style of
* queue are easy to verify -- elements are made available by
* linking, and unavailable by matching.) Compared to plain M&S
* queues, this property of dual queues requires one additional
* successful atomic operation per enq/deq pair. But it also
* enables lower cost variants of queue maintenance mechanics. (A
* variation of this idea applies even for non-dual queues that
* support deletion of interior elements, such as
* j.u.c.ConcurrentLinkedQueue.)
*
* Once a node is matched, its match status can never again
* change. We may thus arrange that the linked list of them
* contain a prefix of zero or more matched nodes, followed by a
* suffix of zero or more unmatched nodes. (Note that we allow
* both the prefix and suffix to be zero length, which in turn
* means that we do not use a dummy header.) If we were not
* concerned with either time or space efficiency, we could
* correctly perform enqueue and dequeue operations by traversing
* from a pointer to the initial node; CASing the item of the
* first unmatched node on match and CASing the next field of the
* trailing node on appends. (Plus some special-casing when
* initially empty). While this would be a terrible idea in
* itself, it does have the benefit of not requiring ANY atomic
* updates on head/tail fields.
*
* We introduce here an approach that lies between the extremes of
* never versus always updating queue (head and tail) pointers.
* This offers a tradeoff between sometimes requiring extra
* traversal steps to locate the first and/or last unmatched
* nodes, versus the reduced overhead and contention of fewer
* updates to queue pointers. For example, a possible snapshot of
* a queue is:
*
* head tail
* | |
* v v
* M -> M -> U -> U -> U -> U
*
* The best value for this "slack" (the targeted maximum distance
* between the value of "head" and the first unmatched node, and
* similarly for "tail") is an empirical matter. We have found
* that using very small constants in the range of 1-3 work best
* over a range of platforms. Larger values introduce increasing
* costs of cache misses and risks of long traversal chains, while
* smaller values increase CAS contention and overhead.
*
* Dual queues with slack differ from plain M&S dual queues by
* virtue of only sometimes updating head or tail pointers when
* matching, appending, or even traversing nodes; in order to
* maintain a targeted slack. The idea of "sometimes" may be
* operationalized in several ways. The simplest is to use a
* per-operation counter incremented on each traversal step, and
* to try (via CAS) to update the associated queue pointer
* whenever the count exceeds a threshold. Another, that requires
* more overhead, is to use random number generators to update
* with a given probability per traversal step.
*
* In any strategy along these lines, because CASes updating
* fields may fail, the actual slack may exceed targeted
* slack. However, they may be retried at any time to maintain
* targets. Even when using very small slack values, this
* approach works well for dual queues because it allows all
* operations up to the point of matching or appending an item
* (hence potentially allowing progress by another thread) to be
* read-only, thus not introducing any further contention. As
* described below, we implement this by performing slack
* maintenance retries only after these points.
*
* As an accompaniment to such techniques, traversal overhead can
* be further reduced without increasing contention of head
* pointer updates: Threads may sometimes shortcut the "next" link
* path from the current "head" node to be closer to the currently
* known first unmatched node, and similarly for tail. Again, this
* may be triggered with using thresholds or randomization.
*
* These ideas must be further extended to avoid unbounded amounts
* of costly-to-reclaim garbage caused by the sequential "next"
* links of nodes starting at old forgotten head nodes: As first
* described in detail by Boehm
* (http://portal.acm.org/citation.cfm?doid=503272.503282) if a GC
* delays noticing that any arbitrarily old node has become
* garbage, all newer dead nodes will also be unreclaimed.
* (Similar issues arise in non-GC environments.) To cope with
* this in our implementation, upon CASing to advance the head
* pointer, we set the "next" link of the previous head to point
* only to itself; thus limiting the length of connected dead lists.
* (We also take similar care to wipe out possibly garbage
* retaining values held in other Node fields.) However, doing so
* adds some further complexity to traversal: If any "next"
* pointer links to itself, it indicates that the current thread
* has lagged behind a head-update, and so the traversal must
* continue from the "head". Traversals trying to find the
* current tail starting from "tail" may also encounter
* self-links, in which case they also continue at "head".
*
* It is tempting in slack-based scheme to not even use CAS for
* updates (similarly to Ladan-Mozes & Shavit). However, this
* cannot be done for head updates under the above link-forgetting
* mechanics because an update may leave head at a detached node.
* And while direct writes are possible for tail updates, they
* increase the risk of long retraversals, and hence long garbage
* chains, which can be much more costly than is worthwhile
* considering that the cost difference of performing a CAS vs
* write is smaller when they are not triggered on each operation
* (especially considering that writes and CASes equally require
* additional GC bookkeeping ("write barriers") that are sometimes
* more costly than the writes themselves because of contention).
*
* *** Overview of implementation ***
*
* We use a threshold-based approach to updates, with a slack
* threshold of two -- that is, we update head/tail when the
* current pointer appears to be two or more steps away from the
* first/last node. The slack value is hard-wired: a path greater
* than one is naturally implemented by checking equality of
* traversal pointers except when the list has only one element,
* in which case we keep slack threshold at one. Avoiding tracking
* explicit counts across method calls slightly simplifies an
* already-messy implementation. Using randomization would
* probably work better if there were a low-quality dirt-cheap
* per-thread one available, but even ThreadLocalRandom is too
* heavy for these purposes.
*
* With such a small slack threshold value, it is not worthwhile
* to augment this with path short-circuiting (i.e., unsplicing
* interior nodes) except in the case of cancellation/removal (see
* below).
*
* We allow both the head and tail fields to be null before any
* nodes are enqueued; initializing upon first append. This
* simplifies some other logic, as well as providing more
* efficient explicit control paths instead of letting JVMs insert
* implicit NullPointerExceptions when they are null. While not
* currently fully implemented, we also leave open the possibility
* of re-nulling these fields when empty (which is complicated to
* arrange, for little benefit.)
*
* All enqueue/dequeue operations are handled by the single method
* "xfer" with parameters indicating whether to act as some form
* of offer, put, poll, take, or transfer (each possibly with
* timeout). The relative complexity of using one monolithic
* method outweighs the code bulk and maintenance problems of
* using separate methods for each case.
*
* Operation consists of up to three phases. The first is
* implemented within method xfer, the second in tryAppend, and
* the third in method awaitMatch.
*
* 1. Try to match an existing node
*
* Starting at head, skip already-matched nodes until finding
* an unmatched node of opposite mode, if one exists, in which
* case matching it and returning, also if necessary updating
* head to one past the matched node (or the node itself if the
* list has no other unmatched nodes). If the CAS misses, then
* a loop retries advancing head by two steps until either
* success or the slack is at most two. By requiring that each
* attempt advances head by two (if applicable), we ensure that
* the slack does not grow without bound. Traversals also check
* if the initial head is now off-list, in which case they
* start at the new head.
*
* If no candidates are found and the call was untimed
* poll/offer, (argument "how" is NOW) return.
*
* 2. Try to append a new node (method tryAppend)
*
* Starting at current tail pointer, find the actual last node
* and try to append a new node (or if head was null, establish
* the first node). Nodes can be appended only if their
* predecessors are either already matched or are of the same
* mode. If we detect otherwise, then a new node with opposite
* mode must have been appended during traversal, so we must
* restart at phase 1. The traversal and update steps are
* otherwise similar to phase 1: Retrying upon CAS misses and
* checking for staleness. In particular, if a self-link is
* encountered, then we can safely jump to a node on the list
* by continuing the traversal at current head.
*
* On successful append, if the call was ASYNC, return.
*
* 3. Await match or cancellation (method awaitMatch)
*
* Wait for another thread to match node; instead cancelling if
* the current thread was interrupted or the wait timed out. On
* multiprocessors, we use front-of-queue spinning: If a node
* appears to be the first unmatched node in the queue, it
* spins a bit before blocking. In either case, before blocking
* it tries to unsplice any nodes between the current "head"
* and the first unmatched node.
*
* Front-of-queue spinning vastly improves performance of
* heavily contended queues. And so long as it is relatively
* brief and "quiet", spinning does not much impact performance
* of less-contended queues. During spins threads check their
* interrupt status and generate a thread-local random number
* to decide to occasionally perform a Thread.yield. While
* yield has underdefined specs, we assume that it might help,
* and will not hurt, in limiting impact of spinning on busy
* systems. We also use smaller (1/2) spins for nodes that are
* not known to be front but whose predecessors have not
* blocked -- these "chained" spins avoid artifacts of
* front-of-queue rules which otherwise lead to alternating
* nodes spinning vs blocking. Further, front threads that
* represent phase changes (from data to request node or vice
* versa) compared to their predecessors receive additional
* chained spins, reflecting longer paths typically required to
* unblock threads during phase changes.
*
*
* ** Unlinking removed interior nodes **
*
* In addition to minimizing garbage retention via self-linking
* described above, we also unlink removed interior nodes. These
* may arise due to timed out or interrupted waits, or calls to
* remove(x) or Iterator.remove. Normally, given a node that was
* at one time known to be the predecessor of some node s that is
* to be removed, we can unsplice s by CASing the next field of
* its predecessor if it still points to s (otherwise s must
* already have been removed or is now offlist). But there are two
* situations in which we cannot guarantee to make node s
* unreachable in this way: (1) If s is the trailing node of list
* (i.e., with null next), then it is pinned as the target node
* for appends, so can only be removed later after other nodes are
* appended. (2) We cannot necessarily unlink s given a
* predecessor node that is matched (including the case of being
* cancelled): the predecessor may already be unspliced, in which
* case some previous reachable node may still point to s.
* (For further explanation see Herlihy & Shavit "The Art of
* Multiprocessor Programming" chapter 9). Although, in both
* cases, we can rule out the need for further action if either s
* or its predecessor are (or can be made to be) at, or fall off
* from, the head of list.
*
* Without taking these into account, it would be possible for an
* unbounded number of supposedly removed nodes to remain
* reachable. Situations leading to such buildup are uncommon but
* can occur in practice; for example when a series of short timed
* calls to poll repeatedly time out but never otherwise fall off
* the list because of an untimed call to take at the front of the
* queue.
*
* When these cases arise, rather than always retraversing the
* entire list to find an actual predecessor to unlink (which
* won't help for case (1) anyway), we record a conservative
* estimate of possible unsplice failures (in "sweepVotes").
* We trigger a full sweep when the estimate exceeds a threshold
* ("SWEEP_THRESHOLD") indicating the maximum number of estimated
* removal failures to tolerate before sweeping through, unlinking
* cancelled nodes that were not unlinked upon initial removal.
* We perform sweeps by the thread hitting threshold (rather than
* background threads or by spreading work to other threads)
* because in the main contexts in which removal occurs, the
* caller is already timed-out, cancelled, or performing a
* potentially O(n) operation (e.g. remove(x)), none of which are
* time-critical enough to warrant the overhead that alternatives
* would impose on other threads.
*
* Because the sweepVotes estimate is conservative, and because
* nodes become unlinked "naturally" as they fall off the head of
* the queue, and because we allow votes to accumulate even while
* sweeps are in progress, there are typically significantly fewer
* such nodes than estimated. Choice of a threshold value
* balances the likelihood of wasted effort and contention, versus
* providing a worst-case bound on retention of interior nodes in
* quiescent queues. The value defined below was chosen
* empirically to balance these under various timeout scenarios.
*
* Note that we cannot self-link unlinked interior nodes during
* sweeps. However, the associated garbage chains terminate when
* some successor ultimately falls off the head of the list and is
* self-linked.
*/
看着這段注釋,嘗試着進行了幾次翻譯,了解,最終的感受就隻有一個:
懵逼樹上懵逼果,懵逼樹下你和我,簡直是一頭霧水,兩行清淚。
簡單的進行了翻譯了一下,大緻可以對比着參考一下,具體能從其中看出來多少門道,那就是看個人造化了:
當一個線程試圖進入一個資料節點,但遇到一個請求節點,它代替“比對”并删除它;對排隊請求來說,反之亦然。阻塞雙隊列将排隊未比對請求的線程阻塞,直到其他線程提供比對。雙同步隊列(參見Scherer, Lea, & Scott http://www.cs.rochester.edu/u/scott/papers/2009_Scherer_CACM_SSQ.pdf)還會導緻排隊的不比對資料也會阻塞。根據呼叫者的要求,雙傳輸隊列支援所有這些模式。
FIFO雙隊列可以使用Michael & Scott (M&S)無鎖隊列算法的變體來實作(http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf)。它維護兩個指針字段,“head”,指向一個(比對的)節點,而這個節點又指向第一個實際的(不比對的)隊列節點(空的則為null);和指向隊列上最後一個節點的“tail”(如果為空,也為null)。例如,下面是一個可能有四個資料元素的隊列:
head tail
| |
v v
M -> U -> U -> U -> U
這導緻了減少争論的變體的發展,如消除數組(參見Moir等人http://portal.acm.org/citation.cfm?id=1074013)和樂觀的反向指針(參見Ladan-Mozes & Shavit http://people.csail.mit.edu/edya/publications/OptimisticFIFOQueue-journal.pdf)。然而,當需要雙性時,雙隊列的性質為改進m&s風格的實作提供了更簡單的政策。
在雙隊列中,每個節點必須自動維護其比對狀态。雖然還有其他可能的變體,但我們在這裡實作如下:對于資料模式節點,比對需要在比對時将“item”字段從非空資料值封裝為空,反之亦然,對于請求節點,從空封裝為資料值。(注意,這種風格的隊列的線性化屬性很容易驗證——元素通過連結而可用,通過比對而不可用。)與普通的M&S隊列相比,雙隊列的這個屬性需要每個enq/deq對額外的一個成功的原子操作。但它也支援成本更低的隊列維護機制。(這種思想的一個變體甚至适用于支援删除内部元素的非雙隊列,如j.u.c.ConcurrentLinkedQueue。)
一旦一個節點被比對,它的比對狀态就再也不會改變。是以,我們可以将它們的連結清單安排為包含0個或多個比對節點的字首,後跟0個或多個不比對節點的字尾。(注意,我們允許零長度的字首和字尾,這反過來又意味着我們不使用假頭。)如果我們不關心時間和空間效率,我們可以正确地執行入隊和出隊操作通過周遊一個指針初始節點;在match上封裝第一個不比對節點的項,在append上封裝尾随節點的下一個字段。(加上一些最初為空的特殊大小寫)。雖然這本身是一個糟糕的想法,但它的好處是不需要對頭/尾字段進行任何原子更新。
我們在這裡介紹一種介于從不和始終更新隊列(頭和尾)指針這兩個極端之間的方法。這在有時需要額外的周遊步驟來定位第一個和/或最後一個不比對的節點與減少開銷和争用更少的隊列指針更新之間提供了一種折衷。例如,隊列的一個可能的快照是:
head tail
| |
v v
M -> M -> U -> U -> U -> U
我們發現,在各種平台上,使用非常小的1-3個常量效果最好。較大的值會增加緩存丢失的成本和長周遊鍊的風險,而較小的值會增加CAS争用和開銷。
作為這種技術的輔助,周遊開銷可以進一步減少,而不會增加頭指針更新的争用:線程有時可以從目前“頭”節點捷徑“下一個”連結路徑,以更接近目前已知的第一個不比對節點,tail也是如此。同樣,這可以通過使用門檻值或随機化來觸發。
這些想法必須進一步擴充,以避免無限數量的costly-to-reclaim順序“下一個”造成的垃圾連結的節點開始在老忘記頭節點:首先較長的描述的波姆(http://portal.acm.org/citation.cfm?doid=503272.503282)如果一個GC延遲注意到任何舊任意節點已經成為垃圾,所有新的死亡節點也将荒地。(在非gc環境中也會出現類似的問題)為了解決這個問題,在我們的實作中,我們将前一個head的“next”連結設定為隻指向自身;這樣就限制了連接配接死清單的長度。(我們也采取類似的保健可能消滅垃圾保留值在其他節點字段。)然而,這樣做增加了一些進一步的複雜性周遊:如果任何“下一個”指針連結本身,這表明目前線程已經落後于head-update,是以必須繼續周遊從“頭”。試圖從“tail”開始查找目前tail的周遊也可能會遇到self-links,在這種情況下,它們也會在“head”處繼續。
在基于slack的方案中,甚至不使用CAS進行更新都是很誘人的(類似于Ladan-Mozes & Shavit)。然而,在上述連結遺忘機制下,這不能用于head更新,因為更新可能會将head留在獨立的節點上。尾巴雖然直接寫可能更新,增加長retraversals的風險,是以垃圾長鍊,可以比值得考慮到更昂貴的成本差異執行CAS vs寫小當他們不觸發每個操作(尤其是考慮到寫和案件同樣需要額外的GC記帳(“寫障礙”),有時候可能會超出寫道自己因為争用)。
*** Overview of implementation ***
松弛值是硬連接配接的:大于1的路徑是通過檢查周遊指針是否相等來自然實作的,除非清單隻有一個元素,在這種情況下我們将松弛門檻值保持在1。避免在方法調用之間跟蹤顯式計數,可以略微簡化已經很混亂的實作。如果每個線程都有一個低品質、低成本的随機化方法,那麼使用随機化可能會更好,但即使是ThreadLocalRandom對于這些目的來說也過于繁重。
所有入隊/出隊操作都由單一方法“xfer”處理,該方法帶有參數,訓示是否作為某種形式的報價、put、輪詢、接受或傳輸(每一個都可能帶有逾時)。使用單一方法的相對複雜性超過了為每種情況使用單獨方法所帶來的代碼量和維護問題。
操作包括三個階段。第一個在方法xfer中實作,第二個在tryAppend中實作,第三個在方法awaitMatch中實作。
1. Try to match an existing node
如果CAS失敗了,那麼循環将重新嘗試向前推進兩步,直到成功或懈怠不超過兩步。通過要求每次嘗試都向前推進兩個頭(如果适用的話),我們確定了松弛不會無限制地增長。周遊還檢查初始磁頭現在是否不在清單中,在這種情況下,它們從新的磁頭開始。
如果沒有找到候選人,并且調用是不定時的poll/offer,(參數“how”是現在)傳回。
2. Try to append a new node (method tryAppend)
隻有當節點的前身已經比對或模式相同時,才可以追加節點。如果我們檢測到其他情況,那麼在周遊過程中必須附加一個具有相反模式的新節點,是以我們必須在階段1重新啟動。在其他方面,周遊和更新步驟類似于階段1:在CAS失敗時重新嘗試并檢查是否過時。特别是,如果遇到了self連結,那麼我們可以通過繼續周遊目前head來安全地跳轉到清單上的節點。
在成功追加時,如果調用是異步的,則傳回。
3. Await match or cancellation (method awaitMatch)
相反,如果目前線程被中斷或等待逾時,則取消。在多處理器上,我們使用隊列前旋轉:如果一個節點是隊列中第一個不比對的節點,它會在阻塞之前旋轉一點。在任何一種情況下,在阻塞之前,它都會嘗試在目前“頭”和第一個不比對的節點之間不拼接任何節點。
隊列前端旋轉極大地提高了激烈競争的隊列的性能。隻要它相對簡短且“安靜”,旋轉對競争較少的隊列的性能不會産生太大影響。在旋轉期間,線程檢查它們的中斷狀态并生成一個線程本地随機數,以決定偶爾執行Thread.yield。雖然yield的規格定義不足,但我們認為它可能會在限制旋轉對繁忙系統的影響方面有所幫助,而不會有壞處。我們還使用較小的(1/2)自旋來表示那些不知道是在前面的節點,但它們的前輩沒有被阻塞——這些“鍊式”自旋避免了隊列前面規則的人工産物,否則會導緻交替的節點旋轉vs阻塞。此外,與它們的前輩相比,表示相位變化(從資料到請求節點,或者相反)的前端線程接收到額外的鍊旋,反映了在相位變化期間解除線程阻塞通常需要的更長的路徑。
** Unlinking removed interior nodes **
這些可能是由于逾時或中斷等待,或調用remove(x)或Iterator.remove而引起的。通常情況下,給定一個節點,一度已知一些節點的前任年代也被删除,我們可以通過套管unsplice年代下一個字段的前任如果它仍然指出了s(否則年代必須已經被删除或現在offlist)。但是有兩種情況,我們不能保證節點不可到達的以這種方式:(1)如果s的拖尾節點清單(例如,空下),然後固定作為附加目标節點,是以隻能删除後附加上其他節點。我們不能拆開s(2)給定一個前任節點相比對(包括被取消的情況下):前任可能已經unspliced,在這種情況下,一些以前的可及節點可能仍然指向年代。(作進一步的解釋見Herlihy & Shavit第9章“多處理器程式設計的藝術”)。盡管如此,在這兩種情況下,我們可以排除需要進一步的行動如果年代或其前身(或可以),或脫落,清單。
當出現這些情況時,我們并不總是重新周遊整個清單來找到一個實際的前任來解除連結(這對case(1)無論如何都沒有幫助),而是記錄一個可能的unsplice失敗的保守估計(在“sweepVotes”中)。當估計值超過門檻值(“SWEEP_THRESHOLD”)時,我們會觸發一次全面清除,該門檻值表示在清除之前可以容忍的最大預估删除失敗數,取消在初始删除時未取消連結的取消節點的連結。我們執行掃描線程達到門檻值(而不是背景線程或通過傳播工作,其他線程)因為在主中移除時,調用者已經斷開,取消,或者執行一個潛在的O (n)操作(例如,删除(x)),其中沒有一個是時間關鍵型到足以替代将對其他線程的開銷。
因為sweepVotes估計是保守的,因為節點在從隊列頭掉下來時“自然地”斷開連結,并且因為我們允許在進行掃選時投票累積,這樣的節點通常比估計的要少得多。門檻值的選擇可以平衡浪費精力和争用的可能性,也可以提供靜态隊列中内部節點保留的最差情況邊界。下面定義的值是根據經驗選擇的,以便在各種逾時場景下平衡這些參數。
注意,我們不能在掃描期間自連結未連結的内部節點。然而,當某個後繼節點最終從清單的頭節點上脫落并自連結時,相關的垃圾鍊終止。
其中涉及Unsafe類的介紹,參見: