1、前言
Java并發程式設計核心在于java.util.concurrent包, 而juc當中的大多數同步器實作都是圍繞着共同的基礎行為,比如等待隊列、條件隊列、獨占擷取、共享擷取等,而這個行為的抽象就是基于AbstractQueuedSynchronizer簡稱AQS,AQS定義了一套多線程通路共享資源的同步器架構,是一個依賴狀态(state)的同步器.
AQS具備的特性
- 阻塞等待隊列
- 共享/獨占
- 公平/非公平
- 可重入
- 允許中斷
juc包中的同步器的實作例如Latch,Barrier,BlockingQuene等,都是基于AQS架構實作
1、AQS内部維護屬性 volatile int state (32位)
- state表示資源的可用狀态
2、state三種通路方式
getState()、setState()、compareAndSetState()
3、AQS定義兩種資源共享方式
- Exclusive-獨占,隻有一個線程能執行,如ReentrantLock
- Share-共享,多個線程可以同時執行,如Semaphore/CountDownLatch
4、AQS定義兩種隊列
- 同步等待隊列
- 條件等待隊列
不同的自定義同步器争用共享資源的方式也不同。自定義同步器在實作時隻需要實作共享資源state的擷取與釋放方式即可,至于具體線程等待隊列的維護(如擷取資源失敗入隊/喚醒出隊等),AQS已經在頂層實作好了。自定義同步器實作時主要實作以下幾種方法:
- isHeldExclusively():該線程是否正在獨占資源。隻有用到condition才需要去實作它。
- tryAcquire(int):獨占方式。嘗試擷取資源,成功則傳回true,失敗則傳回false。
- tryRelease(int):獨占方式。嘗試釋放資源,成功則傳回true,失敗則傳回false。
- tryAcquireShared(int):共享方式。嘗試擷取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
- tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放後允許喚醒後續等待結點傳回true,否則傳回false。
說一下AQS中定義的兩種隊列
同步等待隊列:隻有一個線程搶占到鎖運作,其他線程在依次排隊等待
AQS當中的同步等待隊列也稱CLH隊列,CLH隊列是Craig、Landin、Hagersten三人發明的一種基于雙向連結清單資料結構的隊列,是FIFO先入先出線程等待隊列,Java中的CLH隊列是原CLH隊列的一個變種,線程由原自旋機制改為阻塞機制。
(紅色:表示線程在排隊等待 )
條件等待隊列:Condition是一個多線程間協調通信的工具類,使得某個,或者某些線程一起等待某個條件(Condition),隻有當該條件具備時,這些等待線程才會被喚醒,進而重新争奪鎖
(紅色:表示線程在排隊等待 )
2、AQS源碼分析
打開AQS源碼,可以看到一個靜态内部類Node,不管是等待隊列還是同步隊列,都是基于Node實作的
1、Node内部定義的常量
- 表示節點的模式SHARED/EXCLUSIVE
- 表示節點在隊列中變換的狀态值
- 定義兩種隊列
//标記節點是共享模式
static final Node SHARED = new Node();
//标記節點是獨占模式
static final Node EXCLUSIVE = null;
//在同步隊列中等待的線程等待逾時或者被中斷,需要從同步隊列中取消等待
static final int CANCELLED = 1;
//後繼節點的線程處于等待狀态,而目前的節點如果釋放了同步狀态或者被取消,
//将會通知後繼節點,使後繼節點的線程得以運作。
static final int SIGNAL = -1;
//節點在等待隊列中,節點的線程等待在Condition上,
//當其他線程對Condition調用了signal()方法後,
//該節點會從等待隊列中轉移到同步隊列中,加入到同步狀态的擷取中
static final int CONDITION = -2;
// 表示下一次共享式同步狀态擷取将會被無條件地傳播下去
static final int PROPAGATE = -3;
//标記目前節點的信号量狀态 (1,0,-1,-2,-3)5種狀态
//使用CAS更改狀态,volatile保證線程可見性,高并發場景下,
//即被一個線程修改後,狀态會立馬讓其他線程可見
volatile int waitStatus;
// 前驅節點,目前節點加入到同步隊列中被設定
volatile Node prev;
// 後繼結點
volatile Node next;
//節點同步狀态的線程
volatile Thread thread;
// 節點同步狀态的線程
Node nextWaiter;
// 傳回下一節點是否是共享
final boolean isShared() {
return nextWaiter == SHARED;
}
// 傳回前驅節點
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
//空節點,用于标記共享模式
Node() { // Used to establish initial head or SHARED marker
}
//用于同步隊列CLH
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
//用于條件隊列
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
看完這些變量,我麼可以大緻判斷每個節點(線程)在隊列中的狀态變換表示
2、同步等待隊列的一些核心方法
- tryAcquire() :嘗試擷取資源,成功就傳回,就是一個嘗試加鎖方法
- addWaiter() :将目前線程加到等待隊列的隊尾
- acquireQueued():讓線程阻塞在等待隊列裡面擷取資源,直到擷取到資源才傳回,如果在等待的過程被中斷,就傳回false,否則傳回true。
2.1、tryAcquire():就是一個嘗試加鎖方法
這個方法AQS隻定義了一個接口,都交給我們具體的實作類(ReentrantLock...)來實作。
2.2、addWaiter():将目前線程加到等待隊列隊尾
private Node addWaiter(Node mode) {
// 1. 将目前線程建構成Node類型
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 2. 1目前尾節點是否為null?
if (pred != null) {
// 2.2 将目前節點尾插入的方式
node.prev = pred;
// 2.3 通過CAS将節點插入同步隊列的尾部
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果存在競争 搶占失敗,就通過自旋
enq(node);
return node;
}
enq():搶占失敗,通過自旋加入到隊列尾部
private Node enq(final Node node) {
// 自旋
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
//隊列為空需要初始化,建立空的頭節點
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
//set尾部節點
if (compareAndSetTail(t, node)) {//目前節點置為尾部
t.next = node; //前驅節點的next指針指向目前節點
return t;
}
}
}
}
2.3、acquireQueued():讓線程阻塞在等待隊列裡面擷取資源,直到擷取到資源才傳回
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//死循環
for (;;) {
//找到目前結點的前驅結點
final Node p = node.predecessor();
//如果前驅結點是頭結點,才tryAcquire,其他結點是沒有機會tryAcquire的。
if (p == head && tryAcquire(arg)) {
//擷取同步狀态成功,将目前結點設定為頭結點。
setHead(node);
// help GC
p.next = null;
failed = false;
return interrupted;
}
//如果前驅節點不是Head,通過shouldParkAfterFailedAcquire
// 判斷是否應該阻塞
// 前驅節點信号量為-1,目前線程可以安全被parkAndCheckInterrupt用來阻塞線程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
// 終結掉正在擷取鎖的線程
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire():判斷目前節點能否被安全挂起park
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 若前驅結點的狀态是SIGNAL,意味着目前結點可以被安全地park
return true;
if (ws > 0) {
// 前驅節點狀态如果被取消狀态,将被移除出隊列
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//将其設定為SIGNAL狀态,然後目前結點才可以可以被安全地park
// 目前驅節點waitStatus為 0 or PROPAGATE狀态時
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt():阻塞目前節點,傳回目前Thread的中斷狀态
// LockSupport.park
// 底層實作邏輯調用系統核心功能 pthread_mutex_lock 阻塞線程
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//阻塞
return Thread.interrupted();
}
3、AQS主要實作源碼解析
ReentrantLock:同一時間隻有一個線程擷取到鎖
lock():
公平鎖:
- 調用tryAcquire()搶占鎖,成功設定目前線程為獨占模式(還有可重入變量)
- 搶占失敗,調用addWaiter(),acquireQueued()将目前節點加入到等待隊列等待擷取資源
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 得到同步資源的狀态
int c = getState();
// 沒有被占用
if (c == 0) {
// 如果目前節點沒有前驅節點
// 且CAS成功 設定目前線程獨占資源
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 判斷是目前線程
else if (current == getExclusiveOwnerThread()) {
// 可重入鎖 state加一
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
非公平鎖:
CAS操作搶占鎖,搶占成功,設定目前線程
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
unlock():
調用tryRelease()釋放鎖
protected final boolean tryRelease(int releases) {
// 可重入次數減一
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
// 設定搶占線程為null,
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
總結:公平和非公平鎖的差別就在于,公平鎖會根據自己的前驅節點的狀态,隻有前驅節點為空,才會去搶占鎖(也就是依次搶占),就是說根據加入到等待隊列中的順序搶占鎖,非公平鎖模式下,目前線程和等待隊列中的線程一起搶占鎖,各憑本事