当我们想合理使用 JUC (java.util.concurrent)包下的工具去做一些场景的需求时,如果了解其中的原理,对于我们写代码可能事半功倍。今天就分享下我对
ReentrantLock
和
Condition
的理解。
以前文章 两个线程交替执行输出,一个数字1-10,一个字符a-e ,打印出来12a34b56c78d910e
这里使用了
java.util.concurrent
包下的
ReentrantLock
和
Condition
。
上次分析了
ReentrantLock
的原理请点击 ReentrantLock lock unLock 原理分析 。
接着上次
ReentrantLock
的分析,本次开始分析
Condition
的原理
文章目录
- Condition `await signal` 原理分析
-
- 线程的等待与唤醒的体现
-
- LockAwaitDemo.await
- LockSignalDemo.signal
-
- await 被唤醒后的逻辑
- 疑问解答
Condition await signal
原理分析
await signal
线程的等待与唤醒的体现
我们知道在进行线程的等待和唤醒的时候,需要拿到锁,通过锁中维护的
Condition
来实现锁的等待与唤醒
// 获得一个 Condition
public Condition newCondition() {
return sync.newCondition();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
// ConditionObject 中维护了一个等待队列
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
public ConditionObject() { }
}
LockAwaitDemo.await
该方法是将获得锁的线程添加到等待队列中
new Node('LockAwaitDemo Thread', Node.CONDITION)
,释放锁并使当前线程处于阻塞状态。
// 等待
public final void await() throws InterruptedException {
// 判断线程是否被中断,
// 因为该方法会清除标志位,
// 如果该线程未执行中断,则会返回false
// 如果中断过第一次执行该方法则会返回true,再次执行也是会返回false
if (Thread.interrupted())
throw new InterruptedException();
// 将当前页面添加到等待队列中,如果队列不存在则创建一个,
// 并返回该节点(包含当前线程和waitStatus=CONDITION)
Node node = addConditionWaiter();
// 释放当前节点的重入锁,并返回重入次数
int savedState = fullyRelease(node);
int interruptMode = 0;
// 判断该节点是否存在同步队列中,
// 如果不存在同步队列中,则 park 当前线程
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
// 当代码执行了 await()方法时,当前线程将阻塞在这里
// 到此为止主要做了几步:
// 1. 线程拿到锁 ,从同步队列中取出了
// 2. 线程释放锁,并将该线程封装到等待队列中
// 3. 线程阻塞在这里,等待从等待队列中再次取出该线程进行后续操作
// todo 下面的等执行完 signal 后在分析
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
// 添加当前线程到等待队列中
// 将当前线程添加到等待队列最后一个节点中
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
// 如果 最后一个等待节点不为空且不为CONDITION 说明为取消状态,则删除该节点
if (t != null && t.waitStatus != Node.CONDITION) {
// 删除处于取消状态的节点,
// 并再次获得最后一个等待节点(该节点不再是取消状态了,因为已经删除了所有出于取消状态的节点)
unlinkCancelledWaiters();
t = lastWaiter;
}
// 等待队列的节点:当前线程,CONDITION(-2)
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
// 删除处于取消状态的节点
// 从等待队列中的第一个节点开始进行遍历判断是否是取消状态,如果是取消状态则删除该节点,
// 将该节点的下一个节点的引用取消,并将该节点的下一个节点作为该等待队列的第一个节点
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
// 用来做中间值 ,来删除已经取消的节点
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
// 如果 firstWaiter 节点的 waitStatus != CONDITION ,说明是取消状态的节点
if (t.waitStatus != Node.CONDITION) {
// 设置 firstWaiter 下一个节点的引用为空,
t.nextWaiter = null;
if (trail == null)
// 在等待队列中,如果第一个节点为取消状态则将第原本处于第二的节点设置为第一个节点
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
// 如果没有下一个节点了,则将该节点设置为最后一个节点
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
// 释放当前等待节点的所有锁,返回当前锁重入的次数
final int fullyRelease(Node node) {
boolean failed = true;
try {
// 锁标记,大于0 表示有锁,大于1表示冲入的次数
int savedState = getState();
// 释放重入锁
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
// 失败则设置为取消状态
node.waitStatus = Node.CANCELLED;
}
}
// 判断当前节点是否在同步队列中
final boolean isOnSyncQueue(Node node) {
// 如果该节点的状态是 -2 ,则不在同步队列中
// 如果该节点的prev为空,有两种可能,
// 一种是 在等待队列中,因为该队列中的节点没有prev属性,所以为空
// 另一个是head 节点, 即获得锁的节点,也不存在同步队列中
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 因为同步队列中存在next ,而等待队列中存在 nextWaiter ,
// 所以 node.next 不为空说明就是同步队列
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
// 从tail 节点倒着查看该节点是否存在同步队列中
// node.prev可以是非null,但尚未在队列中,
// 因为将其置于队列中的CAS可能会失败。
// 所以我们必须从尾部穿过以确保它实际上成功。
return findNodeFromTail(node);
}
// 从tail倒着遍历,如果存在 节点等于 node ,
// 则说明在同步队列中存在该节点
// 为什么这样做呢?
// 因为在 enq 中是先设置prev 的关系,在通过CAS 替换tail ,
// 如果此时prev 不为空,但是 next 却没有,便可以通过这种方式找到
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIyVGduV2YfNWawNCM38FdsYkRGZkRG9lcvx2bjxiNx8VZ6l2cs0DNXlVe5cVWxQmMMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnLwgDN5IDNxITMwMDOwAjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
LockSignalDemo.signal
从同步队列中取出
firstWaiter
, 并将该节点状态由
CONDITION(-2)
CAS 替换成
SIGNAL(-1)
,并添加当前节点到同步队列中,进入到同步队列中的线程会再次去争抢锁。
// 唤醒线程
public final void signal() {
// 获得锁的线程不是当前线程,报错
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
// 从等待队列中获得第一个节点下一个节点,赋值给第一个节点,
// 即 从等待队列中取出第一个节点,
// 如果原本处于第二的节点为空,说明该等待对列中就只有一个等待节点,
// 节点取出直接取完了,则设置lastWaiter 为空
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 取消 first 节点的 nextWaiter 的指向,帮助GC
first.nextWaiter = null;
// 设置该first 节点的waitStatus 由-1 变为0 ,
// 如果替换失败,且 等待队列中还有其他节点的话,继续进行替换下面的节点
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
// CAS 替换 node 节点 的 waitStatus 为 0
// 如果替换失败返回false ,只有一种可能就是节点被 CANCELLED 了
// 为什么要替换? 这是要将原本处于等待队列中的节点放到同步队列中
// 比如 线程A 获得锁之后,进行await ,是将该线程添加到等待队列中并阻塞
// signal 方法是将该等待队列再次放到 同步队列中,再次能够竞争锁
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
// CAS 修改 node 节点的状态成功,则将该节点添加到同步队列中
// 并返回原本的tail 节点:即 node 添加到同步队列的上一个节点
Node p = enq(node);
int ws = p.waitStatus;
// 如果该节点的上一个同步节点 是取消状态
// 或者 修改上一个节点状态为 SIGNAL 失败则进行 unpark(唤醒该节点的线程)
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 唤醒节点上的线程.
LockSupport.unpark(node.thread);
return true;
}
await 被唤醒后的逻辑
上面主要做了两件事,
-
将拿到锁的线程封装成node(CONDITION) 节点添加到等待队列中, 释放锁并阻塞(park)当前线程LockAwaitDemo
-
将拿到锁的线程从等待队列中取出,将取出的等待节点添加到同步队列中并唤醒该节点的线程LockSignalDemo
到此为止我们知道了,
LockAwaitDemo
线程已经被阻塞了, 而
LockSignalDemo
执行完
doSignal()
方法后会在执行了
unLock()
后才会释放锁,
LockAwaitDemo
线程才会再次去争抢锁。那么抢到锁之后还会做什么呢? 再接着
await
未分析完的代码分析。
// 等待
public final void await() throws InterruptedException {
// .....
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
// 当代码执行了 await()方法时,当前线程将阻塞在这里
// 到此为止主要做了几步:
// 1. 线程拿到锁 ,从同步队列中取出了
// 2. 线程释放锁,并将该线程封装到等待队列中
// 3. 线程阻塞在这里,等待从等待队列中再次取出该线程进行后续操作
// 当 signal 执行完毕释放锁后会再次执行到这里。
// 再等待时检查是否中断
// 如果未中断过返回0 , 中断过返回1
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 让该节点再次去竞争锁,重入次数为原来的释放的次数
// 如果线程未被中断,interruptMode = 0
// 后面的逻辑主要是判断在等待过程中线程是否被中断了,如果中断了会判断是抛异常还是继续中断
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 删除处于取消状态的节点
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
private int checkInterruptWhileWaiting(Node node) {
// 未中断过则返回0 ,
return Thread.interrupted() ?
// 如果在等待过程中,被中断了,则需要转移这些中断的节点
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
// 如果添加到同步队列中 则 返回true
// 添加不到同步队列中,返回false
final boolean transferAfterCancelledWait(Node node) {
// 将等待节点添加到同步队列中
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
疑问解答
- 线程的等待与唤醒的体现: 当
线程执行LockAwaitDemo
时, 是怎么阻塞当前线程的? 当await()
线程阻塞后,LockAwaitDemo
是怎么通过阻塞进行获得锁的? 当LockSignalDemo
线程拿到锁之后,是怎么唤醒LockSignalDemo
LockAwaitDemo
阻塞线程的?
答:
- 当我们去进行线程的等待与唤醒的时候,需要拿到锁。拿到锁的线程便是head 节点(thread=null,waitStatus=-1,exclusiveOwnerThread=当前线程,state=重入次数),当进行
操作时,会释放重入锁,并将当前线程封装成node(thread=LockAwaitDemo.await()
,waitStatus=-2|CONDITION),添加到等待队列中并阻塞当前线程。LockAwaitDemo
-
线程 释放锁(重入锁)后,此时LockAwaitDemo
线程会存在同步队列中,等待线程释放后进行unpark head 节点的next 节点(LockSignalDemo
线程),在通过自旋再次竞争锁LockAwaitDemo
-
线程拿到锁后,将从等待队列中取出firstWaiter 节点的waitStatus修改为0, 并进行唤醒该节点的线程(unparkLockSignalDemo
线程)和添加到同步队列中。当LockAwaitDemo
线程执行完所有的代码 并LockSignalDemo
释放锁后,此时lock.unlock()
线程便可以继续去竞争锁并拿到锁LockAwaitDemo
- 当我们去进行线程的等待与唤醒的时候,需要拿到锁。拿到锁的线程便是head 节点(thread=null,waitStatus=-1,exclusiveOwnerThread=当前线程,state=重入次数),当进行