天天看點

為源碼寫注釋: ReentrantLock

ReentrantLock:

先簡單講下ReentrantLock裡面的成員變量。

(1)int state:用于分辨目前鎖是否已經被鎖上

1)state=0: 未上鎖

2)state>=1:已上鎖,并且state>=1時記錄的時重入鎖的次數

(2)Node head:引用始終指向獲得了鎖的節點,它不會被取消。acquire操作成功就表示獲得了鎖,acquire過程中如果中斷,那麼acquire就失敗了,這時候head就會指向下一個節點。

(3)Node tail:尾節點,用于線程快速如隊列

1. 構造器

先闡述公平鎖和不公平鎖的定義

公平鎖(Fair):加鎖前檢查是否有排隊等待的線程,優先排隊等待的線程,先來先得

非公平鎖(Nonfair):加鎖時不考慮排隊等待問題,直接嘗試擷取鎖,擷取不到自動到隊尾等待

(1)預設是構造一個不公平的鎖。

public ReentrantLock(){
    sync = new NonfairSync();
}
           

(2)為true時,構造公平鎖;為false時,構造不公平鎖。

public ReentrantLock(boolean fair){
    sync = fair ? new FairSync() : new NonfairSync();
}
           

2. lock方法

lock方法針對 fair 和 nonfair 是有不同的實作的,在下面的代碼的實作在于 sync 是屬于哪個鎖

public void lock() {
        sync.lock();
 }
           

(1)對于NonfairSync.lock() 的實作如下

1) 上鎖接口

final void lock() {
        //compareAndSetState,使用cas線程安全的把state的值更改為1, 當state=0時,代表沒有上鎖
    if (compareAndSetState(, ))
            //設定鎖的專屬對象是目前線程
            setExclusiveOwnerThread(Thread.currentThread());
    else
            //如果cas操作失敗時,調用acquire方法嘗試上鎖
            acquire();
}
           

2) 申請鎖

public final void acquire(int arg) {
    // tryAcquire 嘗試快速上鎖 =>3)
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //快速上鎖失敗後調用acquireQueued方法把目前線程加入隊列,并傳回acquireQueued中是否産生過攔截
        //acquireQueued中産生攔截,則調用線程中斷方法
        selfInterrupt();
}
           

非公平鎖

1)嘗試快速上鎖

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
           
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    //state=0時代表未上鎖
    if (c == ) {
        //可上鎖時,使用cas操作,線程安全的設定目前鎖狀态
        if (compareAndSetState(, acquires)) {
            //設定鎖所屬線程 為目前線程
            setExclusiveOwnerThread(current);
            return true;
        }
    }

    //目前線程就是擁有鎖的線程,是以為重入鎖
    else if (current == getExclusiveOwnerThread()) {
        //重入鎖記錄 上鎖次數
        int nextc = c + acquires;
        //上鎖次數溢出,重入次數太多,在改變狀态之前抛出異常以確定鎖的狀态是正确的
        if (nextc < ) // overflow
            throw new Error("Maximum lock count exceeded");
        //修改鎖狀态,不需要cas,因為屬于同一個線程
        setState(nextc);
        return true;
    }
    return false;
   }
           

2) addWaiter: 在目前等待隊列添加成員

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    //如果尾部節點存在
    if (pred != null) {
        node.prev = pred;
        // cas線程安全 設定隊列尾部等待線程為目前線程
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //如果尾部節點不存在,則通過死循環插入隊列
    enq(node);
    return node;
}
           

3) 插入目前線程到 鎖等待隊列上

private Node enq(final Node node) {
    //直到成功才結束
    for (;;) {
        Node t = tail;
        // 尾部節點為空,case線程安全的設定頭節點為目前節點,同時設定尾節點。不然,繼續在尾部添加node
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
           

4) 在所等待隊列中配置設定鎖

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        // 是否産生中斷
        boolean interrupted = false;
        for (;;) {
            // 擷取候鎖隊列的前一個節點
            final Node p = node.predecessor();
            // 如果node節點的前一個節點p == head,并且tryAcquire為目前線程拿到鎖,則配置設定鎖成功
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 請求鎖失敗時,使用shouldParkAfterFailedAcquire判斷是否要中斷目前線程,需要中斷目前線程則調用parkAndCheckInterrupt産生一次中斷
        /**線程的thread.interrupt()方法是中斷線程,将會設定該線程的中斷狀态位,即設定為true,中斷的結果線程是死亡、還是等待新的任務或是繼續運作至下一步,就取決于這個程式本身。線程會不時地檢測這個中斷标示位,以判斷線程是否應該被中斷(中斷标示值是否為true)。它并不像stop方法那樣會中斷一個正在運作的線程。*/
            if (shouldParkAfterFailedAcquire(p, node) 
                && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        //  配置設定失敗,要取消目前線程的鎖請求
        if (failed)
            cancelAcquire(node);
    }
}
           

5) 請求鎖失敗時,是否中斷目前線程,這裡首先要了解Node的waitStatus的定義

class:AbstractQueuedSynchronizer.Node

/** waitStatus value to indicate thread has cancelled:目前線程已登出 */
static final int CANCELLED =  ;
/** waitStatus value to indicate successor's thread needs unparking:目前線程的繼承者(後繼線程)需要喚醒*/
static final int SIGNAL    = -;
/** waitStatus value to indicate thread is waiting on condition:目前線程在等待Condition喚醒*/
static final int CONDITION = -;
/**
 * waitStatus value to indicate the next acquireShared should:暫時不看
 * unconditionally propagate
 */
static final int PROPAGATE = -;

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //前一個線程的狀态
        int ws = pred.waitStatus;
        //目前線程需要喚醒
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        //前一個線程已經登出,是以在前一個線程以前往前尋找沒有登出的線程,并且把找到的線程的下個節點設定為目前線程
        if (ws > ) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > );
            pred.next = node;
        } else {
            /* 前一個線程等待狀态為-2或-3,cas線程安全的設定前一個線程的狀态為SIGNAL,即目前線程需要喚醒
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
           

6) 中斷并且檢查線程有沒有中斷

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
           

7) 登出線程對鎖的請求

/**
     * Cancels an ongoing attempt to acquire.
     *
     * @param node the node
     */
    private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;

        node.thread = null;
        // 向前回溯,跳過取消的前繼節點,直到找到一個沒有取消的節點(注意:這裡跳過的節點都是無效節點,其實可以從隊列中移除),并使目前節點的前繼節點指向它,此時找到的那個節點的後繼節點沒有改動
        // Skip cancelled predecessors
        Node pred = node.prev;
        while (pred.waitStatus > )
            node.prev = pred = pred.prev;

        // predNext is the apparent node to unsplice. CASes below will
        // fail if not, in which case, we lost race vs another cancel
        // or signal, so no further action is necessary.
        Node predNext = pred.next;

        // Can use unconditional write instead of CAS here.
        // After this atomic step, other Nodes can skip past us.
        // Before, we are free of interference from other threads.
        //設定目前節點為取消狀态
        node.waitStatus = Node.CANCELLED;

        //目前節點就是尾節點,則直接清除尾節點,設定前一個節點的後繼節點為null,取消完成
        // If we are the tail, remove ourselves.
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            // 如果後繼節點需要喚醒,先從設定前繼節點的指向入手。如果前繼節點不是頭節點時,因為前繼節點之後到目前節點(node)直接的節點都是被跳過了(節點已取消),是以如果pred的waitStatus如果是SIGNAL狀态,意味着node的下個節點會喚醒,是以把pred的下個節點設定為node的下個節點,同時也完成了node節點的取消操作。
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <=  && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= )
                    compareAndSetNext(pred, predNext, next);
            } else {
                //上面嘗試沒完成後,直接喚醒後繼節點
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }
           

8). next指向非null的下一個節點,在同步隊列中等待的節點,入隊操作時設定了前一個節點的next值,這樣可以在釋放鎖時,通知下一個節點來擷取鎖

private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < )
        compareAndSetWaitStatus(node, ws, );

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    //找到後面第一個沒有被取消的節點
    if (s == null || s.waitStatus > ) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= )
                s = t;
    }
    if (s != null)
        //喚醒後繼線程
        LockSupport.unpark(s.thread);
}
           

FairSync:公平鎖

(1) 上鎖

final void lock() {
    //和非公平鎖(NonfairSync)不一樣的是,沒有快速上鎖(搶鎖)的機制(compareAndSetState)
    //acquire 和 NonfairSync 一樣,都是通過 AbstractQueuedSynchronizer 的acquire 去實作上鎖
    //但是對于tryAcquire的實作機制又有不同
    acquire();
}
           

(2)嘗試擷取鎖

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == ) {
        //和NonfairSync不同的是,多了一個 hasQueuedPredecessors 的判斷目前隊列是否有等待更久的線程
        if (!hasQueuedPredecessors() &&
            // cas 更改鎖狀态,傳回true 則成功擷取鎖 
            compareAndSetState(, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < )
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
           

(3)判斷目前隊列是否有 正在工作的節點 或 等待更久的線程,有就傳回true

public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}