天天看點

java并發程式設計核心:AQS原理及API從ReentrantLock的實作看AQS的原理及應用

從ReentrantLock的實作看AQS的原理及應用

前言

Java中的大部分同步類(Lock、Semaphore、ReentrantLock等)都是基于AbstractQueuedSynchronizer(簡稱為AQS)實作的。AQS是一種提供了原子式管理同步狀态、阻塞和喚醒線程功能以及隊列模型的簡單架構。本文會從應用層逐漸深入到原理層,并通過ReentrantLock的基本特性和ReentrantLock與AQS的關聯,來深入解讀AQS相關獨占鎖的知識點,同時采取問答的模式來幫助大家了解AQS。由于篇幅原因,本篇文章主要闡述AQS中獨占鎖的邏輯和Sync Queue,不講述包含共享鎖和Condition Queue的部分(本篇文章核心為AQS原理剖析,隻是簡單介紹了ReentrantLock,感興趣同學可以閱讀一下ReentrantLock的源碼)。

下面列出本篇文章的大綱和思路,以便于大家更好地了解:

java并發程式設計核心:AQS原理及API從ReentrantLock的實作看AQS的原理及應用

1 ReentrantLock

1.1 ReentrantLock特性概覽

ReentrantLock意思為可重入鎖,指的是一個線程能夠對一個臨界資源重複加鎖。為了幫助大家更好地了解ReentrantLock的特性,我們先将ReentrantLock跟常用的Synchronized進行比較,其特性如下(藍色部分為本篇文章主要剖析的點):

java并發程式設計核心:AQS原理及API從ReentrantLock的實作看AQS的原理及應用

下面通過僞代碼,進行更加直覺的比較:

// **************************Synchronized的使用方式**************************
// 1.用于代碼塊
synchronized (this) {}
// 2.用于對象
synchronized (object) {}
// 3.用于方法
public synchronized void test () {}
// 4.可重入
for (int i = 0; i < 100; i++) {
	synchronized (this) {}
}
// **************************ReentrantLock的使用方式**************************
public void test () throw Exception {
	// 1.初始化選擇公平鎖、非公平鎖
	ReentrantLock lock = new ReentrantLock(true);
	// 2.可用于代碼塊
	lock.lock();
	try {
		try {
			// 3.支援多種加鎖方式,比較靈活; 具有可重入特性
			if(lock.tryLock(100, TimeUnit.MILLISECONDS)){ }
		} finally {
			// 4.手動釋放鎖
			lock.unlock()
		}
	} finally {
		lock.unlock();
	}
}
           

1.2 ReentrantLock與AQS的關聯

通過上文我們已經了解,ReentrantLock支援公平鎖和非公平鎖(關于公平鎖和非公平鎖的原理分析,可參考《不可不說的Java“鎖”事》),并且ReentrantLock的底層就是由AQS來實作的。那麼ReentrantLock是如何通過公平鎖和非公平鎖與AQS關聯起來呢? 我們着重從這兩者的加鎖過程來了解一下它們與AQS之間的關系(加鎖過程中與AQS的關聯比較明顯,解鎖流程後續會介紹)。

非公平鎖源碼中的加鎖流程如下:

// java.util.concurrent.locks.ReentrantLock#NonfairSync

// 非公平鎖
static final class NonfairSync extends Sync {
	...
	final void lock() {
		if (compareAndSetState(0, 1))
			setExclusiveOwnerThread(Thread.currentThread());
		else
			acquire(1);
		}
  ...
}
           

這塊代碼的含義為:

  • 若通過CAS設定變量State(同步狀态)成功,也就是擷取鎖成功,則将目前線程設定為獨占線程。
  • 若通過CAS設定變量State(同步狀态)失敗,也就是擷取鎖失敗,則進入Acquire方法進行後續處理。

第一步很好了解,但第二步擷取鎖失敗後,後續的處理政策是怎麼樣的呢?這塊可能會有以下思考:

  • 某個線程擷取鎖失敗的後續流程是什麼呢?有以下兩種可能:

(1) 将目前線程獲鎖結果設定為失敗,擷取鎖流程結束。這種設計會極大降低系統的并發度,并不滿足我們實際的需求。是以就需要下面這種流程,也就是AQS架構的處理流程。

(2) 存在某種排隊等候機制,線程繼續等待,仍然保留擷取鎖的可能,擷取鎖流程仍在繼續。

  • 對于問題1的第二種情況,既然說到了排隊等候機制,那麼就一定會有某種隊列形成,這樣的隊列是什麼資料結構呢?
  • 處于排隊等候機制中的線程,什麼時候可以有機會擷取鎖呢?
  • 如果處于排隊等候機制中的線程一直無法擷取鎖,還是需要一直等待嗎,還是有别的政策來解決這一問題?

帶着非公平鎖的這些問題,再看下公平鎖源碼中獲鎖的方式:

// java.util.concurrent.locks.ReentrantLock#FairSync

static final class FairSync extends Sync {
  ...  
	final void lock() {
		acquire(1);
	}
  ...
}
           

看到這塊代碼,我們可能會存在這種疑問:Lock函數通過Acquire方法進行加鎖,但是具體是如何加鎖的呢?

結合公平鎖和非公平鎖的加鎖流程,雖然流程上有一定的不同,但是都調用了Acquire方法,而Acquire方法是FairSync和UnfairSync的父類AQS中的核心方法。

對于上邊提到的問題,其實在ReentrantLock類源碼中都無法解答,而這些問題的答案,都是位于Acquire方法所在的類AbstractQueuedSynchronizer中,也就是本文的核心——AQS。下面我們會對AQS以及ReentrantLock和AQS的關聯做詳細介紹(相關問題答案會在2.3.5小節中解答)。

2 AQS

首先,我們通過下面的架構圖來整體了解一下AQS架構:

java并發程式設計核心:AQS原理及API從ReentrantLock的實作看AQS的原理及應用
  • 上圖中有顔色的為Method,無顔色的為Attribution。
  • 總的來說,AQS架構共分為五層,自上而下由淺入深,從AQS對外暴露的API到底層基礎資料。
  • 當有自定義同步器接入時,隻需重寫第一層所需要的部分方法即可,不需要關注底層具體的實作流程。當自定義同步器進行加鎖或者解鎖操作時,先經過第一層的API進入AQS内部方法,然後經過第二層進行鎖的擷取,接着對于擷取鎖失敗的流程,進入第三層和第四層的等待隊列處理,而這些處理方式均依賴于第五層的基礎資料提供層。

下面我們會從整體到細節,從流程到方法逐一剖析AQS架構,主要分析過程如下:

java并發程式設計核心:AQS原理及API從ReentrantLock的實作看AQS的原理及應用

2.1 原理概覽

AQS核心思想是,如果被請求的共享資源空閑,那麼就将目前請求資源的線程設定為有效的工作線程,将共享資源設定為鎖定狀态;如果共享資源被占用,就需要一定的阻塞等待喚醒機制來保證鎖配置設定。這個機制主要用的是CLH隊列的變體實作的,将暫時擷取不到鎖的線程加入到隊列中。

CLH:Craig、Landin and Hagersten隊列,是單向連結清單,AQS中的隊列是CLH變體的虛拟雙向隊列(FIFO),AQS是通過将每條請求共享資源的線程封裝成一個節點來實作鎖的配置設定。

主要原理圖如下:

java并發程式設計核心:AQS原理及API從ReentrantLock的實作看AQS的原理及應用

AQS使用一個Volatile的int類型的成員變量來表示同步狀态,通過内置的FIFO隊列來完成資源擷取的排隊工作,通過CAS完成對State值的修改。

2.1.1 AQS資料結構

先來看下AQS中最基本的資料結構——Node,Node即為上面CLH變體隊列中的節點。

java并發程式設計核心:AQS原理及API從ReentrantLock的實作看AQS的原理及應用

解釋一下幾個方法和屬性值的含義:

方法和屬性值 含義
waitStatus 目前節點在隊列中的狀态
thread 表示處于該節點的線程
prev 前驅指針
predecessor 傳回前驅節點,沒有的話抛出npe
nextWaiter 指向下一個處于CONDITION狀态的節點(由于本篇文章不講述Condition Queue隊列,這個指針不多介紹)
next 後繼指針

線程兩種鎖的模式:

模式 含義
SHARED 表示線程以共享的模式等待鎖
EXCLUSIVE 表示線程正在以獨占的方式等待鎖

waitStatus有下面幾個枚舉值:

枚舉 含義
當一個Node被初始化的時候的預設值
CANCELLED 為1,表示線程擷取鎖的請求已經取消了
CONDITION 為-2,表示節點在等待隊列中,節點線程等待喚醒
PROPAGATE 為-3,目前線程處在SHARED情況下,該字段才會使用
SIGNAL 為-1,表示線程已經準備好了,就等資源釋放了

2.1.2 同步狀态State

在了解資料結構後,接下來了解一下AQS的同步狀态——State。AQS中維護了一個名為state的字段,意為同步狀态,是由Volatile修飾的,用于展示目前臨界資源的獲鎖情況。

// java.util.concurrent.locks.AbstractQueuedSynchronizer

private volatile int state;
           

下面提供了幾個通路這個字段的方法:

方法名 描述
protected final int getState() 擷取State的值
protected final void setState(int newState) 設定State的值
protected final boolean compareAndSetState(int expect, int update) 使用CAS方式更新State

這幾個方法都是Final修飾的,說明子類中無法重寫它們。我們可以通過修改State字段表示的同步狀态來實作多線程的獨占模式和共享模式(加鎖過程)。

java并發程式設計核心:AQS原理及API從ReentrantLock的實作看AQS的原理及應用
java并發程式設計核心:AQS原理及API從ReentrantLock的實作看AQS的原理及應用

對于我們自定義的同步工具,需要自定義擷取同步狀态和釋放狀态的方式,也就是AQS架構圖中的第一層:API層。

2.2 AQS重要方法與ReentrantLock的關聯

從架構圖中可以得知,AQS提供了大量用于自定義同步器實作的Protected方法。自定義同步器實作的相關方法也隻是為了通過修改State字段來實作多線程的獨占模式或者共享模式。自定義同步器需要實作以下方法(ReentrantLock需要實作的方法如下,并不是全部):

方法名 描述
protected boolean isHeldExclusively() 該線程是否正在獨占資源。隻有用到Condition才需要去實作它。
protected boolean tryAcquire(int arg) 獨占方式。arg為擷取鎖的次數,嘗試擷取資源,成功則傳回True,失敗則傳回False。
protected boolean tryRelease(int arg) 獨占方式。arg為釋放鎖的次數,嘗試釋放資源,成功則傳回True,失敗則傳回False。
protected int tryAcquireShared(int arg) 共享方式。arg為擷取鎖的次數,嘗試擷取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
protected boolean tryReleaseShared(int arg) 共享方式。arg為釋放鎖的次數,嘗試釋放資源,如果釋放後允許喚醒後續等待結點傳回True,否則傳回False。

一般來說,自定義同步器要麼是獨占方式,要麼是共享方式,它們也隻需實作tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一種即可。AQS也支援自定義同步器同時實作獨占和共享兩種方式,如ReentrantReadWriteLock。ReentrantLock是獨占鎖,是以實作了tryAcquire-tryRelease。

以非公平鎖為例,這裡主要闡述一下非公平鎖與AQS之間方法的關聯之處,具體每一處核心方法的作用會在文章後面詳細進行闡述。

java并發程式設計核心:AQS原理及API從ReentrantLock的實作看AQS的原理及應用

為了幫助大家了解ReentrantLock和AQS之間方法的互動過程,以非公平鎖為例,我們将加鎖和解鎖的互動流程單獨拎出來強調一下,以便于對後續内容的了解。

java并發程式設計核心:AQS原理及API從ReentrantLock的實作看AQS的原理及應用

加鎖:

  • 通過ReentrantLock的加鎖方法Lock進行加鎖操作。
  • 會調用到内部類Sync的Lock方法,由于Sync#lock是抽象方法,根據ReentrantLock初始化選擇的公平鎖和非公平鎖,執行相關内部類的Lock方法,本質上都會執行AQS的Acquire方法。
  • AQS的Acquire方法會執行tryAcquire方法,但是由于tryAcquire需要自定義同步器實作,是以執行了ReentrantLock中的tryAcquire方法,由于ReentrantLock是通過公平鎖和非公平鎖内部類實作的tryAcquire方法,是以會根據鎖類型不同,執行不同的tryAcquire。
  • tryAcquire是擷取鎖邏輯,擷取失敗後,會執行架構AQS的後續邏輯,跟ReentrantLock自定義同步器無關。

解鎖:

  • 通過ReentrantLock的解鎖方法Unlock進行解鎖。
  • Unlock會調用内部類Sync的Release方法,該方法繼承于AQS。
  • Release中會調用tryRelease方法,tryRelease需要自定義同步器實作,tryRelease隻在ReentrantLock中的Sync實作,是以可以看出,釋放鎖的過程,并不區分是否為公平鎖。
  • 釋放成功後,所有處理由AQS架構完成,與自定義同步器無關。

通過上面的描述,大概可以總結出ReentrantLock加鎖解鎖時API層核心方法的映射關系。

java并發程式設計核心:AQS原理及API從ReentrantLock的實作看AQS的原理及應用

2.3 通過ReentrantLock了解AQS

ReentrantLock中公平鎖和非公平鎖在底層是相同的,這裡以非公平鎖為例進行分析。

在非公平鎖中,有一段這樣的代碼:

// java.util.concurrent.locks.ReentrantLock

static final class NonfairSync extends Sync {
	...
	final void lock() {
		if (compareAndSetState(0, 1))
			setExclusiveOwnerThread(Thread.currentThread());
		else
			acquire(1);
	}
  ...
}
           

看一下這個Acquire是怎麼寫的:

// java.util.concurrent.locks.AbstractQueuedSynchronizer

public final void acquire(int arg) {
	if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
		selfInterrupt();
}
           

再看一下tryAcquire方法:

// java.util.concurrent.locks.AbstractQueuedSynchronizer

protected boolean tryAcquire(int arg) {
	throw new UnsupportedOperationException();
}
           

可以看出,這裡隻是AQS的簡單實作,具體擷取鎖的實作方法是由各自的公平鎖和非公平鎖單獨實作的(以ReentrantLock為例)。如果該方法傳回了True,則說明目前線程擷取鎖成功,就不用往後執行了;如果擷取失敗,就需要加入到等待隊列中。下面會詳細解釋線程是何時以及怎樣被加入進等待隊列中的。

2.3.1 線程加入等待隊列

2.3.1.1 加入隊列的時機

當執行Acquire(1)時,會通過tryAcquire擷取鎖。在這種情況下,如果擷取鎖失敗,就會調用addWaiter加入到等待隊列中去。

2.3.1.2 如何加入隊列

擷取鎖失敗後,會執行addWaiter(Node.EXCLUSIVE)加入等待隊列,具體實作方法如下:

// java.util.concurrent.locks.AbstractQueuedSynchronizer

private Node addWaiter(Node mode) {
	Node node = new Node(Thread.currentThread(), mode);
	// Try the fast path of enq; backup to full enq on failure
	Node pred = tail;
	if (pred != null) {
		node.prev = pred;
		if (compareAndSetTail(pred, node)) {
			pred.next = node;
			return node;
		}
	}
	enq(node);
	return node;
}
private final boolean compareAndSetTail(Node expect, Node update) {
	return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
           

主要的流程如下:

  • 通過目前的線程和鎖模式建立一個節點。
  • Pred指針指向尾節點Tail。
  • 将New中Node的Prev指針指向Pred。
  • 通過compareAndSetTail方法,完成尾節點的設定。這個方法主要是對tailOffset和Expect進行比較,如果tailOffset的Node和Expect的Node位址是相同的,那麼設定Tail的值為Update的值。
// java.util.concurrent.locks.AbstractQueuedSynchronizer

static {
	try {
		stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
		headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
		tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
		waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));
		nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));
	} catch (Exception ex) { 
    throw new Error(ex); 
  }
}
           

從AQS的靜态代碼塊可以看出,都是擷取一個對象的屬性相對于該對象在記憶體當中的偏移量,這樣我們就可以根據這個偏移量在對象記憶體當中找到這個屬性。tailOffset指的是tail對應的偏移量,是以這個時候會将new出來的Node置為目前隊列的尾節點。同時,由于是雙向連結清單,也需要将前一個節點指向尾節點。

  • 如果Pred指針是Null(說明等待隊列中沒有元素),或者目前Pred指針和Tail指向的位置不同(說明被别的線程已經修改),就需要看一下Enq的方法。
// java.util.concurrent.locks.AbstractQueuedSynchronizer

private Node enq(final Node node) {
	for (;;) {
		Node t = tail;
		if (t == null) { // Must initialize
			if (compareAndSetHead(new Node()))
				tail = head;
		} else {
			node.prev = t;
			if (compareAndSetTail(t, node)) {
				t.next = node;
				return t;
			}
		}
	}
}
           

如果沒有被初始化,需要進行初始化一個頭結點出來。但請注意,初始化的頭結點并不是目前線程節點,而是調用了無參構造函數的節點。如果經曆了初始化或者并發導緻隊列中有元素,則與之前的方法相同。其實,addWaiter就是一個在雙端連結清單添加尾節點的操作,需要注意的是,雙端連結清單的頭結點是一個無參構造函數的頭結點。

總結一下,線程擷取鎖的時候,過程大體如下:

  1. 當沒有線程擷取到鎖時,線程1擷取鎖成功。
  2. 線程2申請鎖,但是鎖被線程1占有。
java并發程式設計核心:AQS原理及API從ReentrantLock的實作看AQS的原理及應用
  1. 如果再有線程要擷取鎖,依次在隊列中往後排隊即可。

回到上邊的代碼,hasQueuedPredecessors是公平鎖加鎖時判斷等待隊列中是否存在有效節點的方法。如果傳回False,說明目前線程可以争取共享資源;如果傳回True,說明隊列中存在有效節點,目前線程必須加入到等待隊列中。

// java.util.concurrent.locks.ReentrantLock

public final boolean hasQueuedPredecessors() {
	// The correctness of this depends on head being initialized
	// before tail and on head.next being accurate if the current
	// thread is first in queue.
	Node t = tail; // Read fields in reverse initialization order
	Node h = head;
	Node s;
	return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}
           

看到這裡,我們了解一下h != t && ((s = h.next) == null || s.thread != Thread.currentThread());為什麼要判斷的頭結點的下一個節點?第一個節點儲存的資料是什麼?

雙向連結清單中,第一個節點為虛節點,其實并不存儲任何資訊,隻是占位。真正的第一個有資料的節點,是在第二個節點開始的。當h != t時: 如果(s = h.next) == null,等待隊列正在有線程進行初始化,但隻是進行到了Tail指向Head,沒有将Head指向Tail,此時隊列中有元素,需要傳回True(這塊具體見下邊代碼分析)。 如果(s = h.next) != null,說明此時隊列中至少有一個有效節點。如果此時s.thread == Thread.currentThread(),說明等待隊列的第一個有效節點中的線程與目前線程相同,那麼目前線程是可以擷取資源的;如果s.thread != Thread.currentThread(),說明等待隊列的第一個有效節點線程與目前線程不同,目前線程必須加入進等待隊列。
// java.util.concurrent.locks.AbstractQueuedSynchronizer#enq

if (t == null) { // Must initialize
	if (compareAndSetHead(new Node()))
		tail = head;
} else {
	node.prev = t;
	if (compareAndSetTail(t, node)) {
		t.next = node;
		return t;
	}
}
           

節點入隊不是原子操作,是以會出現短暫的head != tail,此時Tail指向最後一個節點,而且Tail指向Head。如果Head沒有指向Tail(可見5、6、7行),這種情況下也需要将相關線程加入隊列中。是以這塊代碼是為了解決極端情況下的并發問題。

2.3.1.3 等待隊列中線程出隊列時機

回到最初的源碼:

// java.util.concurrent.locks.AbstractQueuedSynchronizer

public final void acquire(int arg) {
	if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
		selfInterrupt();
}
           

上文解釋了addWaiter方法,這個方法其實就是把對應的線程以Node的資料結構形式加入到雙端隊列裡,傳回的是一個包含該線程的Node。而這個Node會作為參數,進入到acquireQueued方法中。acquireQueued方法可以對排隊中的線程進行“獲鎖”操作。

總的來說,一個線程擷取鎖失敗了,被放入等待隊列,acquireQueued會把放入隊列中的線程不斷去擷取鎖,直到擷取成功或者不再需要擷取(中斷)。

下面我們從“何時出隊列?”和“如何出隊列?”兩個方向來分析一下acquireQueued源碼:

// java.util.concurrent.locks.AbstractQueuedSynchronizer

final boolean acquireQueued(final Node node, int arg) {
	// 标記是否成功拿到資源
	boolean failed = true;
	try {
		// 标記等待過程中是否中斷過
		boolean interrupted = false;
		// 開始自旋,要麼擷取鎖,要麼中斷
		for (;;) {
			// 擷取目前節點的前驅節點
			final Node p = node.predecessor();
			// 如果p是頭結點,說明目前節點在真實資料隊列的首部,就嘗試擷取鎖(别忘了頭結點是虛節點)
			if (p == head && tryAcquire(arg)) {
				// 擷取鎖成功,頭指針移動到目前node
				setHead(node);
				p.next = null; // help GC
				failed = false;
				return interrupted;
			}
			// 說明p為頭節點且目前沒有擷取到鎖(可能是非公平鎖被搶占了)或者是p不為頭結點,這個時候就要判斷目前node是否要被阻塞(被阻塞條件:前驅節點的waitStatus為-1),防止無限循環浪費資源。具體兩個方法下面細細分析
			if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
				interrupted = true;
		}
	} finally {
		if (failed)
			cancelAcquire(node);
	}
}
           

注:setHead方法是把目前節點置為虛節點,但并沒有修改waitStatus,因為它是一直需要用的資料。

// java.util.concurrent.locks.AbstractQueuedSynchronizer

private void setHead(Node node) {
	head = node;
	node.thread = null;
	node.prev = null;
}

// java.util.concurrent.locks.AbstractQueuedSynchronizer

// 靠前驅節點判斷目前線程是否應該被阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
	// 擷取頭結點的節點狀态
	int ws = pred.waitStatus;
	// 說明頭結點處于喚醒狀态
	if (ws == Node.SIGNAL)
		return true; 
	// 通過枚舉值我們知道waitStatus>0是取消狀态
	if (ws > 0) {
		do {
			// 循環向前查找取消節點,把取消節點從隊列中剔除
			node.prev = pred = pred.prev;
		} while (pred.waitStatus > 0);
		pred.next = node;
	} else {
		// 設定前任節點等待狀态為SIGNAL
		compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
	}
	return false;
}
           

parkAndCheckInterrupt主要用于挂起目前線程,阻塞調用棧,傳回目前線程的中斷狀态。

// java.util.concurrent.locks.AbstractQueuedSynchronizer

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}
           

上述方法的流程圖如下:

java并發程式設計核心:AQS原理及API從ReentrantLock的實作看AQS的原理及應用

從上圖可以看出,跳出目前循環的條件是當“前置節點是頭結點,且目前線程擷取鎖成功”。為了防止因死循環導緻CPU資源被浪費,我們會判斷前置節點的狀态來決定是否要将目前線程挂起,具體挂起流程用流程圖表示如下(shouldParkAfterFailedAcquire流程):

java并發程式設計核心:AQS原理及API從ReentrantLock的實作看AQS的原理及應用

從隊列中釋放節點的疑慮打消了,那麼又有新問題了:

  • shouldParkAfterFailedAcquire中取消節點是怎麼生成的呢?什麼時候會把一個節點的waitStatus設定為-1?
  • 是在什麼時間釋放節點通知到被挂起的線程呢?

2.3.2 CANCELLED狀态節點生成

acquireQueued方法中的Finally代碼:

// java.util.concurrent.locks.AbstractQueuedSynchronizer

final boolean acquireQueued(final Node node, int arg) {
	boolean failed = true;
	try {
    ...
		for (;;) {
			final Node p = node.predecessor();
			if (p == head && tryAcquire(arg)) {
				...
				failed = false;
        ...
			}
			...
	} finally {
		if (failed)
			cancelAcquire(node);
		}
}
           

通過cancelAcquire方法,将Node的狀态标記為CANCELLED。接下來,我們逐行來分析這個方法的原理:

// java.util.concurrent.locks.AbstractQueuedSynchronizer

private void cancelAcquire(Node node) {
  // 将無效節點過濾
	if (node == null)
		return;
  // 設定該節點不關聯任何線程,也就是虛節點
	node.thread = null;
	Node pred = node.prev;
  // 通過前驅節點,跳過取消狀态的node
	while (pred.waitStatus > 0)
		node.prev = pred = pred.prev;
  // 擷取過濾後的前驅節點的後繼節點
	Node predNext = pred.next;
  // 把目前node的狀态設定為CANCELLED
	node.waitStatus = Node.CANCELLED;
  // 如果目前節點是尾節點,将從後往前的第一個非取消狀态的節點設定為尾節點
  // 更新失敗的話,則進入else,如果更新成功,将tail的後繼節點設定為null
	if (node == tail && compareAndSetTail(node, pred)) {
		compareAndSetNext(pred, predNext, null);
	} else {
		int ws;
    // 如果目前節點不是head的後繼節點,1:判斷目前節點前驅節點的是否為SIGNAL,2:如果不是,則把前驅節點設定為SINGAL看是否成功
    // 如果1和2中有一個為true,再判斷目前節點的線程是否為null
    // 如果上述條件都滿足,把目前節點的前驅節點的後繼指針指向目前節點的後繼節點
		if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {
			Node next = node.next;
			if (next != null && next.waitStatus <= 0)
				compareAndSetNext(pred, predNext, next);
		} else {
      // 如果目前節點是head的後繼節點,或者上述條件不滿足,那就喚醒目前節點的後繼節點
			unparkSuccessor(node);
		}
		node.next = node; // help GC
	}
}
           

目前的流程:

  • 擷取目前節點的前驅節點,如果前驅節點的狀态是CANCELLED,那就一直往前周遊,找到第一個waitStatus <= 0的節點,将找到的Pred節點和目前Node關聯,将目前Node設定為CANCELLED。
  • 根據目前節點的位置,考慮以下三種情況:

(1) 目前節點是尾節點。

(2) 目前節點是Head的後繼節點。

(3) 目前節點不是Head的後繼節點,也不是尾節點。

根據上述第二條,我們來分析每一種情況的流程。

目前節點是尾節點。

java并發程式設計核心:AQS原理及API從ReentrantLock的實作看AQS的原理及應用

目前節點是Head的後繼節點。

java并發程式設計核心:AQS原理及API從ReentrantLock的實作看AQS的原理及應用

目前節點不是Head的後繼節點,也不是尾節點。

java并發程式設計核心:AQS原理及API從ReentrantLock的實作看AQS的原理及應用

通過上面的流程,我們對于CANCELLED節點狀态的産生和變化已經有了大緻的了解,但是為什麼所有的變化都是對Next指針進行了操作,而沒有對Prev指針進行操作呢?什麼情況下會對Prev指針進行操作?

執行cancelAcquire的時候,目前節點的前置節點可能已經從隊列中出去了(已經執行過Try代碼塊中的shouldParkAfterFailedAcquire方法了),如果此時修改Prev指針,有可能會導緻Prev指向另一個已經移除隊列的Node,是以這塊變化Prev指針不安全。 shouldParkAfterFailedAcquire方法中,會執行下面的代碼,其實就是在處理Prev指針。shouldParkAfterFailedAcquire是擷取鎖失敗的情況下才會執行,進入該方法後,說明共享資源已被擷取,目前節點之前的節點都不會出現變化,是以這個時候變更Prev指針比較安全。
do {
	node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
           

2.3.3 如何解鎖

我們已經剖析了加鎖過程中的基本流程,接下來再對解鎖的基本流程進行分析。由于ReentrantLock在解鎖的時候,并不區分公平鎖和非公平鎖,是以我們直接看解鎖的源碼:

// java.util.concurrent.locks.ReentrantLock

public void unlock() {
	sync.release(1);
}
           

可以看到,本質釋放鎖的地方,是通過架構來完成的。

// java.util.concurrent.locks.AbstractQueuedSynchronizer

public final boolean release(int arg) {
	if (tryRelease(arg)) {
		Node h = head;
		if (h != null && h.waitStatus != 0)
			unparkSuccessor(h);
		return true;
	}
	return false;
}
           

在ReentrantLock裡面的公平鎖和非公平鎖的父類Sync定義了可重入鎖的釋放鎖機制。

// java.util.concurrent.locks.ReentrantLock.Sync

// 方法傳回目前鎖是不是沒有被線程持有
protected final boolean tryRelease(int releases) {
	// 減少可重入次數
	int c = getState() - releases;
	// 目前線程不是持有鎖的線程,抛出異常
	if (Thread.currentThread() != getExclusiveOwnerThread())
		throw new IllegalMonitorStateException();
	boolean free = false;
	// 如果持有線程全部釋放,将目前獨占鎖所有線程設定為null,并更新state
	if (c == 0) {
		free = true;
		setExclusiveOwnerThread(null);
	}
	setState(c);
	return free;
}
           

我們來解釋下述源碼:

// java.util.concurrent.locks.AbstractQueuedSynchronizer

public final boolean release(int arg) {
	// 上邊自定義的tryRelease如果傳回true,說明該鎖沒有被任何線程持有
	if (tryRelease(arg)) {
		// 擷取頭結點
		Node h = head;
		// 頭結點不為空并且頭結點的waitStatus不是初始化節點情況,解除線程挂起狀态
		if (h != null && h.waitStatus != 0)
			unparkSuccessor(h);
		return true;
	}
	return false;
}
           

這裡的判斷條件為什麼是h != null && h.waitStatus != 0?

h == null Head還沒初始化。初始情況下,head == null,第一個節點入隊,Head會被初始化一個虛拟節點。是以說,這裡如果還沒來得及入隊,就會出現head == null 的情況。

h != null && waitStatus == 0 表明後繼節點對應的線程仍在運作中,不需要喚醒。

h != null && waitStatus < 0 表明後繼節點可能被阻塞了,需要喚醒。

再看一下unparkSuccessor方法:

// java.util.concurrent.locks.AbstractQueuedSynchronizer

private void unparkSuccessor(Node node) {
	// 擷取頭結點waitStatus
	int ws = node.waitStatus;
	if (ws < 0)
		compareAndSetWaitStatus(node, ws, 0);
	// 擷取目前節點的下一個節點
	Node s = node.next;
	// 如果下個節點是null或者下個節點被cancelled,就找到隊列最開始的非cancelled的節點
	if (s == null || s.waitStatus > 0) {
		s = null;
		// 就從尾部節點開始找,到隊首,找到隊列第一個waitStatus<0的節點。
		for (Node t = tail; t != null && t != node; t = t.prev)
			if (t.waitStatus <= 0)
				s = t;
	}
	// 如果目前節點的下個節點不為空,而且狀态<=0,就把目前節點unpark
	if (s != null)
		LockSupport.unpark(s.thread);
}
           

為什麼要從後往前找第一個非Cancelled的節點呢?原因如下。

之前的addWaiter方法:

// java.util.concurrent.locks.AbstractQueuedSynchronizer

private Node addWaiter(Node mode) {
	Node node = new Node(Thread.currentThread(), mode);
	// Try the fast path of enq; backup to full enq on failure
	Node pred = tail;
	if (pred != null) {
		node.prev = pred;
		if (compareAndSetTail(pred, node)) {
			pred.next = node;
			return node;
		}
	}
	enq(node);
	return node;
}
           

我們從這裡可以看到,節點入隊并不是原子操作,也就是說,node.prev = pred; compareAndSetTail(pred, node) 這兩個地方可以看作Tail入隊的原子操作,但是此時pred.next = node;還沒執行,如果這個時候執行了unparkSuccessor方法,就沒辦法從前往後找了,是以需要從後往前找。還有一點原因,在産生CANCELLED狀态節點的時候,先斷開的是Next指針,Prev指針并未斷開,是以也是必須要從後往前周遊才能夠周遊完全部的Node。

綜上所述,如果是從前往後找,由于極端情況下入隊的非原子操作和CANCELLED節點産生過程中斷開Next指針的操作,可能會導緻無法周遊所有的節點。是以,喚醒對應的線程後,對應的線程就會繼續往下執行。繼續執行acquireQueued方法以後,中斷如何處理?

2.3.4 中斷恢複後的執行流程

喚醒後,會執行return Thread.interrupted();,這個函數傳回的是目前執行線程的中斷狀态,并清除。

// java.util.concurrent.locks.AbstractQueuedSynchronizer

private final boolean parkAndCheckInterrupt() {
	LockSupport.park(this);
	return Thread.interrupted();
}
           

再回到acquireQueued代碼,當parkAndCheckInterrupt傳回True或者False的時候,interrupted的值不同,但都會執行下次循環。如果這個時候擷取鎖成功,就會把目前interrupted傳回。

// java.util.concurrent.locks.AbstractQueuedSynchronizer

final boolean acquireQueued(final Node node, int arg) {
	boolean failed = true;
	try {
		boolean interrupted = false;
		for (;;) {
			final Node p = node.predecessor();
			if (p == head && tryAcquire(arg)) {
				setHead(node);
				p.next = null; // help GC
				failed = false;
				return interrupted;
			}
			if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
				interrupted = true;
			}
	} finally {
		if (failed)
			cancelAcquire(node);
	}
}
           

如果acquireQueued為True,就會執行selfInterrupt方法。

// java.util.concurrent.locks.AbstractQueuedSynchronizer

static void selfInterrupt() {
	Thread.currentThread().interrupt();
}
           

該方法其實是為了中斷線程。但為什麼擷取了鎖以後還要中斷線程呢?這部分屬于Java提供的協作式中斷知識内容,感興趣同學可以查閱一下。這裡簡單介紹一下:

  1. 當中斷線程被喚醒時,并不知道被喚醒的原因,可能是目前線程在等待中被中斷,也可能是釋放了鎖以後被喚醒。是以我們通過Thread.interrupted()方法檢查中斷标記(該方法傳回了目前線程的中斷狀态,并将目前線程的中斷辨別設定為False),并記錄下來,如果發現該線程被中斷過,就再中斷一次。
  2. 線程在等待資源的過程中被喚醒,喚醒後還是會不斷地去嘗試擷取鎖,直到搶到鎖為止。也就是說,在整個流程中,并不響應中斷,隻是記錄中斷記錄。最後搶到鎖傳回了,那麼如果被中斷過的話,就需要補充一次中斷。

這裡的處理方式主要是運用線程池中基本運作單元Worder中的runWorker,通過Thread.interrupted()進行額外的判斷處理,感興趣的同學可以看下ThreadPoolExecutor源碼。

2.3.5 小結

我們在1.3小節中提出了一些問題,現在來回答一下。

Q:某個線程擷取鎖失敗的後續流程是什麼呢?

A:存在某種排隊等候機制,線程繼續等待,仍然保留擷取鎖的可能,擷取鎖流程仍在繼續。

Q:既然說到了排隊等候機制,那麼就一定會有某種隊列形成,這樣的隊列是什麼資料結構呢?

A:是CLH變體的FIFO雙端隊列。

Q:處于排隊等候機制中的線程,什麼時候可以有機會擷取鎖呢?

A:可以詳細看下2.3.1.3小節。

Q:如果處于排隊等候機制中的線程一直無法擷取鎖,需要一直等待麼?還是有别的政策來解決這一問題?

A:線程所在節點的狀态會變成取消狀态,取消狀态的節點會從隊列中釋放,具體可見2.3.2小節。

Q:Lock函數通過Acquire方法進行加鎖,但是具體是如何加鎖的呢?

A:AQS的Acquire會調用tryAcquire方法,tryAcquire由各個自定義同步器實作,通過tryAcquire完成加鎖過程。

3 AQS應用

3.1 ReentrantLock的可重入應用

ReentrantLock的可重入性是AQS很好的應用之一,在了解完上述知識點以後,我們很容易得知ReentrantLock實作可重入的方法。在ReentrantLock裡面,不管是公平鎖還是非公平鎖,都有一段邏輯。

公平鎖:

// java.util.concurrent.locks.ReentrantLock.FairSync#tryAcquire

if (c == 0) {
	if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
		setExclusiveOwnerThread(current);
		return true;
	}
}
else if (current == getExclusiveOwnerThread()) {
	int nextc = c + acquires;
	if (nextc < 0)
		throw new Error("Maximum lock count exceeded");
	setState(nextc);
	return true;
}
           

非公平鎖:

// java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire

if (c == 0) {
	if (compareAndSetState(0, acquires)){
		setExclusiveOwnerThread(current);
		return true;
	}
}
else if (current == getExclusiveOwnerThread()) {
	int nextc = c + acquires;
	if (nextc < 0) // overflow
		throw new Error("Maximum lock count exceeded");
	setState(nextc);
	return true;
}
           

從上面這兩段都可以看到,有一個同步狀态State來控制整體可重入的情況。State是Volatile修飾的,用于保證一定的可見性和有序性。

// java.util.concurrent.locks.AbstractQueuedSynchronizer

private volatile int state;
           

接下來看State這個字段主要的過程:

  1. State初始化的時候為0,表示沒有任何線程持有鎖。
  2. 當有線程持有該鎖時,值就會在原來的基礎上+1,同一個線程多次獲得鎖是,就會多次+1,這裡就是可重入的概念。
  3. 解鎖也是對這個字段-1,一直到0,此線程對鎖釋放。

3.2 JUC中的應用場景

除了上邊ReentrantLock的可重入性的應用,AQS作為并發程式設計的架構,為很多其他同步工具提供了良好的解決方案。下面列出了JUC中的幾種同步工具,大體介紹一下AQS的應用場景:

同步工具 同步工具與AQS的關聯
ReentrantLock 使用AQS儲存鎖重複持有的次數。當一個線程擷取鎖時,ReentrantLock記錄目前獲得鎖的線程辨別,用于檢測是否重複擷取,以及錯誤線程試圖解鎖操作時異常情況的處理。
Semaphore 使用AQS同步狀态來儲存信号量的目前計數。tryRelease會增加計數,acquireShared會減少計數。
CountDownLatch 使用AQS同步狀态來表示計數。計數為0時,所有的Acquire操作(CountDownLatch的await方法)才可以通過。
ReentrantReadWriteLock 使用AQS同步狀态中的16位儲存寫鎖持有的次數,剩下的16位用于儲存讀鎖的持有次數。
ThreadPoolExecutor Worker利用AQS同步狀态實作對獨占線程變量的設定(tryAcquire和tryRelease)。

3.3 自定義同步工具

了解AQS基本原理以後,按照上面所說的AQS知識點,自己實作一個同步工具。

public class LeeLock  {

    private static class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryAcquire (int arg) {
            return compareAndSetState(0, 1);
        }

        @Override
        protected boolean tryRelease (int arg) {
            setState(0);
            return true;
        }

        @Override
        protected boolean isHeldExclusively () {
            return getState() == 1;
        }
    }
    
    private Sync sync = new Sync();
    
    public void lock () {
        sync.acquire(1);
    }
    
    public void unlock () {
        sync.release(1);
    }
}
           

通過我們自己定義的Lock完成一定的同步功能。

public class LeeMain {

    static int count = 0;
    static LeeLock leeLock = new LeeLock();

    public static void main (String[] args) throws InterruptedException {

        Runnable runnable = new Runnable() {
            @Override
            public void run () {
                try {
                    leeLock.lock();
                    for (int i = 0; i < 10000; i++) {
                        count++;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    leeLock.unlock();
                }

            }
        };
        Thread thread1 = new Thread(runnable);
        Thread thread2 = new Thread(runnable);
        thread1.start();
        thread2.start();
        thread1.join();
        thread2.join();
        System.out.println(count);
    }
}
           

上述代碼每次運作結果都會是20000。通過簡單的幾行代碼就能實作同步功能,這就是AQS的強大之處。

總結

我們日常開發中使用并發的場景太多,但是對并發内部的基本架構原理了解的人卻不多。由于篇幅原因,本文僅介紹了可重入鎖ReentrantLock的原理和AQS原理,希望能夠成為大家了解AQS和ReentrantLock等同步器的“敲門磚”。

參考資料

  • Lea D. The java. util. concurrent synchronizer framework[J]. Science of Computer Programming, 2005, 58(3): 293-309.
  • 《Java并發程式設計實戰》
  • 不可不說的Java“鎖”事

轉自:https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html