天天看點

Java源碼閱讀之ReentrantLock - lock和unLock方法

閱讀優秀的源碼是提升程式設計技巧的重要手段之一。

如有不對的地方,歡迎指正

轉載請注明出處

https://blog.lzoro.com

碎碎念

如果需要使用或者了解

ReentrantLock

,證明已經步入并發程式設計領域了,這裡理論基礎不多提,需要的自行查閱資料。

但是,相關術語還是要做一下描述的。

ReentrantLock:可重入鎖

AQS:AbstractQueuedSynchronized 抽象類,隊列式同步器

CAS:Compare and Swap, 比較并交換值

CLH隊列:The wait queue is a variant of a "CLH" (Craig, Landin, and
     * Hagersten) lock queue.

           

ReentrantLock

首先,貼圖大家感受一下。

Sync

其中

Sync

ReentrantLock

的抽象靜态内部類,提供了鎖的同步措施,具體實作有

NonFairSync

FairSync

,分别為公平和非公平鎖。

從圖中我們可以看出,

ReentrantLock

是實作了

Lock

接口和

Serializable

接口,

Serializable

是Java的序列化接口,這裡我們不多做讨論。

那麼,開始源碼的閱讀了~

首先,先看下

Lock

接口提供的方法(篇幅所限,這裡将源碼注釋去掉),大緻可分為三類:擷取鎖、釋放鎖、建立條件(可用于進階應用,如等待/喚醒)。

public interface Lock {

    /**
     * 擷取鎖,若擷取失敗則進行等待
     */
    void lock();
    
    /**
     * 可中斷鎖
     */
    void lockInterruptibly() throws InterruptedException;
    
    /**
     * 擷取鎖,立即傳回,成功傳回true,否則false
     */
    boolean tryLock();
    
    /**
     * 擷取鎖,若擷取失敗則在指定時間内等待,成功傳回true,否則false
     */
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    
    /**
     * 釋放鎖
     */
    void unlock();
    
    /**
     * 建立條件,可用與進階應用
     */
    Condition newCondition();
}
           

接下來我們具體看下

ReentrantLock

的實作。

public void lock() {
        sync.lock();
    }
           

可以看到

ReentrantLock

的lock方法,是調用靜态内部類

sysc

的lock方法的,而

sync

lock

方法是抽象方法,具體的實作有兩個,

NonfairSync

(非公平鎖)和

FairSync

(公平鎖),我們先來看

NonFairSync

的實作

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}
           

compareAndSetState(0,1)

這個方法是由

sysn

的父類

AbstractQueuedSynchronizer

來實作的,也是我們通常說的

AQS

,而

compareAndSetState

方法的具體實作是由Unsafe提供的。

Unfafe

類的

compareAndSwap*

系列方法,是虛拟機的本地方法實作,具體的實作不在我們的讨論範圍内,簡單介紹一下作用,該方法的的作用如下:調用該方法時,若value值與expect值相等,則将value修改為update值,并傳回true;若value值與expect值不相等,那麼不做任何操作,并傳回false,這也就是我們常說的

CAS

操作,至于存在的

ABA

等問題和解決方案,有興趣的可以自己搜尋資料。

protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
           

lock

方法執行完

CAS

操作後

若得的一個true傳回,則會執行

setExclusiveOwnerThread(Thread.currentThread());

,該方法作用是為鎖設定獨占線程,其實也就是一個指派操作,如下:

protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }
           

CAS

操作傳回一個false,則執行

acquire

方法

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
           

從上面源碼可以看出,若

tryAcquire

失敗并且

acquireQueued

傳回true中斷辨別的話,将會中斷目前線程。

我們先看一下

tryAcquire

,方法的作用大緻如下:判斷鎖的

state

值,若目前未有其他線程持有該鎖,則執行

CAS

操作,成功後則設定獨占線程;若發現該鎖已被線程持有,則判斷持有線程是不是目前線程,若是則允許重入,并判斷重入的次數是否超過限制,重入成功後修改

state

并傳回一個true布爾值。

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
        
           

接下來看一下

acquireQueued

addWaiter

方法,作用描述如下,利用

addWaiter

方法,将目前線程作為一個節點

Node

加入的CLH隊列(The wait queue is a variant of a "CLH" (Craig, Landin, and

* Hagersten) lock queue),這裡加入隊列有一系列操作,包括尾部判斷,前置節點設定等,成功加入隊列後,會判斷是否有資格去競争擷取鎖,有則嘗試擷取鎖,成功後會傳回标志位。如果沒有資格,則判斷是否可以被阻塞,并做相關操作,具體請看注釋。

final boolean acquireQueued(final Node node, int arg) {
    //失敗标志
    boolean failed = true;
    try {
        //是否中斷标志
        boolean interrupted = false;
        for (;;) {
            //擷取前置節點
            final Node p = node.predecessor();
            //如果前置節點為首節點,并且目前線程能夠成功擷取鎖
            if (p == head && tryAcquire(arg)) {
                //将目前節點設定為首節點
                setHead(node);
                p.next = null; //help GC,前首節點出隊,幫助GC 
                failed = false;
                return interrupted;
            }
            //判斷是否可以阻塞線程并做相應操作,下面具體閱讀這幾個方法
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        //判斷是否擷取失敗
        if (failed)
            cancelAcquire(node);
    }
}


private Node addWaiter(Node mode) {
    //封裝成node
    Node node = new Node(Thread.currentThread(), mode);
    // 擷取隊列尾節點(作為目前節點的前置節點)
    Node pred = tail;
    //如果尾節點不為空
    if (pred != null) {
        //設定目前節點的前置節點
        node.prev = pred;
        //CAS操作,設定隊列尾部
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //尾節點為null,調用enq方法
    enq(node);
    //傳回目前節點
    return node;
}

 private Node enq(final Node node) {
    
    for (;;) {
        Node t = tail;
        //尾節點為null
        if (t == null) { // Must initialize
            //通過CAS操作設定首節點
            if (compareAndSetHead(new Node()))
                //将首節點指派給尾節點(初始化)
                tail = head;
        } else {
            //設定目前節點的前置節點
            node.prev = t;
            //通過CAS操作設定尾節點
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

           

接下來是

shouldParkAfterFailedAcquire

parkAndCheckInterrupt

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //擷取前置節點的等待狀态
    int ws = pred.waitStatus;
    //如果為SIGNAL
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        //隻有目前置節點的狀态位SIGNAL的話,目前節點才能進入阻塞,并等待前置節點的喚醒
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        //如果前置節點為取消狀态,則不斷往前搜尋并找到SIGNAL狀态的節點,并加在其後面
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        //通過CAS操作設定前置節點的等待狀态位SIGNAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

/**
 * 如果上面的方法調用傳回true,則代表目前節點可以進入阻塞/等待
 */
private final boolean parkAndCheckInterrupt() {
    //通過LockSupport類的park方法來阻塞目前線程
    LockSupport.park(this);
    //被喚醒後,傳回中斷标志
    return Thread.interrupted();
}  

/**
 * 這裡的阻塞具體實作是JVM虛拟機的本地實作,有興趣者可以自行研究
 */
public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    UNSAFE.park(false, 0L);
    setBlocker(t, null);
}
    
           

看到這裡,會有點困惑,如果沒有成功擷取到鎖的線程進入了阻塞狀态,那麼它什麼時候被喚醒呢?

這裡有一個不得不提的點,如果使用lock方法來進行加鎖,那麼必須成對地使用unlock來釋放鎖,否則容易導緻死鎖,一般都是在try-catch-finally進行鎖的釋放。

是以,等待線程的被喚醒是由持有鎖的線程調用

unlock

後觸發的。

接下來,從

unlock

入手來具體看下源碼,可以看到

unlock

方法是調用

sync.release(1)

實作的,還是以開頭的

NonFairSync

(非公平鎖)的實作來看,

① 解鎖
public void unlock() {
    sync.release(1);
}

② 釋放鎖
public final boolean release(int arg) {
    //判斷是否釋放成功
    if (tryRelease(arg)) {
        Node h = head;
        //判斷CLH隊列的首節點是否為null,并判斷等待狀态是否正常
        if (h != null && h.waitStatus != 0)
            //喚醒節點
            unparkSuccessor(h);
        return true;
    }
    return false;
}

③ 釋放鎖,并喚醒CLH隊列中的合法首節點
protected final boolean tryRelease(int releases) {
    //計算state和釋放數量的內插補點
    int c = getState() - releases;
    //判斷線程是否是鎖持有者
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    //初始化釋放結果
    boolean free = false;
    //如果目前線程未重入,釋放成功
    if (c == 0) {
        free = true;
        //釋放鎖持有的線程
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

④ 喚醒阻塞/等待的節點
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    //擷取節點等待狀态
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    //擷取後置節點
    Node s = node.next;
    //後置節點為null或者為取消狀态
    if (s == null || s.waitStatus > 0) {
        s = null;
        //從尾部向前擷取到一個不為null且狀态不是取消的節點
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    //喚醒該節點
    if (s != null)
        LockSupport.unpark(s.thread);
}
    
           

被喚醒後的節點,傳回是否被中斷過的标志,在

acquireQueued

方法内繼續執行循環擷取鎖的流程。

到這裡,NonfairSync非公平鎖的分析基本上就告一段落了,而關于FairSync的公平機制,有興趣的可以去閱讀下,實作的機制大同小異。

以上,就是Java可重入鎖ReentrantLock的lock和unLock源碼分析,膜拜Java源碼大神。

總結

1、Lock提供

lock

lockInterruptibly

tryLock()

tryLock(long time, TimeUnit unit)

unlock

newCondition

五個方法;

2、

lock

unlock

必須成對調用;

3、ReentrantLock實作了Lock和Serializable兩個接口;

4、Sync是ReentrantLock的靜态内部類,提供了公平鎖(FairSync)和非公平鎖(NonFairSync)的實作。

5、CAS操作是基于JVM提供的本地方法實作。

6、待補充

賣萌

喜歡的不妨給個贊呗,溜了溜了。