ReentrantLock重入鎖源碼解析
ReentrantLock加鎖釋放鎖,都是通過其一個對象屬性Sync來實作的,Sync繼承了AbstractQueuedSynchronizer,也就是大名鼎鼎的aqs了。而Sync本身有兩個子類,一個是非公平實作,一個是公平實作,ReentrantLock中預設采用非公平鎖。本文隻探讨非公平實作邏輯
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
lock方法
lock方法會調用sync的lock方法
final void lock() {
//cas嘗試修改state變量,從0變為1
//這裡其實就能展現所謂的非公平了,上來就直接嘗試擷取鎖,不會老老實實去同步隊列尾部等着
if (compareAndSetState(0, 1))
//修改成功,設定目前持有鎖的線程
setExclusiveOwnerThread(Thread.currentThread());
else
//失敗,這段代碼的實作在父類AQS中
acquire(1);
}
state是aqs中的一個屬性,加鎖時就是使其+1,釋放鎖就是-1
acquire方法
public final void acquire(int arg) {
//tryAcquire方法會走子類實作,最終執行nonfairTryAcquire
//如果tryAcquire傳回了false,那麼就會往下走
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
//擷取state
int c = getState();
//等于0說明沒有線程持有鎖
if (c == 0) {
//cas加鎖
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//不等于0,鎖被持有了,看看是否是自己持有的,是的話,state+1
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//擷取鎖失敗,傳回false
return false;
}
tryAcquire方法就是嘗試去擷取鎖了,lock加鎖失敗,到這裡可能鎖又被釋放了,或者是自己加的鎖,現在是重入,是以tryAcquire就是重新嘗試一下,如果還是失敗了,那麼就需要加入到同步隊列尾部了,也就是接下來的addWaiter方法。
這裡先說說這個同步隊列。這個同步隊列的節點是一個Node類,是AQS的一個靜态内部類,也是AQS的核心。裡面包裝了目前線程,以及waitStatus,如等待喚醒、取消等
static final class Node{
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
//已取消狀态,要等阻塞隊列裡某個線程被喚醒搶鎖失敗抛異常才标記為這個
static final int CANCELED = 1;
//被加入阻塞隊列後,等待喚醒的狀态
static final SIGNAL = -1;
//等待狀态,調用Condition.await方法,會加入到等待隊列,狀态就是這個
static final CONDITION = -2;
//這是用以讀寫重入鎖的
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
}
acquireQueued與addWaiter 方法
//先看addWaiter方法
private Node addWaiter(Node mode) {
//首先構造一個node,狀态是EXCLUSIVE,意思就是排它鎖了
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
//這裡如果為null,說明連結清單還不存在,不為null,加入到尾部,這裡使用cas操作進行搶占,因為可能有别的線程也在嘗試
//如果連結清單不存在,或者搶占失敗,走enq
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//加入到連結清單尾部
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) { //自旋,不斷嘗試。
Node t = tail;
//如果連結清單不存在,那麼先建立一個頭結點,這個頭結點不存儲線程資訊
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
//否則的話,嘗試加入尾部
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
//這是一個中斷标志,如果線程在自旋嘗試擷取鎖的過程中被阻塞過,會傳回true,然後再acquire方法中再次中斷一下,目的是恢複中斷标志位
//因為在下面parkAndCheckInterrupt中會調用interrupted方法,這個方法會清除中斷标志
boolean interrupted = false;
for (;;) { //自旋
//拿到這個node的前一個結點
final Node p = node.predecessor();
//如果p是頭結點
//前面tryAcquire方法我們知道,這裡是去嘗試搶占鎖,或者重入鎖,如果失敗了,看下面
//如果成功了呢,把這個節點變成頭結點,原來的頭結點置為null,這裡會把這個node的thread=null,prev=null
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//如果p的waitStatus不是-1,傳回的false,那麼自旋重新來,如果是-1了,就走parkAndCheckInterrupt,阻塞線程
//那麼假設現在我們的這個線程被阻塞在這個地方了,不會往下走了,我們看看當它的前一個線程釋放鎖會怎樣,unlock
//!!!看完了嗎!前一個節點unlock釋放鎖後,會喚醒後一個節點或是從後往前第一個可執行節點,這個線程被喚醒,那麼它再次自旋去嘗試搶占鎖
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//報錯了,就會進入這個方法了
if (failed)
cancelAcquire(node);
}
}
可以看到,addWaiter方法實際上就構造了一個同步隊列(阻塞隊列),每一個tryAcquire嘗試擷取鎖失敗的線程,都加入到這個隊列尾部去。之後在acquireQueued方法中,自旋,看看自己是不是head的下一個節點,是的話嘗試擷取鎖,不是的話在shouldParkAfterFailedAcquire方法中會判斷前一個節點的waitStatus是否是signal,也就是-1,是的話就阻塞自己,同時這個方法内也會清除該節點前面的狀态為CANCELLED的節點
而finally中的cancelAcquire,主要就是清除被取消的節點
unlock方法
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
//如果state現在為0了,就是傳回true了,看看怎麼走
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//喚醒之前被阻塞的線程,注意傳入的節點是head節點
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
//釋放鎖就是-1嘛,如果c==0,說明現在沒人持有鎖了,就設定擁有線程為null,放回true。
//否則說明之前是重入的,需要繼續釋放,傳回false
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
//如果下一個節點是null或者狀态是已取消
if (s == null || s.waitStatus > 0) {
s = null;
//從後往前找到一個可執行節點
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//如果是一個可執行節點,喚醒它!
if (s != null)
LockSupport.unpark(s.thread);
}
總結
一段lock與unlock流程看下來,就是lock的時候嘗試擷取鎖,擷取鎖是通過cas修改state從0到1,修改成功的即擷取鎖成功并且辨別出目前擁有鎖的線程,同一個線程多次lock,state會+1,最後釋放鎖也需要多次釋放直到state變為0。
而擷取鎖失敗的線程,就會被封裝成一個node節點,加入到同步隊列的尾部,期間會通過自旋反複嘗試擷取鎖,最終目前一個節點狀态為待喚醒時就會阻塞自己,等待喚醒。
而釋放鎖就是cas讓state-1了,當state為0時就是鎖完全釋放,喚醒頭結點的下一個節點,或者是尾部往前第一個可執行節點。