原文:https://blog.csdn.net/javazejian/article/details/72772461
深入了解(1)Java注解類型(@Annotation)
深入了解(2)Java枚舉類型(enum)
深入了解(3)Java類加載器(ClassLoader)
深入了解(4)Java類型資訊(Class對象)與反射機制
深入了解(5)Java記憶體模型(JMM)及volatile關鍵字
深入了解(6)Java并發AQS的共享鎖的實作(基于信号量Semaphore)
深入了解(7)Java無鎖CAS與Unsafe類及其并發包Atomic
深入了解(8)Java并發之synchronized實作原理
深入了解(9)Java基于并發AQS的(獨占鎖)重入鎖(ReetrantLock)及其Condition實作原理
深入了解(10)java并發之阻塞隊列LinkedBlockingQueue與ArrayBlockingQueue
文章目錄
- Lock接口
- 重入鎖ReetrantLock
- 并發基礎元件AQS與ReetrantLock
-
- AQS工作原理概要
- 基于ReetrantLock分析AQS獨占模式實作過程
-
- ReetrantLock中非公平鎖
- ReetrantLock中公平鎖
- 關于synchronized 與ReentrantLock
- 神奇的Condition
-
- 關于Condition接口
- Condition的使用案例-生産者消費者模式
- Condition的實作原理
Lock接口
前面我們詳談過解決多線程同步問題的關鍵字synchronized,synchronized屬于隐式鎖,即鎖的持有與釋放都是隐式的,我們無需幹預,而本篇我們要講解的是顯式鎖,即鎖的持有和釋放都必須由我們手動編寫。在Java 1.5中,官方在concurrent并發包中加入了Lock接口,該接口中提供了lock()方法和unLock()方法對顯式加鎖和顯式釋放鎖操作進行支援,簡單了解一下代碼編寫,如下:
Lock lock = new ReentrantLock();
lock.lock();
try{
//臨界區......
}finally{
lock.unlock();
}1234567
正如代碼所顯示(ReentrantLock是Lock的實作類,稍後分析),目前線程使用lock()方法與unlock()對臨界區進行包圍,其他線程由于無法持有鎖将無法進入臨界區直到目前線程釋放鎖,注意unlock()操作必須在finally代碼塊中,這樣可以確定即使臨界區執行抛出異常,線程最終也能正常釋放鎖,Lock接口還提供了鎖以下相關方法
public interface Lock {
//加鎖
void lock();
//解鎖
void unlock();
//可中斷擷取鎖,與lock()不同之處在于可響應中斷操作,即在獲
//取鎖的過程中可中斷,注意synchronized在擷取鎖時是不可中斷的
void lockInterruptibly() throws InterruptedException;
//嘗試非阻塞擷取鎖,調用該方法後立即傳回結果,如果能夠擷取則傳回true,否則傳回false
boolean tryLock();
//根據傳入的時間段擷取鎖,在指定時間内沒有擷取鎖則傳回false,如果在指定時間内目前線程未被中并斷擷取到鎖則傳回true
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
//擷取等待通知元件,該元件與目前鎖綁定,目前線程隻有獲得了鎖
//才能調用該元件的wait()方法,而調用後,目前線程将釋放鎖。
Condition newCondition();1234567891011121314151617181920
可見Lock對象鎖還提供了synchronized所不具備的其他同步特性,如可中斷鎖的擷取(synchronized在等待擷取鎖時是不可中的),逾時中斷鎖的擷取,等待喚醒機制的多條件變量Condition等,這也使得Lock鎖在使用上具有更大的靈活性。下面進一步分析Lock的實作類重入鎖ReetrantLock。
重入鎖ReetrantLock
重入鎖ReetrantLock,JDK 1.5新增的類,實作了Lock接口,作用與synchronized關鍵字相當,但比synchronized更加靈活。ReetrantLock本身也是一種支援重進入的鎖,即該鎖可以支援一個線程對資源重複加鎖,同時也支援公平鎖與非公平鎖。所謂的公平與非公平指的是在請求先後順序上,先對鎖進行請求的就一定先擷取到鎖,那麼這就是公平鎖,反之,如果對于鎖的擷取并沒有時間上的先後順序,如後請求的線程可能先擷取到鎖,這就是非公平鎖,一般而言非,非公平鎖機制的效率往往會勝過公平鎖的機制,但在某些場景下,可能更注重時間先後順序,那麼公平鎖自然是很好的選擇。需要注意的是ReetrantLock支援對同一線程重加鎖,但是加鎖多少次,就必須解鎖多少次,這樣才可以成功釋放鎖。下面看看ReetrantLock的簡單使用案例:
import java.util.concurrent.locks.ReentrantLock;
public class ReenterLock implements Runnable{
public static ReentrantLock lock=new ReentrantLock();
public static int i=0;
@Override
public void run() {
for(int j=0;j<10000000;j++){
lock.lock();
//支援重入鎖
lock.lock();
try{
i++;
}finally{
//執行兩次解鎖
lock.unlock();
lock.unlock();
}
}
}
public static void main(String[] args) throws InterruptedException {
ReenterLock tl=new ReenterLock();
Thread t1=new Thread(tl);
Thread t2=new Thread(tl);
t1.start();t2.start();
t1.join();t2.join();
//輸出結果:20000000
System.out.println(i);
}
}123456789101112131415161718192021222324252627282930
代碼非常簡單,我們使用兩個線程同時操作臨界資源i,執行自增操作,使用ReenterLock進行加鎖,解決線程安全問題,這裡進行了兩次重複加鎖,由于ReenterLock支援重入,是以這樣是沒有問題的,需要注意的是在finally代碼塊中,需執行兩次解鎖操作才能真正成功地讓目前執行線程釋放鎖,從這裡看ReenterLock的用法還是非常簡單的,除了實作Lock接口的方法,ReenterLock其他方法說明如下
//查詢目前線程保持此鎖的次數。
int getHoldCount()
//傳回目前擁有此鎖的線程,如果此鎖不被任何線程擁有,則傳回 null。
protected Thread getOwner();
//傳回一個 collection,它包含可能正等待擷取此鎖的線程,其内部維持一個隊列,這點稍後會分析。
protected Collection<Thread> getQueuedThreads();
//傳回正等待擷取此鎖的線程估計數。
int getQueueLength();
// 傳回一個 collection,它包含可能正在等待與此鎖相關給定條件的那些線程。
protected Collection<Thread> getWaitingThreads(Condition condition);
//傳回等待與此鎖相關的給定條件的線程估計數。
int getWaitQueueLength(Condition condition);
// 查詢給定線程是否正在等待擷取此鎖。
boolean hasQueuedThread(Thread thread);
//查詢是否有些線程正在等待擷取此鎖。
boolean hasQueuedThreads();
//查詢是否有些線程正在等待與此鎖有關的給定條件。
boolean hasWaiters(Condition condition);
//如果此鎖的公平設定為 true,則傳回 true。
boolean isFair()
//查詢目前線程是否保持此鎖。
boolean isHeldByCurrentThread()
//查詢此鎖是否由任意線程保持。
boolean isLocked() 1234567891011121314151617181920212223242526272829303132333435
由于ReetrantLock鎖在使用上還是比較簡單的,也就暫且打住,下面着重分析一下ReetrantLock的内部實作原理,這才是本篇博文的重點。實際上ReetrantLock是基于AQS并發架構實作的,我們先深入了解AQS,然後一步步揭開ReetrantLock的内部實作原理。
并發基礎元件AQS與ReetrantLock
AQS工作原理概要
AbstractQueuedSynchronizer又稱為隊列同步器(後面簡稱AQS),它是用來建構鎖或其他同步元件的基礎架構,内部通過一個int類型的成員變量state來控制同步狀态,當state=0時,則說明沒有任何線程占有共享資源的鎖,當state=1時,則說明有線程目前正在使用共享變量,其他線程必須加入同步隊列進行等待,AQS内部通過内部類Node構成FIFO的同步隊列來完成線程擷取鎖的排隊工作,同時利用内部類ConditionObject建構等待隊列,當Condition調用wait()方法後,線程将會加入等待隊列中,而當Condition調用signal()方法後,線程将從等待隊列轉移動同步隊列中進行鎖競争。注意這裡涉及到兩種隊列,一種的同步隊列,當線程請求鎖而等待的後将加入同步隊列等待,而另一種則是等待隊列(可有多個),通過Condition調用await()方法釋放鎖後,将加入等待隊列。關于Condition的等待隊列我們後面再分析,這裡我們先來看看AQS中的同步隊列模型,如下
/**
* AQS抽象類
*/
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer{
//指向同步隊列隊頭
private transient volatile Node head;
//指向同步的隊尾
private transient volatile Node tail;
//同步狀态,0代表鎖未被占用,1代表鎖已被占用
private volatile int state;
//省略其他代碼......
}12345678910111213141516
head和tail分别是AQS中的變量,其中head指向同步隊列的頭部,注意head為空結點,不存儲資訊。而tail則是同步隊列的隊尾,同步隊列采用的是雙向連結清單的結構這樣可友善隊列進行結點增删操作。state變量則是代表同步狀态,執行當線程調用lock方法進行加鎖後,如果此時state的值為0,則說明目前線程可以擷取到鎖(在本篇文章中,鎖和同步狀态代表同一個意思),同時将state設定為1,表示擷取成功。如果state已為1,也就是目前鎖已被其他線程持有,那麼目前執行線程将被封裝為Node結點加入同步隊列等待。其中Node結點是對每一個通路同步代碼的線程的封裝,從圖中的Node的資料結構也可看出,其包含了需要同步的線程本身以及線程的狀态,如是否被阻塞,是否等待喚醒,是否已經被取消等。每個Node結點内部關聯其前繼結點prev和後繼結點next,這樣可以友善線程釋放鎖後快速喚醒下一個在等待的線程,Node是AQS的内部類,其資料結構如下:
static final class Node {
//共享模式
static final Node SHARED = new Node();
//獨占模式
static final Node EXCLUSIVE = null;
//辨別線程已處于結束狀态
static final int CANCELLED = 1;
//等待被喚醒狀态
static final int SIGNAL = -1;
//條件狀态,
static final int CONDITION = -2;
//在共享模式中使用表示獲得的同步狀态會被傳播
static final int PROPAGATE = -3;
//等待狀态,存在CANCELLED、SIGNAL、CONDITION、PROPAGATE 4種
volatile int waitStatus;
//同步隊列中前驅結點
volatile Node prev;
//同步隊列中後繼結點
volatile Node next;
//請求鎖的線程
volatile Thread thread;
//等待隊列中的後繼結點,這個與Condition有關,稍後會分析
Node nextWaiter;
//判斷是否為共享模式
final boolean isShared() {
return nextWaiter == SHARED;
}
//擷取前驅結點
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
//.....
}12345678910111213141516171819202122232425262728293031323334353637383940414243444546
其中SHARED和EXCLUSIVE常量分别代表共享模式和獨占模式,所謂共享模式是一個鎖允許多條線程同時操作,如信号量Semaphore采用的就是基于AQS的共享模式實作的,而獨占模式則是同一個時間段隻能有一個線程對共享資源進行操作,多餘的請求線程需要排隊等待,如ReentranLock。變量waitStatus則表示目前被封裝成Node結點的等待狀态,共有4種取值CANCELLED、SIGNAL、CONDITION、PROPAGATE。
- CANCELLED:值為1,在同步隊列中等待的線程等待逾時或被中斷,需要從同步隊列中取消該Node的結點,其結點的waitStatus為CANCELLED,即結束狀态,進入該狀态後的結點将不會再變化。
- SIGNAL:值為-1,被辨別為該等待喚醒狀态的後繼結點,當其前繼結點的線程釋放了同步鎖或被取消,将會通知該後繼結點的線程執行。說白了,就是處于喚醒狀态,隻要前繼結點釋放鎖,就會通知辨別為SIGNAL狀态的後繼結點的線程執行。
- CONDITION:值為-2,與Condition相關,該辨別的結點處于等待隊列中,結點的線程等待在Condition上,當其他線程調用了Condition的signal()方法後,CONDITION狀态的結點将從等待隊列轉移到同步隊列中,等待擷取同步鎖。
- PROPAGATE:值為-3,與共享模式相關,在共享模式中,該狀态辨別結點的線程處于可運作狀态。
- 0狀态:值為0,代表初始化狀态。
pre和next,分别指向目前Node結點的前驅結點和後繼結點,thread變量存儲的請求鎖的線程。nextWaiter,與Condition相關,代表等待隊列中的後繼結點,關于這點這裡暫不深入,後續會有更詳細的分析,嗯,到此我們對Node結點的資料結構也就比較清晰了。總之呢,AQS作為基礎元件,對于鎖的實作存在兩種不同的模式,即共享模式(如Semaphore)和獨占模式(如ReetrantLock),無論是共享模式還是獨占模式的實作類,其内部都是基于AQS實作的,也都維持着一個虛拟的同步隊列,當請求鎖的線程超過現有模式的限制時,會将線程包裝成Node結點并将線程目前必要的資訊存儲到node結點中,然後加入同步隊列等會擷取鎖,而這系列操作都有AQS協助我們完成,這也是作為基礎元件的原因,無論是Semaphore還是ReetrantLock,其内部絕大多數方法都是間接調用AQS完成的,下面是AQS整體類圖結構
這裡以ReentrantLock為例,簡單講解ReentrantLock與AQS的關系
- AbstractOwnableSynchronizer:抽象類,定義了存儲獨占目前鎖的線程和擷取的方法
- AbstractQueuedSynchronizer:抽象類,AQS架構核心類,其内部以虛拟隊列的方式管理線程的鎖擷取與鎖釋放,其中擷取鎖(tryAcquire方法)和釋放鎖(tryRelease方法)并沒有提供預設實作,需要子類重寫這兩個方法實作具體邏輯,目的是使開發人員可以自由定義擷取鎖以及釋放鎖的方式。
- Node:AbstractQueuedSynchronizer 的内部類,用于建構虛拟隊列(連結清單雙向連結清單),管理需要擷取鎖的線程。
- Sync:抽象類,是ReentrantLock的内部類,繼承自AbstractQueuedSynchronizer,實作了釋放鎖的操作(tryRelease()方法),并提供了lock抽象方法,由其子類實作。
- NonfairSync:是ReentrantLock的内部類,繼承自Sync,非公平鎖的實作類。
- FairSync:是ReentrantLock的内部類,繼承自Sync,公平鎖的實作類。
- ReentrantLock:實作了Lock接口的,其内部類有Sync、NonfairSync、FairSync,在建立時可以根據fair參數決定建立NonfairSync(預設非公平鎖)還是FairSync。
ReentrantLock内部存在3個實作類,分别是Sync、NonfairSync、FairSync,其中Sync繼承自AQS實作了解鎖
tryRelease()
方法,而NonfairSync(非公平鎖)、 FairSync(公平鎖)則繼承自Sync,實作了擷取鎖的
tryAcquire()
方法,ReentrantLock的所有方法調用都通過間接調用AQS和Sync類及其子類來完成的。從上述類圖可以看出AQS是一個抽象類,但請注意其源碼中并沒一個抽象的方法,這是因為AQS隻是作為一個基礎元件,并不希望直接作為直接操作類對外輸出,而更傾向于作為基礎元件,為真正的實作類提供基礎設施,如建構同步隊列,控制同步狀态等,事實上,從設計模式角度來看,AQS采用的模闆模式的方式建構的,其内部除了提供并發操作核心方法以及同步隊列操作外,還提供了一些模闆方法讓子類自己實作,如加鎖操作以及解鎖操作,為什麼這麼做?這是因為AQS作為基礎元件,封裝的是核心并發操作,但是實作上分為兩種模式,即共享模式與獨占模式,而這兩種模式的加鎖與解鎖實作方式是不一樣的,但AQS隻關注内部公共方法實作并不關心外部不同模式的實作,是以提供了模闆方法給子類使用,也就是說實作獨占鎖,如ReentrantLock需要自己實作tryAcquire()方法和tryRelease()方法,而實作共享模式的Semaphore,則需要實作tryAcquireShared()方法和tryReleaseShared()方法,這樣做的好處是顯而易見的,無論是共享模式還是獨占模式,其基礎的實作都是同一套元件(AQS),隻不過是加鎖解鎖的邏輯不同罷了,更重要的是如果我們需要自定義鎖的話,也變得非常簡單,隻需要選擇不同的模式實作不同的加鎖和解鎖的模闆方法即可,AQS提供給獨占模式和共享模式的模闆方法如下
//AQS中提供的主要模闆方法,由子類實作。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer{
//獨占模式下擷取鎖的方法
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
//獨占模式下解鎖的方法
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
//共享模式下擷取鎖的方法
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
//共享模式下解鎖的方法
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
//判斷是否為持有獨占鎖
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
}1234567891011121314151617181920212223242526272829
在了解AQS的原理概要後,下面我們就基于ReetrantLock進一步分析AQS的實作過程,這也是ReetrantLock的内部實作原理。
基于ReetrantLock分析AQS獨占模式實作過程
ReetrantLock中非公平鎖
AQS同步器的實作依賴于内部的同步隊列(FIFO的雙向連結清單對列)完成對同步狀态(state)的管理,目前線程擷取鎖(同步狀态)失敗時,AQS會将該線程以及相關等待資訊包裝成一個節點(Node)并将其加入同步隊列,同時會阻塞目前線程,當同步狀态釋放時,會将頭結點head中的線程喚醒,讓其嘗試擷取同步狀态。關于同步隊列和Node結點,前面我們已進行了較為詳細的分析,這裡重點分析一下擷取同步狀态和釋放同步狀态以及如何加入隊列的具體操作,這裡從ReetrantLock入手分析AQS的具體實作,我們先以非公平鎖為例進行分析。
//預設構造,建立非公平鎖NonfairSync
public ReentrantLock() {
sync = new NonfairSync();
}
//根據傳入參數建立鎖類型
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
//加鎖操作
public void lock() {
sync.lock();
}12345678910111213
前面說過sync是個抽象類,存在兩個不同的實作子類,這裡從非公平鎖入手,看看其實作
/**
* 非公平鎖實作
*/
static final class NonfairSync extends Sync {
//加鎖
final void lock() {
//執行CAS操作,擷取同步狀态
if (compareAndSetState(0, 1))
//成功則将獨占鎖線程設定為目前線程
setExclusiveOwnerThread(Thread.currentThread());
else
//否則再次請求同步狀态
acquire(1);
}
}123456789101112131415
這裡擷取鎖時,首先對同步狀态執行CAS操作,嘗試把state的狀态從0設定為1,如果傳回true則代表擷取同步狀态成功,也就是目前線程擷取鎖成,可操作臨界資源,如果傳回false,則表示已有線程持有該同步狀态(其值為1),擷取鎖失敗,注意這裡存在并發的情景,也就是可能同時存在多個線程設定state變量,是以是CAS操作保證了state變量操作的原子性。傳回false後,執行
acquire(1)
方法,該方法是AQS中的方法,它對中斷不敏感,即使線程擷取同步狀态失敗,進入同步隊列,後續對該線程執行中斷操作也不會從同步隊列中移出,方法如下
public final void acquire(int arg) {
//再次嘗試擷取同步狀态
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}123456
這裡傳入參數arg表示要擷取同步狀态後設定的值(即要設定state的值),因為要擷取鎖,而status為0時是釋放鎖,1則是擷取鎖,是以這裡一般傳遞參數為1,進入方法後首先會執行tryAcquire(arg)方法,在前面分析過該方法在AQS中并沒有具體實作,而是交由子類實作,是以該方法是由ReetrantLock類内部實作的
//NonfairSync類
static final class NonfairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
//Sync類
abstract static class Sync extends AbstractQueuedSynchronizer {
//nonfairTryAcquire方法
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//判斷同步狀态是否為0,并嘗試再次擷取同步狀态
if (c == 0) {
//執行CAS操作
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果目前線程已擷取鎖,屬于重入鎖,再次擷取鎖後将status值加1
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
//設定目前同步狀态,目前隻有一個線程持有鎖,因為不會發生線程安全問題,可以直接執行 setState(nextc);
setState(nextc);
return true;
}
return false;
}
//省略其他代碼
}123456789101112131415161718192021222324252627282930313233343536
從代碼執行流程可以看出,這裡做了兩件事,一是嘗試再次擷取同步狀态,如果擷取成功則将目前線程設定為OwnerThread,否則失敗,二是判斷目前線程current是否為OwnerThread,如果是則屬于重入鎖,state自增1,并擷取鎖成功,傳回true,反之失敗,傳回false,也就是
tryAcquire(arg)
執行失敗,傳回false。需要注意的是
nonfairTryAcquire(int acquires)
内部使用的是CAS原子性操作設定state值,可以保證state的更改是線程安全的,是以隻要任意一個線程調用
nonfairTryAcquire(int acquires)
方法并設定成功即可擷取鎖,不管該線程是新到來的還是已在同步隊列的線程,畢竟這是非公平鎖,并不保證同步隊列中的線程一定比新到來線程請求(可能是head結點剛釋放同步狀态然後新到來的線程恰好擷取到同步狀态)先擷取到鎖,這點跟後面還會講到的公平鎖不同。ok~,接着看之前的方法
acquire(int arg)
public final void acquire(int arg) {
//再次嘗試擷取同步狀态
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}123456
如果tryAcquire(arg)傳回true,
acquireQueued
自然不會執行,這是最理想的,因為畢竟目前線程已擷取到鎖,如果tryAcquire(arg)傳回false,則會執行
addWaiter(Node.EXCLUSIVE)
進行入隊操作,由于ReentrantLock屬于獨占鎖,是以結點類型為
Node.EXCLUSIVE
,下面看看addWaiter方法具體實作
private Node addWaiter(Node mode) {
//将請求同步狀态失敗的線程封裝成結點
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
//如果是第一個結點加入肯定為空,跳過。
//如果非第一個結點則直接執行CAS入隊操作,嘗試在尾部快速添加
if (pred != null) {
node.prev = pred;
//使用CAS執行尾部結點替換,嘗試在尾部快速添加
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果第一次加入或者CAS操作沒有成功執行enq入隊操作
enq(node);
return node;
}12345678910111213141516171819
建立了一個
Node.EXCLUSIVE
類型Node結點用于封裝線程及其相關資訊,其中tail是AQS的成員變量,指向隊尾(這點前面的我們分析過AQS維持的是一個雙向的連結清單結構同步隊列),如果是第一個結點,則為tail肯定為空,那麼将執行
enq(node)
操作,如果非第一個結點即tail指向不為null,直接嘗試執行CAS操作加入隊尾,如果CAS操作失敗還是會執行
enq(node)
,繼續看
enq(node)
:
private Node enq(final Node node) {
//死循環
for (;;) {
Node t = tail;
//如果隊列為null,即沒有頭結點
if (t == null) { // Must initialize
//建立并使用CAS設定頭結點
if (compareAndSetHead(new Node()))
tail = head;
} else {//隊尾添加新結點
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}123456789101112131415161718
這個方法使用一個死循環進行CAS操作,可以解決多線程并發問題。這裡做了兩件事,一是如果還沒有初始同步隊列則建立新結點并使用compareAndSetHead設定頭結點,tail也指向head,二是隊列已存在,則将新結點node添加到隊尾。注意這兩個步驟都存在同一時間多個線程操作的可能,如果有一個線程修改head和tail成功,那麼其他線程将繼續循環,直到修改成功,這裡使用CAS原子操作進行頭結點設定和尾結點tail替換可以保證線程安全,從這裡也可以看出head結點本身不存在任何資料,它隻是作為一個牽頭結點,而tail永遠指向尾部結點(前提是隊列不為null)。
添加到同步隊列後,結點就會進入一個自旋過程,即每個結點都在觀察時機待條件滿足擷取同步狀态,然後從同步隊列退出并結束自旋,回到之前的
acquire()
方法,自旋過程是在
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
方法中執行的,代碼如下
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//自旋,死循環
for (;;) {
//擷取前驅結點
final Node p = node.predecessor();
當且僅當p為頭結點才嘗試擷取同步狀态
if (p == head && tryAcquire(arg)) {
//将node設定為頭結點
setHead(node);
//清空原來頭結點的引用便于GC
p.next = null; // help GC
failed = false;
return interrupted;
}
//如果前驅結點不是head,判斷是否挂起線程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
//最終都沒能擷取同步狀态,結束該線程的請求
cancelAcquire(node);
}
}
1234567891011121314151617181920212223242526272829
目前線程在自旋(死循環)中擷取同步狀态,當且僅目前驅結點為頭結點才嘗試擷取同步狀态,這符合FIFO的規則,即先進先出,其次head是目前擷取同步狀态的線程結點,隻有當head釋放同步狀态喚醒後繼結點,後繼結點才有可能擷取到同步狀态,是以後繼結點在其前繼結點為head時,才進行嘗試擷取同步狀态,其他時刻将被挂起。進入if語句後調用
setHead(node)
方法,将目前線程結點設定為head
//設定為頭結點
private void setHead(Node node) {
head = node;
//清空結點資料
node.thread = null;
node.prev = null;
}1234567
設定為node結點被設定為head後,其thread資訊和前驅結點将被清空,因為該線程已擷取到同步狀态(鎖),正在執行了,也就沒有必要存儲相關資訊了,head隻有儲存指向後繼結點的指針即可,便于head結點釋放同步狀态後喚醒後繼結點,執行結果如下圖
從圖可知更新head結點的指向,将後繼結點的線程喚醒并擷取同步狀态,調用
setHead(node)
将其替換為head結點,清除相關無用資料。當然如果前驅結點不是head,那麼執行如下
//如果前驅結點不是head,判斷是否挂起線程
if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())
interrupted = true;
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//擷取目前結點的等待狀态
int ws = pred.waitStatus;
//如果為等待喚醒(SIGNAL)狀态則傳回true
if (ws == Node.SIGNAL)
return true;
//如果ws>0 則說明是結束狀态,
//周遊前驅結點直到找到沒有結束狀态的結點
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//如果ws小于0又不是SIGNAL狀态,
//則将其設定為SIGNAL狀态,代表該結點的線程正在等待喚醒。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
//将目前線程挂起
LockSupport.park(this);
//擷取線程中斷狀态,interrupted()是判斷目前中斷狀态,
//并非中斷線程,是以可能true也可能false,并傳回
return Thread.interrupted();
}12345678910111213141516171819202122232425262728293031323334
shouldParkAfterFailedAcquire()
方法的作用是判斷目前結點的前驅結點是否為
SIGNAL
狀态(即等待喚醒狀态),如果是則傳回true。如果結點的ws為
CANCELLED
狀态(
值為1>0
),即結束狀态,則說明該前驅結點已沒有用應該從同步隊列移除,執行while循環,直到尋找到非
CANCELLED
狀态的結點。倘若前驅結點的ws值不為
CANCELLED
,也不為
SIGNAL
(當從Condition的條件等待隊列轉移到同步隊列時,結點狀态為CONDITION是以需要轉換為SIGNAL),那麼将其轉換為
SIGNAL
狀态,等待被喚醒。
若
shouldParkAfterFailedAcquire()
方法傳回true,即前驅結點為
SIGNAL
狀态同時又不是head結點,那麼使用
parkAndCheckInterrupt()
方法挂起目前線程,稱為WAITING狀态,需要等待一個unpark()操作來喚醒它,到此ReetrantLock内部間接通過AQS的FIFO的同步隊列就完成了lock()操作,這裡我們總結成邏輯流程圖
關于擷取鎖的操作,這裡看看另外一種可中斷的擷取方式,即調用ReentrantLock類的
lockInterruptibly()
或者
tryLock()
方法,最終它們都間接調用到
doAcquireInterruptibly()
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//直接抛異常,中斷線程的同步狀态請求
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}1234567891011121314151617181920212223
最大的不同是
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//直接抛異常,中斷線程的同步狀态請求
throw new InterruptedException();1234
檢測到線程的中斷操作後,直接抛出異常,進而中斷線程的同步狀态請求,移除同步隊列,ok~,加鎖流程到此。下面接着看unlock()操作
//ReentrantLock類的unlock
public void unlock() {
sync.release(1);
}
//AQS類的release()方法
public final boolean release(int arg) {
//嘗試釋放鎖
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//喚醒後繼結點的線程
unparkSuccessor(h);
return true;
}
return false;
}
//ReentrantLock類中的内部類Sync實作的tryRelease(int releases)
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//判斷狀态是否為0,如果是則說明已釋放同步狀态
if (c == 0) {
free = true;
//設定Owner為null
setExclusiveOwnerThread(null);
}
//設定更新同步狀态
setState(c);
return free;
}123456789101112131415161718192021222324252627282930313233343536
釋放同步狀态的操作相對簡單些,
tryRelease(int releases)
方法是ReentrantLock類中内部類自己實作的,因為AQS對于釋放鎖并沒有提供具體實作,必須由子類自己實作。釋放同步狀态後會使用
unparkSuccessor(h)
喚醒後繼結點的線程,這裡看看
unparkSuccessor(h)
private void unparkSuccessor(Node node) {
//這裡,node一般為目前線程所在的結點。
int ws = node.waitStatus;
if (ws < 0)//置零目前線程所在的結點狀态,允許失敗。
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;//找到下一個需要喚醒的結點s
if (s == null || s.waitStatus > 0) {//如果為空或已取消
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)//從這裡可以看出,<=0的結點,都是還有效的結點。
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//喚醒
}12345678910111213141516
從代碼執行操作來看,這裡主要作用是用unpark()喚醒同步隊列中最前邊未放棄線程(也就是狀态為CANCELLED的線程結點s)。此時,回憶前面分析進入自旋的函數
acquireQueued()
,s結點的線程被喚醒後,會進入
acquireQueued()
函數的
if (p == head && tryAcquire(arg))
的判斷,如果
p!=head
也不會有影響,因為它會執行
shouldParkAfterFailedAcquire()
,由于s通過
unparkSuccessor()
操作後已是同步隊列中最前邊未放棄的線程結點,那麼通過shouldParkAfterFailedAcquire()内部對結點狀态的調整,s也必然會成為head的next結點,是以再次自旋時
p==head
就成立了,然後s把自己設定成head結點,表示自己已經擷取到資源了,最終
acquire()
也傳回了,這就是獨占鎖釋放的過程。
ok~,關于獨占模式的加鎖和釋放鎖的過程到這就分析完,總之呢,在AQS同步器中維護着一個同步隊列,當線程擷取同步狀态失敗後,将會被封裝成Node結點,加入到同步隊列中并進行自旋操作,當目前線程結點的前驅結點為head時,将嘗試擷取同步狀态,擷取成功将自己設定為head結點。在釋放同步狀态時,則通過調用子類(ReetrantLock中的Sync内部類)的
tryRelease(int releases)
方法釋放同步狀态,釋放成功則喚醒後繼結點的線程。
ReetrantLock中公平鎖
了解完ReetrantLock中非公平鎖的實作後,我們再來看看公平鎖。與非公平鎖不同的是,在擷取鎖的時,公平鎖的擷取順序是完全遵循時間上的FIFO規則,也就是說先請求的線程一定會先擷取鎖,後來的線程肯定需要排隊,這點與前面我們分析非公平鎖的
nonfairTryAcquire(int acquires)
方法實作有鎖不同,下面是公平鎖中tryAcquire()方法的實作
//公平鎖FairSync類中的實作
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//注意!!這裡先判斷同步隊列是否存在結點
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}123456789101112131415161718192021
該方法與
nonfairTryAcquire(int acquires)
方法唯一的不同是在使用CAS設定嘗試設定state值前,調用了
hasQueuedPredecessors()
判斷同步隊列是否存在結點,如果存在必須先執行完同步隊列中結點的線程,目前線程進入等待狀态。這就是非公平鎖與公平鎖最大的差別,即公平鎖線上程請求到來時先會判斷同步隊列是否存在結點,如果存在先執行同步隊列中的結點線程,目前線程将封裝成node加入同步隊列等待。而非公平鎖呢,當線程請求到來時,不管同步隊列是否存線上程結點,直接嘗試擷取同步狀态,擷取成功直接通路共享資源,但請注意在絕大多數情況下,非公平鎖才是我們理想的選擇,畢竟從效率上來說非公平鎖總是勝于公平鎖。
以上便是ReentrantLock的内部實作原理,這裡我們簡單進行小結,重入鎖ReentrantLock,是一個基于AQS并發架構的并發控制類,其内部實作了3個類,分别是Sync、NoFairSync以及FairSync類,其中Sync繼承自AQS,實作了釋放鎖的模闆方法tryRelease(int),而NoFairSync和FairSync都繼承自Sync,實作各種擷取鎖的方法tryAcquire(int)。ReentrantLock的所有方法實作幾乎都間接調用了這3個類,是以當我們在使用ReentrantLock時,大部分使用都是在間接調用AQS同步器中的方法,這就是ReentrantLock的内部實作原理,最後給出張類圖結構
關于synchronized 與ReentrantLock
在JDK 1.6之後,虛拟機對于synchronized關鍵字進行整體優化後,在性能上synchronized與ReentrantLock已沒有明顯差距,是以在使用選擇上,需要根據場景而定,大部分情況下我們依然建議是synchronized關鍵字,原因之一是使用友善語義清晰,二是性能上虛拟機已為我們自動優化。而ReentrantLock提供了多樣化的同步特性,如逾時擷取鎖、可以被中斷擷取鎖(synchronized的同步是不能中斷的)、等待喚醒機制的多個條件變量(Condition)等,是以當我們确實需要使用到這些功能是,可以選擇ReentrantLock
神奇的Condition
關于Condition接口
在并發程式設計中,每個Java對象都存在一組螢幕方法,如
wait()
、
notify()
以及
notifyAll()
方法,通過這些方法,我們可以實作線程間通信與協作(也稱為等待喚醒機制),如生産者-消費者模式,而且這些方法必須配合着synchronized關鍵字使用,關于這點,如果想有更深入的了解,可觀看部落客另外一篇博文【 深入了解Java并發之synchronized實作原理】,與synchronized的等待喚醒機制相比Condition具有更多的靈活性以及精确性,這是因為
notify()
在喚醒線程時是随機(同一個鎖),而Condition則可通過多個Condition執行個體對象建立更加精細的線程控制,也就帶來了更多靈活性了,我們可以簡單了解為以下兩點
- 通過Condition能夠精細的控制多線程的休眠與喚醒。
- 對于一個鎖,我們可以為多個線程間建立不同的Condition。
Condition是一個接口類,其主要方法如下:
public interface Condition {
/**
* 使目前線程進入等待狀态直到被通知(signal)或中斷
* 當其他線程調用singal()或singalAll()方法時,該線程将被喚醒
* 當其他線程調用interrupt()方法中斷目前線程
* await()相當于synchronized等待喚醒機制中的wait()方法
*/
void await() throws InterruptedException;
//目前線程進入等待狀态,直到被喚醒,該方法不響應中斷要求
void awaitUninterruptibly();
//調用該方法,目前線程進入等待狀态,直到被喚醒或被中斷或逾時
//其中nanosTimeout指的等待逾時時間,機關納秒
long awaitNanos(long nanosTimeout) throws InterruptedException;
//同awaitNanos,但可以指明時間機關
boolean await(long time, TimeUnit unit) throws InterruptedException;
//調用該方法目前線程進入等待狀态,直到被喚醒、中斷或到達某個時
//間期限(deadline),如果沒到指定時間就被喚醒,傳回true,其他情況傳回false
boolean awaitUntil(Date deadline) throws InterruptedException;
//喚醒一個等待在Condition上的線程,該線程從等待方法傳回前必須
//擷取與Condition相關聯的鎖,功能與notify()相同
void signal();
//喚醒所有等待在Condition上的線程,該線程從等待方法傳回前必須
//擷取與Condition相關聯的鎖,功能與notifyAll()相同
void signalAll();
}1234567891011121314151617181920212223242526272829303132
關于Condition的實作類是AQS的内部類ConditionObject,關于這點我們稍後分析,這裡先來看一個Condition的使用案例,即經典消費者生産者模式
Condition的使用案例-生産者消費者模式
這裡我們通過一個賣烤鴨的案例來示範多生産多消費者的案例,該場景中存在兩條生産線程t1和t2,用于生産烤鴨,也存在兩條消費線程t3,t4用于消費烤鴨,4條線程同時執行,需要保證隻有在生産線程産生烤鴨後,消費線程才能消費,否則隻能等待,直到生産線程産生烤鴨後喚醒消費線程,注意烤鴨不能重複消費。ResourceByCondition類中定義product()和consume()兩個方法,分别用于生産烤鴨和消費烤鴨,并且定義ReentrantLock鎖,用于控制product()和consume()的并發,由于必須在烤鴨生成完成後消費線程才能消費烤鴨,否則隻能等待,是以這裡定義兩組Condition對象,分别是producer_con和consumer_con,前者擁有控制生産線程,後者擁有控制消費線程,這裡我們使用一個标志flag來控制是否有烤鴨,當flag為true時,代表烤鴨生成完畢,生産線程必須進入等待狀态同時喚醒消費線程進行消費,消費線程消費完畢後将flag設定為false,代表烤鴨消費完成,進入等待狀态,同時喚醒生産線程生産烤鴨,具體代碼如下
package com.zejian.concurrencys;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Created by zejian on 2017/7/22.
* Blog : http://blog.csdn.net/javazejian [原文位址,請尊重原創]
*/
public class ResourceByCondition {
private String name;
private int count = 1;
private boolean flag = false;
//建立一個鎖對象。
Lock lock = new ReentrantLock();
//通過已有的鎖擷取兩組螢幕,一組監視生産者,一組監視消費者。
Condition producer_con = lock.newCondition();
Condition consumer_con = lock.newCondition();
/**
* 生産
* @param name
*/
public void product(String name)
{
lock.lock();
try
{
while(flag){
try{producer_con.await();}catch(InterruptedException e){}
}
this.name = name + count;
count++;
System.out.println(Thread.currentThread().getName()+"...生産者5.0..."+this.name);
flag = true;
consumer_con.signal();//直接喚醒消費線程
}
finally
{
lock.unlock();
}
}
/**
* 消費
*/
public void consume()
{
lock.lock();
try
{
while(!flag){
try{consumer_con.await();}catch(InterruptedException e){}
}
System.out.println(Thread.currentThread().getName()+"...消費者.5.0......."+this.name);//消費烤鴨1
flag = false;
producer_con.signal();//直接喚醒生産線程
}
finally
{
lock.unlock();
}
}
}
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
執行代碼
package com.zejian.concurrencys;
/**
* Created by zejian on 2017/7/22.
* Blog : http://blog.csdn.net/javazejian [原文位址,請尊重原創]
*/
public class Mutil_Producer_ConsumerByCondition {
public static void main(String[] args) {
ResourceByCondition r = new ResourceByCondition();
Mutil_Producer pro = new Mutil_Producer(r);
Mutil_Consumer con = new Mutil_Consumer(r);
//生産者線程
Thread t0 = new Thread(pro);
Thread t1 = new Thread(pro);
//消費者線程
Thread t2 = new Thread(con);
Thread t3 = new Thread(con);
//啟動線程
t0.start();
t1.start();
t2.start();
t3.start();
}
}
/**
* @decrition 生産者線程
*/
class Mutil_Producer implements Runnable {
private ResourceByCondition r;
Mutil_Producer(ResourceByCondition r) {
this.r = r;
}
public void run() {
while (true) {
r.product("北京烤鴨");
}
}
}
/**
* @decrition 消費者線程
*/
class Mutil_Consumer implements Runnable {
private ResourceByCondition r;
Mutil_Consumer(ResourceByCondition r) {
this.r = r;
}
public void run() {
while (true) {
r.consume();
}
}
}12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
正如代碼所示,我們通過兩者Condition對象單獨控制消費線程與生産消費,這樣可以避免消費線程在喚醒線程時喚醒的還是消費線程,如果是通過synchronized的等待喚醒機制實作的話,就可能無法避免這種情況,畢竟同一個鎖,對于synchronized關鍵字來說隻能有一組等待喚醒隊列,而不能像Condition一樣,同一個鎖擁有多個等待隊列。synchronized的實作方案如下,
public class KaoYaResource {
private String name;
private int count = 1;//烤鴨的初始數量
private boolean flag = false;//判斷是否有需要線程等待的标志
/**
* 生産烤鴨
*/
public synchronized void product(String name){
while(flag){
//此時有烤鴨,等待
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
this.name=name+count;//設定烤鴨的名稱
count++;
System.out.println(Thread.currentThread().getName()+"...生産者..."+this.name);
flag=true;//有烤鴨後改變标志
notifyAll();//通知消費線程可以消費了
}
/**
* 消費烤鴨
*/
public synchronized void consume(){
while(!flag){//如果沒有烤鴨就等待
try{this.wait();}catch(InterruptedException e){}
}
System.out.println(Thread.currentThread().getName()+"...消費者........"+this.name);//消費烤鴨1
flag = false;
notifyAll();//通知生産者生産烤鴨
}
}123456789101112131415161718192021222324252627282930313233343536
如上代碼,在調用notify()或者 notifyAll()方法時,由于等待隊列中同時存在生産者線程和消費者線程,是以我們并不能保證被喚醒的到底是消費者線程還是生産者線程,而Codition則可以避免這種情況。嗯,了解完Condition的使用方式後,下面我們将進一步探讨Condition背後的實作機制
Condition的實作原理
Condition的具體實作類是AQS的内部類ConditionObject,前面我們分析過AQS中存在兩種隊列,一種是同步隊列,一種是等待隊列,而等待隊列就相對于Condition而言的。注意在使用Condition前必須獲得鎖,同時在Condition的等待隊列上的結點與前面同步隊列的結點是同一個類即Node,其結點的waitStatus的值為CONDITION。在實作類ConditionObject中有兩個結點分别是firstWaiter和lastWaiter,firstWaiter代表等待隊列第一個等待結點,lastWaiter代表等待隊列最後一個等待結點,如下
public class ConditionObject implements Condition, java.io.Serializable {
//等待隊列第一個等待結點
private transient Node firstWaiter;
//等待隊列最後一個等待結點
private transient Node lastWaiter;
//省略其他代碼.......
}1234567
每個Condition都對應着一個等待隊列,也就是說如果一個鎖上建立了多個Condition對象,那麼也就存在多個等待隊列。等待隊列是一個FIFO的隊列,在隊列中每一個節點都包含了一個線程的引用,而該線程就是Condition對象上等待的線程。當一個線程調用了await()相關的方法,那麼該線程将會釋放鎖,并建構一個Node節點封裝目前線程的相關資訊加入到等待隊列中進行等待,直到被喚醒、中斷、逾時才從隊列中移出。Condition中的等待隊列模型如下
正如圖所示,Node節點的資料結構,在等待隊列中使用的變量與同步隊列是不同的,Condtion中等待隊列的結點隻有直接指向的後繼結點并沒有指明前驅結點,而且使用的變量是nextWaiter而不是next,這點我們在前面分析結點Node的資料結構時講過。firstWaiter指向等待隊列的頭結點,lastWaiter指向等待隊列的尾結點,等待隊列中結點的狀态隻有兩種即CANCELLED和CONDITION,前者表示線程已結束需要從等待隊列中移除,後者表示條件結點等待被喚醒。再次強調每個Codition對象對于一個等待隊列,也就是說AQS中隻能存在一個同步隊列,但可擁有多個等待隊列。下面從代碼層面看看被調用await()方法(其他await()實作原理類似)的線程是如何加入等待隊列的,而又是如何從等待隊列中被喚醒的
public final void await() throws InterruptedException {
//判斷線程是否被中斷
if (Thread.interrupted())
throw new InterruptedException();
//建立新結點加入等待隊列并傳回
Node node = addConditionWaiter();
//釋放目前線程鎖即釋放同步狀态
int savedState = fullyRelease(node);
int interruptMode = 0;
//判斷結點是否同步隊列(SyncQueue)中,即是否被喚醒
while (!isOnSyncQueue(node)) {
//挂起線程
LockSupport.park(this);
//判斷是否被中斷喚醒,如果是退出循環。
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//被喚醒後執行自旋操作争取獲得鎖,同時判斷線程是否被中斷
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// clean up if cancelled
if (node.nextWaiter != null)
//清理等待隊列中不為CONDITION狀态的結點
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}123456789101112131415161718192021222324252627
執行
addConditionWaiter()
添加到等待隊列。
private Node addConditionWaiter() {
Node t = lastWaiter;
// 判斷是否為結束狀态的結點并移除
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//建立新結點狀态為CONDITION
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//加入等待隊列
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}1234567891011121314151617
await()
方法主要做了3件事,一是調用
addConditionWaiter()
方法将目前線程封裝成node結點加入等待隊列,二是調用
fullyRelease(node)
方法釋放同步狀态并喚醒後繼結點的線程。三是調用
isOnSyncQueue(node)
方法判斷結點是否在同步隊列中,注意是個while循環,如果同步隊列中沒有該結點就直接挂起該線程,需要明白的是如果線程被喚醒後就調用
acquireQueued(node, savedState)
執行自旋操作争取鎖,即目前線程結點從等待隊列轉移到同步隊列并開始努力擷取鎖。
接着看看喚醒操作singal()方法
public final void signal() {
//判斷是否持有獨占鎖,如果不是抛出異常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
//喚醒等待隊列第一個結點的線程
if (first != null)
doSignal(first);
}123456789
這裡
signal()
方法做了兩件事,一是判斷目前線程是否持有獨占鎖,沒有就抛出異常,從這點也可以看出隻有獨占模式先采用等待隊列,而共享模式下是沒有等待隊列的,也就沒法使用Condition。二是喚醒等待隊列的第一個結點,即執行
doSignal(first)
private void doSignal(Node first) {
do {
//移除條件等待隊列中的第一個結點,
//如果後繼結點為null,那麼說沒有其他結點将尾結點也設定為null
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
//如果被通知節點沒有進入到同步隊列并且條件等待隊列還有不為空的節點,則繼續循環通知後續結點
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
//transferForSignal方法
final boolean transferForSignal(Node node) {
//嘗試設定喚醒結點的waitStatus為0,即初始化狀态
//如果設定失敗,說明當期結點node的waitStatus已不為
//CONDITION狀态,那麼隻能是結束狀态了,是以傳回false
//傳回doSignal()方法中繼續喚醒其他結點的線程,注意這裡并
//不涉及并發問題,是以CAS操作失敗隻可能是預期值不為CONDITION,
//而不是多線程設定導緻預期值變化,畢竟操作該方法的線程是持有鎖的。
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//加入同步隊列并傳回前驅結點p
Node p = enq(node);
int ws = p.waitStatus;
//判斷前驅結點是否為結束結點(CANCELLED=1)或者在設定
//前驅節點狀态為Node.SIGNAL狀态失敗時,喚醒被通知節點代表的線程
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
//喚醒node結點的線程
LockSupport.unpark(node.thread);
return true;
}123456789101112131415161718192021222324252627282930313233
注釋說得很明白了,這裡我們簡單整體說明一下,
doSignal(first)
方法中做了兩件事,從條件等待隊列移除被喚醒的節點,然後重新維護條件等待隊列的firstWaiter和lastWaiter的指向。二是将從等待隊列移除的結點加入同步隊列(在
transferForSignal()
方法中完成的),如果進入到同步隊列失敗并且條件等待隊列還有不為空的節點,則繼續循環喚醒後續其他結點的線程。到此整個
signal()
的喚醒過程就很清晰了,即
signal()
被調用後,先判斷目前線程是否持有獨占鎖,如果有,那麼喚醒目前Condition對象中等待隊列的第一個結點的線程,并從等待隊列中移除該結點,移動到同步隊列中,如果加入同步隊列失敗,那麼繼續循環喚醒等待隊列中的其他結點的線程,如果成功加入同步隊列,那麼如果其前驅結點是否已結束或者設定前驅節點狀态為Node.SIGNAL狀态失敗,則通過
LockSupport.unpark()
喚醒被通知節點代表的線程,到此
signal()
任務完成,注意被喚醒後的線程,将從前面的
await()
方法中的while循環中退出,因為此時該線程的結點已在同步隊列中,那麼
while (!isOnSyncQueue(node))
将不在符合循環條件,進而調用AQS的
acquireQueued()
方法加入擷取同步狀态的競争中,這就是等待喚醒機制的整個流程實作原理,流程如下圖所示(注意無論是同步隊列還是等待隊列使用的Node資料結構都是同一個,不過是使用的内部變量不同罷了)
ok~,本篇先到這,關于AQS中的另一種模式即共享模式,下篇再詳聊,歡迎繼續關注。