背景
通過
lock
與
unlock
研究ReentrantLock相關實作的原理 :
- 通過
變量标志目前資源占用state
-
标志目前占用CPU的線程ExclusiveThread
-
來申請和釋放CPU資源LockSupport.park/unpark
-
來完成CAS操作Unsafe.compareAndSwapObject
-
來完成原子操作Unsafe.putObject
重要變量
-
變量state
- 為0 : 代表該節點為初創節點 , 什麼都沒幹
- 大于0 : 即代表CANCELLED , 也就是所有大于0的節點都是被取消的節點
-
小于0 : SIGNAL = -1 : 代表該節點等待unpark
CONDITION = -2 : 代表該節點等待condition的觸發
PROPAGATE = -3 : 主要用于共享鎖 , 代表該節點需要無條件傳遞上一個節點
lock流程簡述
- 通過CAS設定
(0-->1) , 如果設定成功 , 則獨占該鎖state
- 設定失敗 , 通過
建立本線程的Node , 插入隊列中addWaiter
- 在添加完節點後 , 如果目前線程未搶占成功 , 則會周遊删除Cancel節點
- 最後會找到可以執行的Node , 開始通過
來讓出目前線程的CPUpark
- 公平鎖會在申請鎖的時候判斷隊列後繼節點是否存在 , 如果目前節點不是該後繼節點的話 , 就不會申請鎖.
unlock流程簡述
- 嘗試釋放相應的資源
- 釋放成功的話 , 則會把獨占線程讓出
- 會周遊Node隊列 , 從頭節點開始尋找可以unpark的線程
非公平鎖的加鎖與釋放
lock的實作
- 當調用到
時ReentrantLock.lock()
// 以非公平鎖為例
static final class NonfairSync extends Sync {
// 擷取鎖
final void lock() {
// 通過CAS操作設定state的狀态
if (compareAndSetState(0, 1))
// 如果設定成功的話 , 則代表目前獨占鎖的線程為目前線程
setExclusiveOwnerThread(Thread.currentThread());
else
// 嘗試擷取鎖
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
// 重寫子類的Acquire , 調用nonfairTryAcquire
return nonfairTryAcquire(acquires);
}
}
複制
- 在
中 , 會擷取鎖 , 或者插入等待隊列中acquire
public final void acquire(int arg) {
// tryAcquire嘗試擷取鎖 , 如果擷取失敗的話 ,
// 則通過acquireQueued将目前線程添加到隊列中
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 如果成功的話 , 則中斷目前線程 , 僅僅隻是中斷
selfInterrupt();
}
複制
- 在Sync重寫的
中 , 會嘗試擷取鎖tryAcquire
- 如果目前鎖沒有被占用(即state=0)時 , 則獨占該鎖
- 如果目前獨占線程是本線程的話 , 則将State遞增
- 否則 , 則傳回false , 将本線程插入隊列中
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 擷取目前State狀态
int c = getState();
if (c == 0) {
// 如果目前State為0的話 , 則獨占該鎖 ,并且傳回true,不需要插入隊列
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
// 如果目前線程就是獨占的線程的話 , 則将State加一 , 并且傳回true
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
複制
- 通過
建立Node , 由于是通過addWaiter(Node.EXCLUSIVE)
, 是以也是保證線程間的可見性的.Unsafe.putObject
private Node addWaiter(Node mode) {
// 通過構造函數 , 将Node.EXCLUSIVE作為nextWaiter放入該節點
Node node = new Node(mode);
for (;;) {
// 找到尾節點 , head/tail都是volatile的
Node oldTail = tail;
// 尾節點不為空 , 雙線連結清單增加尾節點操作
if (oldTail != null) {
// 将tail節點放到node.prev中
U.putObject(node, Node.PREV, oldTail);
// 通過cas操作 , 設定tail
if (compareAndSetTail(oldTail, node)) {
// 如果設定成功的話 , 則将tail.next設定成該節點
oldTail.next = node;
// 傳回node
return node;
}
} else {
// 如果沒有頭尾節點 , 則建立head/tail節點
initializeSyncQueue();
}
}
}
複制
- 在添加完節點後 , 開始在隊列中找尋要喚醒的線程節點.
- 如果
傳回true , 則會Park目前線程shouldParkAfterFailedAcquire
- 如果
傳回false , 就會一直自旋周遊前驅節點 , 直到前驅接節點為空 , 則将目前節點設定為head節點 , 可以擷取目前鎖shouldParkAfterFailedAcquire
final boolean acquireQueued(final Node node, int arg) {
// 其實node就是尾節點
try {
boolean interrupted = false;
for (;;) {
// 從尾節點往前周遊 , 擷取其前驅
final Node p = node.predecessor();
// 如果周遊到了head節點 , 并且擷取成功了, 則傳回false
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return interrupted;
}
// shouldParkAfterFailedAcquire:判斷在acquire後,是否應該park讓出CPU
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
複制
-
主要根據前驅節點狀态判斷目前節點是否需要Park讓出CPUshouldParkAfterFailedAcquire
- 如果前驅節點的ws為
的話 , 則傳回true , park目前線程Node.SIGNAL
- 如果前驅節點的ws為canceld的話 , 則從隊列中去除掉canceld節點
- 如果前驅節點的ws為0或者
的話 , 則将前驅節點的ws設定成SIGNALPROPAGATE
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 如果前驅節點的waitStatus為SIGNAL,就代表前驅節點可以被喚醒
// 傳回true , park目前線程
return true;
if (ws > 0) {
// 如果ws>0, 則代表前驅節點cancelled了 , 則過濾掉canceld節點
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 删除掉cancedld節點
pred.next = node;
} else {
// 如果ws為0或者PROPAGATE的話 , 則将前驅節點的waitStatus設定成SIGNAL , 但是先不park
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}
複制
- 如果前驅節點要喚醒的話 , 則會Park目前線程
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
複制
unlock的實作
當某個線程在使用完鎖後 , 使用
unlock
來釋放鎖資源.
- 調用
也就是釋放unlock
的資源1
public void unlock() {
sync.release(1);
}
複制
- 在AQS中的
會先嘗試釋放鎖 , 如果成功 ,則release
前驅線程unpark
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
複制
- 在
中的Sync
中 , 會釋放相關資源 , 并且傳回鎖是否釋放成功tryRelease
protected final boolean tryRelease(int releases) {
// 将目前申請資源數 - 要釋放的數量
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
// 如果目前線程不是獨占線程的話 , 則抛出異常
throw new IllegalMonitorStateException();
// 是否釋放鎖
boolean free = false;
if (c == 0) {
// 如果目前沒有資源占用的話 , 則認為可以釋放鎖
free = true;
// 将獨占線程設為空
setExclusiveOwnerThread(null);
}
// 設定目前鎖所對應的資源數量
setState(c);
// 傳回目前鎖是否會被釋放
return free;
複制
- 當鎖成功釋放後 , 會判斷前驅節點的
是否不為0 , 由于waitStatus
階段會清除所有的lock
節點 , 是以此處判斷不為0 , 則代表前驅節點可以擷取資源占有鎖. 于是 , 調用Cancel
讓前驅節點擷取節點unparkSuccessor
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 如果waitStatus小于0 ,
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);
// 開始尋找可執行的前驅節點
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
if (s != null)
// unpark對應的線程
LockSupport.unpark(s.thread);
}
複制
公平鎖的加鎖與釋放
對于公平鎖(
FairSync
)而言在加鎖的過程中會有所不同 , 僅僅隻是在申請鎖的時候 , 加入了隊列的判斷 , 如果頭節點有後繼節點的話 , 則讓後繼節點擷取CPU
- 在申請鎖的時候 , 會判斷隊列是否有後繼節點 , 如果有後繼節點的話 , 則讓後繼節點優先獲得CPU
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 判斷隊列中是否有後繼節點
if (!hasQueuedPredecessors() &&
// 如果沒有前驅的話 , 則會通過cas來設定state
compareAndSetState(0, acquires)) {
// 設定目前線程獨占該鎖
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
複制
- 判斷head/tail節點是否相同 , 如果不同的話 , 代表head節點仍有後繼節點
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
// 傳回頭尾節點不相等 , 并且頭節點的next不為空 , 頭節點的線程不是目前線程
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
複制