Doug Lea是JDK中concurrent工具包的作者,這位大神是誰可以自行google。
本文淺析ReentrantLock(可重入鎖)的原理
Lock接口
Lock接口定義了這幾個方法:
-
lock()
用來擷取鎖,如果鎖已經被其他線程占有,則進行等待,直到搶占到鎖;該方法在發送異常時不會自動釋放鎖,是以在使用時需要在finall塊中釋放鎖;
-
tryLock()和tryLock(long time, TimeUnit unit)
嘗試獲得鎖,如果鎖已經被其他線程占有,傳回false,成功擷取鎖傳回true;該方法不會等待,立即傳回;而帶有參數的tryLock在等待時長内拿到鎖傳回true,逾時或者沒拿到鎖傳回false;帶參數的方法還支援響應中斷;
-
lockInterruptibly()
支援中斷的lock();
-
unlock()
釋放鎖;
-
newCondition()
建立
,Condition以後會分析;Condition
ReentrantLock可重入鎖
ReentrantLock實作了Lock接口,ReentrantLock中有一個重要的成員變量,
同步器
sync繼承了
AbstractQueuedSynchronizer
簡稱
AQS
,我們先介紹
AQS
;
AQS用一個隊列(結構是一個FIFO隊列)來管理同步狀态,當線程擷取同步狀态失敗時,會将目前線程包裝成一個
Node
放入隊列,目前線程進入阻塞狀态;當同步狀态釋放時,會從隊列去出線程擷取同步狀态。
AQS裡定義了head、tail、state,他們都是volatile修飾的,head指向隊列的第一個元素,tail指向隊列的最後一個元素,state表示了同步狀态,這個狀态非常重要,在ReentrantLock中,state為0的時候代表鎖被釋放,state為1時代表鎖已經被占用;
看下面代碼:
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
這一段靜态初始化代碼初始了state、head、tail等變量的在記憶體中的偏移量;
Unsafe
類是sun.misc下的類,不屬于java标準。
Unsafe
讓java可以像C語言一樣操作記憶體指針,其中就提供了
CAS
的一些原子操作和
park、unpark
對線程挂起與恢複的操作;關于
CAS
是concurrent工具包的基礎,以後會單獨介紹,其主要作用就是在硬體級别提供了
compareAndSwap
的功能,進而實作了比較和交換的原子性操作。
AQS還有一個内部類叫Node,它将線程封裝,利用prev和next可以将Node串連成雙向連結清單,這就是一開始說的FIFO的結構;
ReentrantLock提供了公平鎖和非公平鎖,我們這裡從非公平鎖分析AQS的應用;
Lock調用lock()方法時調用了AQS的lock()方法,我們來看這個非公平鎖
NonfairSync
的lock方法:
final void lock() {
//首先調用CAS搶占同步狀态state,如果成功則将目前線程設定為同步器的獨占線程,
//這也是非公平的展現,因為新來的線程沒有馬上加入隊列尾部,而是先嘗試搶占同步狀态。
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
//搶占同步狀态失敗,調用AQS的acquire
acquire(1);
}
瞄一眼acquire方法:
public final void acquire(int arg) {
//在這裡還是先試着搶占一下同步狀态
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire調用的是
NonfairSync
的實作,然後又調用了
Sync
的nonfairTryAcquire方法:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//和之前一樣,利用CAS搶占同步狀态,成功則設定目前線程為獨占線程并且傳回true
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果目前線程已經是獨占線程,即目前線程已經獲得了同步狀态則将同步狀态state加1,
//這裡是可重入鎖的展現
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//沒有搶占到同步狀态傳回false
return false;
}
再看addWaiter方法:
private Node addWaiter(Node mode) {
//建立一個Node,封裝了目前線程和模式,這裡傳入的是獨占模式Node.EXCLUSIVE
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
//如果tail不為空就不需要初始化node隊列了
if (pred != null) {
//将node作為隊列最後一個元素入列
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
//傳回建立的node
return node;
}
}
//如果tail為空則表示node隊列還沒有初始化,此時初始化隊列
enq(node);
return node;
}
瞄一眼enq方法:
private Node enq(final Node node) {
//無限loop直到CAS成功,其他地方也大量使用了無限loop
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
//隊列尾部為空,必須初始化,head初始化為一個空node,不包含線程,tail = head
if (compareAndSetHead(new Node()))
tail = head;
} else {
//隊列已經初始化,将目前node加在列尾
node.prev = t;
//将目前node設定為tail,CAS操作,enqueue安全
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
拿到建立的node後傳給acquireQueued方法:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
//标記是否中斷狀态
boolean interrupted = false;
for (;;) {
//拿到目前node的前驅
final Node p = node.predecessor();
//如果前驅正好為head,即目前線程在列首,馬上tryAcquire搶占同步狀态
if (p == head && tryAcquire(arg)) {
//搶占成功後,将目前節點的thread、prev清空作為head
setHead(node);
p.next = null; // help GC 原來的head等待GC回收
failed = false;
return interrupted;
}
//沒有搶占成功後,判斷是否要park
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
瞄一眼shouldParkAfterFailedAcquire方法:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//如果前驅node的狀态為SIGNAL,說明目前node可以park
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
//如果前驅的狀态大于0說明前驅node的thread已經被取消
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
//從前驅node開始,将取消的node移出隊列
//目前節點之前的節點不會變化,是以這裡可以更新prev,而且不必用CAS來更新。
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//前驅node狀态等于0或者為PROPAGATE(以後會介紹)
//将前驅node狀态設定為SIGNAL,傳回false,表示目前node暫不需要park,
//可以再嘗試一下搶占同步狀态
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
看一下parkAndCheckInterrupt方法:
private final boolean parkAndCheckInterrupt() {
//阻塞目前線程
LockSupport.park(this);
//傳回目前線程是否設定中斷标志,并清空中斷标志
return Thread.interrupted();
}
這裡解釋一下為什麼要儲存一下中斷标志:
中斷會喚醒被park的阻塞線程,但被park的阻塞線程不會響應中斷,是以這裡儲存一下中斷狀态并傳回,如果狀态為true說明發生過中斷,會補發一次中斷,即調用interrupt()方法
在acquireQueued中發生異常時執行cancelAcquire:
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
//清空node的線程
node.thread = null;
// Skip cancelled predecessors
//移除被取消的前繼node,這裡隻移動了node的prev,沒有改變next
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
//擷取前繼node的後繼node
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
//設定目前node等待狀态為取消,其他線程檢測到取消狀态會移除它們
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
//如果目前node為tail,将前驅node設定為tail(CAS)
//設定前驅node(即現在的tail)的後繼為null(CAS)
//此時,如果中間有取消的node,将沒有引用指向它,将被GC回收
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
//如果目前node既不是head也不是tail,設定前繼node的後繼為目前node後繼
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
//喚醒目前node後繼
unparkSuccessor(node);
}
//目前node的next設定為自己
//注意現在目前node的後繼的prev還指向目前node,是以目前node還未被删除,prev是在移除取消節點時更新的
//這裡就是為什麼在前面要從後往前找可換新的node原因了,next會導緻死循環
node.next = node; // help GC
}
}
畫圖描述解析一下cancelAcquire:
首先看如何跳過取消的前驅
這時,前驅被取消的node并沒有被移出隊列,前驅的前驅的next還指向前驅;
如果目前node是tail的情況:
這時,沒有任何引用指向目前node;
如果目前node既不是tail也不是head:
這時,目前node的前驅的next指向目前node的後繼,目前node的next指向自己,pre都沒有更新;
如果目前node是head的後繼:
這時,隻是簡單的将目前node的next指向自己;
到這裡,當線程搶占同步狀态的時候,會進入FIFO隊列等待同步狀态被釋放。在unlock()方法中調用了同步器的release方法;看一下release方法:
public final boolean release(int arg) {
//判斷是否釋放同步狀态成功
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//如果head不為null,且head的等待狀态不為0,
//喚醒後繼node的線程
unparkSuccessor(h);
return true;
}
return false;
}
再來看一下tryRelease方法(在Sync類中實作):
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
//目前thread不是獨占模式的那個線程,抛出異常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
//如果同步狀态state為0,釋放成功,将獨占線程設定為null
free = true;
setExclusiveOwnerThread(null);
}
//更新同步狀态state
setState(c);
return free;
}
繼續看unparkSuccessor(喚醒後繼node的tread)方法:
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
//head的等待狀态為負數,設定head的等待狀态為0
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
//如果head的後繼node不存在或者後繼node等待狀态大于0(即取消)
//從尾部往目前node疊代找到等待狀态為負數的node,unpark
//因為會有取消的節點
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
總結
介紹完ReentrantLock後,我們大體了解了AQS的工作原理。AQS主要就是使用了同步狀态和隊列實作了鎖的功能。有了CAS這個基礎,AQS才能發揮作用,使得在enqueue、dequeque、節點取消和異常時能夠保證隊列在多線程下的完整性。