擷取公平鎖(基于JDK1.7.0_40)
通過前面“Java多線程系列--“JUC鎖”02之 互斥鎖ReentrantLock”的“示例1”,我們知道,擷取鎖是通過lock()函數。下面,我們以lock()對擷取公平鎖的過程進行展開。
1. lock()
lock()在ReentrantLock.java的FairSync類中實作,它的源碼如下:
final void lock() {
acquire(1);
}
說明:“目前線程”實際上是通過acquire(1)擷取鎖的。
這裡說明一下“1”的含義,它是設定“鎖的狀态”的參數。對于“獨占鎖”而言,鎖處于可擷取狀态時,它的狀态值是0;鎖被線程初次擷取到了,它的狀态值就變成了1。
由于ReentrantLock(公平鎖/非公平鎖)是可重入鎖,是以“獨占鎖”可以被單個線程多次擷取,每擷取1次就将鎖的狀态+1。也就是說,初次擷取鎖時,通過acquire(1)将鎖的狀态值設為1;再次擷取鎖時,将鎖的狀态值設為2;依次類推...這就是為什麼擷取鎖時,傳入的參數是1的原因了。
可重入就是指鎖可以被單個線程多次擷取。
2. acquire()
acquire()在AQS中實作的,它的源碼如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}<span style="font-family:'Courier New' !important;color:#000000;font-size: 12px !important; line-height: 1.5 !important;"></span>
(01) “目前線程”首先通過tryAcquire()嘗試擷取鎖。擷取成功的話,直接傳回;嘗試失敗的話,進入到等待隊列排序等待(前面還有可能有需要該鎖的線程在等待)。
(02) “目前線程”嘗試失敗的情況下,先通過addWaiter(Node.EXCLUSIVE)來将“目前線程”加入到"CLH隊列(非阻塞的FIFO隊列)"末尾。CLH隊列就是線程等待隊列。
(03) 再執行完addWaiter(Node.EXCLUSIVE)之後,會調用acquireQueued()來擷取鎖。由于此時ReentrantLock是公平鎖,它會根據公平性原則來擷取鎖。
(04) “目前線程”在執行acquireQueued()時,會進入到CLH隊列中休眠等待,直到擷取鎖了才傳回!如果“目前線程”在休眠等待過程中被中斷過,acquireQueued會傳回true(目前線程在休眠過程中被中斷過,為什麼要傳回true,求解釋?),此時"目前線程"會調用selfInterrupt()來自己給自己産生一個中斷。至于為什麼要自己給自己産生一個中斷,後面再介紹。
上面是對acquire()的概括性說明。下面,我們将該函數分為4部分來逐漸解析。
一. tryAcquire()
二. addWaiter()
三. acquireQueued()
四. selfInterrupt()
一. tryAcquire()
1. tryAcquire()
公平鎖的tryAcquire()在ReentrantLock.java的FairSync類中實作,源碼如下:
protected final boolean tryAcquire(int acquires) {
// 擷取“目前線程”
final Thread current = Thread.currentThread();
// 擷取“獨占鎖”的狀态
int c = getState();
// c=0意味着“鎖沒有被任何線程鎖擁有”,
if (c == 0) {
// 若“鎖沒有被任何線程鎖擁有”,
// 則判斷“目前線程”是不是CLH隊列中的第一個線程線程,
// 若是的話,則擷取該鎖,設定鎖的狀态,并切設定鎖的擁有者為“目前線程”。
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 如果“獨占鎖”的擁有者已經為“目前線程”,
// 則将更新鎖的狀态。
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
說明:根據代碼,我們可以分析出,tryAcquire()的作用就是嘗試去擷取鎖。注意,這裡隻是嘗試!
嘗試成功的話,傳回true;嘗試失敗的話,傳回false,後續再通過其它辦法來擷取該鎖。後面我們會說明,在嘗試失敗的情況下,是如何一步步擷取鎖的。
2. hasQueuedPredecessors()
hasQueuedPredecessors()在AQS中實作,源碼如下:
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}<span style="font-family:'Courier New' !important;color:#000000;font-size: 12px !important; line-height: 1.5 !important;"></span>
說明: 通過代碼,能分析出,hasQueuedPredecessors() 是通過判斷"目前線程"是不是在CLH隊列的隊首,來傳回AQS中是不是有比“目前線程”等待更久的線程。下面對head、tail和Node進行說明。
3. Node的源碼
Node就是CLH隊列的節點。Node在AQS中實作,它的資料結構如下:
private transient volatile Node head; // CLH隊列的隊首
private transient volatile Node tail; // CLH隊列的隊尾
// CLH隊列的節點
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
// 線程已被取消,對應的waitStatus的值
static final int CANCELLED = 1;
// “目前線程的後繼線程需要被unpark(喚醒)”,對應的waitStatus的值。
// 一般發生情況是:目前線程的後繼線程處于阻塞狀态,而目前線程被release或cancel掉,是以需要喚醒目前線程的後繼線程。
static final int SIGNAL = -1;
// 線程(處在Condition休眠狀态)在等待Condition喚醒,對應的waitStatus的值
static final int CONDITION = -2;
// (共享鎖)其它線程擷取到“共享鎖”,對應的waitStatus的值
static final int PROPAGATE = -3;
// waitStatus為“CANCELLED, SIGNAL, CONDITION, PROPAGATE”時分别表示不同狀态,
// 若waitStatus=0,則意味着目前線程不屬于上面的任何一種狀态。
volatile int waitStatus;
// 前一節點
volatile Node prev;
// 後一節點
volatile Node next;
// 節點所對應的線程
volatile Thread thread;
// nextWaiter是“差別目前CLH隊列是 ‘獨占鎖’隊列 還是 ‘共享鎖’隊列 的标記”
// 若nextWaiter=SHARED,則CLH隊列是“獨占鎖”隊列;
// 若nextWaiter=EXCLUSIVE,(即nextWaiter=null),則CLH隊列是“共享鎖”隊列。
Node nextWaiter;
// “共享鎖”則傳回true,“獨占鎖”則傳回false。
final boolean isShared() {
return nextWaiter == SHARED;
}
// 傳回前一節點
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
// 構造函數。thread是節點所對應的線程,mode是用來表示thread的鎖是“獨占鎖”還是“共享鎖”。
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
// 構造函數。thread是節點所對應的線程,waitStatus是線程的等待狀态。
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
說明:
Node是CLH隊列的節點,代表“等待鎖的線程隊列”。
(01) 每個Node都會一個線程對應。
(02) 每個Node會通過prev和next分别指向上一個節點和下一個節點,這分别代表上一個等待線程和下一個等待線程。
(03) Node通過waitStatus儲存線程的等待狀态。
(04) Node通過nextWaiter來區分線程是“獨占鎖”線程還是“共享鎖”線程。如果是“獨占鎖”線程,則nextWaiter的值為EXCLUSIVE;如果是“共享鎖”線程,則nextWaiter的值是SHARED。
4. compareAndSetState()
compareAndSetState()在AQS中實作。它的源碼如下:
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}<span style="font-family:'Courier New' !important;color:#000000;font-size: 12px !important; line-height: 1.5 !important;"></span>
說明: compareAndSwapInt() 是sun.misc.Unsafe類中的一個本地方法。對此,我們需要了解的是 compareAndSetState(expect, update) 是以原子的方式操作目前線程;若目前線程的狀态為expect,則設定它的狀态為update。
5. setExclusiveOwnerThread()
setExclusiveOwnerThread()在AbstractOwnableSynchronizer.java中實作,它的源碼如下:
// exclusiveOwnerThread是目前擁有“獨占鎖”的線程
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread t) {
exclusiveOwnerThread = t;
}<span style="font-family:'Courier New' !important;color:#000000;font-size: 12px !important; line-height: 1.5 !important;"></span>
說明:setExclusiveOwnerThread()的作用就是,設定線程t為目前擁有“獨占鎖”的線程。
6. getState(), setState()
getState()和setState()都在AQS中實作,源碼如下:
// 鎖的狀态
private volatile int state;
// 設定鎖的狀态
protected final void setState(int newState) {
state = newState;
}
// 擷取鎖的狀态
protected final int getState() {
return state;
}<span style="font-family:'Courier New' !important;color:#000000;font-size: 12px !important; line-height: 1.5 !important;"></span>
說明:state表示鎖的狀态,對于“獨占鎖”而已,state=0表示鎖是可擷取狀态(即,鎖沒有被任何線程鎖持有)。由于java中的獨占鎖是可重入的,state的值可以>1。
小結:tryAcquire()的作用就是讓“目前線程”嘗試擷取鎖。擷取成功傳回true,失敗則傳回false。
二. addWaiter(Node.EXCLUSIVE)
addWaiter(Node.EXCLUSIVE)的作用是,建立“目前線程”的Node節點,且Node中記錄“目前線程”對應的鎖是“獨占鎖”類型,并且将該節點添加到CLH隊列的末尾。
1.addWaiter()
addWaiter()在AQS中實作,源碼如下:
private Node addWaiter(Node mode) {
// 建立一個Node節點,節點對應的線程是“目前線程”,“目前線程”的鎖的模型是mode。
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 若CLH隊列不為空,則将“目前線程”添加到CLH隊列末尾
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 若CLH隊列為空,則調用enq()建立CLH隊列,然後再将“目前線程”添加到CLH隊列中。
enq(node);
return node;
}<span style="font-family:'Courier New' !important;color:#000000;font-size: 12px !important; line-height: 1.5 !important;"></span>
說明:對于“公平鎖”而言,addWaiter(Node.EXCLUSIVE)會首先建立一個Node節點,節點的類型是“獨占鎖”(Node.EXCLUSIVE)類型。然後,再将該節點添加到CLH隊列的末尾。
2. compareAndSetTail()
compareAndSetTail()在AQS中實作,源碼如下:
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}<span style="font-family:'Courier New' !important;color:#000000;font-size: 12px !important; line-height: 1.5 !important;"></span>
說明:compareAndSetTail也屬于CAS函數,也是通過“本地方法”實作的。compareAndSetTail(expect, update)會以原子的方式進行操作,它的作用是判斷CLH隊列的隊尾是不是為expect,是的話,就将隊尾設為update。
3. enq()
enq()在AQS中實作,源碼如下:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}<span style="font-family:'Courier New' !important;color:#000000;font-size: 12px !important; line-height: 1.5 !important;"></span>
說明: enq()的作用很簡單。如果CLH隊列為空,則建立一個CLH表頭;然後将node添加到CLH末尾。否則,直接将node添加到CLH末尾。
小結:addWaiter()的作用,就是将目前線程添加到CLH隊列中。這就意味着将目前線程添加到等待擷取“鎖”的等待線程隊列中了。
三. acquireQueued()
前面,我們已經将目前線程添加到CLH隊列中了。而acquireQueued()的作用就是逐漸的去執行CLH隊列的線程,如果目前線程擷取到了鎖,則傳回;否則,目前線程進行休眠,直到喚醒并重新擷取鎖了才傳回。下面,我們看看acquireQueued()的具體流程。
1. acquireQueued()
acquireQueued()在AQS中實作,源碼如下:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// interrupted表示在CLH隊列的排程中,
// “目前線程”在休眠時,有沒有被中斷過。
boolean interrupted = false;
for (;;) {
// 擷取上一個節點。
// node是“目前線程”對應的節點,這裡就意味着“擷取上一個等待鎖的線程”。
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}<span style="font-family:'Courier New' !important;color:#000000;font-size: 12px !important; line-height: 1.5 !important;"></span>
說明:acquireQueued()的目的是從隊列中擷取鎖。
2. shouldParkAfterFailedAcquire()
shouldParkAfterFailedAcquire()在AQS中實作,源碼如下:
// 傳回“目前線程是否應該阻塞”
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前繼節點的狀态
int ws = pred.waitStatus;
// 如果前繼節點是SIGNAL狀态,則意味這目前線程需要被unpark喚醒。此時,傳回true。
if (ws == Node.SIGNAL)
return true;
// 如果前繼節點是“取消”狀态,則設定 “目前節點”的 “目前前繼節點” 為 “‘原前繼節點’的前繼節點”。
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 如果前繼節點為“0”或者“共享鎖”狀态,則設定前繼節點為SIGNAL狀态。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}<span style="font-family:'Courier New' !important;color:#000000;font-size: 12px !important; line-height: 1.5 !important;"></span>
說明:
(01) 關于waitStatus請參考下表(中擴号内為waitStatus的值),更多關于waitStatus的内容,可以參考前面的Node類的介紹。
CANCELLED[1] -- 目前線程已被取消
SIGNAL[-1] -- “目前線程的後繼線程需要被unpark(喚醒)”。一般發生情況是:目前線程的後繼線程處于阻塞狀态,而目前線程被release或cancel掉,是以需要喚醒目前線程的後繼線程。
CONDITION[-2] -- 目前線程(處在Condition休眠狀态)在等待Condition喚醒
PROPAGATE[-3] -- (共享鎖)其它線程擷取到“共享鎖”
[0] -- 目前線程不屬于上面的任何一種狀态。
(02) shouldParkAfterFailedAcquire()通過以下規則,判斷“目前線程”是否需要被阻塞。
規則1:如果前繼節點狀态為SIGNAL,表明目前節點需要被unpark(喚醒),此時則傳回true。
規則2:如果前繼節點狀态為CANCELLED(ws>0),說明前繼節點已經被取消,則通過先前回溯找到一個有效(非CANCELLED狀态)的節點,并傳回false。
規則3:如果前繼節點狀态為非SIGNAL、非CANCELLED,則設定前繼的狀态為SIGNAL,并傳回false。
如果“規則1”發生,即“前繼節點是SIGNAL”狀态,則意味着“目前線程”需要被阻塞。接下來會調用parkAndCheckInterrupt()阻塞目前線程,直到目前先被喚醒才從parkAndCheckInterrupt()中傳回。
3. parkAndCheckInterrupt())
parkAndCheckInterrupt()在AQS中實作,源碼如下:
private final boolean parkAndCheckInterrupt() {
// 通過LockSupport的park()阻塞“目前線程”。
LockSupport.park(this);
// 傳回線程的中斷狀态。
return Thread.interrupted();
}<span style="font-family:'Courier New' !important;color:#000000;font-size: 12px !important; line-height: 1.5 !important;"></span>
說明:parkAndCheckInterrupt()的作用是阻塞目前線程,并且傳回“線程被喚醒之後”的中斷狀态。
它會先通過LockSupport.park()阻塞“目前線程”,然後通過Thread.interrupted()傳回線程的中斷狀态。
這裡介紹一下線程被阻塞之後如何喚醒。一般有2種情況:
第1種情況:unpark()喚醒。“前繼節點對應的線程”使用完鎖之後,通過unpark()方式喚醒目前線程。
第2種情況:中斷喚醒。其它線程通過interrupt()中斷目前線程。
補充:LockSupport()中的park(),unpark()的作用 和 Object中的wait(),notify()作用類似,是阻塞/喚醒。
它們的用法不同,park(),unpark()是輕量級的,而wait(),notify()是必須先通過Synchronized擷取同步鎖。
關于LockSupport,我們會在之後的章節再專門進行介紹!
4. 再次tryAcquire()
了解了shouldParkAfterFailedAcquire()和parkAndCheckInterrupt()函數之後。我們接着分析acquireQueued()的for循環部分。
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}<span style="font-family:'Courier New' !important;color:#000000;font-size: 12px !important; line-height: 1.5 !important;"></span>
說明:
(01) 通過node.predecessor()擷取前繼節點。predecessor()就是傳回node的前繼節點,若對此有疑惑可以檢視下面關于Node類的介紹。
(02) p == head && tryAcquire(arg)
首先,判斷“前繼節點”是不是CHL表頭。如果是的話,則通過tryAcquire()嘗試擷取鎖。
其實,這樣做的目的是為了“讓目前線程擷取鎖”,但是為什麼需要先判斷p==head呢?了解這個對了解“公平鎖”的機制很重要,因為這麼做的原因就是為了保證公平性!
(a) 前面,我們在shouldParkAfterFailedAcquire()我們判斷“目前線程”是否需要阻塞;
(b) 接着,“目前線程”阻塞的話,會調用parkAndCheckInterrupt()來阻塞線程。當線程被解除阻塞的時候,我們會傳回線程的中斷狀态。而線程被解決阻塞,可能是由于“線程被中斷”,也可能是由于“其它線程調用了該線程的unpark()函數”。
(c) 再回到p==head這裡。如果目前線程是因為其它線程調用了unpark()函數而被喚醒,那麼喚醒它的線程,應該是它的前繼節點所對應的線程(關于這一點,後面在“釋放鎖”的過程中會看到)。 OK,是前繼節點調用unpark()喚醒了目前線程!
此時,再來了解p==head就很簡單了:目前繼節點是CLH隊列的頭節點,并且它釋放鎖之後;就輪到目前節點擷取鎖了。然後,目前節點通過tryAcquire()擷取鎖;擷取成功的話,通過setHead(node)設定目前節點為頭節點,并傳回。
總之,如果“前繼節點調用unpark()喚醒了目前線程”并且“前繼節點是CLH表頭”,此時就是滿足p==head,也就是符合公平性原則的。否則,如果目前線程是因為“線程被中斷”而喚醒,那麼顯然就不是公平了。這就是為什麼說p==head就是保證公平性!
小結:acquireQueued()的作用就是“目前線程”會根據公平性原則進行阻塞等待,直到擷取鎖為止;并且傳回目前線程在等待過程中有沒有并中斷過。
四. selfInterrupt()
selfInterrupt()是AQS中實作,源碼如下:
private static void selfInterrupt() {
Thread.currentThread().interrupt();
}<span style="font-family:'Courier New' !important;color:#000000;font-size: 12px !important; line-height: 1.5 !important;"></span>
說明:selfInterrupt()的代碼很簡單,就是“目前線程”自己産生一個中斷。但是,為什麼需要這麼做呢?
這必須結合acquireQueued()進行分析。如果在acquireQueued()中,目前線程被中斷過,則執行selfInterrupt();否則不會執行。
在acquireQueued()中,即使是線程在阻塞狀态被中斷喚醒而擷取到cpu執行權利;但是,如果該線程的前面還有其它等待鎖的線程,根據公平性原則,該線程依然無法擷取到鎖。它會再次阻塞! 該線程再次阻塞,直到該線程被它的前面等待鎖的線程鎖喚醒;線程才會擷取鎖,然後“真正執行起來”!
也就是說,在該線程“成功擷取鎖并真正執行起來”之前,它的中斷會被忽略并且中斷标記會被清除! 因為在parkAndCheckInterrupt()中,我們線程的中斷狀态時調用了Thread.interrupted()。該函數不同于Thread的isInterrupted()函數,isInterrupted()僅僅傳回中斷狀态,而interrupted()在傳回目前中斷狀态之後,還會清除中斷狀态。 正因為之前的中斷狀态被清除了,是以這裡需要調用selfInterrupt()重新産生一個中斷!
小結:selfInterrupt()的作用就是目前線程自己産生一個中斷。
總結
再回過頭看看acquire()函數,它最終的目的是擷取鎖!
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}<span style="font-family:'Courier New' !important;color:#000000;font-size: 12px !important; line-height: 1.5 !important;"></span>
(01) 先是通過tryAcquire()嘗試擷取鎖。擷取成功的話,直接傳回;嘗試失敗的話,再通過acquireQueued()擷取鎖。
(02) 嘗試失敗的情況下,會先通過addWaiter()來将“目前線程”加入到"CLH隊列"末尾;然後調用acquireQueued(),在CLH隊列中排序等待擷取鎖,在此過程中,線程處于休眠狀态。直到擷取鎖了才傳回。 如果在休眠等待過程中被中斷過,則調用selfInterrupt()來自己産生一個中斷。