天天看點

java多線程--公平鎖(三)

擷取公平鎖(基于JDK1.7.0_40)

通過前面“Java多線程系列--“JUC鎖”02之 互斥鎖ReentrantLock”的“示例1”,我們知道,擷取鎖是通過lock()函數。下面,我們以lock()對擷取公平鎖的過程進行展開。

1. lock()

lock()在ReentrantLock.java的FairSync類中實作,它的源碼如下:

final void lock() {
    acquire(1);
}
           

說明:“目前線程”實際上是通過acquire(1)擷取鎖的。

        這裡說明一下“1”的含義,它是設定“鎖的狀态”的參數。對于“獨占鎖”而言,鎖處于可擷取狀态時,它的狀态值是0;鎖被線程初次擷取到了,它的狀态值就變成了1。

        由于ReentrantLock(公平鎖/非公平鎖)是可重入鎖,是以“獨占鎖”可以被單個線程多次擷取,每擷取1次就将鎖的狀态+1。也就是說,初次擷取鎖時,通過acquire(1)将鎖的狀态值設為1;再次擷取鎖時,将鎖的狀态值設為2;依次類推...這就是為什麼擷取鎖時,傳入的參數是1的原因了。

        可重入就是指鎖可以被單個線程多次擷取。

2. acquire()

acquire()在AQS中實作的,它的源碼如下:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}<span style="font-family:'Courier New' !important;color:#000000;font-size: 12px !important; line-height: 1.5 !important;"></span>
           

(01) “目前線程”首先通過tryAcquire()嘗試擷取鎖。擷取成功的話,直接傳回;嘗試失敗的話,進入到等待隊列排序等待(前面還有可能有需要該鎖的線程在等待)。

(02) “目前線程”嘗試失敗的情況下,先通過addWaiter(Node.EXCLUSIVE)來将“目前線程”加入到"CLH隊列(非阻塞的FIFO隊列)"末尾。CLH隊列就是線程等待隊列。

(03) 再執行完addWaiter(Node.EXCLUSIVE)之後,會調用acquireQueued()來擷取鎖。由于此時ReentrantLock是公平鎖,它會根據公平性原則來擷取鎖。

(04) “目前線程”在執行acquireQueued()時,會進入到CLH隊列中休眠等待,直到擷取鎖了才傳回!如果“目前線程”在休眠等待過程中被中斷過,acquireQueued會傳回true(目前線程在休眠過程中被中斷過,為什麼要傳回true,求解釋?),此時"目前線程"會調用selfInterrupt()來自己給自己産生一個中斷。至于為什麼要自己給自己産生一個中斷,後面再介紹。

上面是對acquire()的概括性說明。下面,我們将該函數分為4部分來逐漸解析。

一. tryAcquire()

二. addWaiter()

三. acquireQueued()

四. selfInterrupt()

一. tryAcquire()

1. tryAcquire()

公平鎖的tryAcquire()在ReentrantLock.java的FairSync類中實作,源碼如下:

protected final boolean tryAcquire(int acquires) {
    // 擷取“目前線程”
    final Thread current = Thread.currentThread();
    // 擷取“獨占鎖”的狀态
    int c = getState();
    // c=0意味着“鎖沒有被任何線程鎖擁有”,
    if (c == 0) {
        // 若“鎖沒有被任何線程鎖擁有”,
        // 則判斷“目前線程”是不是CLH隊列中的第一個線程線程,
        // 若是的話,則擷取該鎖,設定鎖的狀态,并切設定鎖的擁有者為“目前線程”。
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        // 如果“獨占鎖”的擁有者已經為“目前線程”,
        // 則将更新鎖的狀态。
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
           

說明:根據代碼,我們可以分析出,tryAcquire()的作用就是嘗試去擷取鎖。注意,這裡隻是嘗試!

         嘗試成功的話,傳回true;嘗試失敗的話,傳回false,後續再通過其它辦法來擷取該鎖。後面我們會說明,在嘗試失敗的情況下,是如何一步步擷取鎖的。

2. hasQueuedPredecessors()

hasQueuedPredecessors()在AQS中實作,源碼如下:

public final boolean hasQueuedPredecessors() {
    Node t = tail; 
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}<span style="font-family:'Courier New' !important;color:#000000;font-size: 12px !important; line-height: 1.5 !important;"></span>
           

說明: 通過代碼,能分析出,hasQueuedPredecessors() 是通過判斷"目前線程"是不是在CLH隊列的隊首,來傳回AQS中是不是有比“目前線程”等待更久的線程。下面對head、tail和Node進行說明。

3. Node的源碼

Node就是CLH隊列的節點。Node在AQS中實作,它的資料結構如下:

private transient volatile Node head;    // CLH隊列的隊首
private transient volatile Node tail;    // CLH隊列的隊尾

// CLH隊列的節點
static final class Node {
    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;

    // 線程已被取消,對應的waitStatus的值
    static final int CANCELLED =  1;
    // “目前線程的後繼線程需要被unpark(喚醒)”,對應的waitStatus的值。
    // 一般發生情況是:目前線程的後繼線程處于阻塞狀态,而目前線程被release或cancel掉,是以需要喚醒目前線程的後繼線程。
    static final int SIGNAL    = -1;
    // 線程(處在Condition休眠狀态)在等待Condition喚醒,對應的waitStatus的值
    static final int CONDITION = -2;
    // (共享鎖)其它線程擷取到“共享鎖”,對應的waitStatus的值
    static final int PROPAGATE = -3;

    // waitStatus為“CANCELLED, SIGNAL, CONDITION, PROPAGATE”時分别表示不同狀态,
    // 若waitStatus=0,則意味着目前線程不屬于上面的任何一種狀态。
    volatile int waitStatus;

    // 前一節點
    volatile Node prev;

    // 後一節點
    volatile Node next;

    // 節點所對應的線程
    volatile Thread thread;

    // nextWaiter是“差別目前CLH隊列是 ‘獨占鎖’隊列 還是 ‘共享鎖’隊列 的标記”
    // 若nextWaiter=SHARED,則CLH隊列是“獨占鎖”隊列;
    // 若nextWaiter=EXCLUSIVE,(即nextWaiter=null),則CLH隊列是“共享鎖”隊列。
    Node nextWaiter;

    // “共享鎖”則傳回true,“獨占鎖”則傳回false。
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    // 傳回前一節點
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    // 構造函數。thread是節點所對應的線程,mode是用來表示thread的鎖是“獨占鎖”還是“共享鎖”。
    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    // 構造函數。thread是節點所對應的線程,waitStatus是線程的等待狀态。
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}
           

說明:

Node是CLH隊列的節點,代表“等待鎖的線程隊列”。

(01) 每個Node都會一個線程對應。

(02) 每個Node會通過prev和next分别指向上一個節點和下一個節點,這分别代表上一個等待線程和下一個等待線程。

(03) Node通過waitStatus儲存線程的等待狀态。

(04) Node通過nextWaiter來區分線程是“獨占鎖”線程還是“共享鎖”線程。如果是“獨占鎖”線程,則nextWaiter的值為EXCLUSIVE;如果是“共享鎖”線程,則nextWaiter的值是SHARED。

4. compareAndSetState()

compareAndSetState()在AQS中實作。它的源碼如下:

protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}<span style="font-family:'Courier New' !important;color:#000000;font-size: 12px !important; line-height: 1.5 !important;"></span>
           

說明: compareAndSwapInt() 是sun.misc.Unsafe類中的一個本地方法。對此,我們需要了解的是 compareAndSetState(expect, update) 是以原子的方式操作目前線程;若目前線程的狀态為expect,則設定它的狀态為update。

5. setExclusiveOwnerThread()

setExclusiveOwnerThread()在AbstractOwnableSynchronizer.java中實作,它的源碼如下:

// exclusiveOwnerThread是目前擁有“獨占鎖”的線程
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread t) {
    exclusiveOwnerThread = t;
}<span style="font-family:'Courier New' !important;color:#000000;font-size: 12px !important; line-height: 1.5 !important;"></span>
           

說明:setExclusiveOwnerThread()的作用就是,設定線程t為目前擁有“獨占鎖”的線程。

6. getState(), setState()

getState()和setState()都在AQS中實作,源碼如下:

// 鎖的狀态
private volatile int state;
// 設定鎖的狀态
protected final void setState(int newState) {
    state = newState;
}
// 擷取鎖的狀态
protected final int getState() {
    return state;
}<span style="font-family:'Courier New' !important;color:#000000;font-size: 12px !important; line-height: 1.5 !important;"></span>
           

說明:state表示鎖的狀态,對于“獨占鎖”而已,state=0表示鎖是可擷取狀态(即,鎖沒有被任何線程鎖持有)。由于java中的獨占鎖是可重入的,state的值可以>1。

小結:tryAcquire()的作用就是讓“目前線程”嘗試擷取鎖。擷取成功傳回true,失敗則傳回false。

二. addWaiter(Node.EXCLUSIVE)

addWaiter(Node.EXCLUSIVE)的作用是,建立“目前線程”的Node節點,且Node中記錄“目前線程”對應的鎖是“獨占鎖”類型,并且将該節點添加到CLH隊列的末尾。

1.addWaiter()

addWaiter()在AQS中實作,源碼如下:

private Node addWaiter(Node mode) {
    // 建立一個Node節點,節點對應的線程是“目前線程”,“目前線程”的鎖的模型是mode。
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    // 若CLH隊列不為空,則将“目前線程”添加到CLH隊列末尾
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 若CLH隊列為空,則調用enq()建立CLH隊列,然後再将“目前線程”添加到CLH隊列中。
    enq(node);
    return node;
}<span style="font-family:'Courier New' !important;color:#000000;font-size: 12px !important; line-height: 1.5 !important;"></span>
           

說明:對于“公平鎖”而言,addWaiter(Node.EXCLUSIVE)會首先建立一個Node節點,節點的類型是“獨占鎖”(Node.EXCLUSIVE)類型。然後,再将該節點添加到CLH隊列的末尾。

2. compareAndSetTail()

compareAndSetTail()在AQS中實作,源碼如下:

private final boolean compareAndSetTail(Node expect, Node update) {
    return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}<span style="font-family:'Courier New' !important;color:#000000;font-size: 12px !important; line-height: 1.5 !important;"></span>
           

說明:compareAndSetTail也屬于CAS函數,也是通過“本地方法”實作的。compareAndSetTail(expect, update)會以原子的方式進行操作,它的作用是判斷CLH隊列的隊尾是不是為expect,是的話,就将隊尾設為update。

3. enq()

enq()在AQS中實作,源碼如下:

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}<span style="font-family:'Courier New' !important;color:#000000;font-size: 12px !important; line-height: 1.5 !important;"></span>
           

說明: enq()的作用很簡單。如果CLH隊列為空,則建立一個CLH表頭;然後将node添加到CLH末尾。否則,直接将node添加到CLH末尾。

小結:addWaiter()的作用,就是将目前線程添加到CLH隊列中。這就意味着将目前線程添加到等待擷取“鎖”的等待線程隊列中了。

三. acquireQueued()

前面,我們已經将目前線程添加到CLH隊列中了。而acquireQueued()的作用就是逐漸的去執行CLH隊列的線程,如果目前線程擷取到了鎖,則傳回;否則,目前線程進行休眠,直到喚醒并重新擷取鎖了才傳回。下面,我們看看acquireQueued()的具體流程。

1. acquireQueued()

acquireQueued()在AQS中實作,源碼如下:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        // interrupted表示在CLH隊列的排程中,
        // “目前線程”在休眠時,有沒有被中斷過。
        boolean interrupted = false;
        for (;;) {
            // 擷取上一個節點。
            // node是“目前線程”對應的節點,這裡就意味着“擷取上一個等待鎖的線程”。
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}<span style="font-family:'Courier New' !important;color:#000000;font-size: 12px !important; line-height: 1.5 !important;"></span>
           

說明:acquireQueued()的目的是從隊列中擷取鎖。

2. shouldParkAfterFailedAcquire()

shouldParkAfterFailedAcquire()在AQS中實作,源碼如下:

// 傳回“目前線程是否應該阻塞”
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 前繼節點的狀态
    int ws = pred.waitStatus;
    // 如果前繼節點是SIGNAL狀态,則意味這目前線程需要被unpark喚醒。此時,傳回true。
    if (ws == Node.SIGNAL)
        return true;
    // 如果前繼節點是“取消”狀态,則設定 “目前節點”的 “目前前繼節點”  為  “‘原前繼節點’的前繼節點”。
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 如果前繼節點為“0”或者“共享鎖”狀态,則設定前繼節點為SIGNAL狀态。
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}<span style="font-family:'Courier New' !important;color:#000000;font-size: 12px !important; line-height: 1.5 !important;"></span>
           

說明:

(01) 關于waitStatus請參考下表(中擴号内為waitStatus的值),更多關于waitStatus的内容,可以參考前面的Node類的介紹。

CANCELLED[1]  -- 目前線程已被取消
SIGNAL[-1]    -- “目前線程的後繼線程需要被unpark(喚醒)”。一般發生情況是:目前線程的後繼線程處于阻塞狀态,而目前線程被release或cancel掉,是以需要喚醒目前線程的後繼線程。
CONDITION[-2] -- 目前線程(處在Condition休眠狀态)在等待Condition喚醒
PROPAGATE[-3] -- (共享鎖)其它線程擷取到“共享鎖”
[0]           -- 目前線程不屬于上面的任何一種狀态。      

(02) shouldParkAfterFailedAcquire()通過以下規則,判斷“目前線程”是否需要被阻塞。

規則1:如果前繼節點狀态為SIGNAL,表明目前節點需要被unpark(喚醒),此時則傳回true。
規則2:如果前繼節點狀态為CANCELLED(ws>0),說明前繼節點已經被取消,則通過先前回溯找到一個有效(非CANCELLED狀态)的節點,并傳回false。
規則3:如果前繼節點狀态為非SIGNAL、非CANCELLED,則設定前繼的狀态為SIGNAL,并傳回false。      

如果“規則1”發生,即“前繼節點是SIGNAL”狀态,則意味着“目前線程”需要被阻塞。接下來會調用parkAndCheckInterrupt()阻塞目前線程,直到目前先被喚醒才從parkAndCheckInterrupt()中傳回。

3. parkAndCheckInterrupt())

parkAndCheckInterrupt()在AQS中實作,源碼如下:

private final boolean parkAndCheckInterrupt() {
    // 通過LockSupport的park()阻塞“目前線程”。
    LockSupport.park(this);
    // 傳回線程的中斷狀态。
    return Thread.interrupted();
}<span style="font-family:'Courier New' !important;color:#000000;font-size: 12px !important; line-height: 1.5 !important;"></span>
           

說明:parkAndCheckInterrupt()的作用是阻塞目前線程,并且傳回“線程被喚醒之後”的中斷狀态。

它會先通過LockSupport.park()阻塞“目前線程”,然後通過Thread.interrupted()傳回線程的中斷狀态。

這裡介紹一下線程被阻塞之後如何喚醒。一般有2種情況:

第1種情況:unpark()喚醒。“前繼節點對應的線程”使用完鎖之後,通過unpark()方式喚醒目前線程。

第2種情況:中斷喚醒。其它線程通過interrupt()中斷目前線程。

補充:LockSupport()中的park(),unpark()的作用 和 Object中的wait(),notify()作用類似,是阻塞/喚醒。

它們的用法不同,park(),unpark()是輕量級的,而wait(),notify()是必須先通過Synchronized擷取同步鎖。

關于LockSupport,我們會在之後的章節再專門進行介紹!

4. 再次tryAcquire()

了解了shouldParkAfterFailedAcquire()和parkAndCheckInterrupt()函數之後。我們接着分析acquireQueued()的for循環部分。

final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
    setHead(node);
    p.next = null; // help GC
    failed = false;
    return interrupted;
}<span style="font-family:'Courier New' !important;color:#000000;font-size: 12px !important; line-height: 1.5 !important;"></span>
           

說明:

(01) 通過node.predecessor()擷取前繼節點。predecessor()就是傳回node的前繼節點,若對此有疑惑可以檢視下面關于Node類的介紹。

(02) p == head && tryAcquire(arg)

       首先,判斷“前繼節點”是不是CHL表頭。如果是的話,則通過tryAcquire()嘗試擷取鎖。

       其實,這樣做的目的是為了“讓目前線程擷取鎖”,但是為什麼需要先判斷p==head呢?了解這個對了解“公平鎖”的機制很重要,因為這麼做的原因就是為了保證公平性!

       (a) 前面,我們在shouldParkAfterFailedAcquire()我們判斷“目前線程”是否需要阻塞;

       (b) 接着,“目前線程”阻塞的話,會調用parkAndCheckInterrupt()來阻塞線程。當線程被解除阻塞的時候,我們會傳回線程的中斷狀态。而線程被解決阻塞,可能是由于“線程被中斷”,也可能是由于“其它線程調用了該線程的unpark()函數”。

       (c) 再回到p==head這裡。如果目前線程是因為其它線程調用了unpark()函數而被喚醒,那麼喚醒它的線程,應該是它的前繼節點所對應的線程(關于這一點,後面在“釋放鎖”的過程中會看到)。 OK,是前繼節點調用unpark()喚醒了目前線程!

            此時,再來了解p==head就很簡單了:目前繼節點是CLH隊列的頭節點,并且它釋放鎖之後;就輪到目前節點擷取鎖了。然後,目前節點通過tryAcquire()擷取鎖;擷取成功的話,通過setHead(node)設定目前節點為頭節點,并傳回。

       總之,如果“前繼節點調用unpark()喚醒了目前線程”并且“前繼節點是CLH表頭”,此時就是滿足p==head,也就是符合公平性原則的。否則,如果目前線程是因為“線程被中斷”而喚醒,那麼顯然就不是公平了。這就是為什麼說p==head就是保證公平性!

小結:acquireQueued()的作用就是“目前線程”會根據公平性原則進行阻塞等待,直到擷取鎖為止;并且傳回目前線程在等待過程中有沒有并中斷過。

四. selfInterrupt()

selfInterrupt()是AQS中實作,源碼如下:

private static void selfInterrupt() {
    Thread.currentThread().interrupt();
}<span style="font-family:'Courier New' !important;color:#000000;font-size: 12px !important; line-height: 1.5 !important;"></span>
           

說明:selfInterrupt()的代碼很簡單,就是“目前線程”自己産生一個中斷。但是,為什麼需要這麼做呢?

這必須結合acquireQueued()進行分析。如果在acquireQueued()中,目前線程被中斷過,則執行selfInterrupt();否則不會執行。

在acquireQueued()中,即使是線程在阻塞狀态被中斷喚醒而擷取到cpu執行權利;但是,如果該線程的前面還有其它等待鎖的線程,根據公平性原則,該線程依然無法擷取到鎖。它會再次阻塞! 該線程再次阻塞,直到該線程被它的前面等待鎖的線程鎖喚醒;線程才會擷取鎖,然後“真正執行起來”!

也就是說,在該線程“成功擷取鎖并真正執行起來”之前,它的中斷會被忽略并且中斷标記會被清除! 因為在parkAndCheckInterrupt()中,我們線程的中斷狀态時調用了Thread.interrupted()。該函數不同于Thread的isInterrupted()函數,isInterrupted()僅僅傳回中斷狀态,而interrupted()在傳回目前中斷狀态之後,還會清除中斷狀态。 正因為之前的中斷狀态被清除了,是以這裡需要調用selfInterrupt()重新産生一個中斷!

小結:selfInterrupt()的作用就是目前線程自己産生一個中斷。

總結

再回過頭看看acquire()函數,它最終的目的是擷取鎖!

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}<span style="font-family:'Courier New' !important;color:#000000;font-size: 12px !important; line-height: 1.5 !important;"></span>
           

(01) 先是通過tryAcquire()嘗試擷取鎖。擷取成功的話,直接傳回;嘗試失敗的話,再通過acquireQueued()擷取鎖。

(02) 嘗試失敗的情況下,會先通過addWaiter()來将“目前線程”加入到"CLH隊列"末尾;然後調用acquireQueued(),在CLH隊列中排序等待擷取鎖,在此過程中,線程處于休眠狀态。直到擷取鎖了才傳回。 如果在休眠等待過程中被中斷過,則調用selfInterrupt()來自己産生一個中斷。