本來這篇想寫并發容器的,但是我看了下api文檔,沒啥深度,沒啥可寫的,照着api文檔的描述敲一遍代碼也差不多就懂了。是以也就決定這一片記一下這兩天學習的AQS原理。
大家不要一看到原理兩字,心裡就有點退堂鼓,以為很難的感覺。難道那不是人寫的代碼嗎,他能寫出來,你就寫不出來嗎,也就是一個人的邏輯,明白了他的邏輯不也就懂了代碼原理了嘛。
初入部落格,有許多描述不清楚、語句不通順的地方。請提出指正!!!
開始
synchronized鎖的實作,它是jvm關鍵字,直接轉換成位元組碼指令。 ReentrantLock是一個普通的代碼類,是java代碼通過邏輯來實作鎖的效果,也正因為是java代碼實作的,是以在靈活度上要比sync鎖強。 在講解是,我會以非公平鎖示例來描述他發生的過程,了解了非公平鎖,公平鎖也自然就了解了。我将以不同的情況來描述Rreentrantlock當時要做的操作。
1. 初始狀态下,第一個線程通路:
public static void main(String[] args) {
Lock rtLock =new ReentrantLock();
try {
rtLock.lock();
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
rtLock.unlock();
}
}
斷點運作這段代碼,此時的狀态:

可以看到rtLock中有4個字段:state、head、tail、exclusiveOwnerThread,
state表示的是鎖的狀态:0表示沒有鎖,0<state,則表示有鎖或鎖重入。
head和tail:表示的隊列中的節點,Rtlock之是以能實作公平鎖,就是因為隊列的設計,下面的情節會說。
exclusiveOwnerThread:目前持有鎖的線程.
這段是擷取鎖的源碼,通過CAS算法去擷取鎖,我們這一小節是初始狀态的前提下,是以CAS傳回值肯定為true,然後設定目前持有鎖的線程為目前線程。
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
2. RtLock的重入鎖 繼第一步,我們在main方法中,稍加改動
看到了state變成了2,看源碼是怎麼操作的:
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
還是這段源碼,這次compareAndSetState的CAS傳回結果肯定為false,是以執行else中的方法,跟進去:
public final void acquire(int arg) {
//tryAcquire擷取鎖,若tryAcquire擷取鎖失敗,則進入
//acquireQueued方法
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//先跟進去tryAcquire方法
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();//目前線程
int c = getState();//此時鎖的狀态
//鎖重入,則c肯定不為0,執行else if邏輯
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
/**
*我們在上面說明了,有個表示目前持有鎖的線程的字段,
getExclusiveOwnerThread為得到目前持有鎖的線程,
*判斷目前線程與此時持有鎖的線程比較是否相等,相等則表示為
同一個線程,則state加1,傳回true
*别忘了我們這一小節的主題:重入鎖,是以,此小節到此為止。
别着急,一步一步來。
*/
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
3. 此時處于上鎖狀态時,又新來一線程擷取鎖
waitStatus在AQS中有5個狀态:
/** waitStatus value to indicate successor's thread needs unparking */
翻譯:waitStatus value to 訓示後續線程需要斷開連接配接
static final int SIGNAL = -1;
/** waitStatus value to indicate thread has cancelled */
翻譯:waitStatus value to 訓示線程已取消
static final int CANCELLED = 1;
/** waitStatus value to indicate thread is waiting on condition */
翻譯:waitStatus value to 訓示線程正在等待條件
static final int CONDITION = -2;
/** waitStatus value to indicate the next acquireShared should
* unconditionally propagate*/
翻譯:waitStatus value to 訓示下一個acquireShared應無條件傳播
static final int PROPAGATE = -3;
為0時,則以上都不是。
- 我們看到head(隊列頭部)的thread為空、waitstatus為-1以及next指向了下一個節點,thread為空則說明head是AQS建立的一個空節點,隻不過,為啥要這麼做呢?—程式之是以建立一個空的head節點,就是為了能準确的确定目前節點是否為有效隊列中的第一個節點。
-
tail(隊列尾部)的waitstatus為0,上一個節點為head,下一個節點為空,thread為新來的線程。
記住這張圖的資訊,然後我們開始看源碼:
繼第2步,tryAcquire擷取鎖方法肯定傳回false,然後“!false”為true,則執行acquireQueued(addWaiter(Node.EXCLUSIVE), arg),
嵌套了一個方法addWaiter,我們進去:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);//建立一個獨占鎖的node節點,Node.EXCLUSIVE表示獨占鎖
//隊列中的尾部節點,由于這個線程是新來的,隊列還未建立,是以tail現在肯定為空。
Node pred = tail;
//pred為空,則跳過
if (pred != null) {
node.prev = pred;//原來隊列中的尾部節點,現在變成新節點的上一個節點
if (compareAndSetTail(pred, node)) {//通過cas算法,加入隊列中
pred.next = node;//原來尾部節點的下一個節點為目前節點
return node;
}
}
enq(node);//加入隊列,進入enq
return node;
}
private Node enq(final Node node) {
for (;;) {//相當于while(true),最好的寫法是for(;;)
//第一次循環,tail為空,則執行if方法體,第2次循環不為空,
//則執行else方法體,
//這裡說的第一次循環tail為空,是基于這個新來的線程是第二
//個競争鎖的線程,
//如果第3個線程來了,第一個線程還未釋放鎖,那這個tail肯定
//不為空。是以不要混淆
Node t = tail;
if (t == null) {
//建立一個節點,通過cas算法放入head中。
if (compareAndSetHead(new Node()))
//将頭部又指派給尾部,說明該隊列是個循環清單,然
//後進行第2次循環
tail = head;
} else {
//這一步就是給tail重新指派,将新線程的資訊指派給tail,然
//後循環結束
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
結束,我們看到addWaiter方法就是做了隊列的建立,以及将新來的線程加入到建立的隊列中,一個特點就是head節點是空的,有效的線程從隊列中的第2個元素開始。然後,我們看acquireQueued方法:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//現在隊列中雖然有兩個節點,但是現在“真正有效”的節點
//隻有tail節點,則p為head節點
final Node p = node.predecessor();
//現在第一個線程還未釋放鎖,則p==head為true,
//tryAcquire(arg)為false,跳過第一個if,執行第二個if
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//剛才看到了圖檔上head的waitStatus為-1,是以傳回true
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
看到源碼,我們得到的資訊是,去目前節點,上一個節點的waitStatus枚舉值,由枚舉值決定傳回值,然後,parkAndCheckInterrupt方法,是直接将目前線程中斷,那麼這個if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())幹的就是:由目前節點的上一個節點的狀态來決定目前節點是否需要被中斷。在這裡我想說明一下:我看到了很多部落格中說for(;;)是在自旋。确實是自旋,但是與jvm中對鎖進行優化時進行的自旋鎖有鎖不同,jvm中的自旋鎖,是為了防止阻塞導緻的性能下降而引入的,而這裡描述的自旋,僅僅隻是自旋,并沒有關于性能的優化設計,在這裡,如果需要阻塞,就會阻塞。我也因為當時了解錯誤,找了好久的資料。
對于CLH隊列,隻是記錄了線程的id,在需要喚醒的時候,根據這個id喚醒它,否則根據這個id找到這個線程後讓他中斷。
4. 看到了AQS的隊列操作,再來看看解鎖吧
解鎖很簡單,我不會在這裡再那麼多話,請自行分析代碼
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
//喚醒head節點下一個節點
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
這是解鎖,沒啥好說的,可以說一下的就是在解鎖之後,程式又去喚醒了head節點的下一個節點。節點在擷取鎖之後,将目前節點移出隊列,就是if (p == head && tryAcquire(arg))中的邏輯。
**在這裡,被喚醒的節點,從被中斷的位置開始執行,也就是再次開始執行循環for(;;)體,則if (p == head && tryAcquire(arg))有可能成功(因為是非公平鎖,是以有可能成功),通過自旋。**這裡的自旋與sync鎖的優化自旋不同,sync的自旋的設計是不想線程阻塞耗費性能,這裡的自旋隻是單純的循環擷取鎖而已。
總結:當鎖已經被占用的時候,非公平鎖,會首先去嘗試擷取鎖,如果擷取鎖失敗,則乖乖的去排隊。CLH隊列,遵循FIFO規則,也就是,隻要你入隊了,那麼你隻能被按照公平鎖的方式執行。而公平鎖與非公平鎖在代碼上的不同就是在擷取鎖的時候,先去檢視隊列中是否有node節點,如果有,那就乖乖排隊去,沒有就直接擷取鎖。
另外,aqs又可以實作條件隊列來操作對應的線程,詳見部落格
這隻是reentrantlock中的很小的一部分,但是基本的流程邏輯也差不多就這樣了,其他的方法多是對一些邏輯方法的強化等。
如果有時間真的可以好好讀一讀。
CLH隊列:是Craig,Landin and Hagersten三個人發明的,是以叫CLH隊列