目录
-
- AQS实现线程通信原理
- 相关类源码分析
在上一篇并发多线程之AQS源码分析(上)中介绍了关于独占锁相关的方法,本篇来介绍AQS实现线程间通信Condition及ConditionObject等相关的源码分析。当然在梳理之前我们先把原理搞懂可以起到事半功倍的效果。
AQS实现线程通信原理
关于条件队列
在AQS中线程间通信是通过引入条件队列来实现,条件队列在前面已经介绍,条件队列是一个单向的链表结构。如下:
AQS通过引入条件队列,在线程不满足运行条件时,将线程添加至条件队列中并阻塞,当其他线程通过唤醒方法唤醒条件队列中的线程后,线程由条件队列添加至同步队列中等待被唤醒。
两个队列的关系如下:
这里需要说明一下几点:
- 节点由条件队列中添加至同步队列时,会从条件队列中剔除。
- 线程每次被放入条件队列时,都是新建一个Node实例,所以可能会存在持有当前线程的多个Node分别位于条件队列和同步队列中,这点是需要注意的,源码中也有很多相关的判断。
线程间通信流程
以最常见的一个示例来说明。代码如下:
public class LockDemo {
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void block() {
try {
lock.lock();
while(!check()) {
condition.await();
}
// doSomething
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
private void nonBlock() {
try {
lock.lock();
// doSomething
if (check()) {
condition.signalAll();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
}
lock.unlock();
}
private boolean check() {
// 检查条件是否继续执行
return false;
}
}
那么针对上面示例代码,在AQS中执行的流程如下:
这里简要的说明一下流程:
- 首先线程需要抢占锁,当然对于没有抢占到的流程在上一篇中已经说明,在此不再赘述。
- 线程抢占到锁以后,执行代码块,当不满足条件时,那么会调用await方法。
- 在调用await方法后,需要循环判断当前线程是否在同步队列中(为什么要循环判断,我的理解是线程位于同步队列中随时可能被唤醒,及时当前线程被阻塞同时运行条件不满足,那么这种唤醒时无意思的,所以需要保证当前线程不再同步队列中),当前线程不在同步队列中则阻塞桑倩线程。
- 满足条件的线程继续运行,当满足唤醒线程的条件时则唤醒条件队列中的线程。唤醒的过程主要是将条件队列中的节点追加至同步队列中并且将节点的前置节点状态修改为SIGNAL状态,便于唤醒。
- 当同步队列中线程执行到步骤4追加的节点后,唤醒调用await阻塞的线程。
- 被唤醒的由于调用await阻塞的线程进行锁资源的抢占。当获取到锁资源后,执行后续的代码。当然如果条件不满足则会再次调用await方法进入条件队列从而从步骤3开始运行。
- 以上步骤反复执行知道执行完代码并释放锁,从而从同步队列中唤醒后继线程。
相关类源码分析
AbstractQueuedSynchronizer类中与条件队列相关的方法
- 判断节点是否在同步队列。
该方法主要是判断参数节点是否在同步队列中,当然提供了一些快捷判断,如状态为CONDITION的话那就在条件队列中,同时如果后置节点也能证明在同步队列中,为什么不用前置节点不为空来判断呢?因为同步队列中头节点的前置节点为空不能作为判断依据,当这些快捷方法不起作用后,则调用findNodeFromTail来查找参数节点。final boolean isOnSyncQueue(Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null) return false; if (node.next != null) return true; return findNodeFromTail(node); }
- 遍历同步队列查找节点。
该方法是从同步队列的尾节点查找参数节点,存在返回true,不存在返回false。private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } }
- 完全释放资源。
将当前线程持有的所有资源释放。final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
- 将条件队列中节点添加至同步队列。
首先将参数节点的状态由CONDITION状态修改为0,当然没有成功则直接返回false。修改状态后将节点加入同步队列中,并且将同步队列中前置节点状态修改为SIGNAL便于后续的唤醒。final boolean transferForSignal(Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
- 取消等待节点传输。
该方法是在线程取消等待后将其添加同步队列中,如果取消操作是在被唤醒之前则返回true。后续的循环是保证当前线程被添加至同步队列中,这里就是一个竞态条件,当前线程可能被其他线程通过唤醒添加到同步队列中了,或者是在当前线在阻塞前被取消了,所以会将自己添加到同步队列,而这里就是处理了这两种情况下避免重复加入同步队列。final boolean transferAfterCancelledWait(Node node) { if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { enq(node); return true; } while (!isOnSyncQueue(node)) Thread.yield(); return false; }
以上就是AbstractQueuedSynchronizer类中关于条件队列的相关操作。接下来来了解一下条件队列的实现类ConditionObject的属性和方法。
条件队列的实现ConditionObject
- 条件队列的头节点和尾节点。
条件队列中的头节点和尾节点,两者为空代表队列为空。private transient Node firstWaiter; private transient Node lastWaiter;
- 无参构造。
- 向条件队列中增加节点。
该方法首先是判断队列中是否可能存在取消的节点,存在则将取消的节点从同步队列中剔除。在剔除完成后将当前线程封装成Node添加到条件队列中。private Node addConditionWaiter() { Node t = lastWaiter; if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
- 剔除取消队列。
从条件队列的头节点向后遍历将取消的节点剔除。private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } }
- 从条件队列中唤醒线程。
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
该方法从给定参数节点开始向条件队列尾部遍历,将第一个waitStatus为CONDIITON的节点追加到同步队列中。
注意:该方法只能转移一个或0个节点。
- 从条件队列中唤醒所有的线程。
在该方法中,遍历条件队列,将所有节点转移至同步队列中,同时将firstWaiter和lastWaiter均置为空。private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); }
doSignal和doSignalAll两个方法均没有判断节点waitStatus为CONDIITON,因为在transferForSignal方法中进行了判断,是通过compareAndSetWaitStatus(node, Node.CONDITION, 0)方法使用CAS判断,不为CONDITOON则更新失败,返回false。
- 唤醒等待线程。
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
1>通过AbstractQueuedSychronizeder的isHeldExclusively方法判断独占线程是 否是当前线程。
2>将条件队列中的第一个节点调用doSignal方法唤醒。
- 唤醒所有等待线程。
public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); }
1>判断当前线程是否是独占线程。
2>判断条件队列是否为空。
3>条件不为空则调用doSignalAll方法将所有节点唤醒。
- 判断在等待时是否被中断。
private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; }
1>判断当前线程的打断状态,没有打断则返回0。
2>打断,则通过调用transferAfterCancelledWait(Node node)方法判断当前节点是有当前线程入队,还是其他线程入队。
3>如果是当前线程入队,则返回THROW_IE。
4>如果是其他线程入队则返回REINTERRUPT。
- 根据标识是否再次中断。
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); }
1>通过interruptMode来执行指定操作。
2>interruptMode为THROW_IE则抛出异常。
3>interruptMode为REINTERRUPT则代用selfInterrupt()修改中断状态。
- 等待(条件队列的核心)。
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
1>通过 Thread.interrupted()判断,线程被打断过则抛出异常。
2>调用addConditionWaiter()方法将当前节点添加至条件队列中。
3>调用fullyRelease方法将当前资源全部释放。
4>循环判断当前节点是否在同步队列中,不在则阻塞当前线程。
5>在循环中,除正常唤醒外,interrupt也会唤醒当前线程,此时局部变量interruptMode会更改为checkInterruptWhileWaiting返回的值。
6>当前线程被唤醒后抢占资源过程中被打断,并且当前节点不是当前线程添加至同步队列的将interruptMode修改为REINTERRUPT。
7>从当前节点清除条件队列中状态不为CONDITION的节点。
8>调用reportInterruptAfterWait方法判断是抛出异常还是修改线程中断状态。
以上的方法主要就是AQS的条件队列实现的线程通信的相关源码,当然队列可中断的及指定时间等待与await大同小异,await理解后其他的也没有那么困难。接下来就是介绍基于AQS实现的可重入锁Lock及ReentrantLock,还有线程通信相关的Condition及ConditionObject是如何协作Lock进行的。这里需要主要的是ConditionObject是AbstractQueuedSynchronizer的一个内部类(非静态),所以ConditionObject的实例是与AQS的实例绑定的。同时一个AQS可以对应多个ConditionObject实例,每个ConditionObject对应一个条件队列,所有AQS可以创建多个不同条件的条件队列来存储由于不同条件而阻塞的线程,这一点是需要意识到的,当然同步队列对于一个AQS实例则只有一个。