寫在最前,本人也隻是個大三的學生,如果你發現任何我寫的不對的,請在評論中指出。
預設版本 JDK8
CAS
鎖是分兩種的,一種是悲觀鎖,另一種是樂觀鎖。
悲觀鎖
就是常說的鎖,像 Lock、synchronized 這種。悲觀鎖總是認為要通路的共享資源是要發生沖突的,是以每次都會在通路的時候加上鎖,以此保證每次通路共享資源的時候隻有一個線程。
樂觀鎖
是純粹的樂天派,它是認為要通路的共享資源是不會發生沖突的,無需加鎖也無需等待,自然能夠避免死鎖。而一旦多個線程發生沖突,樂觀鎖通常是使用一種稱為CAS的技術來保證線程執行的安全性。
CAS概念了解
CAS全拼是比較并交換(Compare And Swap)。在CAS中,有這樣三個值:
- V:要更新的變量 var
- E:期望的值 expected
- N:新值 new
整體過程如下: 首先判斷 V 是否等于 E,如果等于,将 V 的值設定為 N;如果不等,說明其他線程更新了該值,則目前線程放棄更新,什麼都不做。是以這裡的 E 本質是一個舊值。
可以舉例來說明:
- 假設一個線程想要将共享變量 i=5 更新為6
- 情況1是我們首先去判斷i此時是等于5的,說明沒有被其他線程所更改,那麼就會把值設定為6,此次 CAS 成功,i的值等于6
- 情況2是發生i的值不是期望的5,而是其他什麼值,那麼就會判定此次的 CAS 失敗,然後什麼也不做,i 的值保持原狀。
你可能會疑惑,那假如你判斷 i 等于5之後,正要修改,然後被其他線程更新了值怎麼辦?
我剛開始也有這種疑惑,實際上 CAS 操作是原子性的,它畢竟是 native 方法,是 JVM 底層或 C++ 實作的,它是一種系統原語,是一條 CPU 原子指令,從 CPU 層面保證了它的原子性。
CAS實作技術——Unsafe
Unsafe 是 sun.misc 包下的,
它裡面一些 native 是關于 CAS 的。Unsafe 中對 CAS 的實作是 C++ 寫的,它的具體實作和作業系統、CPU 都有關系。
用到 CAS 技術的可以參照一下各種原子類,比如:
AtomicInteger、AtomicBoolean、AtomicLong等等......
這裡就用 AtomicInteger 類的 getAndAdd( int delta ) 來說明:
/**
* Atomically adds the given value to the current value.
*
* @param delta the value to add
* @return the previous value
*/
// 可以看到 AtomicInteger 類實際上也是調用 Unsafe 類裡的 getAndAddInt 方法來實作功能的
public final int getAndAdd(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta);
}
@HotSpotIntrinsicCandidate
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
// 擷取舊值的操作
v = getIntVolatile(o, offset);
} while (!weakCompareAndSetInt(o, offset, v, v + delta));
return v;
}
我們來一步步解析這段源碼。首先,對象o是this,也就是一個AtomicInteger對象。然後offset是一個常量VALUE ➡
private static final long VALUE = U.objectFieldOffset(AtomicInteger.class, "value")。
因為CAS是無鎖的,它允許操作失敗,是以一般都是搭配while循環來進行的,這裡采用的是do while循環,是保證循環體内的代碼至少執行一次的。它這裡聲明了一個變量v, 如果你執行失敗那就傳回原值v, 如果執行成功那麼就是 v + delta。
這裡的 weakCompareAndSetInt 方法我并不是太清晰,是以采用百度上的說法:
簡單來說,weakCompareAndSet操作僅保留了volatile自身變量的特性,而除去了happens-before規則帶來的記憶體語義。也就是說,weakCompareAndSet無法保證處理操作目标的volatile變量外的其他變量的執行順序( 編譯器和處理器為了優化程式性能而對指令序列進行重新排序 ),同時也無法保證這些變量的可見性。這在一定程度上可以提高性能。
CAS不得不注意的問題
- ABA的問題: CAS是無法注意到: **假設一個共享變量從A到B又從B到A的變化的,**此時可以采用的操作就是加上版本号或者時間戳(AtomicStampedReference類可以解決)。
- 循環時間長: 因為CAS基本多與自旋配合,那麼長時間不成功,會導緻CPU資源浪費。推薦JVM支援處理器提供的pause指令。
- 隻能保證一個共享變量是原子操作的(AtomicReference類可以解決)。
AQS
AQS是AbstractQueuedSynchronizer的簡稱,翻譯過來就是
抽象隊列同步器
,它也可以拆分來了解:
- 抽象: 提供了一種定義的方式,但是不包括具體的實作。
- 隊列:使用了基于隊列的方式去實作。
- 同步:實作了同步的功能。
AQS是一系列并發流程控制類的基石,比如:
CountDownLatch / CyclicBarrier / Semaphore / ReentrantLock ReentrantReadWriteLock 等等。。。
同步器設計思路
在具體分析之前,需要了解兩種同步的方式,
獨占模式和共享模式:
- 獨占模式: 指的是目前資源僅供一個線程通路(ReentrantLock )
- 共享模式: 指的是同時可以被多個線程所擷取, 具體的資源的個數可以通過參數指定(CountDownLatch 、CyclicBarrier )。
以下是AQS的設計方案:
- 設定一個擁有可見性的變量
,使用這個變量表示被擷取的資源的數量int state = 0
- 線程在擷取資源之前需要檢查
的狀态,如果為0,就設定為1,表示擷取資源成功,如果不為0,則表示目前資源已經被占用,此時線程要阻塞并等待其他線程釋放資源。state
- 為了能使得資源釋放後找到那些被阻塞的線程,需要把這些線程放到一個FIFO的隊列内。
- 當占用資源的線程釋放資源後,就可以從隊列裡取出一個正在阻塞的線程并喚醒執行。
AQS的資料成員:
在AQS中被定義的成員變量:
-
作用就是第一條。state
state是被volatile修飾的變量, 保證了自身的可見性。 并且對它的操作都是
原子性的,state的通路方式有三種:
- getState()
- setState()
- compareAndSetState()
其中 compareAndSetState() 是借用了Unsafe的compareAndSwapInt() 方法實作的。
-
和head
,定義了上面第四條中的FIFO隊列,head和tail分别指向隊列的頭部和尾部,并且這是一個雙向隊列。tail
- 定義
,隊列中的每個元素都是Node
節點(相當重要)Node
static final class Node{
//标記一個結點(對應的線程)在共享模式下等待
static final Node SHARED = new Node();
// 标記一個結點(對應的線程)在獨占模式下等待
static final Node EXCLUSIVE = null;
// waitStatus的值,表示該結點(對應的線程)已被取消
static final int CANCELLED = 1;
//waitStatus的值,表示後繼結點(對應的線程)需要被喚醒
static final int SIGNAL = -1;
//waitStatus的值,表示該結點(對應的線程)在等待某一條件
static final int CONDITION = -2;
/*waitStatus的值,表示有資源可用,新head結點需要繼續喚醒後繼結點(共享模式下,多線程并發釋放資源,而head喚醒其後繼結點後,需要把多出來的資源留給後面的結點;設定新的head結點時,會繼續喚醒其後繼結點)*/
static final int PROPAGATE = -3;
// 等待狀态,取值範圍,-3,-2,-1,0,1
volatile int waitStatus;
volatile Node prev; // 前驅結點
volatile Node next; // 後繼結點
volatile Thread thread; // 結點對應的線程
Node nextWaiter; // 等待隊列裡下一個等待條件的結點
//成員方法忽略,可以參考具體的源碼
}
AQS主要方法
AQS的設計時基于模闆方法模式的,它有一些方法必須要子類去實作的,主要有:
- isHeldExclusively():該線程是否正在獨占資源。隻有用到condition才需要去實作它。
- tryAcquire(int):獨占方式。嘗試擷取資源,成功則傳回true,失敗則傳回false。
- tryRelease(int):獨占方式。嘗試釋放資源,成功則傳回true,失敗則傳回false。
- tryAcquireShared(int):共享方式。嘗試擷取資源。負數表示失敗;0表示成功,但是沒有剩餘資源;正數表示成功,且有剩餘資源。
- tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放後允許喚醒後續等待節點傳回true,否則傳回false。
這些方法都是
protected
方法,但是它們并沒有在AQS具體實作,而是直接抛出異常(這裡不适用抽象方法的目的是:避免強迫子類中把所有的抽象方法都實作一遍,減少無用功,這樣子類隻需要關注自己關心的抽象方法即可)。
AQS擷取資源
擷取資源的入口是
acquire(int arg)方法。
arg是要擷取資源的個數,在獨占模式下始終為1,可以浏覽一下這個函數的代碼:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
首先調用tryAcquire(arg)嘗試去擷取資源。前面提到了這個方法是在子類具體實作的。
如果擷取資源失敗,就通過addWaiter(Node.EXCLUSIVE)方法把這個線程插入到等待隊列中。其中傳入的參數代表要插入的Node是獨占式的。這個方法的具體實作:
// 為目前線程和給定模式建立和劃分節點
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 找到尾節點
Node pred = tail;
// 如果尾節點不為空
if (pred != null) {
// 将目前節點的前置節點置為尾節點
node.prev = pred;
// 使用CAS嘗試 如果成功就傳回
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果等待隊列為空或者上述CAS失敗,再自旋CAS插入
enq(node);
return node;
}
// final 不變性
// 自旋CAS将節點插入隊列,必要時初始化
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // 初始化
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
上面的兩個函數好了解,就是在隊列的尾部插入新的Node節點,但是需要注意的是會出現多個線程同時争奪資源的情況,是以肯定會出現多個線程同時插入節點的操作,在這裡是通過CAS自旋的方式保證了操作的線程安全性。
現在可以去看看
acquireQueued
方法,該方法就是把處于等待隊列的結點從頭到尾一個一個擷取:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自旋
for (;;) {
final Node p = node.predecessor();
// 如果node的前驅結點p是head,表示node是第二個結點,就可以嘗試去擷取資源了
if (p == head && tryAcquire(arg)) {
// 拿到資源後,将head指向該結點。
// 是以head所指的結點,就是目前擷取到資源的那個結點或null。
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果自己可以休息了,就進入waiting狀态,直到被unpark()
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
AQS釋放資源
釋放資源相對于擷取資源來說,會簡單許多。在AQS中自會有一小段實作。源碼如下:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
// 如果狀态是負數,嘗試把它設定為0
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 得到頭結點的後繼結點head.next
Node s = node.next;
// 如果這個後繼結點為空或者狀态大于0
// 通過前面的定義我們知道,大于0隻有一種可能,就是這個結點已被取消
if (s == null || s.waitStatus > 0) {
s = null;
// 等待隊列中所有還有用的結點,都向前移動
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果後繼結點不為空,
if (s != null)
LockSupport.unpark(s.thread);
}