天天看點

20 - Lock-Condition 的等待通知

Lock-Condition 的等待通知

      • 1. condition 的使用
        • 1.2 等待方法
        • 1.2 喚醒方法
        • 1.3 使用舉例
      • 2. condition 與 wait / notify
      • 3. 源碼分析
        • 3.1 條件隊列
        • 3.2 await
        • 3.3 signal
        • 3.4 過程總結
      • 4. 生産者消費者
      • 5. 總結

  在上一篇文章中,我們講到 Java SDK 并發包裡的 Lock 有别于 synchronized 隐式鎖的三個特性:能夠響應中斷、支援逾時和非阻塞地擷取鎖。那今天我們接着再來詳細聊聊 Java SDK 并發包裡的 Condition,Condition 實作了管程模型裡面的條件變量。

  在《07-管程:并發程式設計的萬能鑰匙》裡我們提到過 Java 語言内置的管程裡隻有一個條件變量,而 Lock&Condition 實作的管程是支援多個條件變量的,這是二者的一個重要差別。

  在很多并發場景下,支援多個條件變量能夠讓我們的并發程式可讀性更好,實作起來也更容易。例如,實作一個阻塞隊列,就需要兩個條件變量。

  

1. condition 的使用

1.2 等待方法

// 目前線程進入等待狀态,如果其他線程調用 condition 的 signal 或者 signalAll 方法
// 并且目前線程擷取 Lock 從 await 方法傳回,如果在等待狀态中被中斷會抛出被中斷異常
void await() throws InterruptedException

// 目前線程進入等待狀态直到被通知,中斷或者逾時
long awaitNanos(long nanosTimeout)

// 同第二個方法,支援自定義時間機關
boolean await(long time, TimeUnit unit)throws InterruptedException

// 目前線程進入等待狀态直到被通知,中斷或者到了某個時間
boolean awaitUntil(Date deadline) throws InterruptedException
           

  

1.2 喚醒方法

// 喚醒一個等待在 condition 上的線程,将該線程從等待隊列中轉移到同步隊列中,
// 如果在同步隊列中能夠競争到 Lock 則可以從等待方法中傳回
void signal()

// 與 1 的差別在于能夠喚醒所有等待在 condition 上的線程
void signalAll()
           

  

1.3 使用舉例

public class TestCondition {

    static ReentrantLock lock = new ReentrantLock();
    static Condition condition = lock.newCondition();
    static volatile boolean flag = false;

    public static void main(String[] args) {

        new Thread(() -> {
            awaiter();
        }, "等待線程").start();

        new Thread(() -> {
            signal();
        }, "喚醒線程").start();

    }

    public static void awaiter() {
        lock.lock();
        try {
            while (!flag) {
                System.out.println(Thread.currentThread().getName() 
                + ": 條件不滿足,等待...");
                try {
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println(Thread.currentThread().getName() 
            + ": 條件已滿足,接收資料...");
        } finally {
            lock.unlock();
        }
    }

    public static void signal() {
        lock.lock();
        try {
            flag = true;
            System.out.println(Thread.currentThread().getName() 
            + ": 條件準備完成,喚醒等待線程...");
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

}

# 運作結果如下:
等待線程: 條件不滿足,等待...
喚醒線程: 條件準備完成,喚醒等待線程...
等待線程: 條件已滿足,接收資料...
           

  

2. condition 與 wait / notify

  Object 的 wait 和 notify/notify 是與 synchronized 配合完成線程間的等待/通知機制,是屬于 Java 底層級别的。而 Condition 是語言級别的,具有更高的可控制性和擴充性。具體表現如下:

  1. wait/notify 方式是響應中斷的,當線程處于 Object.wait()的等待狀态中,線程中斷會抛出中斷異常;Condition 有響應中斷和不響應中斷模式可以選擇;
  2. wait/notify 方式一個 synchronized 鎖隻有一個等待隊列;一個 Lock 鎖可以根據不同的條件,new 多個 Condition 對象,每個對象包含一個等待隊列。
需要注意的是,Condition 同 wait/notify 一樣,在等待與喚醒方法使用之前必須擷取到該鎖。

  

3. 源碼分析

需要在了解 AQS 及 ReentrantLock 基礎上閱讀本文源碼。《15 - AQS 源碼分析》《16 - ReentrantLock 可重入鎖》

3.1 條件隊列

首先看 Condition 對象的建立:

ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();

/**
 * Returns a {@link Condition} instance for use with this
 * {@link Lock} instance.
 *
 * <p>The returned {@link Condition} instance supports the same
 * usages as do the {@link Object} monitor methods ({@link
 * Object#wait() wait}, {@link Object#notify notify}, and {@link
 * Object#notifyAll notifyAll}) when used with the built-in
 * monitor lock.
 *
 * <ul>
 *
 * <li>If this lock is not held when any of the {@link Condition}
 * {@linkplain Condition#await() waiting} or {@linkplain
 * Condition#signal signalling} methods are called, then an {@link
 * IllegalMonitorStateException} is thrown.
 *
 * <li>When the condition {@linkplain Condition#await() waiting}
 * methods are called the lock is released and, before they
 * return, the lock is reacquired and the lock hold count restored
 * to what it was when the method was called.
 *
 * <li>If a thread is {@linkplain Thread#interrupt interrupted}
 * while waiting then the wait will terminate, an {@link
 * InterruptedException} will be thrown, and the thread's
 * interrupted status will be cleared.
 *
 * <li> Waiting threads are signalled in FIFO order.
 *
 * <li>The ordering of lock reacquisition for threads returning
 * from waiting methods is the same as for threads initially
 * acquiring the lock, which is in the default case not specified,
 * but for <em>fair</em> locks favors those threads that have been
 * waiting the longest.
 *
 * </ul>
 *
 * @return the Condition object
 */
public Condition newCondition() {
    return sync.newCondition();
}

abstract static class Sync extends AbstractQueuedSynchronizer {
	final ConditionObject newCondition() {
        return new ConditionObject();
    }
}
           

  建立的 Condition 對象其實就是 ConditionObject 對象,ConditionObject 是 AbstractQueuedSynchronizer(AQS)的内部類,實作了 Condition 接口。

  每個 ConditionObject 對象都有一個條件等待隊列,用于儲存在該 Condition 對象上等待的線程。條件等待隊列是一個單向連結清單,結點用的 AQS 的 Node 類,每個結點包含線程、next 結點、結點狀态。ConditionObject 通過持有頭尾指針類管理條件隊列。

20 - Lock-Condition 的等待通知
注意區分 AQS 的同步隊列和 Condition 的條件隊列:
  1. 線程搶鎖失敗時進入 AQS 同步隊列,AQS 同步隊列中的線程都是等待着随時準備搶鎖的;
  2. 線程因為沒有滿足某一條件而調用 condition.await()方法之後進入 Condition 條件隊列,Condition 條件隊列中的線程隻能等着,沒有擷取鎖的機會;
  3. 當條件滿足後調用 condition.signal()線程被喚醒,那麼線程就從 Condition 條件隊列移除,進入 AQS 同步隊列,被賦予搶鎖繼續執行的機會。

條件隊列源碼:

/**
 * Condition implementation for a {@link
 * AbstractQueuedSynchronizer} serving as the basis of a {@link
 * Lock} implementation.
 *
 * <p>Method documentation for this class describes mechanics,
 * not behavioral specifications from the point of view of Lock
 * and Condition users. Exported versions of this class will in
 * general need to be accompanied by documentation describing
 * condition semantics that rely on those of the associated
 * {@code AbstractQueuedSynchronizer}.
 *
 * <p>This class is Serializable, but all fields are transient,
 * so deserialized conditions have no waiters.
 */
public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;
    /** First node of condition queue. */
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    private transient Node lastWaiter;

    /**
     * Creates a new {@code ConditionObject} instance.
     */
    public ConditionObject() { }

	/**
     * Adds a new waiter to wait queue.
     * 入隊操作
     * @return its new wait node
     */
    private Node addConditionWaiter() {
        Node t = lastWaiter;
        // If lastWaiter is cancelled, clean out.
        // 如果尾結點取消等待了,将其清除出去,
        // 并檢查整個條件隊列将已取消的所有結點清除
        if (t != null && t.waitStatus != Node.CONDITION) {
        	// 這個方法會周遊整個條件隊列,然後會将已取消的所有結點清除出隊列
            unlinkCancelledWaiters();
            t = lastWaiter;
        }
		
		// 将目前線程構造成結點,加入隊尾
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        if (t == null)
            firstWaiter = node;
        else
            t.nextWaiter = node;
        lastWaiter = node; // 維護尾結點指針
        return node;
    }

	/**
     * Unlinks cancelled waiter nodes from condition queue.
     * Called only while holding lock. This is called when
     * cancellation occurred during condition wait, and upon
     * insertion of a new waiter when lastWaiter is seen to have
     * been cancelled. This method is needed to avoid garbage
     * retention in the absence of signals. So even though it may
     * require a full traversal, it comes into play only when
     * timeouts or cancellations occur in the absence of
     * signals. It traverses all nodes rather than stopping at a
     * particular target to unlink all pointers to garbage nodes
     * without requiring many re-traversals during cancellation
     * storms.
     * 周遊整個條件隊列,清除已取消等待的結點
     */
    private void unlinkCancelledWaiters() {
        Node t = firstWaiter;
        Node trail = null; // 用于儲存前一個結點
        while (t != null) {
            Node next = t.nextWaiter;
            if (t.waitStatus != Node.CONDITION) {
            	// t結點狀态不是Node.CONDITION,說明已經取消等待,删除
                t.nextWaiter = null;
                if (trail == null)
                    firstWaiter = next;
                else
                    trail.nextWaiter = next;
                if (next == null)
                    lastWaiter = trail;
            }
            else
                trail = t; // 下次循環中t結點的前一個結點
            t = next;
        }
    }

	static final class Node {
	    volatile Thread thread;// 每一個節點對應一個線程
	    Node nextWaiter;// next結點
	    volatile int waitStatus;// 結點狀态
	    static final int CONDITION = -2;// 結點狀态:目前節點進入等待隊列中
	 ...
	}
}
           

  

3.2 await

  當調用 condition.await()方法後會使得線程進入到條件隊列,此時線程将被阻塞。當調用 condition.signal()方法後,線程從條件隊列進入 AQS 同步隊列排隊等鎖。線程在 AQS 中發生的事情這裡就不介紹了,不明白的可以看下《15 - AQS 源碼分析》。

/**
 * 目前線程被阻塞,并加入條件隊列
 * 線程在AQS同步隊列中被喚醒後嘗試擷取鎖
 */
public final void await() throws InterruptedException {
    // 響應打斷
    if (Thread.interrupted())
        throw new InterruptedException();

    // 将目前線程構造成結點,加入條件隊列隊尾,上文詳細分析了該方法
    Node node = addConditionWaiter();

    // 釋放鎖,線程阻塞前必須将鎖釋放,下文詳解fullyRelease()方法
    int savedState = fullyRelease(node);
    int interruptMode = 0;

    /*
     * 1.isOnSyncQueue()檢查node是否在AQS同步隊列中,不在同步隊列中傳回false,
     * 		下文詳解isOnSyncQueue()方法
     * 2.如果node不在AQS同步隊列中,将目前線程阻塞
     * 3.當其他代碼調用signal()方法,線程進入AQS同步隊列後被喚醒,
     * 	繼續從這裡阻塞的地方開始執行
     * 4.注意這裡while循環的自旋,線程被喚醒以後還要再檢查一下node是否在AQS同步隊列中
     */
    while (!isOnSyncQueue(node)) { // 檢查node是否在AQS同步隊列中
        LockSupport.park(this);    // 阻塞,線程被喚醒後從這裡開始執行
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }

    /*
     * 到這裡,是目前線程在AQS同步隊列中被喚醒了,嘗試擷取鎖
     * acquireQueued()方法搶鎖,搶不到鎖就在同步隊列中阻塞
     * acquireQueued()方法是AQS文章中詳細重點講解過的這裡不詳細分析了
     */
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

fullyRelease()方法:
/**
 * 将node線程的鎖全部釋放
 * “全部”是指多次重入的情況,這裡一次全部釋放
 */
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();// 鎖狀态
        if (release(savedState)) {// 釋放鎖
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

isOnSyncQueue()方法:
/**
 * 檢查node是否在AQS同步隊列中,在同步隊列中傳回true
 */
final boolean isOnSyncQueue(Node node) {
    // 狀态為Node.CONDITION條件等待狀态,肯定是在條件隊列中,而不在同步隊列中
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    // 如果node已經有後繼節點next,那肯定是在同步隊列了
    if (node.next != null)
        return true;
    // 周遊同步隊列,檢視是否有與node相等的結點
    return findNodeFromTail(node);
}

/**
 * 從同步隊列的隊尾開始從後往前周遊找,如果找到相等的,說明在同步隊列,
 * 否則就是不在同步隊列
 */
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}
           

  

3.3 signal

  調用 condition.signal()方法後,線程從 Condition 條件隊列移除,進入 AQS 同步隊列排隊等鎖。

注意:正常情況下 signal 隻是将線程從 Condition 條件隊列轉移到 AQS 同步隊列,并沒有喚醒線程。線程的喚醒時機是 AQS 中線程的前驅節點釋放鎖之後。
public final void signal() {
    // 驗證目前線程持有鎖才能調用該方法
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

/**
 * 從條件隊列隊頭往後周遊,找出第一個需要轉移的結點node,将node從條件隊列轉移到AQS同步隊列
 * 為什麼需要周遊找?因為前有些線程會取消等待,但是可能還在條件隊列中
 */
private void doSignal(Node first) {
    do {
        // 将first中條件隊列中移除,将first的next結點作為頭結點指派給firstWaiter
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;

    /*
     * transferForSignal()将first結點加入AQS同步隊列
     * 如果first結點加入同步隊列失敗,是因為first結點取消了Node.CONDITION狀态,
     * 	原因在下面transferForSignal()的講解中說明
     * 如果first結點加入同步隊列失敗,那麼選擇first後面的第一個結點進行轉移,
     * 	依此類推
     */
    } while (!transferForSignal(first) &&    // 将first結點加入AQS同步隊列
    		// first結點加入同步隊列失敗,選擇first後面的結點進行轉移
             (first = firstWaiter) != null); 
}

/**
 * 将結點轉移到同步隊列
 * @return true-代表成功轉移;false-代表在signal之前,節點已經取消等待了
 */
final boolean transferForSignal(Node node) {
    /*
     * CAS設定結點狀态
     * CAS失敗說明此node的waitStatus已不是Node.CONDITION,說明節點已經取消。
     * 	既然已經取消,也就不需要轉移了,方法傳回,轉移後面一個節點
     * CAS失敗為什麼不是其他線程搶先操作了呢?因為這裡還持有lock獨占鎖,
     * 	隻有目前線程可以通路。
     */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    Node p = enq(node);// 自旋進入同步隊列的隊尾
    int ws = p.waitStatus;

    // 正常情況下不會走這裡,這裡是前驅節點取消或者 CAS 失敗的情況
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}


static final class Node {
    volatile Thread thread;// 每一個結點對應一個線程
    Node nextWaiter;// next結點

    volatile int waitStatus;// 結點狀态
    static final int CONDITION = -2;// 結點狀态:目前結點進入等待隊列中
}
           

  

3.4 過程總結

整個 Lock 等待通知的過程如下:

  1. ReentrantLock lock = new ReentrantLock();建立 lock 鎖,對應生成 AQS 同步隊列,一個 ReentrantLock 鎖對應一個 AQS 同步隊列;
  2. Condition condition = lock.newCondition();建立 condition,對應生成 condition 條件隊列;
  3. 線程 A 調用condition.await();,線程 A 阻塞并加入 condition 同步隊列;
  4. 線程 B 調用condition.signal();,線程 A 阻塞從 condition1 同步隊列轉移到 AQS 同步隊列的隊尾;
  5. 當 AQS 隊列中線程 A 的前驅節點線程執行完并釋放鎖時,将線程 A 喚醒;
  6. 線程 A 被喚醒之後搶鎖,執行邏輯代碼。

      

4. 生産者消費者

  下面使用生産者消費者模式模拟一個阻塞隊列:

public class BlockQueue<T> {

    private Object[] datas;
    private int size;
    private int capacity;
    private Lock lock;
    private Condition putCondition;
    private Condition takeconditon;

    public BlockQueue(int capacity) {
        this.datas = new Object[capacity];
        this.size = 0;
        this.capacity = capacity;
        this.lock = new ReentrantLock();
        this.putCondition = lock.newCondition(); // 空了
        this.takeconditon = lock.newCondition(); // 滿了
    }

    public void put(T t) throws Exception {
        lock.lock();
        try {
            while (size >= capacity) {
                System.out.println(Thread.currentThread().getName() 
                + " 隊列已滿");
                putCondition.await();
            }
            System.out.println(Thread.currentThread().getName() 
            + " 隊列添加資料:" + t);
            datas[size++] = t;
            takeconditon.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public T take() throws Exception {
        lock.lock();
        try {
            while (size <= 0) {
                System.out.println(Thread.currentThread().getName() 
                + " 隊列已空");
                takeconditon.await();
            }
            T value = (T) datas[--size];
            System.out.println(Thread.currentThread().getName() 
            + " 隊列擷取資料:" + value);
            putCondition.signalAll();
            return value;
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        BlockQueue<Integer> queue = new BlockQueue<>(5);
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                try {
                    queue.put(new Random().nextInt(1000));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }, "put 線程").start();
        }

        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                try {
                    queue.take();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }, "take 線程").start();
        }
    }
}

# 運作結果如下:
put 線程 隊列添加資料:828
put 線程 隊列添加資料:91
put 線程 隊列添加資料:750
put 線程 隊列添加資料:168
put 線程 隊列添加資料:658
put 線程 隊列已滿
put 線程 隊列已滿
put 線程 隊列已滿
put 線程 隊列已滿
put 線程 隊列已滿
take 線程 隊列擷取資料:658
put 線程 隊列添加資料:50
put 線程 隊列已滿
put 線程 隊列已滿
put 線程 隊列已滿
put 線程 隊列已滿
take 線程 隊列擷取資料:50
take 線程 隊列擷取資料:168
put 線程 隊列添加資料:599
put 線程 隊列添加資料:207
put 線程 隊列已滿
put 線程 隊列已滿
take 線程 隊列擷取資料:207
take 線程 隊列擷取資料:599
take 線程 隊列擷取資料:750
take 線程 隊列擷取資料:91
put 線程 隊列添加資料:548
put 線程 隊列添加資料:684
take 線程 隊列擷取資料:684
take 線程 隊列擷取資料:548
take 線程 隊列擷取資料:828
           

  

5. 總結

  Object 的 wait 和 notify/notify 是與 synchronized 配合完成線程間的等待/通知機制,而 Condition 與 Lock 配合完成等待通知機制。

  Condition 比 wait 和 notify 具有更高的可控制性和擴充性,一個 Lock 鎖可以有多個 Condition 條件,此外 Condition 還有響應中斷和不響應中斷模式可以選擇。Condition 的使用與 wait/notify 一樣,在等待與喚醒方法使用之前必須擷取到鎖。

  Condition 的實作原理:每個 condition 都有一個條件隊列,調用 condition.await()方法将線程阻塞後線程就進入了條件隊列,調用 condition.sigal()方法後線程從 condition 條件隊列轉移到 AQS 同步隊列等鎖,該線程的前一節點釋放鎖之後會喚醒該線程搶鎖執行。

  Condition 多用于實作的生産者消費者問題

繼續閱讀