為充分利用機器性能,人們發明了多線程。但同時帶來了線程安全問題,于是人們又發明了同步鎖。
這個問題自然人人知道,但你真的了解同步鎖嗎?還是說你會用其中的上鎖與解鎖功能?
今天我們就一起來深入看同步鎖的原理和實作吧!
一、同步鎖的職責
同步鎖的職責可以說就一個,限制資源的使用(線程安全從屬)。
它一般至少會包含兩個功能: 1. 給資源加鎖; 2. 給資源解鎖;另外,它一般還有 等待/通知 即 wait/notify 的功能;
同步鎖的應用場景:多個線程同時操作一個事務必須保證正确性;一個資源隻能同時由一線程通路操作;一個資源最多隻能接入k的并發通路;保證通路的順序性;
同步鎖的實作方式:作業系統排程實作;應用自行實作;CAS自旋;
同步鎖的幾個問題:
為什麼它能保證線程安全?
鎖等待耗CPU嗎?
使用鎖後性能下降嚴重的原因是啥?
二、同步鎖的實作一:lock/unlock
其實對于應用層來說,非常多就是 lock/unlock , 這也是鎖的核心。
AQS 是java中很多鎖實作的基礎,因為它屏蔽了很多繁雜而底層的阻塞操作,為上層抽象出易用的接口。
我們就以AQS作為跳闆,先來看一下上鎖的過程。為不至于陷入具體鎖的業務邏輯中,我們先以最簡單的 CountDownLatch 看看。
// 先看看 CountDownLatch 的基礎資料結構,可以說是不能再簡單了,就繼承了 AQS,然後簡單覆寫了幾個必要方法。
// java.util.concurrent.CountDownLatch.Sync
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
// 隻有一種情況會擷取鎖成功,即 state == 0 的時候
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
// 原始的鎖數量是在初始化時指定的不可變的,每次釋放一個鎖辨別
int nextc = c-1;
if (compareAndSetState(c, nextc))
// 隻有一情況會釋放鎖成功,即本次釋放後 state == 0
return nextc == 0;
}
}
}
private final Sync sync;
重點1,我們看看上鎖過程,即 await() 的調用。
public void await() throws InterruptedException {
// 調用 AQS 的接口,由AQS實作了鎖的骨架邏輯
sync.acquireSharedInterruptibly(1);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireSharedInterruptibly
/**
* Acquires in shared mode, aborting if interrupted. Implemented
* by first checking interrupt status, then invoking at least once
* {@link #tryAcquireShared}, returning on success. Otherwise the
* thread is queued, possibly repeatedly blocking and unblocking,
* invoking {@link #tryAcquireShared} until success or the thread
* is interrupted.
* @param arg the acquire argument.
* This value is conveyed to {@link #tryAcquireShared} but is
* otherwise uninterpreted and can represent anything
* you like.
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 首先嘗試擷取鎖,如果成功就不用阻塞了
// 而從上面的邏輯我們看到,擷取鎖相當之簡單,是以,擷取鎖本身并沒有太多的性能消耗喲
// 如果擷取鎖失敗,則會進行稍後嘗試,這應該是複雜而精巧的
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 首先将目前線程添加排隊隊尾,此處會保證線程安全,稍後我們可以看到
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 擷取其上一節點,如果上一節點是頭節點,就代表目前線程可以再次嘗試擷取鎖了
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 先檢測是否需要阻塞,然後再進行阻塞等待,阻塞由 LockSupport 底層支援
// 如果阻塞後,将不會主動喚醒,隻會由 unlock 時,主動被通知
// 是以,此處即是擷取鎖的最終等待點
// 作業系統将不會再次排程到本線程,直到擷取到鎖
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 如此線程安全地添加目前線程到隊尾? CAS 保證
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
// 檢測是否需要進行阻塞
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
// 隻有前置節點是 SIGNAL 狀态的節點,才需要進行 阻塞等待,當然前置節點會在下一次循環中被設定好
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// park 阻塞實作
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
// 将目前 AQS 執行個體作為鎖對象 blocker, 進行作業系統調用阻塞, 是以所有等待鎖的線程将會在同一個鎖前提下執行
LockSupport.park(this);
return Thread.interrupted();
}
如上,上鎖過程是比較簡單明了的。加入一隊列,然後由作業系統将線程調出。(那麼作業系統是如何把線程調出的呢?有興趣自行研究)
重點2. 解鎖過程,即 countDown() 調用
public void countDown() {
// 同樣直接調用 AQS 的接口,由AQS實作了鎖的釋放骨架邏輯
sync.releaseShared(1);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared
/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
public final boolean releaseShared(int arg) {
// 調用業務實作的釋放邏輯,如果成功,再執行底層的釋放,如隊列移除,線程通知等等
// 在 CountDownLatch 的實作中,隻有 state == 0 時才會成功,是以它隻會執行一次底層釋放
// 這也是我們認為 CountDownLatch 能夠做到多線程同時執行的效果的原因之一
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
/**
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
// 隊列不為空才進行釋放
if (h != null && h != tail) {
int ws = h.waitStatus;
// 看過上面的 lock 邏輯,我們知道隻要在阻塞狀态,一定是 Node.SIGNAL
if (ws == Node.SIGNAL) {
// 狀态改變成功,才進行後續的喚醒邏輯
// 因為先改變狀态成功,才算是線程安全的,再進行喚醒,否則進入下一次循環再檢查
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 将頭節點的下一節點喚醒,如有必要
unparkSuccessor(h);
}
// 這裡的 propagates, 是要傳播啥呢??
// 為什麼隻喚醒了一個線程,其他線程也可以動了?
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// 喚醒下一個節點
// 但如果下一節點已經取消等待了,那麼就找下一個沒最近的沒被取消的線程進行喚醒
// 喚醒隻是針對一個線程的喲
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
重要3. 線程解鎖的傳播性?
因為從上一節的講解中,我們看到,當使用者調用 countDown 時,僅僅是讓作業系統喚醒了 head 的下一個節點線程或者最近未取消的節點。那麼,從哪裡來的所有線程都擷取了鎖進而運作呢?
其實是在 擷取鎖的過程中,還有一點我們未看清:
// java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireShared
/**
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// 當countDown被調用後,head節點被喚醒,執行
int r = tryAcquireShared(arg);
if (r >= 0) {
// 擷取到鎖後,設定node為下一個頭節點,并把喚醒狀态傳播下去,而這裡面肯定會做一些喚醒其他線程的操作,請看下文
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* Sets head of queue, and checks if successor may be waiting
* in shared mode, if so propagating if either propagate > 0 or
* PROPAGATE status was set.
*
* @param node the node
* @param propagate the return value from a tryAcquireShared
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// 如果有必要,則做一次喚醒下一線程的操作
// 在 countDown() 不會觸發此操作,是以這裡隻是一個内部調用傳播
Node s = node.next;
if (s == null || s.isShared())
// 此處鎖釋放邏輯如上,總之,又是另一次的喚醒觸發
doReleaseShared();
}
}
到此,我們明白了它是怎麼做到一個鎖釋放,所有線程可通行的。也從根本上回答了我們猜想,所有線程同時并發運作。然而并沒有,它隻是通過喚醒傳播性來依次喚醒各個等待線程的。從絕對時間性上來講,都是有先後關系的。以後可别再淺顯說是同時執行了喲。
三、 鎖的切換:wait/notify
上面看出,針對一個lock/unlock 的過程還是很簡單的,由作業系統負責大頭,實作代碼也并不多。
但是針對稍微有點要求的場景,就會進行條件式的操作。比如:持有某個鎖運作一段代碼,但是,運作時發現某條件不滿足,需要進行等待而不能直接結束,直到條件成立。即所謂的 wait 操作。
乍一看,wait/notify 與 lock/unlock 很像,其實不然。區分主要是 lock/unlock 是針對整個代碼段的,而 wait/notify 則是針對某個條件的,即擷取了鎖不代表條件成立了,但是條件成立了一定要在鎖的前提下才能進行安全操作。
那麼,是否 wait/notify 也一樣的實作簡單呢?比如java的最基礎類 Object 類就提供了 wait/notify 功能。
我們既然想一探究竟,還是以并發包下的實作作為基礎吧,畢竟 java 才是我們的強項。
本次,咱們以 ArrayBlockingQueue#put/take 作為基礎看下這種場景的使用先。
ArrayBlockingQueue 的put/take 特性就是,put當隊列滿時,一直阻塞,直到有可用位置才繼續運作下一步。而take當隊列為空時一樣阻塞,直到隊列裡有資料才運作下一步。這種場景使用鎖主不好搞了,因為這是一個條件判斷。put/take 如下:
// java.util.concurrent.ArrayBlockingQueue#put
/**
* Inserts the specified element at the tail of this queue, waiting
* for space to become available if the queue is full.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 當隊列滿時,一直等待
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
// java.util.concurrent.ArrayBlockingQueue#take
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 當隊列為空時一直等待
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
看起來相當簡單,完全符合人類思維。隻是,這裡使用的兩個變量進行控制流程 notFull,notEmpty. 這兩個變量是如何進行關聯的呢?
在這之前,我們還需要補充下上面的例子,即 notFull.await(), notEmpty.await(); 被阻塞了,何時才能運作呢?如上代碼在各自的入隊和出隊完成之後進行通知就可以了。
// 與 put 對應,入隊完成後,隊列自然就不為空了,通知下 notEmpty 就好了
/**
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
*/
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
// 我已放入一個元素,不為空了
notEmpty.signal();
}
// 與 take 對應,出隊完成後,自然就不可能是滿的了,至少一個空餘空間。
/**
* Extracts element at current take position, advances, and signals.
* Call only when holding lock.
*/
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// 我已移除一個元素,肯定沒有滿了,你們繼續放入吧
notFull.signal();
return x;
}
是不是超級好了解。是的。不過,我們不是想看 ArrayBlockingQueue 是如何實作的,我們是要論清 wait/notify 是如何實作的。因為畢竟,他們不是一個鎖那麼簡單。
// 三個鎖的關系,即 notEmpty, notFull 都是 ReentrantLock 的條件鎖,相當于是其子集吧
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
// lock.newCondition() 是什麼鬼?它是 AQS 中實作的 ConditionObject
// java.util.concurrent.locks.ReentrantLock#newCondition
public Condition newCondition() {
return sync.newCondition();
}
// java.util.concurrent.locks.ReentrantLock.Sync#newCondition
final ConditionObject newCondition() {
// AQS 中定義
return new ConditionObject();
}
接下來,我們要帶着幾個疑問來看這個 Condition 的對象:
1. 它的 wait/notify 是如何實作的?
2. 它是如何與互相進行聯系的?
3. 為什麼 wait/notify 必須要在外面的lock擷取之後才能執行?
4. 它與Object的wait/notify 有什麼相同和不同點?
能夠回答了上面的問題,基本上對其原理與實作也就了解得差不多了。
重點1. wait/notify 是如何實作的?
我們從上面可以看到,它是通過調用 await()/signal() 實作的,到底做事如何,且看下面。
// java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await()
/**
* Implements interruptible condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled or interrupted.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 添加目前線程到 等待線程隊列中,有 lastWaiter/firstWaiter 維護
Node node = addConditionWaiter();
// 釋放目前lock中持有的鎖,詳情且看下文
int savedState = fullyRelease(node);
// 從以下開始,将不再保證線程安全性,因為目前的鎖已經釋放,其他線程将會重新競争鎖使用
int interruptMode = 0;
// 循環判定,如果目前節點不在 sync 同步隊列中,那麼就反複阻塞自己
// 是以判斷是否在 同步隊列上,是很重要的
while (!isOnSyncQueue(node)) {
// 沒有在同步隊列,阻塞
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 當條件被滿足後,需要重新競争鎖,詳情看下文
// 競争到鎖後,原樣傳回到 wait 的原點,繼續執行業務邏輯
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 下面是異常處理,忽略
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
/**
* Invokes release with current state value; returns saved state.
* Cancels node and throws exception on failure.
* @param node the condition node for this wait
* @return previous sync state
*/
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
// 預期的,都是釋放鎖成功,如果失敗,說明目前線程并并未擷取到鎖,引發異常
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
// tryRelease 由用戶端自定義實作
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// 如何判定目前線程是否在同步隊列中或者可以進行同步隊列?
/**
* Returns true if a node, always one that was initially placed on
* a condition queue, is now waiting to reacquire on sync queue.
* @param node the node
* @return true if is reacquiring
*/
final boolean isOnSyncQueue(Node node) {
// 如果上一節點還沒有被移除,目前節點就不能被加入到同步隊列
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 如果目前節點的下遊節點已經存在,則它自身必定已經被移到同步隊列中
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
// 最終直接從同步隊列中查找,如果找到,則自身已經在同步隊列中
return findNodeFromTail(node);
}
/**
* Returns true if node is on sync queue by searching backwards from tail.
* Called only when needed by isOnSyncQueue.
* @return true if present
*/
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
// 當條件被滿足後,需要重新競争鎖,以保證外部的鎖語義,因為之前自己已經将鎖主動釋放
// 這個鎖與 lock/unlock 時的一毛一樣,沒啥可講的
// java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC , 在同步隊列中,每次被喚醒,都進行嘗試擷取鎖,失敗則進行阻塞等待下一次被喚醒,直到成功
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
總結一下 wait 的邏輯:
1. 前提:自身已擷取到外部鎖;
2. 将目前線程添加到 ConditionQueue 等待隊列中;
3. 釋放已擷取到的鎖;
4. 反複檢查進入等待,直到目前節點被移動到同步隊列中;
5. 條件滿足被喚醒,重新競争外部鎖,成功則傳回,否則繼續阻塞;(外部鎖是同一個,這也是要求兩個對象必須存在依賴關系的原因)
6. wait前線程持有鎖,wait後線程持有鎖,沒有一點外部鎖變化;
重點2. 厘清了 wait, 接下來,我們看 signal() 通知喚醒的實作:
// java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signal
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signal() {
// 隻有擷取鎖的執行個體,才可以進行signal,否則你拿什麼去保證線程安全呢
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
// 通知 firstWaiter
if (first != null)
doSignal(first);
}
/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
// 最多隻轉移一個 節點
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
// 将一個節點從 等待隊列 移動到 同步隊列中,即可參與下一輪競争
// 隻有确實移動成功才會傳回 true
// 說明:目前線程是持有鎖的線程
// java.util.concurrent.locks.AbstractQueuedSynchronizer#transferForSignal
/**
* Transfers a node from a condition queue onto sync queue.
* Returns true if successful.
* @param node the node
* @return true if successfully transferred (else the node was
* cancelled before signal)
*/
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
// 同步隊列由 head/tail 指針維護
Node p = enq(node);
int ws = p.waitStatus;
// 注意,此處正常情況下并不會喚醒等待線程,僅是将隊列轉移。
// 因為目前線程的鎖保護區域并未完成,完成後自然會喚醒其他等待線程
// 否則将會存在目前線程任務還未執行完成,卻被其他線程搶了先去,那接下來的任務當如何??
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
總結一下,notify 的功能原理如下:
2. 轉移下一個等待隊列的節點到同步隊列中;
3. 如果遇到下一節點被取消情況,順延到再下一節點直到為空,至多轉移一個節點;
4. 正常情況下不做線程的喚醒操作;
是以,實作 wait/notify, 最關鍵的就是維護兩個隊列,等待隊列與同步隊列,而且都要求是在有外部鎖保證的情況下執行。
到此,我們也能回答一個問題:為什麼wait/notify一定要在鎖模式下才能運作?
因為wait是等待條件成立,此時必定存在競争需要做保護,而它自身又必須釋放鎖以使外部條件可成立,且後續需要做恢複動作;而notify之後可能還有後續工作必須保障安全,notify隻是鎖的一個子集。。。
四、通知所有線程的實作:notifyAll
有時條件成立後,可以允許所有線程通行,這時就可以進行 notifyAll, 那麼如果達到通知所有的目的呢?是一起通知還是??
以下是 AQS 中的實作:
// java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signalAll
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
/**
* Removes and transfers all nodes.
* @param first (non-null) the first node on condition queue
*/
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
可以看到,它是通過周遊所有節點,依次轉移等待隊列到同步隊列(通知)的,原本就沒有人能同時幹幾件事的!
本文從java實作的角度去解析同步鎖的原理與實作,但并不局限于java。道理總是相通的,隻是像作業系統這樣的大佬,能幹的活更純粹:比如讓cpu根本不用排程一個線程。
不要害怕今日的苦,你要相信明天,更苦!