CAS
CAS (compareAndSwap),中文叫比較交換,一種無鎖原子算法。過程是這樣:它包含 3 個參數 CAS(V,E,N),V表示要更新變量的值,E表示預期值,N表示新值。僅當 V值等于E值時,才會将V的值設為N,如果V值和E值不同,則說明已經有其他線程做兩個更新,則目前線程則什麼都不做。最後,CAS 傳回目前V的真實值。CAS 操作時抱着樂觀的态度進行的,它總是認為自己可以成功完成操作。
java.util.concurrent.atomic中的AtomicXXX,都使用了這些底層的JVM支援為數字類型的引用類型提供一種高效的CAS操作,而在java.util.concurrent中的大多數類在實作時都直接或間接的使用了這些原子變量類,這些原子變量都調用了 sun.misc.Unsafe 類庫裡面的 CAS算法,用CPU指令來實作無鎖自增。
CAS的全稱為Compare And Swap,直譯就是比較交換。是一條CPU的原子指令,其作用是讓CPU先進行比較兩個值是否相等,然後原子地更新某個位置的值,其實作方式是基于硬體平台的彙編指令,在intel的CPU中,使用的是cmpxchg指令,就是說CAS是靠硬體實作的,進而在硬體層面提升效率。如圖:
當多個線程同時使用CAS 操作一個變量時,隻有一個會勝出,并成功更新,其餘均會失敗。失敗的線程不會挂起,僅是被告知失敗,并且允許再次嘗試,當然也允許實作的線程放棄操作。基于這樣的原理,CAS 操作即使沒有鎖,也可以發現其他線程對目前線程的幹擾。
與鎖相比,使用CAS會使程式看起來更加複雜一些,但由于其非阻塞的,它對死鎖問題天生免疫,并且,線程間的互相影響也非常小。更為重要的是,使用無鎖的方式完全沒有鎖競争帶來的系統開銷,也沒有線程間頻繁排程帶來的開銷,是以,他要比基于鎖的方式擁有更優越的性能。
簡單的說,CAS 需要你額外給出一個期望值,也就是你認為這個變量現在應該是什麼樣子的。如果變量不是你想象的那樣,哪說明它已經被别人修改過了。你就需要重新讀取,再次嘗試修改就好了。
CAS底層原理
這樣歸功于硬體指令集的發展,實際上,我們可以使用同步将這兩個操作變成原子的,但是這麼做就沒有意義了。是以我們隻能靠硬體來完成,硬體保證一個從語義上看起來需要多次操作的行為隻通過一條處理器指令就能完成。這類指令常用的有:
- 測試并設定(Tetst-and-Set)
- 擷取并增加(Fetch-and-Increment)
- 交換(Swap)
- 比較并交換(Compare-and-Swap)
- 加載連結/條件存儲(Load-Linked/Store-Conditional)
CPU 實作原子指令有2種方式:
-
通過總線鎖定來保證原子性
總線鎖定其實就是處理器使用了總線鎖,所謂總線鎖就是使用處理器提供的一個 LOCK# 信号,當一個處理器在總線上輸出此信号時,其他處理器的請求将被阻塞住,那麼該處理器可以獨占共享記憶體。但是該方法成本太大。是以有了下面的方式。
-
通過緩存鎖定來保證原子性
所謂 緩存鎖定 是指記憶體區域如果被緩存在處理器的緩存行中,并且在Lock 操作期間被鎖定,那麼當他執行鎖操作寫回到記憶體時,處理器不在總線上聲言 LOCK# 信号,而是修改内部的記憶體位址,并允許他的緩存一緻性機制來保證操作的原子性,因為緩存一緻性機制會阻止同時修改兩個以上處理器緩存的記憶體區域資料(這裡和 volatile 的可見性原理相同),當其他處理器回寫已被鎖定的緩存行的資料時,會使緩存行無效。
注意:有兩種情況下處理器不會使用緩存鎖定:
- 當操作的資料不能被緩存在處理器内部,或操作的資料跨多個緩存行時,則處理器會調用總線鎖定。
- 有些處理器不支援緩存鎖定,對于 Intel 486 和 Pentium 處理器,就是鎖定的記憶體區域在處理器的緩存行也會調用總線鎖定。
CAS舉例
public class CASDemo {
private static AtomicInteger i=new AtomicInteger();
private static volatile int m;
public static int increase() {
return i.incrementAndGet();
}
public static int increase01() {
return ++m;
}
}
AtomicInteger類部分源碼如下:Unsafe類中含有許多native方法,調用jvm的cpp檔案(C++編寫的),再調用asm檔案(彙編語言編寫的),就完成了CPU指令的調用。CAS實作了修改資料的原子性(AtomicInteger實作了對int類型的CAS),volatile實作了可見性豈不完美!!!
從cpp進入asm檔案:
CAS的缺點
循環時間太長
如果CAS一直不成功呢?這種情況絕對有可能發生,如果自旋CAS長時間地不成功,則會給CPU帶來非常大的開銷。在JUC中有些地方就限制了CAS自旋的次數,例如BlockingQueue的SynchronousQueue。
ABA問題
CAS需要檢查操作值有沒有發生改變,如果沒有發生改變則更新。但是存在這樣一種情況:如果一個值原來是A,變成了B,然後又變成了A,那麼在CAS檢查的時候會發現沒有改變,但是實質上它已經發生了改變,這就是所謂的ABA問題。對于ABA問題其解決方案是加上版本号,即在每個變量都加上一個版本号,每次改變時加1,即A —> B —> A,變成1A —> 2B —> 3A。
JUC包中也有解決了ABA問題的CAS類,比如AtomicStampedReference:
AQS
AQS是AbstractQueuedSynchronizer的簡稱。AQS中有一個CLH隊列,這個隊列維護着等待擷取資源state的線程,虛線表示自旋,以待擷取資源。
CLH隊列維護了一個volatile int state(代表共享資源入口,為0表示可擷取資源,這與對象頭中的monitor類似)和一個FIFO線程等待隊列(多線程争用資源被阻塞時會進入此隊列)。
通路和修改state的方法有三種:
- getState()
- setState()
- compareAndSetState()//利用CAS修改state
AQS定義了兩種資源共享方式:Exclusive(獨占,隻有一個線程能執行,如ReentrantLock)和Share(共享,多個線程可同時執行,如Semaphore/CountDownLatch)。
不同的自定義同步器争用共享資源的方式也不同。自定義同步器在實作時隻需要實作共享資源state的擷取與釋放方式即可,至于具體線程等待隊列的維護(如擷取資源失敗入隊/喚醒出隊等),AQS已經在頂層實作好了。自定義同步器實作時主要實作以下幾種方法:
- isHeldExclusively():該線程是否正在獨占資源。隻有用到condition才需要去實作它。
- tryAcquire(int):獨占方式。嘗試擷取資源,成功則傳回true,失敗則傳回false。
- tryRelease(int):獨占方式。嘗試釋放資源,成功則傳回true,失敗則傳回false。
- tryAcquireShared(int):共享方式。嘗試擷取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
- tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放後允許喚醒後續等待結點傳回true,否則傳回false。
以ReentrantLock為例,state初始化為0,表示未鎖定狀态。A線程lock()時,會調用tryAcquire()獨占該鎖并将state+1。此後,其他線程再tryAcquire()時就會失敗,直到A線程unlock()到state=0(即釋放鎖)為止,其它線程才有機會擷取該鎖。當然,釋放鎖之前,A線程自己是可以重複擷取此鎖的(state會累加),這就是可重入的概念。但要注意,擷取多少次就要釋放多麼次,這樣才能保證state是能回到零态的。
再以CountDownLatch以例,任務分為N個子線程去執行,state也初始化為N(注意N要與線程個數一緻)。這N個子線程是并行執行的,每個子線程執行完後countDown()一次,state會CAS減1。等到所有子線程都執行完後(即state=0),會unpark()主調用線程,然後主調用線程就會從await()函數傳回,繼續後餘動作。
一般來說,自定義同步器要麼是獨占方法,要麼是共享方式,他們也隻需實作tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一種即可。但AQS也支援自定義同步器同時實作獨占和共享兩種方式,如ReentrantReadWriteLock。
源碼分析
acquire(int)
以獨占模式擷取資源,忽略中斷interrupt()。這個方法應該被lock()調用。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
函數流程如下:
- tryAcquire()嘗試直接去擷取資源,如果成功則直接傳回;
- addWaiter()将該線程加入等待隊列的尾部,并标記為獨占模式;
- acquireQueued()使線程在等待隊列中擷取資源,一直擷取到資源後才傳回。如果在整個等待過程中被中斷過,則傳回true,否則傳回false。
- 如果線程在等待過程中被中斷過,它是不響應的。隻是擷取資源後才再進行自我中斷selfInterrupt(),将中斷補上。
tryAcquire(int)
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
-
這個方法為什麼會抛異常呢?
因為這個方法需要你重寫自己去實作對state的CAS操作。
-
那既然這樣為什麼聲明為abstract方法呢?
因為獨占模式下隻用實作tryAcquire-tryRelease,而共享模式下隻用實作tryAcquireShared-tryReleaseShared。如果都定義成abstract,那麼每個模式也要去實作另一模式下的接口,就更麻煩了。
addWaiter(Node)
此方法用于将目前線程加入到等待隊列的隊尾,并傳回目前線程所在的結點。
//mode隻能傳Node.EXCLUSIVE 和 exclusive, Node.SHARED
private Node addWaiter(Node mode) {
//以給定模式構造結點。mode有兩種:EXCLUSIVE(獨占)和SHARED(共享)
Node node = new Node(Thread.currentThread(), mode);
//嘗試快速方式直接放到隊尾。
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//上一步失敗則通過enq入隊。
enq(node);
return node;
}
這裡我們說下Node。Node結點是對每一個通路同步代碼的線程的封裝,其包含了需要同步的線程本身以及線程的狀态,如是否被阻塞,是否等待喚醒,是否已經被取消等。變量waitStatus則表示目前被封裝成Node結點的等待狀态,共有4種取值CANCELLED、SIGNAL、CONDITION、PROPAGATE。
- CANCELLED:值為1,在同步隊列中等待的線程等待逾時或被中斷,需要從同步隊列中取消該Node的結點,其結點的waitStatus為CANCELLED,即結束狀态,進入該狀态後的結點将不會再變化。
- SIGNAL:值為-1,被辨別為該等待喚醒狀态的後繼結點,當其前繼結點的線程釋放了同步鎖或被取消,将會通知該後繼結點的線程執行。說白了,就是處于喚醒狀态,隻要前繼結點釋放鎖,就會通知辨別為SIGNAL狀态的後繼結點的線程執行。
- CONDITION:值為-2,與Condition相關,該辨別的結點處于等待隊列中,結點的線程等待在Condition上,當其他線程調用了Condition的signal()方法後,CONDITION狀态的結點将從等待隊列轉移到同步隊列中,等待擷取同步鎖。
- PROPAGATE:值為-3,與共享模式相關,在共享模式中,該狀态辨別結點的線程處于可運作狀态。
0狀态:值為0,代表初始化狀态。
AQS在判斷狀态時,通過用Node中的waitStatus>0表示取消狀态,而waitStatus<0表示有效狀态。
enq(Node)
此方法用于将node加入隊尾。
private Node enq(final Node node) {
//CAS"自旋",直到成功加入隊尾
for (;;) {
Node t = tail;
// 隊列為空,建立一個空的标志結點作為head結點,并将tail也指向它。
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {//正常流程,放入隊尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued(Node, int)
通過tryAcquire()和addWaiter(),該線程擷取資源失敗,已經被放入等待隊列尾部了,這時就應該進入等待狀态休息,直到其他線程徹底釋放資源後喚醒自己,自己再拿到資源,這就是acquireQueued要幹的事。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;//标記是否成功拿到資源
try {
boolean interrupted = false;//标記等待過程中是否被中斷過
//又是一個“自旋”!
for (;;) {
final Node p = node.predecessor();//拿到前驅
//如果前驅是head,即該結點已成老二,那麼便有資格去嘗試擷取資源(可能是老大釋放完資源喚醒自己的,當然也可能被interrupt了)。
if (p == head && tryAcquire(arg)) {
setHead(node);//拿到資源後,将head指向該結點。是以head所指的标杆結點,就是目前擷取到資源的那個結點或null。
p.next = null; // setHead中node.prev已置為null,此處再将head.next置為null,就是為了友善GC回收以前的head結點。也就意味着之前拿完資源的結點出隊了!
failed = false;
return interrupted;//傳回等待過程中是否被中斷過
}
//如果自己可以休息了,就進入waiting狀态,直到被unpark()
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;//如果等待過程中被中斷過,哪怕隻有那麼一次,就将interrupted标記為true
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire(Node, Node)
此方法主要用于檢查狀态,看看自己是否真的可以去休息了(進入線程的waiting狀态),整個流程中,如果前驅結點的狀态不是SIGNAL,那麼自己就不能安心去休息,需要去找個安心的休息點,同時可以再嘗試下看有沒有機會輪到自己拿号。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;//拿到前驅的狀态
if (ws == Node.SIGNAL)
//如果已經告訴前驅拿完号後通知自己一下,那就可以安心休息了
return true;
if (ws > 0) {
/*
* 如果前驅放棄了,那就一直往前找,直到找到最近一個正常等待的狀态,并排在它的後邊。
* 注意:那些放棄的結點,由于被自己“加塞”到它們前邊,它們相當于形成一個無引用鍊,稍後就會被保安大叔趕走了(GC回收)!
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//如果前驅正常,那就把前驅的狀态設定成SIGNAL,告訴它拿完号後通知自己一下。有可能失敗,人家說不定剛剛釋放完呢!
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt()
如果線程找好安全休息點後,那就可以安心去休息了。此方法就是讓線程去休息,真正進入等待狀态。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//調用park()使線程進入waiting狀态
return Thread.interrupted();//如果被喚醒,檢視自己是不是被中斷的。
}
park()會讓目前線程進入waiting狀态。在此狀态下,有兩種途徑可以喚醒該線程:1)被unpark();2)被interrupt()。需要注意的是,Thread.interrupted()會清除目前線程的中斷标記位。
小結
acquireQueued()的具體流程如下:
- 結點進入隊尾後,檢查狀态,找到安全休息點;
- 調用park()進入waiting狀态,等待unpark()或interrupt()喚醒自己;
- 被喚醒後,看自己是不是有資格能拿到号。如果拿到,head指向目前結點,并傳回從入隊到拿到号的整個過程中是否被中斷過;如果沒拿到,繼續流程1。
acquire()的具體流程如下:
- 調用自定義同步器的tryAcquire()嘗試直接去擷取資源,如果成功則直接傳回;
- 沒成功,則addWaiter()将該線程加入等待隊列的尾部,并标記為獨占模式;
- acquireQueued()使線程在等待隊列中休息,有機會時(輪到自己,會被unpark())會去嘗試擷取資源。擷取到資源後才傳回。如果在整個等待過程中被中斷過,則傳回true,否則傳回false。
- 如果線程在等待過程中被中斷過,它是不響應的。隻是擷取資源後才再進行自我中斷selfInterrupt(),将中斷補上。
release(int)
獨享模式下釋放資源,應被unlock()調用。如果最終釋放完會state==0,。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
邏輯并不複雜。它調用tryRelease()來釋放資源。有一點需要注意的是,它是根據tryRelease()的傳回值來判斷該線程是否已經完成釋放掉資源了!是以自定義同步器在設計tryRelease()的時候要明确這一點!!
tryRelease(arg)
與tryAqcuire一樣,需要自己重寫。
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
unparkSuccessor(Node)
private void unparkSuccessor(Node node) {
//這裡,node一般為目前線程所在的結點。
int ws = node.waitStatus;
if (ws < 0)//置零目前線程所在的結點狀态,允許失敗。
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;//找到下一個需要喚醒的結點s
if (s == null || s.waitStatus > 0) {//如果為空或已取消
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)//從這裡可以看出,<=0的結點,都是還有效的結點。
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//喚醒
}
這個函數并不複雜。一句話概括:用unpark()喚醒等待隊列中最前邊的那個未放棄線程,這裡我們也用s來表示吧。此時,再和acquireQueued()聯系起來,s被喚醒後,進入if (p == head && tryAcquire(arg))的判斷(即使p!=head也沒關系,它會再進入shouldParkAfterFailedAcquire()尋找一個安全點。這裡既然s已經是等待隊列中最前邊的那個未放棄線程了,那麼通過shouldParkAfterFailedAcquire()的調整,s也必然會跑到head的next結點,下一次自旋p==head就成立啦),然後s把自己設定成head标杆結點,表示自己已經擷取到資源了,acquire()也傳回了。
acquireShared(int)
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
acquireShared()的流程就是:
- tryAcquireShared()嘗試擷取資源,成功則直接傳回;
- 失敗則通過doAcquireShared()進入等待隊列,直到擷取到資源為止才傳回。
doAcquireShared(int)
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);//加入隊列尾部
boolean failed = true;//是否成功标志
try {
boolean interrupted = false;//等待過程中是否被中斷過的标志
for (;;) {
final Node p = node.predecessor();//前驅
if (p == head) {//如果到head的下一個,因為head是拿到資源的線程,此時node被喚醒,很可能是head用完資源來喚醒自己的
int r = tryAcquireShared(arg);//嘗試擷取資源
if (r >= 0) {//成功
setHeadAndPropagate(node, r);//将head指向自己,還有剩餘資源可以再喚醒之後的線程
p.next = null; // help GC
if (interrupted)//如果等待過程中被打斷過,此時将中斷補上。
selfInterrupt();
failed = false;
return;
}
}
//判斷狀态,尋找安全點,進入waiting狀态,等着被unpark()或interrupt()
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
setHeadAndPropagate(Node, int)
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);//head指向自己
//如果還有剩餘量,繼續喚醒下一個鄰居線程
if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
小結
acquireShared()的流程:
- tryAcquireShared()嘗試擷取資源,成功則直接傳回;
- 失敗則通過doAcquireShared()進入等待隊列park(),直到被unpark()/interrupt()并成功擷取到資源才傳回。整個等待過程也是忽略中斷的。
其實跟acquire()的流程大同小異,隻不過多了個自己拿到資源後,還會去喚醒後繼隊友的操作(這才是共享嘛)。
releaseShared()
釋放指定量的資源,如果成功釋放且允許喚醒等待線程,它會喚醒等待隊列裡的其他線程來擷取資源。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//嘗試釋放資源
doReleaseShared();//喚醒後繼結點
return true;
}
return false;
}
此方法的流程也比較簡單,一句話:釋放掉資源後,喚醒後繼。跟獨占模式下的release()相似,但有一點稍微需要注意:獨占模式下的tryRelease()在完全釋放掉資源(state=0)後,才會傳回true去喚醒其他線程,這主要是基于獨占下可重入的考量;而共享模式下的releaseShared()則沒有這種要求,共享模式實質就是控制一定量的線程并發執行,那麼擁有資源的線程在釋放掉部分資源時就可以喚醒後繼等待結點。例如,資源總量是13,A(5)和B(7)分别擷取到資源并發運作,C(4)來時隻剩1個資源就需要等待。A在運作過程中釋放掉2個資源量,然後tryReleaseShared(2)傳回true喚醒C,C一看隻有3個仍不夠繼續等待;随後B又釋放2個,tryReleaseShared(2)傳回true喚醒C,C一看有5個夠自己用了,然後C就可以跟A和B一起運作。而ReentrantReadWriteLock讀鎖的tryReleaseShared()隻有在完全釋放掉資源(state=0)才傳回true,是以自定義同步器可以根據需要決定tryReleaseShared()的傳回值。
doReleaseShared()
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);//喚醒後繼
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)// head發生變化
break;
}
}
總結:
CAS可以保證一次的讀-改-寫操作是原子操作,在單處理器上該操作容易實作,但是在多處理器上實作就有點兒複雜了。
緩存加鎖:其實針對于上面那種情況我們隻需要保證在同一時刻對某個記憶體位址的操作是原子性的即可。緩存加鎖就是緩存在記憶體區域的資料如果在加鎖期間,當它執行鎖操作寫回記憶體時,處理器不在輸出LOCK#信号,而是修改内部的記憶體位址,利用緩存一緻性協定來保證原子性。緩存一緻性機制可以保證同一個記憶體區域的資料僅能被一個處理器修改,也就是說當CPU1修改緩存行中的i時使用緩存鎖定,那麼CPU2就不能同時緩存了i的緩存行。
CAS有ABA問題和自旋問題,但JUC包中有解決這些問題的實作類。
AQS中有一個維護擷取資源入口state的CHL隊列,擷取不到資源的線程會在隊列中自旋等待。它有兩種模式:獨占模式、共享模式。獨占模式是單個線程擷取資源,重寫tryAcquire和release方法即可,根據自己重寫可以實作可重入的功能。共享模式相當于有多個資源,線程可以擷取若幹個資源,隊列中的每一個線程如果擷取足夠多的資源就執行,需要重寫acquireShared和releaseShared方法。
JUC包中也提供了一種實作:ReentrantLock(可重入鎖)