關于java中的AQS鎖機制的了解和使用
沉澱再出發:關于java中的AQS了解
一、前言
在java中有很多鎖結構都繼承自AQS(AbstractQueuedSynchronizer)這個抽象類如果我們仔細了解可以發現AQS的作用是非常大的,但是AQS的底層其實也是使用了大量的CAS,是以我們可以看到CAS的重要性了,但是CAS也是有缺陷的,但是在大部分使用的情況下,我們往往忽略了這種缺陷。
二、AQS的認識
2.1、AQS的基本概念
AQS(AbstractQueuedSynchronizer)就是抽象的隊列式的同步器,AQS定義了一套多線程通路共享資源的同步器架構,許多同步類實作都依賴于它,AQS是一個Java提供的底層同步工具類,用一個int類型的變量表示同步狀态,并提供了一系列的CAS操作來管理這個同步狀态。AQS的主要作用是為Java中的并發同步元件提供統一的底層支援,如常用的ReentrantLock/Semaphore/CountDownLatch等等就是基于AQS實作的,用法是通過繼承AQS實作其模版方法,然後将子類作為同步元件的内部類。
同步隊列是AQS很重要的組成部分,它是一個雙端隊列,遵循FIFO原則,主要作用是用來存放在鎖上阻塞的線程,當一個線程嘗試擷取鎖時,如果已經被占用,那麼目前線程就會被構造成一個Node節點加入到同步隊列的尾部,隊列的頭節點是成功擷取鎖的節點,當頭節點線程釋放鎖時,會喚醒後面的節點并釋放目前頭節點的引用。
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIn5GcukjNwETNzYjM50SNygTNxADN1EDOyATM4EDMy0yM4YzN1ETMvwFMxgTMwIzLcNDO2cTNxEzLcd2bsJ2Lc12bj5ycn9Gbi52YugTMwIzZtl2Lc9CX6MHc0RHaiojIsJye.png)
它維護了一個volatile int state(代表共享資源)和一個FIFO線程等待隊列(多線程争用資源被阻塞時會進入此隊列)。state的通路方式有三種:
1 getState()
2 setState()
3 compareAndSetState()
AQS定義兩種資源共享方式:Exclusive(獨占,隻有一個線程能執行,如ReentrantLock)和Share(共享,多個線程可同時執行,如Semaphore/CountDownLatch)。
不同的自定義同步器争用共享資源的方式也不同。自定義同步器在實作時隻需要實作共享資源state的擷取與釋放方式即可,至于具體線程等待隊列的維護(如擷取資源失敗入隊/喚醒出隊等),AQS已經在頂層實作好了。自定義同步器實作時主要實作以下幾種方法:
1 isHeldExclusively():該線程是否正在獨占資源。隻有用到condition才需要去實作它。
2 tryAcquire(int):獨占方式。嘗試擷取資源,成功則傳回true,失敗則傳回false。
3 tryRelease(int):獨占方式。嘗試釋放資源,成功則傳回true,失敗則傳回false。
4 tryAcquireShared(int):共享方式。嘗試擷取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
5 tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放後允許喚醒後續等待結點傳回true,否則傳回false
ReentrantLock:state初始化為0,表示未鎖定狀态。A線程lock()時,會調用tryAcquire()獨占該鎖并将state+1。此後,其他線程再tryAcquire()時就會失敗,直到A線程unlock()到state=0(即釋放鎖)為止,其它線程才有機會擷取該鎖。當然,釋放鎖之前,A線程自己是可以重複擷取此鎖的(state會累加),這就是可重入的概念。但要注意,擷取多少次就要釋放多麼次,這樣才能保證state是能回到零态的。
CountDownLatch:任務分為N個子線程去執行,state也初始化為N(注意N要與線程個數一緻)。這N個子線程是并行執行的,每個子線程執行完後countDown()一次,state會CAS減1。等到所有子線程都執行完後(即state=0),會unpark()主調用線程,然後主調用線程就會從await()函數傳回,繼續後餘動作。
一般來說,自定義同步器要麼是獨占方法,要麼是共享方式,他們也隻需實作tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一種即可。但AQS也支援自定義同步器同時實作獨占和共享兩種方式,如ReentrantReadWriteLock。
2.1.1、acquire(int)
此方法是獨占模式下線程擷取共享資源的頂層入口。如果擷取到資源,線程直接傳回,否則進入等待隊列,直到擷取到資源為止,且整個過程忽略中斷的影響。這也正是lock()的語義,當然不僅僅隻限于lock()。擷取到資源後,線程就可以去執行其臨界區代碼了。下面是acquire()的源碼:
1 public final void acquire(int arg) {
2 if (!tryAcquire(arg) &&
3 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
4 selfInterrupt();
5 }
tryAcquire()嘗試直接去擷取資源,如果成功則直接傳回;
addWaiter()将該線程加入等待隊列的尾部,并标記為獨占模式;
acquireQueued()使線程在等待隊列中擷取資源,一直擷取到資源後才傳回。如果在整個等待過程中被中斷過,則傳回true,否則傳回false。
如果線程在等待過程中被中斷過,它是不響應的。隻是擷取資源後才再進行自我中斷selfInterrupt(),将中斷補上。
tryAcquire(int)
此方法嘗試去擷取獨占資源。如果擷取成功,則直接傳回true,否則直接傳回false。如下是tryAcquire()的源碼:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
AQS隻是一個架構,具體資源的擷取/釋放方式交由自定義同步器去實作,AQS這裡隻定義了一個接口,具體資源的擷取交由自定義同步器去實作(通過state的get/set/CAS)。至于能不能重入,能不能阻塞,那就看具體的自定義同步器怎麼去設計了,當然,自定義同步器在進行資源通路時要考慮線程安全的影響。這裡沒有定義成abstract是因為獨占模式下隻用實作tryAcquire-tryRelease,而共享模式下隻用實作tryAcquireShared-tryReleaseShared。如果都定義成abstract,那麼每個模式也要去實作另一模式下的接口,這樣設計可以盡量減少不必要的工作量。
addWaiter(Node)
此方法用于将目前線程加入到等待隊列的隊尾,并傳回目前線程所在的結點。
private Node addWaiter(Node mode) {
//以給定模式構造結點。mode有兩種:EXCLUSIVE(獨占)和SHARED(共享)
Node node = new Node(Thread.currentThread(), mode);
//嘗試快速方式直接放到隊尾。
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//上一步失敗或者初次加入,則采用終極自旋方式保證一定加入隊尾
enq(node);
return node;
}
Node結點是對每一個通路同步代碼的線程的封裝,其包含了需要同步的線程本身以及線程的狀态,如是否被阻塞,是否等待喚醒,是否已經被取消等。變量waitStatus則表示目前被封裝成Node結點的等待狀态,共有4種取值CANCELLED、SIGNAL、CONDITION、PROPAGATE。
1 CANCELLED:值為1,在同步隊列中等待的線程等待逾時或被中斷,需要從同步隊列中取消該Node的結點,其結點的waitStatus為CANCELLED,即結束狀态,
進入該狀态後的結點将不會再變化。
2 SIGNAL:值為-1,被辨別為該等待喚醒狀态的後繼結點,當其前繼結點的線程釋放了同步鎖或被取消,将會通知該後繼結點的線程執行。說白了,就是處于喚醒狀态,
隻要前繼結點釋放鎖,就會通知辨別為SIGNAL狀态的後繼結點的線程執行。
3 CONDITION:值為-2,與Condition相關,該辨別的結點處于等待隊列中,結點的線程等待在Condition上,當其他線程調用了Condition的signal()方法後,
CONDITION狀态的結點将從等待隊列轉移到同步隊列中,等待擷取同步鎖。
4 PROPAGATE:值為-3,與共享模式相關,在共享模式中,該狀态辨別結點的線程處于可運作狀态。
5 0狀态:值為0,代表初始化狀态。
6 AQS在判斷狀态時,通過用waitStatus>0表示取消狀态,而waitStatus<0表示有效狀态。
enq(Node)
此方法用于将node加入隊尾,采用終極自旋方式保證一定加入隊尾。CAS自旋volatile變量,是一種很經典的用法。
private Node enq(final Node node) {
//CAS"自旋",直到成功加入隊尾
for (;;) {
Node t = tail;
if (t == null) { // 隊列為空,建立一個空的标志結點作為head結點,并将tail也指向它。
if (compareAndSetHead(new Node()))
tail = head;
} else {//正常流程,放入隊尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued(Node, int)
通過tryAcquire()和addWaiter(),該線程擷取資源失敗,已經被放入等待隊列尾部了。該線程下一部進入等待狀态休息,直到其他線程徹底釋放資源後喚醒自己,自己再拿到資源,然後就可以去幹自己想幹的事了。acquireQueued()就是幹這件事:在等待隊列中排隊拿号(中間沒其它事幹可以休息),直到拿到号後再傳回,這個函數非常關鍵。
1 final boolean acquireQueued(final Node node, int arg) {
2 boolean failed = true;//标記是否成功拿到資源
3 try {
4 boolean interrupted = false;//标記等待過程中是否被中斷過
5 //又是一個“自旋”!
6 for (;;) {
7 final Node p = node.predecessor();//拿到前驅
8 //如果前驅是head,即該結點已成老二,那麼便有資格去嘗試擷取資源(可能是老大釋放完資源喚醒自己的,當然也可能被interrupt了)。
9 if (p == head && tryAcquire(arg)) {
10 setHead(node);//拿到資源後,将head指向該結點。是以head所指的标杆結點,就是目前擷取到資源的那個結點或null。
11 p.next = null;
// setHead中node.prev已置為null,此處再将head.next置為null,就是為了友善GC回收以前的head結點。也就意味着之前拿完資源的結點出隊了!
12 failed = false;
13 return interrupted;//傳回等待過程中是否被中斷過
14 }
15
16 //如果自己可以休息了,就進入waiting狀态,直到被unpark()
17 if (shouldParkAfterFailedAcquire(p, node) &&
18 parkAndCheckInterrupt())
19 interrupted = true;//如果等待過程中被中斷過,哪怕隻有那麼一次,就将interrupted标記為true
20 }
21 } finally {
22 if (failed)
23 cancelAcquire(node);
24 }
25 }
到這裡了,我們先看看shouldParkAfterFailedAcquire()和parkAndCheckInterrupt()具體幹些什麼。
shouldParkAfterFailedAcquire(Node, Node)
此方法主要用于檢查狀态,看看自己是否真的可以去休息了,以免隊列前邊的線程都放棄了盲等。
1 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
2 int ws = pred.waitStatus;//拿到前驅的狀态
3 if (ws == Node.SIGNAL)
4 //如果已經告訴前驅拿完号後通知自己一下,那就可以安心休息了
5 return true;
6 if (ws > 0) {
7 /*
8 * 如果前驅放棄了,那就一直往前找,直到找到最近一個正常等待的狀态,并排在它的後邊。
9 * 注意:那些放棄的結點,由于被自己“加塞”到它們前邊,它們相當于形成一個無引用鍊,
10 * 稍後就會被GC回收
11 */
12 do {
13 node.prev = pred = pred.prev;
14 } while (pred.waitStatus > 0);
15 pred.next = node;
16 } else {
17 //如果前驅正常,那就把前驅的狀态設定成SIGNAL,告訴它拿完号後通知自己一下。
18 //有可能失敗,前驅說不定剛剛釋放完。
19 compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
20 }
21 return false;
22 }
整個流程中,如果前驅結點的狀态不是SIGNAL,那麼自己就不能安心去休息,需要去找個安心的休息點,同時可以再嘗試下看有沒有機會輪到自己拿号。
1 parkAndCheckInterrupt()
2 如果線程找好安全休息點後,那就可以安心去休息了。此方法就是讓線程去休息,真正進入等待狀态。
3 private final boolean parkAndCheckInterrupt() {
4 LockSupport.park(this);//調用park()使線程進入waiting狀态
5 return Thread.interrupted();//如果被喚醒,檢視自己是不是被中斷的。
6 }
park()會讓目前線程進入waiting狀态。在此狀态下,有兩種途徑可以喚醒該線程:1)被unpark();2)被interrupt()。需要注意的是,Thread.interrupted()會清除目前線程的中斷标記位。
至此,我們看一下前面的總函數就知道了整個流程了:
2.1.2、release(int)
這裡我們來講一下acquire()的反操作release()。此方法是獨占模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,如果徹底釋放(即state=0),則喚醒等待隊列裡的其他線程。
1 public final boolean release(int arg) {
2 if (tryRelease(arg)) {
3 Node h = head;//找到頭結點
4 if (h != null && h.waitStatus != 0)
5 unparkSuccessor(h);//喚醒等待隊列裡的下一個線程
6 return true;
7 }
8 return false;
9 }
邏輯并不複雜。調用tryRelease()來釋放資源。有一點需要注意的是,它是根據tryRelease()的傳回值來判斷該線程是否已經完成釋放掉資源了,是以自定義同步器在設計tryRelease()的時候要明确這一點。
tryRelease(int)
1 protected boolean tryRelease(int arg) {
2 throw new UnsupportedOperationException();
3 }
跟tryAcquire()一樣,這個方法是需要獨占模式的自定義同步器去實作。正常來說,tryRelease()都會成功的,因為這是獨占模式,該線程來釋放資源,那麼它肯定已經拿到獨占資源了,直接減掉相應量的資源即可(state-=arg),也不需要考慮線程安全的問題。但要注意它的傳回值,release()是根據tryRelease()的傳回值來判斷該線程是否已經完成釋放掉資源了,是以自義定同步器在實作時,如果已經徹底釋放資源(state=0),要傳回true,否則傳回false。
unparkSuccessor(Node)
此方法用于喚醒等待隊列中下一個線程。
1 private void unparkSuccessor(Node node) {
2 //這裡,node一般為目前線程所在的結點。
3 int ws = node.waitStatus;
4 if (ws < 0)//置零,目前線程所在的結點狀态,允許失敗。
5 compareAndSetWaitStatus(node, ws, 0);
6 Node s = node.next;//找到下一個需要喚醒的結點s
7 if (s == null || s.waitStatus > 0) {//如果為空或已取消
8 s = null;
9 for (Node t = tail; t != null && t != node; t = t.prev)
10 if (t.waitStatus <= 0)//從這裡可以看出,<=0的結點,都是還有效的結點。
11 s = t;
12 }
13 if (s != null)
14 LockSupport.unpark(s.thread);//喚醒
15 }
用unpark()喚醒等待隊列中最前邊的那個未放棄線程,這裡我們也用s來表示吧。此時,再和acquireQueued()聯系起來,s被喚醒後,進入if (p == head && tryAcquire(arg))的判斷(即使p!=head也沒關系,它會再進入shouldParkAfterFailedAcquire()尋找一個安全點。這裡既然s已經是等待隊列中最前邊的那個未放棄線程了,那麼通過shouldParkAfterFailedAcquire()的調整,s也必然會跑到head的next結點,下一次自旋p==head就成立啦),然後s把自己設定成head标杆結點,表示自己已經擷取到資源了,acquire()也傳回了。
release()是獨占模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,如果徹底釋放了(即state=0),它會喚醒等待隊列裡的其他線程來擷取資源。
同樣的讓我們再來看看對于共享鎖的情況下,資源的擷取和釋放。
2.1.3、acquireShared(int)
此方法是共享模式下線程擷取共享資源的頂層入口。它會擷取指定量的資源,擷取成功則直接傳回,擷取失敗則進入等待隊列,直到擷取到資源為止,整個過程忽略中斷。
1 public final void acquireShared(int arg) {
2 if (tryAcquireShared(arg) < 0)
3 doAcquireShared(arg);
4 }
5 }
這裡tryAcquireShared()依然需要自定義同步器去實作。但是AQS已經把其傳回值的語義定義好了:負值代表擷取失敗;0代表擷取成功,但沒有剩餘資源;正數表示擷取成功,還有剩餘資源,其他線程還可以去擷取。是以這裡acquireShared()的流程就是:
tryAcquireShared()嘗試擷取資源,成功則直接傳回;失敗則通過doAcquireShared()進入等待隊列,直到擷取到資源為止才傳回。
doAcquireShared(int)
此方法用于将目前線程加入等待隊列尾部休息,直到其他線程釋放資源喚醒自己,自己成功拿到相應量的資源後才傳回。
1 private void doAcquireShared(int arg) {
2 final Node node = addWaiter(Node.SHARED);//加入隊列尾部
3 boolean failed = true;//是否成功标志
4 try {
5 boolean interrupted = false;//等待過程中是否被中斷過的标志
6 for (;;) {
7 final Node p = node.predecessor();//前驅
8 if (p == head) {//如果到head的下一個,因為head是拿到資源的線程,此時node被喚醒,很可能是head用完資源來喚醒自己的
9 int r = tryAcquireShared(arg);//嘗試擷取資源
10 if (r >= 0) {//成功
11 setHeadAndPropagate(node, r);//将head指向自己,還有剩餘資源可以再喚醒之後的線程
12 p.next = null; // help GC
13 if (interrupted)//如果等待過程中被打斷過,此時将中斷補上
14 selfInterrupt();
15 failed = false;
16 return;
17 }
18 }
19 //判斷狀态,尋找安全點,進入waiting狀态,等着被unpark()或interrupt()
20 if (shouldParkAfterFailedAcquire(p, node) &&
21 parkAndCheckInterrupt())
22 interrupted = true;
23 }
24 }
25 } finally {
26 if (failed)
27 cancelAcquire(node);
28 }
29 }
其實和acquireQueued()流程并沒有太大差別。隻不過這裡将補中斷的selfInterrupt()放到doAcquireShared()裡了,而獨占模式是放到acquireQueued()之外。
跟獨占模式比,還有一點需要注意的是,這裡隻有線程是head.next時(“老二”),才會去嘗試擷取資源,有剩餘的話還會喚醒之後的隊友。那麼問題就來了,假如老大用完後釋放了5個資源,而老二需要6個,老三需要1個,老四需要2個。老大先喚醒老二,老二一看資源不夠,他是把資源讓給老三呢,還是不讓?答案是否定的!老二會繼續park()等待其他線程釋放資源,也更不會去喚醒老三和老四了。獨占模式,同一時刻隻有一個線程去執行,這樣做未嘗不可;但共享模式下,多個線程是可以同時執行的,現在因為老二的資源需求量大,而把後面量小的老三和老四也都卡住了。當然,這并不是問題,隻是AQS保證嚴格按照入隊順序喚醒罷了(保證公平,但降低了并發)。
setHeadAndPropagate(Node, int)
此方法在setHead()的基礎上多了一步,就是自己蘇醒的同時,如果條件符合(比如還有剩餘資源),還會去喚醒後繼結點,畢竟是共享模式。
1 private void setHeadAndPropagate(Node node, int propagate) {
2 Node h = head;
3 setHead(node);//head指向自己
4 //如果還有剩餘量,繼續喚醒下一個鄰居線程
5 if (propagate > 0 || h == null || h.waitStatus < 0) {
6 Node s = node.next;
7 if (s == null || s.isShared())
8 doReleaseShared();
9 }
10 }
2.1.4、releaseShared()
我們來看acquireShared()的反操作releaseShared(),此方法是共享模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,如果成功釋放且允許喚醒等待線程,它會喚醒等待隊列裡的其他線程來擷取資源。
1 public final boolean releaseShared(int arg) {
2 if (tryReleaseShared(arg)) {//嘗試釋放資源
3 doReleaseShared();//喚醒後繼結點
4 return true;
5 }
6 return false;
7 }
此方法的流程也比較簡單,釋放掉資源後,喚醒後繼。跟獨占模式下的release()相似,但是獨占模式下的tryRelease()在完全釋放掉資源(state=0)後,才會傳回true去喚醒其他線程,這主要是基于獨占下可重入的考量;而共享模式下的releaseShared()則沒有這種要求,共享模式實質就是控制一定量的線程并發執行,那麼擁有資源的線程在釋放掉部分資源時就可以喚醒後繼等待結點。例如資源總量是13,A(5)和B(7)分别擷取到資源并發運作,C(4)來時隻剩1個資源就需要等待。A在運作過程中釋放掉2個資源量,然後tryReleaseShared(2)傳回true喚醒C,C一看隻有3個仍不夠繼續等待;随後B又釋放2個,tryReleaseShared(2)傳回true喚醒C,C一看有5個夠自己用了,然後C就可以跟A和B一起運作。而ReentrantReadWriteLock讀鎖的tryReleaseShared()隻有在完全釋放掉資源(state=0)才傳回true,是以自定義同步器可以根據需要決定tryReleaseShared()的傳回值。
doReleaseShared()
此方法用于喚醒後繼。
1 private void doReleaseShared() {
2 for (;;) {
3 Node h = head;
4 if (h != null && h != tail) {
5 int ws = h.waitStatus;
6 if (ws == Node.SIGNAL) {
7 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
8 continue;
9 unparkSuccessor(h);//喚醒後繼
10 }
11 else if (ws == 0 &&
12 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
13 continue;
14 }
15 if (h == head)// head發生變化
16 break;
17 }
18 }
至此我們詳解了獨占和共享兩種模式下擷取-釋放資源(acquire-release、acquireShared-releaseShared)的源碼,值得注意的是,acquire()和acquireShared()兩種方法下,線程在等待隊列中都是忽略中斷的。AQS也支援響應中斷的,acquireInterruptibly()/acquireSharedInterruptibly()即是,這裡相應的源碼跟acquire()和acquireSahred()差不多。
2.3、獨占鎖和共享鎖在實作上的差別
獨占鎖的同步狀态值為1,即同一時刻隻能有一個線程成功擷取同步狀态。共享鎖的同步狀态>1,取值由上層同步元件确定。
獨占鎖隊列中頭節點運作完成後釋放它的直接後繼節點。共享鎖隊列中頭節點運作完成後釋放它後面的所有節點。
共享鎖中會出現多個線程(即同步隊列中的節點)同時成功擷取同步狀态的情況。
2.4、簡單使用
既然明白基本的操作機理,我們就可以實作自己的鎖機制了,比如mutex這種不可重入的互斥鎖。
1 class Mutex implements Lock, java.io.Serializable {
2 // 自定義同步器
3 private static class Sync extends AbstractQueuedSynchronizer {
4 // 判斷是否鎖定狀态
5 protected boolean isHeldExclusively() {
6 return getState() == 1;
7 }
8
9 // 嘗試擷取資源,立即傳回。成功則傳回true,否則false。
10 public boolean tryAcquire(int acquires) {
11 assert acquires == 1; // 這裡限定隻能為1個量
12 if (compareAndSetState(0, 1)) {//state為0才設定為1,不可重入!
13 setExclusiveOwnerThread(Thread.currentThread());//設定為目前線程獨占資源
14 return true;
15 }
16 return false;
17 }
18
19 // 嘗試釋放資源,立即傳回。成功則為true,否則false。
20 protected boolean tryRelease(int releases) {
21 assert releases == 1; // 限定為1個量
22 if (getState() == 0)//既然來釋放,那肯定就是已占有狀态了。隻是為了保險,多層判斷!
23 throw new IllegalMonitorStateException();
24 setExclusiveOwnerThread(null);
25 setState(0);//釋放資源,放棄占有狀态
26 return true;
27 }
28 }
29
30 // 真正同步類的實作都依賴繼承于AQS的自定義同步器!
31 private final Sync sync = new Sync();
32
33 //lock<-->acquire。兩者語義一樣:擷取資源,即便等待,直到成功才傳回。
34 public void lock() {
35 sync.acquire(1);
36 }
37
38 //tryLock<-->tryAcquire。兩者語義一樣:嘗試擷取資源,要求立即傳回。成功則為true,失敗則為false。
39 public boolean tryLock() {
40 return sync.tryAcquire(1);
41 }
42
43 //unlock<-->release。兩者語義一樣:釋放資源。
44 public void unlock() {
45 sync.release(1);
46 }
47
48 //鎖是否占有狀态
49 public boolean isLocked() {
50 return sync.isHeldExclusively();
51 }
52 }
同步類在實作時一般都将自定義同步器(sync)定義為内部類,供自己使用;而同步類自己(Mutex)則實作某個接口,對外服務。當然,接口的實作要直接依賴sync,它們在語義上也存在某種對應關系,而sync隻用實作資源state的擷取-釋放方式tryAcquire-tryRelelase,至于線程的排隊、等待、喚醒等,上層的AQS都已經實作好了,我們不用關心。
除了Mutex,ReentrantLock/CountDownLatch/Semphore這些同步類的實作方式都差不多,不同的地方就在擷取-釋放資源的方式tryAcquire-tryRelelase。掌握了這點,AQS的核心便被攻破了。
2.5、重入鎖
重入鎖指的是目前線成功擷取鎖後,如果再次通路該臨界區,則不會對自己産生互斥行為。Java中對ReentrantLock和synchronized都是可重入鎖,synchronized由jvm實作可重入機制,ReentrantLock的可重入性基于AQS實作。同時,ReentrantLock還提供公平鎖和非公平鎖兩種模式。
ReentrantLock重入鎖
重入鎖的基本原理是判斷上次擷取鎖的線程是否為目前線程,如果是則可再次進入臨界區,如果不是,則阻塞。重入鎖的最主要邏輯就鎖判斷上次擷取鎖的線程是否為目前線程。由于ReentrantLock是基于AQS實作的,底層通過操作同步狀态來擷取鎖,下面看一下非公平鎖的實作邏輯:
1 final boolean nonfairTryAcquire(int acquires) {
2 //擷取目前線程
3 final Thread current = Thread.currentThread();
4 //通過AQS擷取同步狀态
5 int c = getState();
6 //同步狀态為0,說明臨界區處于無鎖狀态,
7 if (c == 0) {
8 //修改同步狀态,即加鎖
9 if (compareAndSetState(0, acquires)) {
10 //将目前線程設定為鎖的owner
11 setExclusiveOwnerThread(current);
12 return true;
13 }
14 }
15 //如果臨界區處于鎖定狀态,且上次擷取鎖的線程為目前線程
16 else if (current == getExclusiveOwnerThread()) {
17 //則遞增同步狀态
18 int nextc = c + acquires;
19 if (nextc < 0) // overflow
20 throw new Error("Maximum lock count exceeded");
21 setState(nextc);
22 return true;
23 }
24 return false;
25 }
非公平鎖
非公平鎖是指當鎖狀态為可用時,不管在目前鎖上是否有其他線程在等待,新近線程都有機會搶占鎖。上述代碼即為非公平鎖和核心實作,可以看到隻要同步狀态為0,任何調用lock的線程都有可能擷取到鎖,而不是按照鎖請求的FIFO原則來進行的。
公平鎖
公平鎖是指當多個線程嘗試擷取鎖時,成功擷取鎖的順序與請求擷取鎖的順序相同,下面看一個ReentrantLock的實作:
1 protected final boolean tryAcquire(int acquires) {
2 final Thread current = Thread.currentThread();
3 int c = getState();
4 if (c == 0) {
5 //此處為公平鎖的核心,即判斷同步隊列中目前節點是否有前驅節點
6 if (!hasQueuedPredecessors() &&
7 compareAndSetState(0, acquires)) {
8 setExclusiveOwnerThread(current);
9 return true;
10 }
11 }
12 else if (current == getExclusiveOwnerThread()) {
13 int nextc = c + acquires;
14 if (nextc < 0)
15 throw new Error("Maximum lock count exceeded");
16 setState(nextc);
17 return true;
18 }
19 return false;
20 }
從上面的代碼中可以看出,公平鎖與非公平鎖的差別僅在于是否判斷目前節點是否存在前驅節點!hasQueuedPredecessors() ,由AQS可知,如果目前線程擷取鎖失敗就會被加入到AQS同步隊列中,那麼,如果同步隊列中的節點存在前驅節點,也就表明存線上程比目前節點線程更早的擷取鎖,故隻有等待前面的線程釋放鎖後才能擷取鎖。
2.6、讀寫鎖
Java提供了一個基于AQS到讀寫鎖實作ReentrantReadWriteLock,該讀寫鎖到實作原理是:将同步變量state按照高16位和低16位進行拆分,高16位表示讀鎖,低16位表示寫鎖。
1 一次隻有一個線程可以占有寫模式的讀寫鎖, 但是可以有多個線程同時占有讀模式的讀寫鎖. 正是因為這個特性:
2 當讀寫鎖是寫加鎖狀态時, 在這個鎖被解鎖之前, 所有試圖對這個鎖加鎖的線程都會被阻塞.
3 當讀寫鎖在讀加鎖狀态時, 所有試圖以讀模式對它進行加鎖的線程都可以得到通路權, 但是如果線程希望以寫模式對此鎖進行加鎖, 它必須直到所有的線程釋放鎖.
4 通常, 當讀寫鎖處于讀模式鎖住狀态時, 如果有另外線程試圖以寫模式加鎖, 讀寫鎖通常會阻塞随後的讀模式鎖請求, 這樣可以避免讀模式鎖長期占用, 而等待的寫模式鎖請求長期阻塞.
5 讀寫鎖适合于對資料結構的讀次數比寫次數多得多的情況. 因為, 讀模式鎖定時可以共享, 以寫模式鎖住時意味着獨占, 是以讀寫鎖又叫共享-獨占鎖.
2.6.1、寫鎖的擷取與釋放
寫鎖是一個獨占鎖,是以我們看一下ReentrantReadWriteLock中tryAcquire(arg)的實作:
寫鎖的擷取處理流程如下:
1 擷取同步狀态,并從中分離出低16為的寫鎖狀态
2 如果同步狀态不為0,說明存在讀鎖或寫鎖
3 如果存在讀鎖(c !=0 && w == 0),則不能擷取寫鎖(保證寫對讀的可見性)
4 如果目前線程不是上次擷取寫鎖的線程,則不能擷取寫鎖(寫鎖為獨占鎖)
5 如果以上判斷均通過,則在低16為寫鎖同步狀态上利用CAS進行修改(增加寫鎖同步狀态,實作可重入)
6 将目前線程設定為寫鎖的擷取線程
1 protected final boolean tryAcquire(int acquires) {
2 Thread current = Thread.currentThread();
3 int c = getState();
4 int w = exclusiveCount(c);
5 if (c != 0) {
6 if (w == 0 || current != getExclusiveOwnerThread())
7 return false;
8 if (w + exclusiveCount(acquires) > MAX_COUNT)
9 throw new Error("Maximum lock count exceeded");
10 // Reentrant acquire
11 setState(c + acquires);
12 return true;
13 }
14 if (writerShouldBlock() ||
15 !compareAndSetState(c, c + acquires))
16 return false;
17 setExclusiveOwnerThread(current);
18 return true;
19 }
寫鎖的釋放過程與獨占鎖基本相同:
1 protected final boolean tryRelease(int releases) {
2 if (!isHeldExclusively())
3 throw new IllegalMonitorStateException();
4 int nextc = getState() - releases;
5 boolean free = exclusiveCount(nextc) == 0;
6 if (free)
7 setExclusiveOwnerThread(null);
8 setState(nextc);
9 return free;
10 }
在釋放的過程中,不斷減少讀鎖同步狀态,當同步狀态為0時,寫鎖完全釋放。
2.6.2、讀鎖的擷取與釋放
讀鎖是一個共享鎖,擷取讀鎖的步驟如下:
1 擷取目前同步狀态
2 計算高16為讀鎖狀态+1後的值
3 如果大于能夠擷取到的讀鎖的最大值,則抛出異常
4 如果存在寫鎖并且目前線程不是寫鎖的擷取者,則擷取讀鎖失敗
5 如果上述判斷都通過,則利用CAS重新設定讀鎖的同步狀态
讀鎖的擷取步驟與寫鎖類似,即不斷的釋放寫鎖狀态,直到為0時,表示沒有線程擷取讀鎖。
三、總結
通過對java中的AQS鎖機制的剖析,我們了解了獨占和共享兩種基本的持有鎖的方式,并且分析了Mutex、可重入鎖、公平鎖、非公平鎖的實作,讀寫鎖等特殊的鎖。通過對這些所的了解,我們更加深刻地了解了AQS的本質,以及其中線程的狀态和活動軌迹。
參考文獻:https://www.cnblogs.com/waterystone/p/4920797.html
https://blog.csdn.net/zhangdong2012/article/details/79983404