目录
一、PriorityBlockingQueue
1、定义
2、构造方法
3、add / offer / put
4、poll / take / peek
5、remove / clear /drainTo
6、iterator / Itr
二、SynchronousQueue
1、定义
2、offer / put
3、poll / take / peek
4、remove / clear /drainTo
5、iterator
6、使用
7、TransferQueue 定义
8、TransferQueue transfer
9、TransferStack 定义
10、TransferStack transfer
PriorityBlockingQueue表示一个支持自动扩容的,线程安全的优先级队列,队列底层实现跟PriorityQueue是一致的,都是基于二叉堆。SynchronousQueue是一个特殊的队列,size始终为0,put或者take操作都会阻塞当前线程,直到与之对应的take或者put操作出现,其内部有两种实现,FIFO先进先出的公平模式和LIFO后进先出的非公平模式,可以通过构造参数fair来指定,默认为false,即非公平模式下,该模式相比公平模式能够支撑更大的吞吐量。
一、PriorityBlockingQueue
1、定义
PriorityBlockingQueue的类继承关系如下:
跟ArrayBlockingQueue实现的接口是一致的,相关方法的描述可以参考《Java8 ArrayBlockingQueue 源码解析》。
该类包含的属性如下:
//默认的初始容量
private static final int DEFAULT_INITIAL_CAPACITY = 11;
//数组的最大长度
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//保存元素的数组,索引为n的元素,其左右节点分别是2*n+1和2*(n+1)
private transient Object[] queue;
//元素的个数
private transient int size;
//用于排序的比较器
private transient Comparator<? super E> comparator;
//同步锁
private final ReentrantLock lock;
//队列为空时的等待的Condition
private final Condition notEmpty;
//数组初始化或者扩容时使用的锁
private transient volatile int allocationSpinLock;
//序列化使用的优先级队列
private PriorityQueue<E> q;
PriorityBlockingQueue优先级队列的实现也是基于二叉堆,二叉堆的介绍可以参考《java8 PriorityQueue接口实现源码解析》。
2、构造方法
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
public PriorityBlockingQueue(Collection<? extends E> c) {
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
boolean heapify = true; // 如果为true,则需要重新排序
boolean screen = true; //如果为true,则需要遍历所有元素,检查是否有null
if (c instanceof SortedSet<?>) {
//如果是SortedSet的实例
SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
this.comparator = (Comparator<? super E>) ss.comparator();
heapify = false;
}
else if (c instanceof PriorityBlockingQueue<?>) {
//如果是PriorityBlockingQueue的实例
PriorityBlockingQueue<? extends E> pq =
(PriorityBlockingQueue<? extends E>) c;
this.comparator = (Comparator<? super E>) pq.comparator();
screen = false;
if (pq.getClass() == PriorityBlockingQueue.class) // exact match
heapify = false;
}
Object[] a = c.toArray();
int n = a.length;
// 如果c.toArray 返回的数组不是Object[],则将其复制到Object[]中
if (a.getClass() != Object[].class)
a = Arrays.copyOf(a, n, Object[].class);
if (screen && (n == 1 || this.comparator != null)) {
//遍历数组元素,找到为null的节点
for (int i = 0; i < n; ++i)
if (a[i] == null)
throw new NullPointerException();
}
this.queue = a;
this.size = n;
if (heapify)
//排序
heapify();
}
private void heapify() {
Object[] array = queue;
int n = size;
//计算中间的索引位
int half = (n >>> 1) - 1;
Comparator<? super E> cmp = comparator;
//从中间的元素开始往前遍历,进行排序,排序的过程中会将其跟后一半的节点比较,并最终找到最小的一个节点
if (cmp == null) {
for (int i = half; i >= 0; i--)
siftDownComparable(i, (E) array[i], array, n);
}
else {
for (int i = half; i >= 0; i--)
siftDownUsingComparator(i, (E) array[i], array, n, cmp);
}
}
3、add / offer / put
这三个方法的实现都是基于offer方法,如果队列满了会自动扩容,插入时会不断的跟自己的父节点比大小,直到找到比自己小的父节点为止,即根节点是最小的。
public boolean add(E e) {
return offer(e);
}
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
//如果数组满了,则扩容
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
if (cmp == null)
//比较器为空,使用队列元素的Comparable接口
siftUpComparable(n, e, array);
else
//比较器不为空,使用比较器
siftUpUsingComparator(n, e, array, cmp);
//元素个数加1
size = n + 1;
//唤醒因为队列是空的而阻塞的线程
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e); // never need to block
}
public void put(E e) {
offer(e); // never need to block
}
//k是新节点的索引
private static <T> void siftUpComparable(int k, T x, Object[] array) {
//如果没有实现Comparable接口,此处就会报错
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
//找到父节点的索引并获取父节点
int parent = (k - 1) >>> 1;
Object e = array[parent];
//如果大于父节点则终止遍历
if (key.compareTo((T) e) >= 0)
break;
//如果小于父节点,将父节点复制到k上,继续往上遍历,直到找到比该节点小的父节点或者找到了根节点
//即根节点是最小的,从根节点往下,值越来越大
array[k] = e;
k = parent;
}
array[k] = key;
}
//逻辑同上,不过使用Comparator来比较大小
private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
Comparator<? super T> cmp) {
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = array[parent];
if (cmp.compare(x, (T) e) >= 0)
break;
array[k] = e;
k = parent;
}
array[k] = x;
}
private void tryGrow(Object[] array, int oldCap) {
//执行扩容前释放锁
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
//获取扩容锁
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
//计算新的容量,如果小于64则扩容一倍,大于64则扩容一半
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
//如果超过最大容量了
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError(); //加1超过最大容量,则抛出异常
//未超过,直接使用最大容量
newCap = MAX_ARRAY_SIZE;
}
//按照新的容量初始化一个数组
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
//释放扩容锁,因为加锁成功只能有一个线程,即最多只有一个线程进入此逻辑
allocationSpinLock = 0;
}
}
//等于null,说明抢占扩容锁失败,其他某个线程正在执行扩容
if (newArray == null) // back off if another thread is allocating
Thread.yield();//放弃当前的CPU时间片
//重新获取锁,因为需要执行复制了,在此期间数组不能修改
lock.lock();
if (newArray != null && queue == array) {
//新数组初始化完成,旧数组还未变更
queue = newArray;
//将旧数组的内容拷贝到新数组中
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
4、poll / take / peek
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//移除元素
return dequeue();
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
//如果队列为空则等待
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
//队列为空则等待,超时返回null
while ( (result = dequeue()) == null && nanos > 0)
nanos = notEmpty.awaitNanos(nanos);
} finally {
lock.unlock();
}
return result;
}
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//返回根节点元素
return (size == 0) ? null : (E) queue[0];
} finally {
lock.unlock();
}
}
private E dequeue() {
int n = size - 1;
if (n < 0)
//队列为空,返回null
return null;
else {
Object[] array = queue;
//获取根节点
E result = (E) array[0];
//获取最后的一个节点,将其对应的数组元素置为null
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
//将x放到合适的位置
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
//k表示删除元素的节点,x表示当前队列中最大的元素,n表示当前队列的大小
private static <T> void siftDownComparable(int k, T x, Object[] array,
int n) {
if (n > 0) {
Comparable<? super T> key = (Comparable<? super T>)x;
int half = n >>> 1; //找到中间的一个节点
//如果k大于或者等于half说明k是二叉堆中最下面一层的节点,无需重新排序
while (k < half) {
//获取k的左子节点,子节点都比父节点大
int child = (k << 1) + 1; //先假定左子节点比右子节点小
Object c = array[child];
//获取k的右节点
int right = child + 1;
if (right < n &&
((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
//如果左子节点比右子节点大,将c置为右子节点
c = array[child = right];
if (key.compareTo((T) c) <= 0) //key小于c节点则终止循环,将key保存到索引k处,此时k是该节点的父节点
break;
//如果key大于c节点,将c保存到k处,继续往下遍历,直到找到大于key的节点
array[k] = c;
k = child;
}
//将key保存到索引k处
array[k] = key;
}
}
//逻辑同上就是使用比较器判断大小
private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
int n,
Comparator<? super T> cmp) {
if (n > 0) {
int half = n >>> 1;
while (k < half) {
int child = (k << 1) + 1;
Object c = array[child];
int right = child + 1;
if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
c = array[child = right];
if (cmp.compare(x, (T) c) <= 0)
break;
array[k] = c;
k = child;
}
array[k] = x;
}
}
5、remove / clear /drainTo
public boolean remove(Object o) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//找到目标元素的索引
int i = indexOf(o);
if (i == -1) //如果目标元素不存在
return false;
removeAt(i); //移除该元素
return true;
} finally {
lock.unlock();
}
}
public void clear() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] array = queue;
int n = size;
size = 0;
//遍历将所有数组元素置为null
for (int i = 0; i < n; i++)
array[i] = null;
} finally {
lock.unlock();
}
}
public int drainTo(Collection<? super E> c) {
return drainTo(c, Integer.MAX_VALUE);
}
public int drainTo(Collection<? super E> c, int maxElements) {
//校验参数合法性
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
final ReentrantLock lock = this.lock;
lock.lock();
try {
//取size和maxElements的最小值
int n = Math.min(size, maxElements);
for (int i = 0; i < n; i++) {
//将根节点加入到c中,然后通过dequeue将其移除
c.add((E) queue[0]); // In this order, in case add() throws.
dequeue();
}
return n;
} finally {
lock.unlock();
}
}
private int indexOf(Object o) {
if (o != null) {
Object[] array = queue;
int n = size;
//遍历数组,找到目标元素,返回该元素的索引
for (int i = 0; i < n; i++)
if (o.equals(array[i]))
return i;
}
return -1;
}
private void removeAt(int i) {
Object[] array = queue;
int n = size - 1;
if (n == i) //如果是移除最后一个元素
array[i] = null;
else {
//获取最后一个节点,将其索引位置为null
E moved = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
//将moved元素放到合适的位置
if (cmp == null)
siftDownComparable(i, moved, array, n);
else
siftDownUsingComparator(i, moved, array, n, cmp);
//i如果大于half,即位于底层节点时,会直接将moved元素保存到i处
if (array[i] == moved) {
//将其当做一个新插入的节点处理,移动该节点的位置
//因为这种情形下moved节点可能比他的父节点要小
if (cmp == null)
siftUpComparable(i, moved, array);
else
siftUpUsingComparator(i, moved, array, cmp);
}
}
size = n;
}
6、iterator / Itr
public Iterator<E> iterator() {
//遍历的是当前数组的一个快照版
return new Itr(toArray());
}
final class Itr implements Iterator<E> {
final Object[] array; // Array of all elements
int cursor; // index of next element to return
int lastRet; // index of last element, or -1 if no such
Itr(Object[] array) {
lastRet = -1;
this.array = array;
}
public boolean hasNext() {
return cursor < array.length;
}
public E next() {
if (cursor >= array.length)
throw new NoSuchElementException();
lastRet = cursor;
//返回cursor对应的元素,然后将其加1
return (E)array[cursor++];
}
public void remove() {
if (lastRet < 0)
throw new IllegalStateException();
removeEQ(array[lastRet]);
lastRet = -1;
}
}
二、SynchronousQueue
1、定义
SynchronousQueue的类继承关系如下:
跟ArrayBlockingQueue实现的接口是一致的,相关方法的描述可以参考《Java8 ArrayBlockingQueue 源码解析》,两者不完全一致。
该类包含的属性如下:
//CPU的个数
static final int NCPUS = Runtime.getRuntime().availableProcessors();
//设置了等待时间时的自旋等待的次数
static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
//没有设置等待时间时的自旋等待的次数
static final int maxUntimedSpins = maxTimedSpins * 16;
//剩余时间低于该值就会自旋,没有次数的限制
static final long spinForTimeoutThreshold = 1000L;
//下面三个都是序列化时使用的,注意序列化的时候不会保存队列中的节点,只是保存队列的类型
//同步锁
private ReentrantLock qlock;
//用来记录队列的类型
private WaitQueue waitingProducers;
private WaitQueue waitingConsumers;
//Transferer接口实现
private transient volatile Transferer<E> transferer;
其中WaitQueue是一个没有方法的内部类,包含两个子类LifoWaitQueue和FifoWaitQueue,其定义如下:
这两个内部类用于标识SynchronousQueue所使用的模式,如果是公平模式,则waitingProducers属性是FifoWaitQueue的实例,否则是LifoWaitQueue的实例,可以参考writeObject方法的实现,如下:
Transferer是一个静态的内部抽象类,包含两个实现子类TransferQueue和TransferStack,分别表示FIFO模式和LIFO模式,这两个类也是内部类,并且是SynchronousQueue的核心实现,通过构造方法的入参决定使用哪种实现,如下:
Transferer的定义如下:
其中e就是执行操作的元素,如果是put操作则是put的对象,如果是take操作则是null;timed表示是否有等待期限,如果为true但是nanos小于等于0则不需要等待,否则等待指定的时间;如果timed为false,则表示无期限等待,即没有等待时间限制;该方法执行成功返回非null值,执行失败返回null。SynchronousQueue实现put或者take方法时都是基于transfer方法,下面先看下SynchronousQueue的方法实现,最后再看两个子类的具体实现。
2、offer / put
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
//不会等待,如果此前没有线程执行take方法则返回false
//跟offer接口本来的语义不一致
return transferer.transfer(e, true, 0) != null;
}
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
//操作成功返回true
if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
return true;
//操作失败且没有线程中断
if (!Thread.interrupted())
return false;
//如果是因为线程中断返回,则抛出异常
throw new InterruptedException();
}
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
//transfer会一直等待直到成功为止,返回null说明线程被中断了,抛出异常
Thread.interrupted();
throw new InterruptedException();
}
}
3、poll / take / peek
public E poll() {
//如果此前有一个put操作,则返回put的元素,否则返回null
return transferer.transfer(null, true, 0);
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E e = transferer.transfer(null, true, unit.toNanos(timeout));
//如果操作成功且没有线程中断,则返回e
if (e != null || !Thread.interrupted())
return e;
throw new InterruptedException();
}
public E take() throws InterruptedException {
//会无期限等待
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
//如果返回null,说明是被线程中断的,重新将线程标记为中断
Thread.interrupted();
//抛出异常
throw new InterruptedException();
}
public E peek() {
//因为队列是空的,这里直接返回null
return null;
}
4、remove / clear /drainTo
public boolean remove(Object o) {
return false;
}
public void clear() {
}
public int drainTo(Collection<? super E> c) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
int n = 0;
//poll方法会把在此之前put而阻塞的线程唤醒,返回put的元素
for (E e; (e = poll()) != null;) {
c.add(e);
++n;
}
return n;
}
public int drainTo(Collection<? super E> c, int maxElements) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
int n = 0;
for (E e; n < maxElements && (e = poll()) != null;) {
c.add(e);
++n;
}
return n;
}
5、iterator
public Iterator<E> iterator() {
//空遍历器,即未实际实现的遍历器,说明SynchronousQueue不支持遍历的
return Collections.emptyIterator();
}
6、使用
@Test
public void test() throws Exception {
BlockingQueue<String> test=new SynchronousQueue<>(true);
Thread a=new Thread(new Runnable() {
@Override
public void run() {
try {
boolean result=false;
long start=System.currentTimeMillis();
//put或者带等待时间的offer方法都会阻塞当前线程,直到另一个线程执行take操作
test.put("s");
// result= test.offer("s",3, TimeUnit.SECONDS);
//因为之前没有poll操作,此时直接返回false
// result=test.offer("s");
//put操作会阻塞2s
System.out.println("put succ,time="+(System.currentTimeMillis()-start)+",result->"+result);
} catch (Exception e) {
e.printStackTrace();
}
}
});
a.start();
Thread b=new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2000);
// System.out.println(test.take());
//poll方法会返回在此之前的一次put类方法put的元素
System.out.println(test.poll());
// System.out.println(test.poll(1,TimeUnit.SECONDS));
} catch (Exception e) {
e.printStackTrace();
}
}
});
b.start();
a.join();
System.out.println("main end");
}
@Test
public void test2() throws Exception {
BlockingQueue<String> test=new SynchronousQueue<>(true);
Thread a=new Thread(new Runnable() {
@Override
public void run() {
try {
boolean result=false;
Thread.sleep(2000);
//put或者带等待时间的offer方法都会阻塞当前线程,直到另一个线程执行take操作
// test.put("s");
// result= test.offer("s",3, TimeUnit.SECONDS);
result=test.offer("s");
//put操作会阻塞2s
System.out.println("put succ,result->"+result);
} catch (Exception e) {
e.printStackTrace();
}
}
});
a.start();
Thread b=new Thread(new Runnable() {
@Override
public void run() {
try {
long start=System.currentTimeMillis();
//take或者带参数的poll方法会阻塞当前线程,直到另一个线程执行put操作
System.out.println(test.take());
// System.out.println(test.poll(1,TimeUnit.SECONDS));
//因为之前没有执行put操作,此时直接返回null
// System.out.println(test.poll());
System.out.println("time->"+(System.currentTimeMillis()-start));
} catch (Exception e) {
e.printStackTrace();
}
}
});
b.start();
a.join();
System.out.println("main end");
}
7、TransferQueue 定义
TransferQueue包含的属性如下:
/** 链表头 */
transient volatile QNode head;
/** 链表尾 */
transient volatile QNode tail;
//缓存的待清理的节点
transient volatile QNode cleanMe;
head和tail是通过QNode的next属性构成的一个单向链表,如果head等于tail说明当前链表是空的。QNode是TransferQueue的一个内部类,其实现如下:
static final class QNode {
//链表中下一个节点
volatile QNode next; // next node in queue
//关联的元素
volatile Object item; // CAS'ed to or from null
//调用transfer方法的线程
volatile Thread waiter; // to control park/unpark
//是否执行put方法,为true表示put方法,false表示take方法
final boolean isData;
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
//修改next属性
boolean casNext(QNode cmp, QNode val) {
return next == cmp &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
//修改item属性
boolean casItem(Object cmp, Object val) {
return item == cmp &&
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
//将item属性指向自己
void tryCancel(Object cmp) {
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
}
//节点是否被取消
boolean isCancelled() {
return item == this;
}
//节点是否下线
boolean isOffList() {
return next == this;
}
//获取item和next属性的偏移量
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = QNode.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
8、TransferQueue transfer
TransferQueue的实现是先进先出的,如果链表是空的或者最后一个节点的操作类型跟当前操作类型一致,则初始化一个新的节点,插入到链表尾,然后让当前线程休眠,等待被唤醒或者等待超时或者被中断,如果是因为等待超时或者被中断唤醒则将该节点标记为已取消且从链表中移除,如果是正常唤醒则将当前节点标记为已下线,如果当前操作为put则返回目标元素e,否则返回被唤醒时返回的一个元素x,x是对应的put操作的节点的item,保证返回值非空;如果链表是非空的且最后一个节点的操作类型跟当前操作类型不一致,则将head节点的下一个节点对应的线程唤醒,将下一个节点作为新的head节点,如果下一个节点是put操作则返回该节点的item,否则返回当前操作的目标元素e,保证返回值非空;
TransferQueue() {
//head节点的isData为false,next为null
QNode h = new QNode(null, false); // initialize to dummy node.
head = h;
tail = h;
}
E transfer(E e, boolean timed, long nanos) {
QNode s = null; // constructed/reused as needed
//e不等于null表示执行put方法
boolean isData = (e != null);
for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null) //TransferQueue未初始化,不可能进入此逻辑
continue; // spin
if (h == t || t.isData == isData) { //如果链表没有等待的节点或者最后一个等待的节点执行的操作与当前操作一致
//获取下一个节点
QNode tn = t.next;
if (t != tail) //tail节点发生变更,通过for循环重新读取
continue;
if (tn != null) { //如果下一个节点不为空
advanceTail(t, tn); //将next节点作为新的tail节点
continue;
}
//进入此逻辑,tail节点的next节点为空
if (timed && nanos <= 0) //如果需要等待但是时间不是正数则不等待,此时直接返回null
return null;
if (s == null)
s = new QNode(e, isData); //给当前操作初始化一个对应的节点
if (!t.casNext(null, s)) //将s作为tail的next属性,如果修改失败,说明新插入了一个节点,则for循环重试
continue;
//上述casNext成功,s已经作为tail的next属性了
advanceTail(t, s); //将s作为新的tail节点
//执行等待
Object x = awaitFulfill(s, e, timed, nanos);
//线程被唤醒了
if (x == s) { //如果是等待超时被取消
clean(t, s); //从链表中移除节点s
return null;
}
if (!s.isOffList()) { //如果s没有从链表中移除
advanceHead(t, s); //如果head等于t,即没有其他等待节点时,将s作为head
if (x != null) // and forget fields
s.item = s; //将item指向自己,表示这个节点已下线
s.waiter = null;
}
//x不为空返回x,否则返回e
return (x != null) ? (E)x : e;
} else {
//如果链表有其他等待节点,且最后一个等待节点跟当前节点的操作不同
//比如初始化结束,链表中只有初始化的head节点,isData为false,此时执行put操作,因为isData不同就会进入此分支
QNode m = h.next; // node to fulfill
//t不等于tail 或者h不等于head 说明tail或者head节点变了
//m等于null说明head节点的操作结束了
if (t != tail || m == null || h != head)
continue; // inconsistent read
//获取关联的对象
Object x = m.item;
if (isData == (x != null) || // 如果m节点的操作跟当前操作类型一致
x == m || // m被取消了
!m.casItem(x, e)) { // 将x修改为e失败,e可能是null或者某个节点,如果修改成功则进入下面的逻辑
advanceHead(h, m); // 将head修改为m
//不唤醒m对应的线程,而是通过for循环处理下一个节点
continue;
}
//将m置为head节点
advanceHead(h, m); // successfully fulfilled
//唤醒m关联的线程
LockSupport.unpark(m.waiter);
//如果x为null,e肯定不为null,即如果操作成功了此方法肯定不返回null
return (x != null) ? (E)x : e;
}
}
}
void advanceTail(QNode t, QNode nt) {
if (tail == t) //如果tail节点没有变,则将其修改成nt
UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
}
//s是当前操作对应的节点,e是关联的元素,后面两个参数表示是否等待
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
/* 计算等待的终止时间*/
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
//如果s节点是head的下一个节点,则自旋等待,timed为true则自旋的次数为maxTimedSpins,否则为maxUntimedSpins
//如果不是head的下一个节点,不需要自旋等待
int spins = ((head.next == s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
//如果线程被中断了,将s标记为已取消
if (w.isInterrupted())
s.tryCancel(e);
Object x = s.item;
if (x != e) //x不等于e说明s的状态发生改变了,如果被取消了x指向s自己,如果是正常被唤醒,x变成唤醒该线程的e
return x;
if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
//等待超时,将其取消,下一次for循环x就不等于e了,返回x,即QNode节点本身
s.tryCancel(e);
continue;
}
}
//自旋,spins减1,进入下一次for循环
if (spins > 0)
--spins;
else if (s.waiter == null)
s.waiter = w; //记录当前线程
else if (!timed)
//无期限休眠
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
//休眠指定时间
LockSupport.parkNanos(this, nanos);
}
}
//pred是s的上一个节点
void clean(QNode pred, QNode s) {
s.waiter = null; // forget thread
while (pred.next == s) { // Return early if already unlinked
QNode h = head;
QNode hn = h.next; // Absorb cancelled first node as head
if (hn != null && hn.isCancelled()) {
//将hn作为head节点,原来的head节点移除
advanceHead(h, hn);
continue;
}
QNode t = tail; // Ensure consistent read for tail
if (t == h) //没有其他节点了,则返回
return;
QNode tn = t.next;
if (t != tail) //tail节点发生修改,重新循环读取
continue;
if (tn != null) { //将tn作为新的tail节点
advanceTail(t, tn);
continue;
}
//tail的next节点为null
if (s != t) { // If not tail, try to unsplice
QNode sn = s.next;
//如果sn等于s说明该节点已经从链表中移除了
//如果sn不等于则将pred的next属性修改为sn,即将s从链表中移除,如果修改成功也是返回
if (sn == s || pred.casNext(s, sn))
return;
}
//
QNode dp = cleanMe;
if (dp != null) { //将上一个取消的节点从链表中移除
QNode d = dp.next;
QNode dn;
if (d == null || // d is gone or
d == dp || // d is off list or
!d.isCancelled() || // d not cancelled or
(d != t && // d not tail and
(dn = d.next) != null && // has successor
dn != d && // that is on list
dp.casNext(d, dn)) ) //将dn作为dp的下一个节点,即移除d
casCleanMe(dp, null); //将cleanMe置为null
if (dp == pred)
return; // s is already saved node
} else if (casCleanMe(null, pred)) //将cleanMe修改成pred
return; // Postpone cleaning s
}
}
void advanceHead(QNode h, QNode nh) {
if (h == head &&
UNSAFE.compareAndSwapObject(this, headOffset, h, nh)) //将nh修改为head
h.next = h; //将head节点从链表中移除了
}
//修改cleanMe为val
boolean casCleanMe(QNode cmp, QNode val) {
return cleanMe == cmp &&
UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
}
9、TransferStack 定义
TransferStack包含的实例属性如下:
/** 表示take操作 */
static final int REQUEST = 0;
/** 表示put操作 */
static final int DATA = 1;
/** 表示当前节点与上一个节点的操作类型不同 */
static final int FULFILLING = 2;
/** 单向链表的链表头 */
volatile SNode head;
静态属性如下:
SNode是一个内部类,其定义如下:
static final class SNode {
//下一个节点
volatile SNode next; // next node in stack
//与之配对的节点,put操作的节点和take操作的节点配对
volatile SNode match; // the node matched to this
//执行操作的线程
volatile Thread waiter; // to control park/unpark
Object item; // data; or null for REQUESTs
//当前节点的模式
int mode;
SNode(Object item) {
this.item = item;
}
//修改next属性
boolean casNext(SNode cmp, SNode val) {
return cmp == next &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
boolean tryMatch(SNode s) {
//这里的match并没有像命名一样做了匹配,如果match为null,将其赋值
if (match == null &&
UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
//将match属性原子的修改成s
Thread w = waiter;
if (w != null) { // waiters need at most one unpark
//waiter不为空,将其对应的线程唤醒
waiter = null;
LockSupport.unpark(w);
}
return true;
}
return match == s;
}
//将match属性修改为当前节点
void tryCancel() {
UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
}
boolean isCancelled() {
return match == this;
}
//获取属性偏移量
private static final sun.misc.Unsafe UNSAFE;
private static final long matchOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = SNode.class;
matchOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("match"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
10、TransferStack transfer
TransferStack用于实现LIFO模式,跟TransferQueue稍有不同,最大的区别是TransferStack下每次新添加节点时都是插入到链表前面而TransferQueue是插入到链表后面。另一个区别是,当前节点的操作类型与最近一个插入的节点的操作类型不一致时,TransferStack会新创建一个节点,将该节点与后面的节点做匹配,直到匹配成功为止即找到了不同操作类型的节点或者没有可匹配的节点为止,匹配成功后会把匹配成功的节点对应的线程唤醒,然后将该节点与新节点一起从链表中移除,再下一个节点作为新的head节点,即TransferStack会优先匹配最近才插入的节点,从而实现LIFO模式;而TransferQueue下不会创建新的节点,而是直接唤醒head节点的下一个节点对应的线程,head节点是最早插入的,从而实现FIFO模式。
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {
SNode s = null; // constructed/reused as needed
int mode = (e == null) ? REQUEST : DATA;
for (;;) {
SNode h = head;
if (h == null || h.mode == mode) { //如果链表是空的或者head的操作和当前操作一致
if (timed && nanos <= 0) { //如果不需要等待
if (h != null && h.isCancelled())
casHead(h, h.next); //如果head节点被取消了,则将next节点作为新的head节点
else
return null;
//需要等待
} else if (casHead(h, s = snode(s, e, h, mode))) { //创建一个新节点,并插入到链表头,原来的head节点作为新节点的next节点
//执行等待
SNode m = awaitFulfill(s, timed, nanos);
//线程被唤醒了
if (m == s) { // 该节点因为中断或者等待超时被取消了
clean(s); //将该节点从链表中移除,同时移除所有已经被取消的节点
return null;
}
//其他情形就是返回跟s配对的一个节点
if ((h = head) != null && h.next == s) //head跟s节点配对,将这两个从链表中移除,tryMatch成功后会执行相同的逻辑
casHead(h, s.next); //将next节点作为新的head节点
//如果当前操作是take,m的操作就是put则返回m的item,否则返回s的item,总而言之就是返回一个非null值
return (E) ((mode == REQUEST) ? m.item : s.item);
}
//链表不是空的且head节点的操作跟当前节点不一致
} else if (!isFulfilling(h.mode)) { //如果模式不是FULFILLING,
if (h.isCancelled()) //如果节点被取消
casHead(h, h.next); //将next节点作为新的节点
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { //创建一个新节点,插入到链表头,模式是FULFILLING,head作为新节点的next节点
//不断for循环直到match成功或者等待的节点都没了
for (;;) {
//m是s的下一个节点,与s的操作模式不同
SNode m = s.next; //m.tryMatch(s)失败时,会将s的next属性修改为mn,下一次for循环m就可能为null
if (m == null) {
//说明此时没有匹配的节点了,将head节点置为null
casHead(s, null); // pop fulfill node
//s置为null,下一次外层的for循环会重新创建一个节点
s = null;
break; //重新开始外层的for循环
}
SNode mn = m.next;
if (m.tryMatch(s)) {//如果m节点的match属性为null,将其修改为s,并唤醒m对应的线程
casHead(s, mn); //s和m相当于是配对了,这两个都从链表中移除,mn作为新的head节点
//如果当前操作是take,m的操作就是put则返回m的item,否则返回s的item,总而言之就是返回一个非null值
return (E) ((mode == REQUEST) ? m.item : s.item);
} else
//tryMatch返回false,说明m节点已经被分配了,将mn作为s的next属性,下一次for循环跟mn配对
s.casNext(m, mn); //将s的next节点修改为mn
}
} else { // head节点是FULFILLING
SNode m = h.next; // m is h's match
if (m == null) // m为空,将head置为null
casHead(h, null);
else {
SNode mn = m.next;
if (m.tryMatch(h)) //看m节点和h是否匹配
casHead(h, mn);
else
h.casNext(m, mn); // help unlink
}
}
}
}
static SNode snode(SNode s, Object e, SNode next, int mode) {
//创建一个新节点
if (s == null) s = new SNode(e);
s.mode = mode;
s.next = next;
return s;
}
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
int spins = (shouldSpin(s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted())
s.tryCancel(); //线程被中断,将目标节点标记为已取消
SNode m = s.match;
if (m != null) //将目标节点标记为已取消,就会将match赋值成this,m不为null了
//s节点调用tryMatch方法成功,m也会不为null
return m;
if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel(); //如果等待超时,将目标节点取消
continue;
}
}
if (spins > 0)
spins = shouldSpin(s) ? (spins-1) : 0; //执行自旋
else if (s.waiter == null)
s.waiter = w; //保存当前操作的线程
else if (!timed)
LockSupport.park(this); //无期限等待
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos); //等待指定的时间
}
}
//判断是否需要自旋等待
boolean shouldSpin(SNode s) {
SNode h = head;
return (h == s || h == null || isFulfilling(h.mode));
}
static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
void clean(SNode s) {
//相关属性置为null
s.item = null; // forget item
s.waiter = null; // forget thread
//获取下一个节点,注意此时s就是head节点
SNode past = s.next;
if (past != null && past.isCancelled())
past = past.next;
//从head往后遍历找到第一个不是取消的节点,将前面已取消的从链表中移除
SNode p;
while ((p = head) != null && p != past && p.isCancelled())
casHead(p, p.next); //将head节点重置了,s节点就从链表中移除了
//遍历p后面的节点,将所有的被取消的节点都移除
while (p != null && p != past) {
SNode n = p.next;
if (n != null && n.isCancelled())
p.casNext(n, n.next);
else
p = n;
}
}