前言:AQS架構在J.U.C中的地位不言而喻,可以說沒有AQS就沒有J.U.C包,可見其重要性,是以有必要對其原理進行詳細深入的了解。
1.AQS是什麼
在深入AQS之前,首先我們要搞清楚什麼是AQS。AQS全稱是AbstractQueuedSynchronizer,我們直接檢視AQS源碼的注釋。

大緻意思就是說:AQS提供了實作阻塞鎖和相關同步器并依賴先進先出(FIFO)等待隊列的架構。
AQS依賴一個原子數值作為鎖的狀态,子類可以有多個狀态值,隻能通過原子方法區操作該值,進而保證同步。
通過第一段的注釋大緻總結下AQS是什麼:
①AQS是一個同步的基礎架構,基于一個先進先出的隊列。
②鎖機制依賴一個原子值的狀态。
③AQS的子類負責定義與操作這個狀态值,但必須通過AQS提供的原子操作。
④AQS剩餘的方法就是圍繞隊列,與線程阻塞喚醒等功能。
2.重要成員變量
AQS中有兩個重要的成員變量:Node和ConditionObject。
①Node的作用是存儲擷取鎖失敗的線程,并且維護一個CLH FIFO隊列,該隊列是會被多線程操作的,是以Node中大部分變量都是被volatile修飾,并且通過自旋和CAS進行原子性的操作。CLH的資料結構如下:
Node有一個模式的屬性:獨占模式和共享模式,獨占模式下資源是線程獨占的,共享模式下,資源是可以被多個線程占用的。
Node源碼如下:
1 static final class Node {
2 /** Marker to indicate a node is waiting in shared mode */
3 static final Node SHARED = new Node(); // 共享模式
4 /** Marker to indicate a node is waiting in exclusive mode */
5 static final Node EXCLUSIVE = null; // 獨占模式
6
7 /** waitStatus value to indicate thread has cancelled */
8 static final int CANCELLED = 1; // 表明線程已處于結束狀态(被取消)
9 /** waitStatus value to indicate successor's thread needs unparking */
10 static final int SIGNAL = -1; // 表明線程需要被喚醒
11 /** waitStatus value to indicate thread is waiting on condition */
12 static final int CONDITION = -2; // 表明線程正處于條件隊列上,等待某一條件
13 /**
14 * waitStatus value to indicate the next acquireShared should
15 * unconditionally propagate
16 */
17 static final int PROPAGATE = -3; // 共享模式下同步狀态會被傳播
18
19 /**
20 * Status field, taking on only the values:
21 * SIGNAL: The successor of this node is (or will soon be)
22 * blocked (via park), so the current node must
23 * unpark its successor when it releases or
24 * cancels. To avoid races, acquire methods must
25 * first indicate they need a signal,
26 * then retry the atomic acquire, and then,
27 * on failure, block.
28 * CANCELLED: This node is cancelled due to timeout or interrupt.
29 * Nodes never leave this state. In particular,
30 * a thread with cancelled node never again blocks.
31 * CONDITION: This node is currently on a condition queue.
32 * It will not be used as a sync queue node
33 * until transferred, at which time the status
34 * will be set to 0. (Use of this value here has
35 * nothing to do with the other uses of the
36 * field, but simplifies mechanics.)
37 * PROPAGATE: A releaseShared should be propagated to other
38 * nodes. This is set (for head node only) in
39 * doReleaseShared to ensure propagation
40 * continues, even if other operations have
41 * since intervened.
42 * 0: None of the above
43 *
44 * The values are arranged numerically to simplify use.
45 * Non-negative values mean that a node doesn't need to
46 * signal. So, most code doesn't need to check for particular
47 * values, just for sign.
48 *
49 * The field is initialized to 0 for normal sync nodes, and
50 * CONDITION for condition nodes. It is modified using CAS
51 * (or when possible, unconditional volatile writes).
52 */
53 volatile int waitStatus;
54
55 /**
56 * Link to predecessor node that current node/thread relies on
57 * for checking waitStatus. Assigned during enqueuing, and nulled
58 * out (for sake of GC) only upon dequeuing. Also, upon
59 * cancellation of a predecessor, we short-circuit while
60 * finding a non-cancelled one, which will always exist
61 * because the head node is never cancelled: A node becomes
62 * head only as a result of successful acquire. A
63 * cancelled thread never succeeds in acquiring, and a thread only
64 * cancels itself, not any other node.
65 */
66 volatile Node prev;
67
68 /**
69 * Link to the successor node that the current node/thread
70 * unparks upon release. Assigned during enqueuing, adjusted
71 * when bypassing cancelled predecessors, and nulled out (for
72 * sake of GC) when dequeued. The enq operation does not
73 * assign next field of a predecessor until after attachment,
74 * so seeing a null next field does not necessarily mean that
75 * node is at end of queue. However, if a next field appears
76 * to be null, we can scan prev's from the tail to
77 * double-check. The next field of cancelled nodes is set to
78 * point to the node itself instead of null, to make life
79 * easier for isOnSyncQueue.
80 */
81 volatile Node next;
82
83 /**
84 * The thread that enqueued this node. Initialized on
85 * construction and nulled out after use.
86 */
87 volatile Thread thread;
88
89 /**
90 * Link to next node waiting on condition, or the special
91 * value SHARED. Because condition queues are accessed only
92 * when holding in exclusive mode, we just need a simple
93 * linked queue to hold nodes while they are waiting on
94 * conditions. They are then transferred to the queue to
95 * re-acquire. And because conditions can only be exclusive,
96 * we save a field by using special value to indicate shared
97 * mode.
98 */
99 Node nextWaiter;
100
101 /**
102 * Returns true if node is waiting in shared mode.
103 */
104 final boolean isShared() {
105 return nextWaiter == SHARED;
106 }
107
108 /**
109 * Returns previous node, or throws NullPointerException if null.
110 * Use when predecessor cannot be null. The null check could
111 * be elided, but is present to help the VM.
112 *
113 * @return the predecessor of this node
114 */
115 final Node predecessor() throws NullPointerException {
116 Node p = prev;
117 if (p == null)
118 throw new NullPointerException();
119 else
120 return p;
121 }
122
123 Node() { // Used to establish initial head or SHARED marker
124 }
125 // 線程加入等待結點
126 Node(Thread thread, Node mode) { // Used by addWaiter
127 this.nextWaiter = mode;
128 this.thread = thread;
129 }
130 // 線程加入條件對列,會帶上線程的狀态值waitStatus
131 Node(Thread thread, int waitStatus) { // Used by Condition
132 this.waitStatus = waitStatus;
133 this.thread = thread;
134 }
135 }
②ConditionObject:條件隊列,這個類的作用從AQS的注釋上可知。
該類主要是為了讓子類實作獨占模式。AQS架構下獨占模式的擷取資源、釋放等操作到最後都是基于這個類實作的。隻有在獨占模式下才會去使用該類。
ConditionObject源碼如下(對主要代碼進行了注釋):
1 public class ConditionObject implements Condition, java.io.Serializable {
2 private static final long serialVersionUID = 1173984872572414699L;
3 /** First node of condition queue. */
4 private transient Node firstWaiter; // 存儲條件對列中第一個節點
5 /** Last node of condition queue. */
6 private transient Node lastWaiter; // 存儲條件對列中最後一個節點
7
8 /**
9 * Creates a new {@code ConditionObject} instance.
10 */
11 public ConditionObject() { }
12
13 // Internal methods
14
15 /**
16 * Adds a new waiter to wait queue. // 增加一個新的節點到等待隊列中
17 * @return its new wait node
18 */
19 private Node addConditionWaiter() {
20 Node t = lastWaiter;
21 // 如果最後一個節點的狀态已經結束,則直接清理掉
22 // If lastWaiter is cancelled, clean out.
23 if (t != null && t.waitStatus != Node.CONDITION) {
24 // 拆分已經處于結束狀态的節點 也就是清除掉這類節點
25 unlinkCancelledWaiters();
26 t = lastWaiter;
27 }
28 // 建立一個新的節點,帶上結點狀态,表明結點處于條件對列上
29 Node node = new Node(Thread.currentThread(), Node.CONDITION);
30 /**
31 條件隊列中加入節點都是從隊尾加入,并且從下面代碼可知,每次都會存儲最後一個節點的值。
32 當最後一個節點為空時,說明隊列中不存在節點,是以将node指派給第一個節點,否則将節點加入對列尾
33 */
34 if (t == null)
35 firstWaiter = node;
36 else
37 t.nextWaiter = node;
38 lastWaiter = node; // 存儲最後一個節點的值
39 return node;
40 }
41
42 /**
43 * 喚醒節點
44 * 移除和轉換節點直到節點狀态處于未結束或者為空 (節點移除相當于喚醒)
45 * Removes and transfers nodes until hit non-cancelled one or
46 * null. Split out from signal in part to encourage compilers
47 * to inline the case of no waiters.
48 * @param first (non-null) the first node on condition queue
49 */
50 private void doSignal(Node first) {
51 do {
52 // 當next節點為null時,則将lastWaiter指派為null
53 if ( (firstWaiter = first.nextWaiter) == null)
54 lastWaiter = null;
55 first.nextWaiter = null; // 切斷目前節點
56 } while (!transferForSignal(first) &&
57 (first = firstWaiter) != null);
58 }
59
60 /**
61 * 喚醒所有節點
62 * Removes and transfers all nodes.
63 * @param first (non-null) the first node on condition queue
64 */
65 private void doSignalAll(Node first) {
66 lastWaiter = firstWaiter = null;
67 do {
68 // 循環喚醒所有節點,代碼還是比較容易了解
69 // 将每個節點直接截斷即可
70 Node next = first.nextWaiter;
71 first.nextWaiter = null;
72 transferForSignal(first);
73 first = next;
74 } while (first != null);
75 }
76
77 /**
78 * Unlinks cancelled waiter nodes from condition queue.
79 * Called only while holding lock. This is called when
80 * cancellation occurred during condition wait, and upon
81 * insertion of a new waiter when lastWaiter is seen to have
82 * been cancelled. This method is needed to avoid garbage
83 * retention in the absence of signals. So even though it may
84 * require a full traversal, it comes into play only when
85 * timeouts or cancellations occur in the absence of
86 * signals. It traverses all nodes rather than stopping at a
87 * particular target to unlink all pointers to garbage nodes
88 * without requiring many re-traversals during cancellation
89 * storms.
90 */
91 private void unlinkCancelledWaiters() { // 删除處于結束狀态的節點
92 Node t = firstWaiter;
93 Node trail = null;
94 // 第一個節點為空,直接傳回
95 // 這裡會周遊所有節點
96 while (t != null) {
97 Node next = t.nextWaiter; // 記錄下一個節點的值
98 // 當節點狀态不為CONDITION
99 if (t.waitStatus != Node.CONDITION) {
100 // 首先将目前節點的下一個節點指派為空,切斷目前節點鍊路
101 t.nextWaiter = null;
102 // 如果追蹤節點為空的時候,則存儲第一個節點的值為next,因為目前節點狀态不為CONDITION需要清理
103 if (trail == null)
104 firstWaiter = next;
105 else // 在追蹤節點串聯下一個節點,主要是為了存儲最後一個節點的值
106 trail.nextWaiter = next;
107 if (next == null) // 當next為空時,則存儲trail為最後一個節點,将最後一個節點值存儲下來
108 lastWaiter = trail;
109 }
110 else // 當節點狀态為CONDITION時,将該節點指派給trail
111 trail = t;
112 t = next; // 将next指派給t,繼續周遊
113 }
114 }
115
116 // public methods
117
118 /**
119 * 喚醒等待時間最長的節點,使其擁有鎖
120 * Moves the longest-waiting thread, if one exists, from the
121 * wait queue for this condition to the wait queue for the
122 * owning lock.
123 *
124 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
125 * returns {@code false}
126 */
127 public final void signal() {
128 // 如果線程不是獨占資源,則抛出異常,從這裡也說明ConditionObject隻能用在獨占模式中
129 if (!isHeldExclusively())
130 throw new IllegalMonitorStateException();
131 Node first = firstWaiter;
132 if (first != null)
133 doSignal(first);
134 }
135
136 /**
137 * 喚醒所有等待節點
138 * Moves all threads from the wait queue for this condition to
139 * the wait queue for the owning lock.
140 *
141 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
142 * returns {@code false}
143 */
144 public final void signalAll() {
145 if (!isHeldExclusively())
146 throw new IllegalMonitorStateException();
147 Node first = firstWaiter;
148 if (first != null)
149 doSignalAll(first);
150 }
151
152 /**
153 * 節點不間斷等待
154 * Implements uninterruptible condition wait.
155 * <ol>
156 * <li> Save lock state returned by {@link #getState}.
157 * <li> Invoke {@link #release} with saved state as argument,
158 * throwing IllegalMonitorStateException if it fails.
159 * <li> Block until signalled.
160 * <li> Reacquire by invoking specialized version of
161 * {@link #acquire} with saved state as argument.
162 * </ol>
163 */
164 public final void awaitUninterruptibly() {
165 Node node = addConditionWaiter();
166 int savedState = fullyRelease(node);
167 boolean interrupted = false;
168 while (!isOnSyncQueue(node)) {
169 LockSupport.park(this);
170 if (Thread.interrupted())
171 interrupted = true;
172 }
173 if (acquireQueued(node, savedState) || interrupted)
174 selfInterrupt();
175 }
176
177 /*
178 * For interruptible waits, we need to track whether to throw
179 * InterruptedException, if interrupted while blocked on
180 * condition, versus reinterrupt current thread, if
181 * interrupted while blocked waiting to re-acquire.
182 */
183
184 /** Mode meaning to reinterrupt on exit from wait */
185 private static final int REINTERRUPT = 1;
186 /** Mode meaning to throw InterruptedException on exit from wait */
187 private static final int THROW_IE = -1;
188
189 /**
190 * Checks for interrupt, returning THROW_IE if interrupted
191 * before signalled, REINTERRUPT if after signalled, or
192 * 0 if not interrupted.
193 */
194 private int checkInterruptWhileWaiting(Node node) {
195 return Thread.interrupted() ?
196 (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
197 0;
198 }
199
200 /**
201 * Throws InterruptedException, reinterrupts current thread, or
202 * does nothing, depending on mode.
203 */
204 private void reportInterruptAfterWait(int interruptMode)
205 throws InterruptedException {
206 if (interruptMode == THROW_IE)
207 throw new InterruptedException();
208 else if (interruptMode == REINTERRUPT)
209 selfInterrupt();
210 }
211
212 /**
213 * Implements interruptible condition wait.
214 * <ol>
215 * <li> If current thread is interrupted, throw InterruptedException.
216 * <li> Save lock state returned by {@link #getState}.
217 * <li> Invoke {@link #release} with saved state as argument,
218 * throwing IllegalMonitorStateException if it fails.
219 * <li> Block until signalled or interrupted.
220 * <li> Reacquire by invoking specialized version of
221 * {@link #acquire} with saved state as argument.
222 * <li> If interrupted while blocked in step 4, throw InterruptedException.
223 * </ol>
224 */
225 public final void await() throws InterruptedException {
226 if (Thread.interrupted())
227 throw new InterruptedException();
228 Node node = addConditionWaiter();
229 int savedState = fullyRelease(node);
230 int interruptMode = 0;
231 while (!isOnSyncQueue(node)) {
232 LockSupport.park(this);
233 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
234 break;
235 }
236 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
237 interruptMode = REINTERRUPT;
238 if (node.nextWaiter != null) // clean up if cancelled
239 unlinkCancelledWaiters();
240 if (interruptMode != 0)
241 reportInterruptAfterWait(interruptMode);
242 }
243
244 /**
245 * Implements timed condition wait.
246 * <ol>
247 * <li> If current thread is interrupted, throw InterruptedException.
248 * <li> Save lock state returned by {@link #getState}.
249 * <li> Invoke {@link #release} with saved state as argument,
250 * throwing IllegalMonitorStateException if it fails.
251 * <li> Block until signalled, interrupted, or timed out.
252 * <li> Reacquire by invoking specialized version of
253 * {@link #acquire} with saved state as argument.
254 * <li> If interrupted while blocked in step 4, throw InterruptedException.
255 * </ol>
256 */
257 public final long awaitNanos(long nanosTimeout)
258 throws InterruptedException {
259 if (Thread.interrupted())
260 throw new InterruptedException();
261 Node node = addConditionWaiter();
262 int savedState = fullyRelease(node);
263 final long deadline = System.nanoTime() + nanosTimeout;
264 int interruptMode = 0;
265 while (!isOnSyncQueue(node)) {
266 if (nanosTimeout <= 0L) {
267 transferAfterCancelledWait(node);
268 break;
269 }
270 if (nanosTimeout >= spinForTimeoutThreshold)
271 LockSupport.parkNanos(this, nanosTimeout);
272 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
273 break;
274 nanosTimeout = deadline - System.nanoTime();
275 }
276 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
277 interruptMode = REINTERRUPT;
278 if (node.nextWaiter != null)
279 unlinkCancelledWaiters();
280 if (interruptMode != 0)
281 reportInterruptAfterWait(interruptMode);
282 return deadline - System.nanoTime();
283 }
284
285 /**
286 * Implements absolute timed condition wait.
287 * <ol>
288 * <li> If current thread is interrupted, throw InterruptedException.
289 * <li> Save lock state returned by {@link #getState}.
290 * <li> Invoke {@link #release} with saved state as argument,
291 * throwing IllegalMonitorStateException if it fails.
292 * <li> Block until signalled, interrupted, or timed out.
293 * <li> Reacquire by invoking specialized version of
294 * {@link #acquire} with saved state as argument.
295 * <li> If interrupted while blocked in step 4, throw InterruptedException.
296 * <li> If timed out while blocked in step 4, return false, else true.
297 * </ol>
298 */
299 public final boolean awaitUntil(Date deadline)
300 throws InterruptedException {
301 long abstime = deadline.getTime();
302 if (Thread.interrupted())
303 throw new InterruptedException();
304 Node node = addConditionWaiter();
305 int savedState = fullyRelease(node);
306 boolean timedout = false;
307 int interruptMode = 0;
308 while (!isOnSyncQueue(node)) {
309 if (System.currentTimeMillis() > abstime) {
310 timedout = transferAfterCancelledWait(node);
311 break;
312 }
313 LockSupport.parkUntil(this, abstime);
314 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
315 break;
316 }
317 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
318 interruptMode = REINTERRUPT;
319 if (node.nextWaiter != null)
320 unlinkCancelledWaiters();
321 if (interruptMode != 0)
322 reportInterruptAfterWait(interruptMode);
323 return !timedout;
324 }
325
326 /**
327 * Implements timed condition wait.
328 * <ol>
329 * <li> If current thread is interrupted, throw InterruptedException.
330 * <li> Save lock state returned by {@link #getState}.
331 * <li> Invoke {@link #release} with saved state as argument,
332 * throwing IllegalMonitorStateException if it fails.
333 * <li> Block until signalled, interrupted, or timed out.
334 * <li> Reacquire by invoking specialized version of
335 * {@link #acquire} with saved state as argument.
336 * <li> If interrupted while blocked in step 4, throw InterruptedException.
337 * <li> If timed out while blocked in step 4, return false, else true.
338 * </ol>
339 */
340 public final boolean await(long time, TimeUnit unit)
341 throws InterruptedException {
342 long nanosTimeout = unit.toNanos(time);
343 if (Thread.interrupted())
344 throw new InterruptedException();
345 Node node = addConditionWaiter();
346 int savedState = fullyRelease(node);
347 final long deadline = System.nanoTime() + nanosTimeout;
348 boolean timedout = false;
349 int interruptMode = 0;
350 while (!isOnSyncQueue(node)) {
351 if (nanosTimeout <= 0L) {
352 timedout = transferAfterCancelledWait(node);
353 break;
354 }
355 if (nanosTimeout >= spinForTimeoutThreshold)
356 LockSupport.parkNanos(this, nanosTimeout);
357 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
358 break;
359 nanosTimeout = deadline - System.nanoTime();
360 }
361 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
362 interruptMode = REINTERRUPT;
363 if (node.nextWaiter != null)
364 unlinkCancelledWaiters();
365 if (interruptMode != 0)
366 reportInterruptAfterWait(interruptMode);
367 return !timedout;
368 }
369
370 // support for instrumentation
371
372 /**
373 * Returns true if this condition was created by the given
374 * synchronization object.
375 *
376 * @return {@code true} if owned
377 */
378 final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
379 return sync == AbstractQueuedSynchronizer.this;
380 }
381
382 /**
383 * Queries whether any threads are waiting on this condition.
384 * Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}.
385 *
386 * @return {@code true} if there are any waiting threads
387 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
388 * returns {@code false}
389 */
390 protected final boolean hasWaiters() {
391 if (!isHeldExclusively())
392 throw new IllegalMonitorStateException();
393 for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
394 if (w.waitStatus == Node.CONDITION)
395 return true;
396 }
397 return false;
398 }
399
400 /**
401 * Returns an estimate of the number of threads waiting on
402 * this condition.
403 * Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}.
404 *
405 * @return the estimated number of waiting threads
406 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
407 * returns {@code false}
408 */
409 protected final int getWaitQueueLength() {
410 if (!isHeldExclusively())
411 throw new IllegalMonitorStateException();
412 int n = 0;
413 for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
414 if (w.waitStatus == Node.CONDITION)
415 ++n;
416 }
417 return n;
418 }
419
420 /**
421 * Returns a collection containing those threads that may be
422 * waiting on this Condition.
423 * Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}.
424 *
425 * @return the collection of threads
426 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
427 * returns {@code false}
428 */
429 protected final Collection<Thread> getWaitingThreads() {
430 if (!isHeldExclusively())
431 throw new IllegalMonitorStateException();
432 ArrayList<Thread> list = new ArrayList<Thread>();
433 for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
434 if (w.waitStatus == Node.CONDITION) {
435 Thread t = w.thread;
436 if (t != null)
437 list.add(t);
438 }
439 }
440 return list;
441 }
442 }
View Code
3.AQS成員函數
由于AQS分獨占模式和共享模式,是以這裡按獨占、共享模式的順序對AQS的成員函數進行分析。
①acquire(int arg)
獨占模式下擷取資源,如果擷取到資源,線程直接傳回,否則進入等待隊列,直到擷取到資源為止,整個過程忽略中斷。源碼如下:
1 /**
2 * Acquires in exclusive mode, ignoring interrupts. Implemented
3 * by invoking at least once {@link #tryAcquire},
4 * returning on success. Otherwise the thread is queued, possibly
5 * repeatedly blocking and unblocking, invoking {@link
6 * #tryAcquire} until success. This method can be used
7 * to implement method {@link Lock#lock}.
8 *
9 * @param arg the acquire argument. This value is conveyed to
10 * {@link #tryAcquire} but is otherwise uninterpreted and
11 * can represent anything you like.
12 */
13 public final void acquire(int arg) {
14 if (!tryAcquire(arg) &&
15 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
16 selfInterrupt();
17 }
該函數執行流程:
A.如果tryAcquire()成功擷取資源,則直接傳回。
B.直接擷取資源失敗,則通過addWaiter()将線程加入隊列尾,并标記為獨占模式。
C.通過acquireQueued()讓線程在等待隊列中擷取資源,通過自旋方式,一直擷取到後才傳回。如果在等待過程中被中斷過,則傳回true,否則傳回false。
D.如果線程在等待擷取資源的過程中被中斷,隻有在擷取到資源後才會去響應,執行selfInterrupt進行自我中斷。
#1.tryAcquire(int)
該方法是在獨占模式下擷取資源,成功-ture,失敗-false。
1 protected boolean tryAcquire(int arg) {
2 throw new UnsupportedOperationException();
3 }
直接調用該方法會抛出異常,因為AQS隻是一個架構,隻是定義該接口,具體實作需在子類中實作。
#2.addWaiter(Node mode)
将目前線程加入等待隊列的隊尾,并傳回目前線程所在的節點。
1 private Node addWaiter(Node mode) {
2 // 建立節點,以獨占模式
3 Node node = new Node(Thread.currentThread(), mode);
4 // Try the fast path of enq; backup to full enq on failure
5 // 嘗試将節點快速放入隊尾
6 Node pred = tail;
7 if (pred != null) {
8 node.prev = pred;
9 // 主要通過CAS入隊尾
10 if (compareAndSetTail(pred, node)) {
11 pred.next = node;
12 return node;
13 }
14 }
15 // 如果快速入隊尾失敗,則通過enq方式入對尾
16 enq(node);
17 return node;
18 }
CAS操作後面讨論,這裡先看enq(final Node node)入隊尾操作。
1 private Node enq(final Node node) {
2 // 這裡是CAS的“自旋”操作,直到将節點成功加入隊尾
3 for (;;) {
4 Node t = tail;
5 // 因為每次入隊都是從隊尾加入,當隊尾為null,則表明隊列為null,則需初始化頭結點
6 // 并将尾節點也指向頭節點
7 if (t == null) { // Must initialize
8 if (compareAndSetHead(new Node()))
9 tail = head;
10 } else { // 通過CAS入隊尾,自旋操作
11 node.prev = t;
12 if (compareAndSetTail(t, node)) {
13 t.next = node;
14 return t;
15 }
16 }
17 }
18 }
線上程入隊尾後,就需要acquireQueued函數了,該函數的作用是讓線程拿到資源,當然還是通過自旋的方式來拿資源,也是就是一個排隊的過程。
1 final boolean acquireQueued(final Node node, int arg) {
2 boolean failed = true; // 标記是否成功拿到資源
3 try {
4 boolean interrupted = false; // 标記在等待過程中是否被中斷過
5 // 自旋操作
6 for (;;) {
7 final Node p = node.predecessor(); // 拿到目前節點的前向節點
8 // 如果前向節點為head,則表明目前節點排在第二位了,已經得到擷取資源的資格
9 if (p == head && tryAcquire(arg)) {
10 // 成功拿到資源後,将head節點指向目前節點
11 // 從這裡可以看出,head節點就是目前擷取到鎖的節點
12 setHead(node);
13 // 将原來head節點的next設定為null,友善GC回收以前的head節點,也就意味着之前拿到鎖的節點出隊列了
14 p.next = null; // help GC
15 failed = false;
16 return interrupted; // 傳回在排隊過程中線程是否被中斷過
17 }
18 // 到這裡,表明線程處于等待狀态,自旋直到被unpark
19 if (shouldParkAfterFailedAcquire(p, node) &&
20 parkAndCheckInterrupt())
21 interrupted = true;
22 }
23 } finally {
24 if (failed) // 擷取資源失敗,則将節點标記為結束狀态
25 cancelAcquire(node);
26 }
27 }
線上程排隊等待的過程中,有兩個關鍵函數shouldParkAfterFailedAcquire(Node pred, Node node)和parkAndCheckInterrupt()。
1 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
2 int ws = pred.waitStatus; // 前驅節點的狀态
3 if (ws == Node.SIGNAL)
4 // 如果前驅節點正處于被喚醒的狀态,則正常排隊等待即可
5 /*
6 * This node has already set status asking a release
7 * to signal it, so it can safely park.
8 */
9 return true;
10 if (ws > 0) { // 前驅節點處于結束狀态
11 /*
12 * Predecessor was cancelled. Skip over predecessors and
13 * indicate retry.
14 */
15 /*
16 *繼續向下找,一直找到處于正常等待狀态的節點,将目前節點插入其後,其他
17 *無用節點形成一個鍊,會被GC
18 */
19 do {
20 node.prev = pred = pred.prev;
21 } while (pred.waitStatus > 0);
22 pred.next = node;
23 } else {
24 /*
25 * waitStatus must be 0 or PROPAGATE. Indicate that we
26 * need a signal, but don't park yet. Caller will need to
27 * retry to make sure it cannot acquire before parking.
28 */
29 // 前驅節點狀态正常,則把前驅節點的狀态設定為SIGNAL,這樣前驅節點拿到資源後,可通知下目前節點
30 compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
31 }
32 return false;
33 }
分析以上源碼可知:隻有目前驅節點的狀态為SIGNAL時,目前節點才能正常排隊等待,否則需找到一個合适的節點next位置來進行排隊等待。
1 private final boolean parkAndCheckInterrupt() {
2 // 使線程進入waitting狀态
3 LockSupport.park(this);
4 return Thread.interrupted(); // 傳回線程是否被中斷過
5 }
該函數作用:當節點正常進入排隊後,讓線程進入等待狀态。
至此acquireQueued()函數總結完成,該函數的具體執行流程:
#1.首先檢查節點是否可以立即擷取資源。
#2.如果不能立即擷取資源,則進行排隊,這裡需要找到正确的排隊點,直到unpark或interrupt喚醒自己。
#3.喚醒後,判斷自己是否有資格擷取資源,如果拿到資源,則将head指向目前節點,并傳回在等待過程是否被中斷過,如果沒拿到資源,則繼續流程2。
acquire小結
到這裡acquire(int)函數分析結束,這個函數非常重要,這裡再貼上源碼:
1 public final void acquire(int arg) {
2 if (!tryAcquire(arg) &&
3 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
4 selfInterrupt();
5 }
#1.調用子類的tryAcquire直接擷取資源,如果成功則傳回。
#2.如果流程1失敗,則将線程加入等待隊列的隊尾(獨占模式)。
#3.在acquireQueued中排隊,通過自旋擷取資源,直到擷取資源才傳回。如果在排隊過程中線程被中斷過傳回true,否則傳回false。
#4.在排隊過程中被中斷是不響應的,隻有擷取到資源後,才進行自我中斷,補上中斷标記。
整個過程的流程圖如下:
②release(int)獨占模式釋放資源。
1 public final boolean release(int arg) {
2 // 嘗試釋放資源
3 if (tryRelease(arg)) {
4 Node h = head;
5 if (h != null && h.waitStatus != 0)
6 unparkSuccessor(h); // 喚醒隊列中下一個線程
7 return true;
8 }
9 return false;
10 }
釋放鎖的函數很簡單,通過tryRelease嘗試釋放資源,然後喚醒隊列中的其他線程。
tryRelease(int):
1 protected boolean tryRelease(int arg) {
2 throw new UnsupportedOperationException();
3 }
與tryAcquire函數一樣,該方法需要子類去實作,如果直接調用會抛異常。
unparkSuccessor(Node node):
喚醒等待隊列中的下一個線程,這裡喚醒的是等待隊列中最前邊那個未放棄的線程,注意看代碼注釋。
1 private void unparkSuccessor(Node node) {
2 /*
3 * If status is negative (i.e., possibly needing signal) try
4 * to clear in anticipation of signalling. It is OK if this
5 * fails or if status is changed by waiting thread.
6 */
7 int ws = node.waitStatus; // 擷取目前線程的狀态
8 if (ws < 0) // 如果目前線程狀态處于可用狀态,則直接将狀态值置0
9 compareAndSetWaitStatus(node, ws, 0);
10
11 /*
12 * Thread to unpark is held in successor, which is normally
13 * just the next node. But if cancelled or apparently null,
14 * traverse backwards from tail to find the actual
15 * non-cancelled successor.
16 */
17 Node s = node.next; // 下一個節點
18 if (s == null || s.waitStatus > 0) { // 如果節點為null或節點已處于結束狀态
19 s = null;
20 // 從隊列尾向前周遊,找到next可用的節點,狀态小于0就可用,這裡的節點是隊列中最前邊的可用節點
21 for (Node t = tail; t != null && t != node; t = t.prev)
22 if (t.waitStatus <= 0)
23 s = t;
24 }
25 if (s != null)
26 LockSupport.unpark(s.thread);// 喚醒next線程
27 }
獨占模式的主要函數分析完畢,接下來看共享模式。
②acquireShared(int)
共享模式下擷取資源,如果成功則直接傳回,否則進入等待隊列,通過自旋直到擷取資源為止。
1 public final void acquireShared(int arg) {
2 // 共享模式下擷取資源,如果擷取失敗,則進入等待隊列
3 // 同樣該函數需要子類去實作
4 if (tryAcquireShared(arg) < 0)
5 doAcquireShared(arg); // 進入等待隊列直到鎖擷取到為止
6 }
tryAcquireShared(int)函數傳回值,需要注意下:
負數:表示擷取失敗;
0:擷取成功,但沒有剩餘資源;
正數:擷取成功,且有剩餘資源;
#1.doAcquireShared(int)
将線程加入隊列尾,然後通過自旋擷取資源,直到得到資源才傳回。
1 private void doAcquireShared(int arg) {
2 final Node node = addWaiter(Node.SHARED); // 将線程加入隊尾,通過共享模式
3 boolean failed = true;// 是否成功
4 try {
5 boolean interrupted = false; // 在自旋過程中是否被中斷過
6 for (;;) {
7 final Node p = node.predecessor(); // 前驅節點
8 if (p == head) { // 這裡表明目前節點處于head的next位,此時node被喚醒,很可能是head用完來喚醒
9 int r = tryAcquireShared(arg); // 擷取資源
10 if (r >= 0) { // 成功
11 setHeadAndPropagate(node, r);// 将head指向自己,還有剩餘資源可用的話再喚醒之後的線程
12 p.next = null; // help GC 無用鍊,幫助GC
13 if (interrupted) // 如果等待過程中被中斷過,将中斷補上
14 selfInterrupt();
15 failed = false;
16 return;
17 }
18 }
19 // 線程未排在head之後,繼續排隊,進入waiting狀态,等着unpark
20 if (shouldParkAfterFailedAcquire(p, node) &&
21 parkAndCheckInterrupt())
22 interrupted = true; // 中斷标記
23 }
24 } finally {
25 if (failed)
26 cancelAcquire(node);
27 }
28 }
整個流程與獨占模式的acquireQueued很相似,隻是共享模式下,在喚醒自己後,如果還有剩餘資源,需要喚醒後續節點。
setHeadAndPropagate(node, int)
将head節點設定為目前節點,如果還有剩餘資源,則喚醒下一個線程。
1 private void setHeadAndPropagate(Node node, int propagate) {
2 Node h = head; // Record old head for check below
3 setHead(node); // 将隊列中的head執行目前節點
4 /*
5 * Try to signal next queued node if:
6 * Propagation was indicated by caller,
7 * or was recorded (as h.waitStatus either before
8 * or after setHead) by a previous operation
9 * (note: this uses sign-check of waitStatus because
10 * PROPAGATE status may transition to SIGNAL.)
11 * and
12 * The next node is waiting in shared mode,
13 * or we don't know, because it appears null
14 *
15 * The conservatism in both of these checks may cause
16 * unnecessary wake-ups, but only when there are multiple
17 * racing acquires/releases, so most need signals now or soon
18 * anyway.
19 */
20 // 如果還有剩餘資源,則喚醒後續線程
21 if (propagate > 0 || h == null || h.waitStatus < 0 ||
22 (h = head) == null || h.waitStatus < 0) {
23 Node s = node.next;
24 if (s == null || s.isShared())
25 doReleaseShared();
26 }
27 }
這裡除了将head設定成目前線程,如果有剩餘資源,需要喚醒後續節點。
doReleaseShared()
1 private void doReleaseShared() {
2 /*
3 * Ensure that a release propagates, even if there are other
4 * in-progress acquires/releases. This proceeds in the usual
5 * way of trying to unparkSuccessor of head if it needs
6 * signal. But if it does not, status is set to PROPAGATE to
7 * ensure that upon release, propagation continues.
8 * Additionally, we must loop in case a new node is added
9 * while we are doing this. Also, unlike other uses of
10 * unparkSuccessor, we need to know if CAS to reset status
11 * fails, if so rechecking.
12 */
13 // 自旋操作
14 for (;;) {
15 Node h = head;
16 if (h != null && h != tail) {
17 int ws = h.waitStatus;
18 if (ws == Node.SIGNAL) { // 如果head狀态為SIGNAL,則需喚醒後續節點
19 // CAS一下目前節點的狀态,判斷是否為SIGNAL,如果是則置為0,否則繼續循環
20 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
21 continue; // loop to recheck cases
22 unparkSuccessor(h); // 喚醒後繼節點
23 }
24 // 如果head節點狀态為0,且CAS置為傳播狀态失敗,則繼續循環,因為if操作中會改變節點的狀态
25 else if (ws == 0 &&
26 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
27 continue; // loop on failed CAS
28 }
29 if (h == head) // 如果head節點發生了改變,則繼續自旋操作,防止上述操作過程中添加了節點的情況 // loop if head changed
30 break;
31 }
32 }
該方法的作用主要是用于喚醒後續節點。
共享模式擷取鎖操作與獨占模式基本相同:先直接擷取資源,如果成功,直接傳回;如果失敗,則将線程加入等待隊列尾,直到擷取到資源才傳回,整個過程忽略中斷。不同點在于共享模式下自己拿到資源後,還需要喚醒後續節點。
#2.releaseShared(int)
同享模式下釋放資源
1 public final boolean releaseShared(int arg) {
2 if (tryReleaseShared(arg)) { // 嘗試釋放資源
3 doReleaseShared(); // 喚醒後續節點,前面已經分析
4 return true;
5 }
6 return false;
7 }
共享模式釋放資源與獨占模式類似,但是獨占模式下需要完全釋放資源後,才會傳回true,而共享模式沒有這種要求。
總結
這裡隻是對AQS的頂層架構進行了簡要的分析,具體需要深入其子類中去,AQS的子類按模式分類可聚合成以下幾類:
#1.獨占模式:
ReentrantLock:可重入鎖。state=0獨占鎖,或者同一線程可多次擷取鎖(擷取+1,釋放-1)。
Worker(java.util.concurrent.ThreadPoolExecutor類中的内部類)線程池類。shutdown關閉空閑工作線程,中斷worker工作線程是獨占的,互斥的。
#2.共享模式:
Semaphore:信号量。 控制同時有多少個線程可以進入代碼段。(互斥鎖的拓展)
CountDownLatch:倒計時器。 初始化一個值,多線程減少這個值,直到為0,倒計時完畢,執行後續代碼。
#3.獨占+共享模式:
ReentrantReadWriteLock:可重入讀寫鎖。獨占寫+共享讀,即并發讀,互斥寫。
後續對這些類進行詳細分析。
by Shawn Chen,2019.1.29日,下午。