最近看了Java的并發包concurrent的ReentrantLock類源碼,做下筆記。
一、Lock接口
//在java的concurrent包中有個Lock接口,用來規範定義鎖的行為
public interface Lock{
/**
* 獲得鎖,如果鎖不可用,目前線程會一直休眠(即使目前線程被其他線程中斷的情況也會一直休眠)直到鎖可獲得。
*/
void lock();
/**
* 獲得鎖,如果鎖不可用,目前線程會一直休眠直到目前線程被其他線程中斷則會抛出InterruptedException異常。
*/
void lockInterruptibly() throws InterruptedException;
/**
* 鎖是空閑的則立即傳回true,如果鎖是不可用的則立即傳回false。
*/
boolean tryLock();
/**
* 如果鎖可用,獲得鎖,傳回true;
* 如果鎖不可用,目前線程會一直休眠直到目前線程被其他線程中斷則會抛出InterruptedException異常;
* 如果已經超過指定的時間,還沒獲得鎖,傳回false。
*/
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
/*
* 釋放鎖
*/
void unlock();
/*
* 傳回一個綁定到目前Lock執行個體的Condition執行個體。
*/
Condition newCondition();
}
..
二、ReentrantLock源碼
//===============================================================
進入主題,ReentrantLock源碼
public class ReentrantLock implements Lock, java.io.Serializable {
private final Sync sync;
//同步器(很多功能繼承于抽象類AbstractQueuedSynchronizer)
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();
/**
* Performs non-fair tryLock. tryAcquire is
* implemented in subclasses, but both need nonfair
* try for trylock method.
*/
//在非公平鎖方式下,嘗試獲得鎖
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
// Methods relayed from outer class
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
final boolean isLocked() {
return getState() != 0;
}
/**
* Reconstitutes this lock instance from a stream.
* @param s the stream
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
//非公平鎖的同步器
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires); //調用父類Sync.nonfairTryAcquire(int acquires)
}
}
//公平鎖的同步器
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
//嘗試獲得鎖,成功傳回true,失敗則傳回false
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();//調用了AbstractQueuedSynchronizer.getStatus()==>>state
if (c == 0) { //state為0,表示沒有線程獲得鎖
//!hasQueuedPredecessors():隊列裡沒有排在目前線程之前的線程,
//compareAndSetState(0, acquires):則将鎖的狀态設定為已被擷取
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {//目前線程就是擷取得鎖的線程
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
}
..
三、AQS
//==========================================
其中鎖背後真正的英雄是AQS(AbstractQueuedSynchronizer類,下面簡稱為AQS)的一些代碼,當然,還有一些方法沒寫上來。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronzier{
static final class Node {
//表示線程已經取消
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
//表示線程需要喚醒
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
//waitStatus的值有:CANCELLED,SIGNAL,CONDITION,PROPAGATE
volatile int waitStatus;
//省略其他代碼
}
//同步器的狀态,volatile辨別則保證了屬性值的對多線程的可見性
private volatile int state;
//獲得arg個許可
public final void acquire(int arg) {
//Node.EXCLUSIVE:線程節點标記為獨占模式
if ( ! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg) ) {
selfInterrupt();
}
}
//建立一個等待線程,插入隊列
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {//隊尾不為空,則表示隊尾已經被初始化
node.prev = pred;
if (compareAndSetTail(pred, node)) {//将目前線程節點銜接到隊尾
pred.next = node;
return node;
}
}
//代碼執行到這裡,表示隊尾為空,即隊尾還沒被初始化
enq(node);
return node;
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
//線程節點入隊
private Node enq(final Node node) {
//如果隊列還沒被初始化,則初始化隊頭為(new Node()),隊尾也指向隊頭
//然後将線程節點銜接到隊尾
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))//使用CAS原子操作設定隊頭
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {//使用CAS原子操作設定隊尾
t.next = node;
return t;
}
}
}
}
//隊列裡有優先獲得鎖的線程
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
//隊頭head和隊尾tail的指向不同
//(因為隊列還沒初始化時,head和tail都為null,隊列剛初始化時,它們的指向也是相同的)
//隊尾沒有在排隊的線程或者排在隊尾的線程是其他線程
}
排隊獲得鎖(CLH鎖)
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();//排線上程節點前面的節點
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//如果前趨節點p是正在阻塞等待鎖的狀态(shouldParkAfterFailedAcquire方法),則目前線程挂起(parkAndCheckInterrupt方法)。
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) //SIGNAL:-1,前趨節點的狀态是需要喚醒的狀态
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) { //waitStatus > 0,即是前趨節點的鎖被取消的值(Node.CANCELLED,值是1)
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev; //修改目前節點的前趨節點,往前移一個節點。
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL); //前趨節點設定成需要喚醒的狀态。
}
return false;
}
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
private static void selfInterrupt() {
Thread.currentThread().interrupt();
}
}
//==========================================================
一.公平鎖FairSync
使用公平鎖,建立ReentrantLock
ReentrantLock fairLock = new java.util.concurrent.locks.ReentrantLock(true);
fairLock.lock();
fairLock.unlock();
ReentrantLock.lock();==>>FairSync.lock()==>>AQS.acquire(1)實作:
1.!FairSync.tryAcquire(int acquires)//1為真,則表示沒有獲得鎖,注意,前面有個"!"(非邏輯)
并且
2.AQS.acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//Node.EXCLUSIVE就獨占式鎖,因為隻有一個線程能獲得鎖,是以使用獨占式鎖;
//比如信号量Semaphore可以設定多個許可供給多個線程獲得,則Semaphore會使用共享式鎖Node.SHARED
(1和2)都為真 ==>>AQS.selfInterrupt()
//============================================================
FairSync.tryAcquire(int acquires)實作:
1.狀态為0,則表示鎖是空閑的
1.1 !AQS.hasQueuedPredecessors() //1.1為真,則表示等待隊列裡沒有優先等待鎖的線程節點
并且
1.2 AQS.compareAndSetState(0, 1) //使用CAS原子操作設定state值由0變為1
(1.1和1.2)都為真 ==>> 目前線程獲得鎖成功傳回true。
2.狀态不為0,則表示鎖已被某個線程獲得,
2.1判斷鎖的持有者是否目前線程
2.1.1是目前線程,則表示線程重複獲得鎖,将狀态值state增加acquires,獲得鎖成功傳回true。
2.1.2不是目前線程,嘗試獲得鎖失敗,傳回false。
3. 傳回false
//============================================================
總結:公平鎖
線程請求擷取鎖時,判斷狀态state的值
1.當狀态state為0時(即鎖還沒被線程占有),
1.1判斷隊列裡有沒前任優先線程節點,即,在目前線程之前有沒其他線程在等待鎖的釋放
1.1.1如果沒有優先線程,則目前線程可以獲得鎖
1.1.2如果有優先線程,則目前線程銜接在隊尾阻塞等待鎖空閑,等待其他線程喚醒來獲得鎖
2.當狀态state不為0時,說明鎖已經被某個線程占有,判斷占有線程是否目前線程
2.1是目前線程,則線程可重複獲得鎖,将狀态state值加1,表示線程又一次重複獲得鎖
2.2不是目前線程,則目前線程銜接在隊尾阻塞等待鎖空閑,等待其他線程喚醒來獲得鎖
//============================================================
總結:非公平鎖
線程請求擷取鎖時,首先比其他線程優先獲得鎖(搶占式地,直接使用CAS(調用:compareAndSetState(0, 1))設定狀态state值),
總之,如有剛好鎖空閑時,非公平鎖會優先獲得鎖,鎖已被占有時,再銜接到隊尾,排隊等待鎖。
不管是公平鎖,還是非公平鎖,沒獲得鎖進入阻塞排隊時,AQS都會使用CLH隊列(對應到上面的acquireQueued方法),
目前線程會判斷它的前趨節點,如果前趨節點的等待狀态waitStatus是SIGNAL,則說明前趨節點也是在等待鎖挂起中,那目前線程也會挂起等待鎖;
如果前趨節點是隊頭head,則說明已經輪到目前線程獲得鎖;
釋放鎖時,會從隊頭的後繼節點開始,喚醒挂起的後繼節點,則後繼獲得鎖。