一、什麼是AQS
AQS,在java.util.concurrent.locks包中,AbstractQueuedSynchronizer這個類是并發包中的核心,了解其他類之前,需要先弄清楚AQS。
AQS就是一個同步器,要做的事情就相當于一個鎖,是以就會有兩個動作:一個是擷取,一個是釋放。擷取釋放的時候該有一個東西來記住他是被用還是沒被用,這個東西就是一個狀态。如果鎖被擷取了,也就是被用了,還有很多其他的要來擷取鎖,總不能給全部拒絕了,這時候就需要他們排隊,這裡就需要一個隊列。
AQS的核心思想是:通過一個volatile修飾的int屬性
state
代表同步狀态,例如0是無鎖狀态,1是上鎖狀态。多線程競争資源時,通過CAS的方式來修改state,例如從0修改為1,修改成功的線程即為資源競争成功的線程,将其設為
exclusiveOwnerThread
,也稱【工作線程】,資源競争失敗的線程會被放入一個
FIFO
的隊列中并挂起休眠,當
exclusiveOwnerThread
線程釋放資源後,會從隊列中喚醒線程繼續工作,循環往複。
二、AQS架構
1、基礎架構圖
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiAzNfRHLGZkRGZkRfJ3bs92YsYTMfVmepNHL90TUlZjVzI2bwNDZzQ2MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnL3EzN4ETNwYTM0EDOwEjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
架構還是比較簡單的,除了實作Serializable接口外,就隻繼承了
AbstractOwnableSynchronizer
父類。
AbstractOwnableSynchronizer
父類中維護了一個
exclusiveOwnerThread
屬性,是用來記錄目前同步器資源的獨占線程的,沒有其他東西。
2、AQS的CLH鎖隊列
CLH其實就是一個FIFO的隊列,隻不過稍微做了點改進,AQS有一個内部類
Node
,AQS會将競争鎖失敗的線程封裝成一個Node節點,然後由這些NODE組成了一個連結清單隊列
/**
+------+ prev +-----+ +-----+
head | | <---- | | <---- | | tail
+------+ +-----+ +-----+
/**
static final class Node {
//作為共享模式
static final Node SHARED = new Node();
//作為獨占模式
static final Node EXCLUSIVE = null;
//等待狀态:表示節點中線程是已被取消的
static final int CANCELLED = 1;
//等待狀态:表示目前節點的後繼節點的線程需要被喚醒
static final int SIGNAL = -1;
//等待狀态:表示線程正在等待條件
static final int CONDITION = -2;
//等待狀态:表示下一個共享模式的節點應該無條件的傳播下去
static final int PROPAGATE = -3;
//等待狀态,初始化為0,剩下的狀态就是上面列出的
volatile int waitStatus;
//目前節點的前驅節點
volatile Node prev;
//後繼節點
volatile Node next;
//目前節點的線程
volatile Thread thread;
//
Node nextWaiter;
//是否是共享節點
final boolean isShared() {
return nextWaiter == SHARED;
}
//目前節點的前驅節點
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
waitStatus是以0為臨界值的,大于0代表節點無效,例如AQS在喚醒隊列中的節點時,waitStatus大于0的節點會被跳過
AQS内部還維護了int類型的state變量,代表同步器的狀态。例如,在ReentrantLock中,state就代表鎖的重入次數,每lock一次,state就+1,每unlock一次,state就-1,當state等于0時,代表沒有上鎖。
AQS内部還維護了head和tail屬性,用來指向FIFO隊列中的頭尾節點,被head指向的節點,總是工作線程。線程在擷取到鎖後,是不會出隊的。隻有當head釋放鎖,并将其後繼節點喚醒并設為head後,才會出隊。
3、共享鎖和互斥鎖
QS的CLH隊列鎖中,每個節點代表着一個需要擷取鎖的線程,該node中有兩個常量SHARED共享模式,EXCLUSIVE獨占模式。
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
共享模式允許多個線程可以擷取同一個鎖,獨占模式則一個鎖隻能被一個線程持有,其他線程必須要等待。
三、源碼
1、AQS源碼
//阻塞隊列的隊列頭
private transient volatile Node head;
//隊列尾
private transient volatile Node tail;
//同步狀态,這就是上面提到的需要原子操作的狀态
private volatile int state;
//傳回目前同步器的狀态
protected final int getState() {
return state;
}
//設定同步器的狀态
protected final void setState(int newState) {
state = newState;
}
//原子的設定目前同步器的狀态
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
//
static final long spinForTimeoutThreshold = 1000L;
2、獨占模式的擷取
acquire,獨占,忽略中斷
//獨占模式的擷取方法,會忽略中斷
//tryAcquire方法會被至少調用一次,由子類實作
//如果tryAcquire不能成功,目前線程就會進入隊列排隊
public final void acquire(int arg) {
//首先調用tryAcquire嘗試擷取
//擷取不成功,就使用acquireQueued使線程進入等待隊列
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire方法:
//由子類來實作
//嘗試在獨占模式下擷取,會查詢該對象的狀态是否允許在獨占模式下擷取
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
使用指定的模式建立一個節點,添加到AQS連結清單隊列中:
private Node addWaiter(Node mode) {
//目前線程,指定的mode,共享或者獨占
Node node = new Node(Thread.currentThread(), mode);
//先嘗試使用直接添加進隊列
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//使用添加節點的方法
enq(node);
return node;
}
向隊列中插入節點:
//會插入節點到對列中
private Node enq(final Node node) {
for (;;) {
//尾節點
Node t = tail;
//需要執行個體化一個隊列
if (t == null) { // Must initialize
//使用cas建立頭節點
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
tryAcquire沒有擷取到,就會先使用addWaiter添加進隊列,然後使用acquireQueued從隊列擷取,如果這時候擷取成功,則替換目前節點為隊列頭,然後傳回:
//獨占模式處理正在排隊等待的線程。
//自旋,直至擷取成功才傳回
final boolean acquireQueued(final Node node, int arg) {
//目前擷取是否失敗
boolean failed = true;
try {
//擷取是否被中斷
boolean interrupted = false;
for (;;) {
//擷取目前節點的前驅節點
final Node p = node.predecessor();
//head節點要麼是剛才初始化的節點
//要麼就是成功擷取鎖的節點
//如果目前節點的前驅節點是head,目前節點就應該去嘗試擷取鎖了
//目前節點的前驅節點是頭節點,就嘗試擷取
if (p == head && tryAcquire(arg)) {
//擷取成功的話,就把目前節點設定為頭節點
setHead(node);
//之前的head節點的next引用設為null
p.next = null; // help GC
failed = false;
return interrupted;
}
//檢視目前節點是否應該被park
//如果應該,就park目前線程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//失敗了,取消目前線程
if (failed)
cancelAcquire(node);
}
}
設定頭節點,隻能被擷取方法調用:
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
shouldParkAfterFailedAcquire方法,檢視是否應該被park:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//前驅節點中儲存的等待狀态
int ws = pred.waitStatus;
//等待狀态是signal,也就是目前節點在等着被喚醒
//此時目前節點應該park
if (ws == Node.SIGNAL)
return true;
//等待狀态大于0表示前驅節點已經取消
//會向前找到一個非取消狀态的節點
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//将前驅節點的waitStatus設定為signal,表示目前需要被park
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
看下parkAndCheckInterrupt方法:
//挂起目前線程,并傳回目前中斷狀态
private final boolean parkAndCheckInterrupt() {
//挂起目前線程
LockSupport.park(this);
return Thread.interrupted();
}
cancelAcquire取消目前節點:
private void cancelAcquire(Node node) {
//節點不存在
if (node == null)
return;
//節點的線程引用設為null
node.thread = null;
//前驅節點
Node pred = node.prev;
//大于0表示前驅節點被取消
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
//前驅節點的下一個是需要移除的節點
Node predNext = pred.next;
//設定節點狀态為取消
node.waitStatus = Node.CANCELLED;
//如果是尾節點,直接取消,将前一個節點設定為尾節點
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {//不是尾節點,說明有後繼節點,将前驅節點的next紙箱後繼節點
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
acquireInterruptibly 獨占,可中斷
跟獨占忽略中斷類似,不再解釋。
tryAcquireNanos,獨占,可逾時,可中斷
跟上面類似,但是在doAcquireNanos中會擷取目前時間,并擷取LockSupport.parkNanos之後的時間在做逾時時間的重新計算,到了逾時時間,就傳回false。
3、獨占模式的釋放
release,獨占,忽略中斷
public final boolean release(int arg) {
//嘗試釋放,修改狀态
if (tryRelease(arg)) {
//成功釋放
//head代表初始化的節點,或者是目前占有鎖的節點
//需要unpark後繼節點
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
unparkSuccessor:
private void unparkSuccessor(Node node) {
//頭節點中儲存的waitStatus
int ws = node.waitStatus;
//重置頭節點狀态為0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//後繼節點
Node s = node.next;
//後繼節點為null或者已經取消
if (s == null || s.waitStatus > 0) {
s = null;
//從最後往前找有效的節點
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//unpark
if (s != null)
LockSupport.unpark(s.thread);
}
共享模式的擷取
acquireShared,共享,忽略中斷
acquireSharedInterruptibly,共享,可中斷
tryAcquireSharedNanos,共享,可設定逾時,可中斷
共享模式的釋放
releaseShared
共享模式的和獨占模式基本差不多,和獨占式的acquireQueued方法差別就是在擷取成功的節點後會繼續unpark後繼節點,将共享狀态向後傳播。
LockSupport
用來建立鎖和其他同步類的基本線程阻塞原語。每個使用LockSupport的線程都會與一個許可關聯,如果該許可可用并且可在程序中使用,則調用park()将會立即傳回,否則可能阻塞。如果許可不可用,可調用unpark使其可用。
許可不可重入,隻能調用一次park()方法,否則會一直阻塞。
park()和unpark()作用分别是阻塞線程和解除阻塞線程,且park和unpark不會遇到suspend和resume可能引發的死鎖問題。
park,如果許可可用,使用該許可,并且該調用立即傳回;否則為線程排程禁用目前線程,并在發生以下三種情況之一之前,使其處于休眠狀态:
* 其他某個線程将目前線程作為目标調用unpark
* 其他某個線程中斷目前線程
* 該調用不合邏輯的傳回
unpark,如果給定的線程尚不可用,則使其可用。如果線程在park上受阻塞,則它将解除其阻塞狀态。否則,保證下一次調用park不受阻塞。如果給定線程尚未啟動,則無法保證此操作有任何效果。
四、注意點
1.工作線程什麼時候出隊?
當
FIFO
隊列中的一個節點競争到資源時,它并不是就馬上出隊了,而是将
head
指向自己。節點釋放鎖後依然不會主動出隊,而是等待下一個節點競争鎖成功後修改
head
的指向,将前任head踢出去。
2.AQS喚醒隊列的規則是什麼?
當
head
指向的節點成功釋放資源後,首先會判斷目前節點的
waitStatus
是否等于0,如果等于0就不會去喚醒後繼節點了,這也就是為什麼新的節點入隊休眠的前提是必須将前驅節點的
waitStatus
改為
SIGNAL(-1)
的原因,如果不改,後繼節點将不會被喚醒,就會導緻死鎖。
AQS首先會喚醒目前節點的直接後繼節點
next
,如果next為null,有兩種情況:
- 确實沒有後繼節點了,之前next指向的節點可能由于逾時等原因退出競争了。
- 存在後繼節點,隻是由于多線程的原因,後繼節點還沒來得及将目前節點的
指向它。next
第一種情況好辦,後繼節點為null,不喚醒就是了。
第二種情況就需要從tail向head尋找了,找到了有效節點再喚醒。
如果存在直接後繼節點,但是節點的
waitStatus
大于0,AQS也是會選擇跳過它的。前面已經說過,
waitStatus
大于0的節點代表無效節點,如
CANCELLED(1)
是已經取消競争的節點。如果直接後繼節點是無效節點的話,AQS會從tail開始向head周遊,直到找到有效節點,再将其喚醒。
總結:存在直接後繼節點且節點有效,則優先喚醒後繼節點。否則,從tail向head周遊,直到找到有效節點再喚醒。
3.喚醒節點為什麼要從尾巴開始?
這是因為,新節點入隊時,需要執行三個步驟:
- 目前節點的prev指向前任tail
- CAS将tail指向目前節點
- 前任tail的next指向目前節點
這三個操作AQS并沒有做同步處理,如果在執行步驟2後CPU時間片到期了,此時的節點指向是這樣的:
前驅節點的next還沒有指派,如果從頭向尾找,就可能會存在漏喚醒的問題。
而prev的指派先于tail的CAS操作之前執行,是以從尾向頭找,就可以避免這個問題。