天天看點

并發程式設計5-AQS的Condition實作原理

在經典的生産者-消費者模式中,可以使用Object.wait()和Object.notify()阻塞和喚醒線程,但是這樣的處理下隻能有一個等待隊列。在可重入鎖ReentrantLock中,使用AQS的condition可以實作設定多個等待隊列,使用Lock.newCondition就可以生成一個等待隊列,相比較來說這種方式就很靈活。

本篇文章将介紹Condition的實作原理和基本使用方法,基本過程如下:

1、Condition提供了await()方法将目前線程阻塞,并提供signal()方法支援另外一個線程将已經阻塞的線程喚醒。

2、Condition需要結合Lock使用

3、線程調用await()方法前必須擷取鎖,調用await()方法時,将線程構造成節點加入等待隊列,同時釋放鎖,并挂起目前線程

4、其他線程調用signal()方法前也必須擷取鎖,當執行signal()方法時将等待隊列的節點移入到同步隊列,當線程退出臨界區釋放鎖的時候,喚醒同步隊列的首個節點

并發程式設計5-AQS的Condition實作原理

關鍵源碼分析:

1、等待隊列使用連結清單結構

/** First node of condition queue. */
private transient Node firstWaiter;//首個等待節點
/** Last node of condition queue. */
private transient Node lastWaiter;//最後一個等待節點
           

2、目前線程(線程A)調用await阻塞目前線程

public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        //将目前線程封裝成Node加入到等待隊列尾部
        Node node = addConditionWaiter();
        //釋放鎖
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        //關鍵代碼!!!
        //判斷目前節點是否已經在同步隊列中,如果是則退出循環,如果不是就阻塞目前線程
        //即其他線程如果發出了signal信号,會把等待隊列的線程移入同步隊列(aqs的同步隊列),此時就會退出循環,進入下面的重新擷取鎖的acquireQueued
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        //其他發出signal信号的線程釋放鎖之後,該線程被喚醒并重新競争鎖
        //acquireQueued這個方法的解析在 文章 《[并發程式設計4-AQS同步器原理](https://mp.csdn.net/mdeditor/89761057#)》中有介紹
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }
           
//線程加入等待隊列尾部
    private Node addConditionWaiter() {
        Node t = lastWaiter;
        // If lastWaiter is cancelled, clean out.
        if (t != null && t.waitStatus != Node.CONDITION) {//清除cancell态的節點
            unlinkCancelledWaiters();
            t = lastWaiter;//t指向最後一個狀态正确的節點
        }
        Node node = new Node(Thread.currentThread(), Node.CONDITION);//把線程建構成一個新的節點
        if (t == null)//清單為空,初始化為第一個節點
            firstWaiter = node;
        else   //不為空,則加入到等待隊列尾部
            t.nextWaiter = node;
        lastWaiter = node;
        return node;
    }  
           

3、其他線程(線程B)調用signal/signalAll方法,将等待隊列的節點移入同步隊列(signalAll隻是循環執行signal而已),signal調用doSignal

private void doSignal(Node first) {
        do {
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;//得到firstWaiter
        } while (!transferForSignal(first) &&
                 (first = firstWaiter) != null);
    }
  //将節點從等待隊列移入同步隊列
  final boolean transferForSignal(Node node) {
    /*
     * If cannot change waitStatus, the node has been cancelled.
     */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;//cas節點狀态錯誤,說明已經cancell了,直接傳回false

    /*
     * Splice onto queue and try to set waitStatus of predecessor to
     * indicate that thread is (probably) waiting. If cancelled or
     * attempt to set waitStatus fails, wake up to resync (in which
     * case the waitStatus can be transiently and harmlessly wrong).
     */
    Node p = enq(node);//加入同步隊列
    int ws = p.waitStatus;
    //設定前置節點狀态為signal,可重入鎖那篇文章分析過,為了喚醒線程而設定
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);//特殊情況下喚醒線程并重新同步,一般情況下這裡不會執行
    return true;
}
           

4、線程(線程B)發出signal信号之後,退出臨界區釋放鎖(代碼在《并發程式設計4-AQS同步器原理》中有詳細分析,這裡僅貼出關鍵代碼)

public final boolean release(int arg) {
        if (tryRelease(arg)) { // tryRelease() 嘗試釋放目前線程的同步狀态(鎖)
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);//釋放鎖成功後,找到AQS新的頭結點(本例是指線程B),并喚醒它
            return true;
        }
        return false;
    }
           

總結

1、condition提供了類似object.wait和notify的線程通信機制,但是condition支援多個等待隊列,使用上更加靈活

2、condition的await和signal的通信機制是juc中有界隊列的實作基礎,而有界隊列又是線程池實作的基礎,常用于生産者-消費者模式

3、condition依賴于鎖而存在。

繼續閱讀