天天看點

ReentrantLock使用和原理(AQS簡述)

在開發過程中,synchronized是最簡單的同步控制方法,在通常情況下是夠用的,但synchronized時不響應中斷,而且有時候,我們需要靈活的來控制加解鎖。這時候可以使用ReentrantLock。

在以前的版本中,synchronized效率是遠遠低于ReentrantLock,後來經過優化,兩者性能差距不大了。但ReentrantLock有一些新特性,是synchronized所不具備的。

1、接口

//Lock.java
/* 加鎖 */
void lock();
//可響應中斷
void lockInterruptibly() throws InterruptedException;
//嘗試加鎖,失敗馬上傳回
boolean tryLock();
//嘗試加鎖,包含最大等待時間,最大等待時間範圍内未加鎖,傳回。
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
//解鎖,注意不要忘,如果有異常,解鎖應放在finally中
void unlock();
//擷取鎖的condition對象,然後做await,signal來實作等待和通知
Condition newCondition();
           

另外ReentrantLock可以選擇是否使用公平鎖,公平鎖需要維持一個有序隊列,按照請求鎖的先後順序來獲得鎖,因而效率較低,但好處在于不會産生饑餓現象,即一個線程,隻要等待,必然能擷取到鎖。而非公平鎖則不是這樣,非公平鎖除了随機獲得鎖以外,還有一個隐藏屬性,即一個線程會傾向于再次獲得已經持有的鎖,這樣的鎖的配置設定方式比較高效。

再來說說Condition。

//Condition.java
//等待,可響應中斷
void await() throws InterruptedException;
//等待,不響應中斷
void awaitUninterruptibly();
//按照時間來等待,響應中斷
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
//通知
void signal();
void signalAll();
           

2、原理

ReentrantLock使用内部類Sync來實作加解鎖,Sync是AbstractQueuedSynchronizer(下面簡稱AQS)的子類。

AQS,人如其名,抽象隊列同步器,定義了一套多線程通路共享資源的同步器架構,是模闆模式的典型應用,不光用在ReentrantLock中,也用在Semaphor/CountDownLatch等上。

AQS核心是一個共享資源(volatile int state;)和一個等待隊列。共享資源相關的方法有3個。

//AbstractQueuedSynchronizer.java
protected final int getState() {
    return state;
}
protected final void setState(int newState) {
    state = newState;
}`
protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
           

隊列是一個FIFO的雙向隊列,隊列相關結構如下。

//注釋

     * <pre>
     *      +------+  prev +-----+       +-----+
     * head |      | <---- |     | <---- |     |  tail
     *      +------+       +-----+       +-----+
     * </pre>
//code
private transient volatile Node head;
private transient volatile Node tail;
           

AQS定義了2種資源共享的方式,EXCLUSIVE(獨占,即隻有一個線程能擷取到該資源,比如ReentrantLock),SHARED(共享,多個線程可以共享該資源,Semephore/CountDownLatch),當然也可以兼而有之,比如ReentrantReadWriteLock。

等待隊列的每一項為Node,核心結構如下

//AbstractQueueSynchronizer.java
//前一個Node和下一個Node
volatile Node prev;
volatile Node next;
//本Node持有的線程
volatile Thread thread;
//指定本Node的模式,辨別共享/獨占
Node nextWaiter;
//辨別本Node的各種狀态,比如被中斷,下個節點需要通知,等等等等
volatile int waitStatus;
           

下面通過具體過程來闡述加鎖的細節。

2-1、加鎖

通常使用的都是非公平鎖,我們以這個為例來說明

//ReentrantLock.java
private final Sync sync;
public ReentrantLock() {
        sync = new NonfairSync();//預設是非公平鎖
}
public void lock() {
        sync.lock();
}
//NonfairSync内部類,非公平鎖加鎖執行本方法
final void lock() {
    if (compareAndSetState(, ))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire();
}
           

其中,compareAndSetState(0, 1),使用CAS來設定State為1,成功傳回true,并執行

//AbstractOwnableSynchronizer.java
//設定目前執行的線程
protected final void setExclusiveOwnerThread(Thread thread) {
    exclusiveOwnerThread = thread;
}
           

此時表示加鎖成功。如果state當時已經為1,即有其他線程已拿到鎖,則compareAndSetState(0, 1)馬上傳回false表示失敗。并執行如下代碼。

//AbstractQueuedSychronizer.java
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
           

首先,繼續嘗試拿鎖。

protected final boolean tryAcquire(int acquires) {
  return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == ) {
        //state為9表示可以加鎖,馬上CAS操作設定State為1
        if (compareAndSetState(, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //如果本線程已經拿到鎖,state也不為0,此時,state+=1
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < ) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
           

上一步加鎖成功,直接傳回,失敗則将本線程進行加隊列操作。

private Node addWaiter(Node mode) {
        //建立目前線程的Node,獨占模式
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        //嘗試使用快速方式增加節點到隊尾
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
   private final boolean compareAndSetTail(Node expect, Node update) {
    return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
           

先使用快速方式增加節點,一次成功可以馬上傳回,失敗則執行enq方法。

private Node enq(final Node node) {
       //經典的CAS自旋volatile變量
        for (;;) {
            Node t = tail;
            //隊列為空,建立頭結點,即尾節點
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
            //CAS加入隊尾,成功傳回,失敗繼續自旋
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
           

加隊列成功後,可以以等待狀态休息了,直到其他線程釋放資源後喚醒本線程,下面來看源碼

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
        //記錄是否被中斷
            boolean interrupted = false;
            //自旋拿鎖,即拿state
            for (;;) {
                //隊列前一個節點
                final Node p = node.predecessor();
                //如果前一個節點是head節點,則可以嘗試進行拿鎖,即CAS設定state,快速傳回成功或失敗。
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //除非上面作為第二個節點拿到鎖傳回了,其他情況執行下面代碼
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
           

如果本線程的Node作為隊列中第二個節點拿到鎖成功,否則會執行shouldParkAfterFailedAcquire,該函數其實是将Node往前插隊,前面的Node可能因為各種原因已經死去(中斷等等),直到找到确實在等待拿鎖的,然後通過park進入waiting狀态。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;//前驅的狀态
        if (ws == Node.SIGNAL)
            //如果已經告訴前驅拿到鎖通知自己了,直接傳回,可以馬上休息
            return true;
        if (ws > ) {
            //其他情況,循環周遊隊列,如果前驅放棄了,就繼續往前找,直到找到正常的節點,并排在他後面。那些被放棄的節點,由于引用消失,後續會被GC掉。
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > );
            pred.next = node;
        } else {
            //前驅狀态正常,就設定前驅狀态為SIGNAL,表示,前驅拿到鎖後通知自己
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
           

上述函數如果傳回成功,則執行park,直到被unpark或中斷喚醒。這裡特意注意下,如果park時收到中斷,并不會抛異常,而是通過Thread.interrupted()獲得中斷标記,并清除掉中斷标記。

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
           

如果是中斷,會在acquire中執行中斷,整個拿鎖流程如下圖所示

1、入隊-》2、是否二号&拿鎖-》3、找正常前前節點-》4、park等待-》2
2-》成功拿鎖/中斷
           

公平鎖的差別隻有下面的擷取鎖的方法有差別

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == ) {
        //state為0可以加鎖。hasQueuedPredecessors表示前面是否有Node,具體代碼見下
        if (!hasQueuedPredecessors() &&
            compareAndSetState(, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < )
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
           

其中hasQueuedPredecessors表示是否前面有節點

public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
           

另外還有一些tryLock(隻使用樂觀加鎖嘗試一次,直接傳回)。 lockInteruptbly(基本流程相同,把最終的Thread.interupt換為throw InteruptException)

2-2、解鎖

unlock流程如下,執行unlock表示已經拿到lock,因而不需要考慮線程安全的問題,直接将state-1即可,唯一需要注意的是多次lock需要多次unlock,這裡要判斷是否存在未完全釋放資源的情況。

public void unlock() {
    sync.release();
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
    Node h = head;//找到頭結點
    if (h != null && h.waitStatus != )
        unparkSuccessor(h);//通知下一個Node
    return true;
}
return false;
}
           

首先執行tryRelease,每次都把state-=1;

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    //
    if (c == ) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}
           

再來看unparkSuccessor

private void unparkSuccessor(Node node) {
        //清零目前node(頭結點)的waitState,允許失敗
        int ws = node.waitStatus;
        if (ws < )
            compareAndSetWaitStatus(node, ws, );

        Node s = node.next;//下一個節點
        if (s == null || s.waitStatus > ) {//為空或取消
            s = null;
            //從隊列尾部往前找,waitStatus <0即為有效的節點
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= )
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);//對該節點進行unpark喚醒
    }
           

unlock的流程主要在于對下一個等待線程的通知上。

2-3 Condition.await

首先是lock.newCondition()

//ReentrantLock.java
final ConditionObject newCondition() {
   return new ConditionObject();
}
           

ConditionObject是AQS的内部類,主要結構如下

//雙向等待隊列的首節點
/** First node of condition queue. */
private transient Node firstWaiter;
//尾節點
/** Last node of condition queue. */
private transient Node lastWaiter;
           

關于await,基本猜想就是往上面的隊列中加,然後阻塞等,基本邏輯跟lock換湯不換藥。下面來驗證我們的猜想。下面來看await的代碼

//AbastractQueuedSynchronizer.java
public final void await() throws InterruptedException {
    if (Thread.interrupted())//先判斷中斷
        throw new InterruptedException();
    //加入等待隊列,就是上面ConditionObject中的那個。
    Node node = addConditionWaiter();
    //釋放鎖将lock等待隊列的下一個節點進行unpark通知
    int savedState = fullyRelease(node);
    int interruptMode = ;
    //循環判斷是否在AQS的等待隊列中,不在就進行park動作。之後不論是被unpark喚醒,還是中斷,均會跳出次循環。
    while (!isOnSyncQueue(node)) {
        //這裡的park是conditionObject的this,即隻有signal或中斷的會喚醒本線程
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != )
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != )
        reportInterruptAfterWait(interruptMode);
}
           

先看addConditionWaiter的代碼,代碼的意義在于加入等待隊列。

private Node addConditionWaiter() {
    Node t = lastWaiter;
    //如果尾節點取消了等待,清除隊列上無效的節點
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    //本節點添加到隊列
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}
           

看unlinkCancelledWaiters,其實就是周遊conditionObject的整個隊列,将非等待狀态的節點摘除。

private void unlinkCancelledWaiters() {
    Node t = firstWaiter;//頭結點
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
        //不是等待節點,就将該節點摘鍊
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            //到尾部了
            if (next == null)
                lastWaiter = trail;
        }
        else
            trail = t;
        //往後周遊
        t = next;
    }
}
           

再看fullyRelease,其實就是unlock操作,并找出lock隊列(AQS的隊列)中下一個節點(下一個狀态不正确, 會從頭結點開始尋找)進行unpark通知

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
    Node h = head;
    if (h != null && h.waitStatus != )
        unparkSuccessor(h);//參考unlock流程的unpark,這裡不展開
    return true;
}
return false;
}
           

其中tryRelease取決于何種子類,對于ReentrantLock來說,就是state-1。

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == ) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}
           

再回過頭看下一個流程,判斷是否在等待lock的等待隊列中,即AQS的隊列

final boolean isOnSyncQueue(Node node) {
    //正常等待condition節點可以進行unpark
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    //其他時候需要下一個節點在AQS的等待隊列中,即等待拿鎖lock中,這裡判斷不在lock中,執行下一步動作
    if (node.next != null) // If has successor, it must be on queue
        return true;
    //從尾部往前找,找到本節點傳回true
    return findNodeFromTail(node);
}
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
           if (t == node)
               return true;
           if (t == null)
               return false;
           t = t.prev;
    }
}
           

回過頭來梳理這個流程,簡單來了解,就是入ConditionObject的隊列進行park等待,直到被喚醒或中斷,這兩種都會跳出while循環。下面來看後面的流程。

這裡剛從await的park中出來,要麼被signal喚醒,要麼被中斷喚醒。下面會重新拿鎖,并傳回。

if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
    unlinkCancelledWaiters();
if (interruptMode != )
    reportInterruptAfterWait(interruptMode);
           

await的核心其實就是釋放鎖,并通過park等待signal。後面被喚醒時,再拿鎖并傳回。

2-4 Condition.signal

signal的核心是将線程從await的等待狀态(park)中喚醒。

public final void signal() {
    //目前非本線程執行,抛異常
   if (!isHeldExclusively())
       throw new IllegalMonitorStateException();
   Node first = firstWaiter;
   if (first != null)
       doSignal(first);
}
protected final boolean isHeldExclusively() {
   return getExclusiveOwnerThread() == Thread.currentThread();
}
           

來看核心流程,通知Condition隊列的頭結點。

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
    //目前Condition狀态置為0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, ))
        return false;

    //節點入AQS鎖的等待隊列
    Node p = enq(node);
    int ws = p.waitStatus;
    //設定為Sinal狀态,并進行unpark
    if (ws >  || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}
           

signal的效果是将頭結點加入Lock的等待隊列,并通知那個線程啟動。那個線程在執行await()會由于while (!isOnSyncQueue(node)) 而跳出循環。

signalAll會把ConditionObject隊列中的所有節點都移入AQS的等待隊列并喚醒他們。