天天看點

【多線程】ReentrantLock--公平鎖源碼分析

ReentrantLock lock = new ReentrantLock(true); 
lock.lock();
           

調用lock()方法實際調用sync内部類的lock方法,Sync類的的lock方法為抽象方法,實際調用其子類的lock方法,由于建立的是公平鎖,是以,最終調用FairSync的lock方法

public void lock() {//ReentrantLock#lock     
    sync.lock();//調用公平鎖的lock 
}
           

FairSync的lock方法中調用了acquire方法,嘗試擷取鎖,acquire()是AQS(AbstractQueuedSynchronizer)中的實作

Sync繼承自AQS,FairSync繼承自Sync

final void lock() {//FairSync#lock
    acquire(1);
}
           

AQS的acquire方法,先嘗試去擷取一次鎖,如果擷取失敗,将該線程封裝為node節點,添加到AQS阻塞隊列的隊尾,并以自旋的方式去嘗試擷取鎖,如果在擷取鎖的過程中出現異常,自我中斷(selfInterrupt())

public final void acquire(int arg) {//arg=1
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        //acquireQueued的傳回值實際是該線程在阻塞中是否有被中斷,如果被中斷了,傳回true,此時再進行中斷
        selfInterrupt();
}
           

先看selfInterrupt(),調用了目前線程的interrupt()方法進行中斷

private static void selfInterrupt() {
    Thread.currentThread().interrupt();
}
           

再回過頭看acquire方法中的tryAcquire方法,該方法用來先去嘗試能否擷取鎖,該方法是需要AQS的子類去重寫的,AQS中的此方法,隻抛出UnsupportedOperationException異常

FairSync#tryAcquire,先判斷資源狀态,如果未被持有,則再判斷是否有線程在等待資源,如果是,擷取鎖失敗,如果沒有線程等待,嘗試持有;如果資源以及被線程持有,判斷持有者是不是自己,如果是自己,重入數+1,否則擷取失敗

protected final boolean tryAcquire(int acquires) {//acquires=1
final Thread current = Thread.currentThread();//擷取目前線程
    int c = getState();//擷取資源狀态
    if (c == 0) {//資源沒有被鎖定
        if (!hasQueuedPredecessors() &&//判斷在此線程前是否有等待時間更長的線程,因為是公平鎖,是以如果有線程在配對等待擷取鎖,則該線程不能立即獲得鎖
            compareAndSetState(0, acquires)) {//CAS操作,如果沒有等待的線程,嘗試去擷取鎖
            setExclusiveOwnerThread(current);//沒有等待線程,目前線程擷取了鎖,那麼将資源持有者設定為此線程
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) { //資源被鎖定,判斷目前線程是否是鎖持有者,如果是,重入數+1
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;//資源狀态不為0,其持有者也不是該線程,則擷取失敗
}
           

下面是判斷線程前是否有其他線程等待資源的方法:hasQueuedPredecessors()

/**
 * Queries whether any threads have been waiting to acquire longer
 * than the current thread.
 */
public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; //tail阻塞隊列的隊尾節點
    Node h = head;//head:阻塞隊列的隊首節點
    Node s;
    // h != t 頭節點和隊尾節點不是同一個(隻有當阻塞隊列中沒有節點時,tail和head都指向null)
    //(s = h.next) == null 判斷頭節點的下一個節點是否存在
    //s.thread != Thread.currentThread() 如果頭結點的下一個節點操作,判斷這個節點裡的線程是否為目前線程
    //AQS中的阻塞隊列是以Node為節點的雙連結清單結構
    //其頭節點是一個僞節點,頭節點的下一個節點即隊首節點是存放第一個阻塞線程的節點
    //是以源碼中用頭節點的next節點中的線程與目前線程做比較
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}
           

到此,tryAcquire()的邏輯已全部分析,接下載下傳再看如果擷取鎖失敗後的處理

擷取鎖失敗,先将線程包裝成node節點,添加到阻塞線程的隊尾

/**
 * Creates and enqueues node for current thread and given mode.
 */
private Node addWaiter(Node mode) {//AbstractQueuedSynchronizer#addWaiter
    Node node = new Node(Thread.currentThread(), mode);//将線程包裝成Node對象
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;//擷取隊尾節點
    if (pred != null) {//如果CLH隊列不為空,将新節點加入到隊尾
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //隻有當阻塞隊列為空時,才會調用此方法來添加節點
    enq(node);//如果CLH隊列為空,自旋的方式添加節點(head節點和該線程節點)
    return node;
}
           
/**
 * Inserts node into queue, initializing if necessary. See picture above.
 */
private Node enq(final Node node) {//AbstractQueuedSynchronizer#enq
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
        //第一次周遊
        //判斷尾節點為空,說明整個隊列是空的,添加頭節點,将隊頭和隊尾都指向此節點
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            //第二次周遊
            //新增節點的pre節點指向目前隊列的隊尾節點
            //将自己設定為隊尾節點,将隊尾節點的next節點指向自己
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
           

将線程添加到隊尾後,接下來線程以自旋的方式嘗試擷取鎖

/**
 * Acquires in exclusive uninterruptible mode for thread already in
 * queue. Used by condition wait methods as well as acquire.
 */
final boolean acquireQueued(final Node node, int arg) {//AbstractQueuedSynchronizer#acquireQueued
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();//擷取目前線程節點的上一個節點
            if (p == head && tryAcquire(arg)) {
                //判斷他的上一個節點是否是頭節點,如果是,就再次嘗試擷取鎖
                //如果擷取鎖成功了,把自己置為頭節點
                //上面有提到,頭節點是僞節點,隊列中真正的第一個阻塞線程實際在第二個節點
                //是以,判斷如果自己的上一個節點是頭節點,說明自己就是第一個被阻塞的線程,就可以去擷取鎖了
                setHead(node);//prev,thread的引用都指向null
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //不是頭節點,或者擷取鎖失敗了,進入一下邏輯
            //1.確定這個節點前的節點的狀态是SIGNAL,這樣自己才能安心的被阻塞,因為會被前一個節點叫醒
            //2.調用LockSupport.park對線程阻塞
            //3.線上程被喚醒時,傳回這個線程的中斷狀态
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                //如果線程在阻塞期間被中斷後,不響應中斷,隻是設定一個是否中斷的變量為true
                //最後在成功擷取鎖後傳回這個中斷狀态給其調用者
                interrupted = true;
        }
    } finally {
        //最後在傳回的時候判斷下擷取鎖的這個自旋過程中是否有發生異常,如果有,則将節點從隊列中移除
        if (failed)
            cancelAcquire(node);
    }
}
           

做阻塞前的保障工作

/**
 * Checks and updates status for a node that failed to acquire.
 * Returns true if thread should block. This is the main signal
 * control in all acquire loops.  Requires that pred == node.prev. 
 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
    //判斷上個節點的狀态,隻有在狀态是SIGNAL的時候,才能確定上一個線程用完鎖後會通知自己來擷取鎖
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        //ws > 0,即CANCEL狀态,将自己前面所有是此狀态的線程節點全部移除隊列
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
         //如果是其他的狀态,将節點的狀态改為SIGNAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
           

阻塞線程

/**
 * Convenience method to park and then check if interrupted
 *
 * @return {@code true} if interrupted
 */
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);//此方法調用後,線程會在這裡阻塞,直到上一個線程喚醒它
    return Thread.interrupted();
}
           

再回頭看acquireQueued()的方法中的cancelAcquire()方法,此方法我的了解是在acquireQueued()出現異常時才會調用,因為如果正常執行,當輪到此線程擷取鎖,并成功的擷取鎖後,會将failed置為false,這樣的話是不會觸發cancelAcquire()方法的,而且此方法的出口,隻有成功擷取鎖後的一個出口

/**
 * Cancels an ongoing attempt to acquire.
 *
 * @param node the node
 */
private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;

    node.thread = null;//釋放線程引用
    Node pred = node.prev;
    while (pred.waitStatus > 0)//讓節點前所有CANCEL狀态的節點退出隊列
        node.prev = pred = pred.prev;
    Node predNext = pred.next;
    node.waitStatus = Node.CANCELLED;
    //如果該節點在隊尾,直接将隊尾置為他前面第一個不是CANCEL的節點
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
        //pred != head 他前面第一個不是CANCEL的節點不是頭節點
        //(ws = pred.waitStatus) == Node.SIGNA 他前面第一個不是CANCEL的節點的狀态是SIGNAL 
        //ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL) 他前面第一個不是CANCEL的節點的狀态不是SIGNAL則置為SIGNAL 
        //pred.thread != null 他前面第一個不是CANCEL的節點中有持有線程
         if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            //如果pred不是頭節點,且保證他的狀态是SIGNAL,且他持有線程,将pred.next的與node節點的next節點連接配接
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}
           

喚醒隊列中的第一個線程

/**
     * Wakes up node's successor, if one exists.
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        //把節點狀态改為0--初始狀态
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            //檢查節點下一個節點,如果為null,或者狀态是CANCEL,從隊尾開始,找到第一個有效的節點
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);//喚醒隊列的第一個線程
    }
           

到此,公平鎖的lock()方法全部結束

總述

調用公平鎖的lock方法,實際調用ReentrantLock中FairSync的lock()方法

他會先去調用tryLock()方法看能否先擷取鎖,如果擷取不到,将線程添加到CLH隊列裡,以自旋的方式再去擷取鎖,擷取成功後判斷自己是否被中斷過,然後再次中斷自己

在自旋的過程中為了保證自己能被成功喚醒,他回去判斷自己前面節點的狀态,如果是SIGNAL,那麼自己會被成功喚醒,如果是CANCEL狀态,需要将節點從隊列中移除,如果是其他的狀态,将狀态置為SIGNAL

如果在擷取鎖的過程中出現了異常,将其從隊列中移除,如果它的頭節點,需要将它的下一個節點線程喚醒

繼續閱讀