ReentrantLock:
先簡單講下ReentrantLock裡面的成員變量。
(1)int state:用于分辨目前鎖是否已經被鎖上
1)state=0: 未上鎖
2)state>=1:已上鎖,并且state>=1時記錄的時重入鎖的次數
(2)Node head:引用始終指向獲得了鎖的節點,它不會被取消。acquire操作成功就表示獲得了鎖,acquire過程中如果中斷,那麼acquire就失敗了,這時候head就會指向下一個節點。
(3)Node tail:尾節點,用于線程快速如隊列
1. 構造器
先闡述公平鎖和不公平鎖的定義
公平鎖(Fair):加鎖前檢查是否有排隊等待的線程,優先排隊等待的線程,先來先得
非公平鎖(Nonfair):加鎖時不考慮排隊等待問題,直接嘗試擷取鎖,擷取不到自動到隊尾等待
(1)預設是構造一個不公平的鎖。
public ReentrantLock(){
sync = new NonfairSync();
}
(2)為true時,構造公平鎖;為false時,構造不公平鎖。
public ReentrantLock(boolean fair){
sync = fair ? new FairSync() : new NonfairSync();
}
2. lock方法
lock方法針對 fair 和 nonfair 是有不同的實作的,在下面的代碼的實作在于 sync 是屬于哪個鎖
public void lock() {
sync.lock();
}
(1)對于NonfairSync.lock() 的實作如下
1) 上鎖接口
final void lock() {
//compareAndSetState,使用cas線程安全的把state的值更改為1, 當state=0時,代表沒有上鎖
if (compareAndSetState(, ))
//設定鎖的專屬對象是目前線程
setExclusiveOwnerThread(Thread.currentThread());
else
//如果cas操作失敗時,調用acquire方法嘗試上鎖
acquire();
}
2) 申請鎖
public final void acquire(int arg) {
// tryAcquire 嘗試快速上鎖 =>3)
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //快速上鎖失敗後調用acquireQueued方法把目前線程加入隊列,并傳回acquireQueued中是否産生過攔截
//acquireQueued中産生攔截,則調用線程中斷方法
selfInterrupt();
}
非公平鎖
1)嘗試快速上鎖
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//state=0時代表未上鎖
if (c == ) {
//可上鎖時,使用cas操作,線程安全的設定目前鎖狀态
if (compareAndSetState(, acquires)) {
//設定鎖所屬線程 為目前線程
setExclusiveOwnerThread(current);
return true;
}
}
//目前線程就是擁有鎖的線程,是以為重入鎖
else if (current == getExclusiveOwnerThread()) {
//重入鎖記錄 上鎖次數
int nextc = c + acquires;
//上鎖次數溢出,重入次數太多,在改變狀态之前抛出異常以確定鎖的狀态是正确的
if (nextc < ) // overflow
throw new Error("Maximum lock count exceeded");
//修改鎖狀态,不需要cas,因為屬于同一個線程
setState(nextc);
return true;
}
return false;
}
2) addWaiter: 在目前等待隊列添加成員
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
//如果尾部節點存在
if (pred != null) {
node.prev = pred;
// cas線程安全 設定隊列尾部等待線程為目前線程
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果尾部節點不存在,則通過死循環插入隊列
enq(node);
return node;
}
3) 插入目前線程到 鎖等待隊列上
private Node enq(final Node node) {
//直到成功才結束
for (;;) {
Node t = tail;
// 尾部節點為空,case線程安全的設定頭節點為目前節點,同時設定尾節點。不然,繼續在尾部添加node
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
4) 在所等待隊列中配置設定鎖
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// 是否産生中斷
boolean interrupted = false;
for (;;) {
// 擷取候鎖隊列的前一個節點
final Node p = node.predecessor();
// 如果node節點的前一個節點p == head,并且tryAcquire為目前線程拿到鎖,則配置設定鎖成功
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 請求鎖失敗時,使用shouldParkAfterFailedAcquire判斷是否要中斷目前線程,需要中斷目前線程則調用parkAndCheckInterrupt産生一次中斷
/**線程的thread.interrupt()方法是中斷線程,将會設定該線程的中斷狀态位,即設定為true,中斷的結果線程是死亡、還是等待新的任務或是繼續運作至下一步,就取決于這個程式本身。線程會不時地檢測這個中斷标示位,以判斷線程是否應該被中斷(中斷标示值是否為true)。它并不像stop方法那樣會中斷一個正在運作的線程。*/
if (shouldParkAfterFailedAcquire(p, node)
&& parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 配置設定失敗,要取消目前線程的鎖請求
if (failed)
cancelAcquire(node);
}
}
5) 請求鎖失敗時,是否中斷目前線程,這裡首先要了解Node的waitStatus的定義
class:AbstractQueuedSynchronizer.Node
/** waitStatus value to indicate thread has cancelled:目前線程已登出 */
static final int CANCELLED = ;
/** waitStatus value to indicate successor's thread needs unparking:目前線程的繼承者(後繼線程)需要喚醒*/
static final int SIGNAL = -;
/** waitStatus value to indicate thread is waiting on condition:目前線程在等待Condition喚醒*/
static final int CONDITION = -;
/**
* waitStatus value to indicate the next acquireShared should:暫時不看
* unconditionally propagate
*/
static final int PROPAGATE = -;
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//前一個線程的狀态
int ws = pred.waitStatus;
//目前線程需要喚醒
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
//前一個線程已經登出,是以在前一個線程以前往前尋找沒有登出的線程,并且把找到的線程的下個節點設定為目前線程
if (ws > ) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > );
pred.next = node;
} else {
/* 前一個線程等待狀态為-2或-3,cas線程安全的設定前一個線程的狀态為SIGNAL,即目前線程需要喚醒
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
6) 中斷并且檢查線程有沒有中斷
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
7) 登出線程對鎖的請求
/**
* Cancels an ongoing attempt to acquire.
*
* @param node the node
*/
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// 向前回溯,跳過取消的前繼節點,直到找到一個沒有取消的節點(注意:這裡跳過的節點都是無效節點,其實可以從隊列中移除),并使目前節點的前繼節點指向它,此時找到的那個節點的後繼節點沒有改動
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > )
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
//設定目前節點為取消狀态
node.waitStatus = Node.CANCELLED;
//目前節點就是尾節點,則直接清除尾節點,設定前一個節點的後繼節點為null,取消完成
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// 如果後繼節點需要喚醒,先從設定前繼節點的指向入手。如果前繼節點不是頭節點時,因為前繼節點之後到目前節點(node)直接的節點都是被跳過了(節點已取消),是以如果pred的waitStatus如果是SIGNAL狀态,意味着node的下個節點會喚醒,是以把pred的下個節點設定為node的下個節點,同時也完成了node節點的取消操作。
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= )
compareAndSetNext(pred, predNext, next);
} else {
//上面嘗試沒完成後,直接喚醒後繼節點
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
8). next指向非null的下一個節點,在同步隊列中等待的節點,入隊操作時設定了前一個節點的next值,這樣可以在釋放鎖時,通知下一個節點來擷取鎖
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < )
compareAndSetWaitStatus(node, ws, );
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
//找到後面第一個沒有被取消的節點
if (s == null || s.waitStatus > ) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= )
s = t;
}
if (s != null)
//喚醒後繼線程
LockSupport.unpark(s.thread);
}
FairSync:公平鎖
(1) 上鎖
final void lock() {
//和非公平鎖(NonfairSync)不一樣的是,沒有快速上鎖(搶鎖)的機制(compareAndSetState)
//acquire 和 NonfairSync 一樣,都是通過 AbstractQueuedSynchronizer 的acquire 去實作上鎖
//但是對于tryAcquire的實作機制又有不同
acquire();
}
(2)嘗試擷取鎖
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == ) {
//和NonfairSync不同的是,多了一個 hasQueuedPredecessors 的判斷目前隊列是否有等待更久的線程
if (!hasQueuedPredecessors() &&
// cas 更改鎖狀态,傳回true 則成功擷取鎖
compareAndSetState(, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < )
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
(3)判斷目前隊列是否有 正在工作的節點 或 等待更久的線程,有就傳回true
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}