ReentrantLock lock = new ReentrantLock(true);
lock.lock();
調用lock()方法實際調用sync内部類的lock方法,Sync類的的lock方法為抽象方法,實際調用其子類的lock方法,由于建立的是公平鎖,是以,最終調用FairSync的lock方法
public void lock() {//ReentrantLock#lock
sync.lock();//調用公平鎖的lock
}
FairSync的lock方法中調用了acquire方法,嘗試擷取鎖,acquire()是AQS(AbstractQueuedSynchronizer)中的實作
Sync繼承自AQS,FairSync繼承自Sync
final void lock() {//FairSync#lock
acquire(1);
}
AQS的acquire方法,先嘗試去擷取一次鎖,如果擷取失敗,将該線程封裝為node節點,添加到AQS阻塞隊列的隊尾,并以自旋的方式去嘗試擷取鎖,如果在擷取鎖的過程中出現異常,自我中斷(selfInterrupt())
public final void acquire(int arg) {//arg=1
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//acquireQueued的傳回值實際是該線程在阻塞中是否有被中斷,如果被中斷了,傳回true,此時再進行中斷
selfInterrupt();
}
先看selfInterrupt(),調用了目前線程的interrupt()方法進行中斷
private static void selfInterrupt() {
Thread.currentThread().interrupt();
}
再回過頭看acquire方法中的tryAcquire方法,該方法用來先去嘗試能否擷取鎖,該方法是需要AQS的子類去重寫的,AQS中的此方法,隻抛出UnsupportedOperationException異常
FairSync#tryAcquire,先判斷資源狀态,如果未被持有,則再判斷是否有線程在等待資源,如果是,擷取鎖失敗,如果沒有線程等待,嘗試持有;如果資源以及被線程持有,判斷持有者是不是自己,如果是自己,重入數+1,否則擷取失敗
protected final boolean tryAcquire(int acquires) {//acquires=1
final Thread current = Thread.currentThread();//擷取目前線程
int c = getState();//擷取資源狀态
if (c == 0) {//資源沒有被鎖定
if (!hasQueuedPredecessors() &&//判斷在此線程前是否有等待時間更長的線程,因為是公平鎖,是以如果有線程在配對等待擷取鎖,則該線程不能立即獲得鎖
compareAndSetState(0, acquires)) {//CAS操作,如果沒有等待的線程,嘗試去擷取鎖
setExclusiveOwnerThread(current);//沒有等待線程,目前線程擷取了鎖,那麼将資源持有者設定為此線程
return true;
}
}
else if (current == getExclusiveOwnerThread()) { //資源被鎖定,判斷目前線程是否是鎖持有者,如果是,重入數+1
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;//資源狀态不為0,其持有者也不是該線程,則擷取失敗
}
下面是判斷線程前是否有其他線程等待資源的方法:hasQueuedPredecessors()
/**
* Queries whether any threads have been waiting to acquire longer
* than the current thread.
*/
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; //tail阻塞隊列的隊尾節點
Node h = head;//head:阻塞隊列的隊首節點
Node s;
// h != t 頭節點和隊尾節點不是同一個(隻有當阻塞隊列中沒有節點時,tail和head都指向null)
//(s = h.next) == null 判斷頭節點的下一個節點是否存在
//s.thread != Thread.currentThread() 如果頭結點的下一個節點操作,判斷這個節點裡的線程是否為目前線程
//AQS中的阻塞隊列是以Node為節點的雙連結清單結構
//其頭節點是一個僞節點,頭節點的下一個節點即隊首節點是存放第一個阻塞線程的節點
//是以源碼中用頭節點的next節點中的線程與目前線程做比較
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
到此,tryAcquire()的邏輯已全部分析,接下載下傳再看如果擷取鎖失敗後的處理
擷取鎖失敗,先将線程包裝成node節點,添加到阻塞線程的隊尾
/**
* Creates and enqueues node for current thread and given mode.
*/
private Node addWaiter(Node mode) {//AbstractQueuedSynchronizer#addWaiter
Node node = new Node(Thread.currentThread(), mode);//将線程包裝成Node對象
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;//擷取隊尾節點
if (pred != null) {//如果CLH隊列不為空,将新節點加入到隊尾
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//隻有當阻塞隊列為空時,才會調用此方法來添加節點
enq(node);//如果CLH隊列為空,自旋的方式添加節點(head節點和該線程節點)
return node;
}
/**
* Inserts node into queue, initializing if necessary. See picture above.
*/
private Node enq(final Node node) {//AbstractQueuedSynchronizer#enq
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
//第一次周遊
//判斷尾節點為空,說明整個隊列是空的,添加頭節點,将隊頭和隊尾都指向此節點
if (compareAndSetHead(new Node()))
tail = head;
} else {
//第二次周遊
//新增節點的pre節點指向目前隊列的隊尾節點
//将自己設定為隊尾節點,将隊尾節點的next節點指向自己
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
将線程添加到隊尾後,接下來線程以自旋的方式嘗試擷取鎖
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*/
final boolean acquireQueued(final Node node, int arg) {//AbstractQueuedSynchronizer#acquireQueued
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();//擷取目前線程節點的上一個節點
if (p == head && tryAcquire(arg)) {
//判斷他的上一個節點是否是頭節點,如果是,就再次嘗試擷取鎖
//如果擷取鎖成功了,把自己置為頭節點
//上面有提到,頭節點是僞節點,隊列中真正的第一個阻塞線程實際在第二個節點
//是以,判斷如果自己的上一個節點是頭節點,說明自己就是第一個被阻塞的線程,就可以去擷取鎖了
setHead(node);//prev,thread的引用都指向null
p.next = null; // help GC
failed = false;
return interrupted;
}
//不是頭節點,或者擷取鎖失敗了,進入一下邏輯
//1.確定這個節點前的節點的狀态是SIGNAL,這樣自己才能安心的被阻塞,因為會被前一個節點叫醒
//2.調用LockSupport.park對線程阻塞
//3.線上程被喚醒時,傳回這個線程的中斷狀态
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//如果線程在阻塞期間被中斷後,不響應中斷,隻是設定一個是否中斷的變量為true
//最後在成功擷取鎖後傳回這個中斷狀态給其調用者
interrupted = true;
}
} finally {
//最後在傳回的時候判斷下擷取鎖的這個自旋過程中是否有發生異常,如果有,則将節點從隊列中移除
if (failed)
cancelAcquire(node);
}
}
做阻塞前的保障工作
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//判斷上個節點的狀态,隻有在狀态是SIGNAL的時候,才能確定上一個線程用完鎖後會通知自己來擷取鎖
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
//ws > 0,即CANCEL狀态,将自己前面所有是此狀态的線程節點全部移除隊列
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//如果是其他的狀态,将節點的狀态改為SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
阻塞線程
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//此方法調用後,線程會在這裡阻塞,直到上一個線程喚醒它
return Thread.interrupted();
}
再回頭看acquireQueued()的方法中的cancelAcquire()方法,此方法我的了解是在acquireQueued()出現異常時才會調用,因為如果正常執行,當輪到此線程擷取鎖,并成功的擷取鎖後,會将failed置為false,這樣的話是不會觸發cancelAcquire()方法的,而且此方法的出口,隻有成功擷取鎖後的一個出口
/**
* Cancels an ongoing attempt to acquire.
*
* @param node the node
*/
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;//釋放線程引用
Node pred = node.prev;
while (pred.waitStatus > 0)//讓節點前所有CANCEL狀态的節點退出隊列
node.prev = pred = pred.prev;
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED;
//如果該節點在隊尾,直接将隊尾置為他前面第一個不是CANCEL的節點
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
//pred != head 他前面第一個不是CANCEL的節點不是頭節點
//(ws = pred.waitStatus) == Node.SIGNA 他前面第一個不是CANCEL的節點的狀态是SIGNAL
//ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL) 他前面第一個不是CANCEL的節點的狀态不是SIGNAL則置為SIGNAL
//pred.thread != null 他前面第一個不是CANCEL的節點中有持有線程
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
//如果pred不是頭節點,且保證他的狀态是SIGNAL,且他持有線程,将pred.next的與node節點的next節點連接配接
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
喚醒隊列中的第一個線程
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
//把節點狀态改為0--初始狀态
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
//檢查節點下一個節點,如果為null,或者狀态是CANCEL,從隊尾開始,找到第一個有效的節點
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//喚醒隊列的第一個線程
}
到此,公平鎖的lock()方法全部結束
總述
調用公平鎖的lock方法,實際調用ReentrantLock中FairSync的lock()方法
他會先去調用tryLock()方法看能否先擷取鎖,如果擷取不到,将線程添加到CLH隊列裡,以自旋的方式再去擷取鎖,擷取成功後判斷自己是否被中斷過,然後再次中斷自己
在自旋的過程中為了保證自己能被成功喚醒,他回去判斷自己前面節點的狀态,如果是SIGNAL,那麼自己會被成功喚醒,如果是CANCEL狀态,需要将節點從隊列中移除,如果是其他的狀态,将狀态置為SIGNAL
如果在擷取鎖的過程中出現了異常,将其從隊列中移除,如果它的頭節點,需要将它的下一個節點線程喚醒