天天看點

深入了解Condition

前言

建議先看一下這篇分享,深入了解AbstractQueuedSynchronizer,這篇文章主要介紹了AQS的同步隊列實作,而本篇文章主要介紹AQS條件隊列的實作

在進行線程間的通信時,當我們使用synchronized時,可以用基于Object對象的wait和notify方法實作等待/通知機制,但是在AQS相關類中怎麼實作這種等待/通知機制呢?答案是Condition,Condition是一個接,AbstractQueuedSynchronizer中有一個内部類實作了這個接口

基于Object實作等待/通知機制的相關方法

深入了解Condition

舉個例子

public class WaitNotify {

    // 代碼來自《Java并發程式設計的藝術》
    static boolean flag = true;
    static Object lock = new Object();

    public static void main(String[] args) throws InterruptedException {
        Thread waitThread = new Thread(new Wait(), "WaitThread");
        waitThread.start();
        TimeUnit.SECONDS.sleep(1);
        Thread notifyThread = new Thread(new Notify(), "notifyThread");
        notifyThread.start();
    }

    static class Wait implements Runnable {

        @Override
        public void run() {
            synchronized (lock) {
                // 條件不滿足時,繼續wait,同時釋放了lock的鎖
                while (flag) {
                    try {
                        System.out.println(Thread.currentThread() + " flag is true. await @ "
                        + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                // 條件滿足,完成工作
                System.out.println(Thread.currentThread() + " flag is false. running @ "
                + new SimpleDateFormat("HH:mm:ss").format(new Date()));
            }
        }
    }

    static class Notify implements Runnable {

        @Override
        public void run() {
            synchronized (lock) {
                System.out.println(Thread.currentThread() + " hold lock. notify @ "
                        + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                lock.notifyAll();
                flag = false;
                // 暫停5秒
                SleepUtils.second(5);
            }
            synchronized (lock) {
                System.out.println(Thread.currentThread() + " hold lock again. sleep @ "
                        + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                SleepUtils.second(5);
            }
        }
    }

    static class SleepUtils {
        public static void second(int n) {
            try {
                TimeUnit.SECONDS.sleep(n);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}           

複制

Thread[WaitThread,5,main] flag is true. await @ 00:05:55
Thread[notifyThread,5,main] hold lock. notify @ 00:05:55
Thread[notifyThread,5,main] hold lock again. sleep @ 00:06:00
Thread[WaitThread,5,main] flag is false. running @ 00:06:05           

複制

這裡有幾個需要注意的點

  1. 第三行和第四行的順序有可能颠倒,因為是競争擷取鎖的
  2. wait()方法被執行後,鎖被自動釋放,但notify()方法被執行後,鎖卻不自動釋放 ,必須執行完notify()方法所在的同步synchronized代碼塊後才釋放鎖

基于Condition實作等待/通知機制(包含了Condition接口的所有方法)

深入了解Condition

Conditon使用例子如下,可以實作條件性的通知

static ReentrantLock lock = new ReentrantLock();
    static Condition conditionA  = lock.newCondition();
    static Condition conditionB = lock.newCondition();

    public static void main(String[] args) throws InterruptedException {
        Thread waitThreadA = new Thread(new WaitA(), "WaitThreadA");
        waitThreadA.start();
        Thread waitThreadB = new Thread(new WaitB(), "WaitThreadB");
        waitThreadB.start();
        TimeUnit.SECONDS.sleep(2);
        lock.lock();
        try {
            conditionA.signal();
        } finally {
            lock.unlock();
        }
    }

    static class WaitA implements Runnable {

        @Override
        public void run() {
            lock.lock();
            try {
                System.out.println(Thread.currentThread() + " begin await @ "
                        + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                conditionA.await();
                System.out.println(Thread.currentThread() + " end await @ "
                        + new SimpleDateFormat("HH:mm:ss").format(new Date()));
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }

    static class WaitB implements Runnable {

        @Override
        public void run() {
            lock.lock();
            try {
                System.out.println(Thread.currentThread() + " begin await @ "
                        + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                conditionB.await();
                System.out.println(Thread.currentThread() + " end await @ "
                        + new SimpleDateFormat("HH:mm:ss").format(new Date()));
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }
}           

複制

Thread[WaitThreadA,5,main] begin await @ 00:49:57
Thread[WaitThreadB,5,main] begin await @ 00:49:57
Thread[WaitThreadA,5,main] end await @ 00:49:59           

複制

WaitThreadB因為沒有被通知,一直阻塞 ,這裡說一下Condition的大概實作,AQS内部維護着一個同步隊列(雙向連結清單實作),多個條件隊列(單向連結清單實作),條件隊列由AQS的内部類ConditionObject來維護,new一個ConditonObject ,則多一個條件隊列,當一個線程執行await方法是,會把當線程包裝成一個Node節點,放到執行await方法的ConditionObject的條件隊列中,釋放鎖并被阻塞,當執行signal方式時,會把條件隊列的第一個節點移除,并轉移到同步隊列中,擷取到鎖即可繼續執行

源碼

基于jdk1.8.0_20 ,Object的螢幕方法和Condition接口的對比

深入了解Condition

ConditionObject 是AQS的一個内部類,用來實作條件隊列,屬性如下

public class ConditionObject implements Condition, java.io.Serializable {

    // 條件隊列的頭節點
    private transient Node firstWaiter;

    // 條件隊列的尾節點
    private transient Node lastWaiter;

    public ConditionObject() { }

    // 阻塞過程中不響應中斷,僅設定标志位,讓之後的方法處理
    private static final int REINTERRUPT =  1;

    // 阻塞過程中響應中斷,并throw InterruptedException
    private static final int THROW_IE    = -1;
}           

複制

假如在阻塞過程中發生了中斷,REINTERRUPT标志了中斷發生在 signalled之後,THROW_IE标志了中斷發生在 signalled之前,進而決定采用那種方式響應中斷

來看await方法

// ConditionObject
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 目前線程加入等待隊列
    Node node = addConditionWaiter();
    // 釋放鎖
    int savedState = fullyRelease(node);
    // 标志位
    int interruptMode = 0;
    // 判斷節點是否在同步隊列中,即是否被喚醒
    while (!isOnSyncQueue(node)) {
        // 阻塞
        LockSupport.park(this);
        // 線程被喚醒,線程節點從條件隊列移除,并放到放到同步隊列,或被中斷
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 喚醒之後競争擷取鎖
    // 擷取鎖的過程中有中斷,并且标志位不是(響應中斷)
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        // 清除等待隊列中不是等待狀态的節點
        unlinkCancelledWaiters();
    // 阻塞中被中斷過,則進行中斷
    if (interruptMode != 0)
        // 根據标志位,決定對中斷的處理方式
        reportInterruptAfterWait(interruptMode);
}           

複制

将目前線程包裝成Node節點,并放入等待隊列

// ConditionObject
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
        // 清除等待隊列中取消狀态的節點
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    // 連結清單還沒有初始化
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}           

複制

釋放鎖

// AbstractQueuedSynchronizer
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            // 釋放鎖失敗
            throw new IllegalMonitorStateException();
        }
    } finally {
        // 釋放鎖失敗後,将目前節點狀态設定為CANCELLED
        // 後序會被清理出條件隊列
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}           

複制

判斷節點是否在同步隊列

// AbstractQueuedSynchronizer
final boolean isOnSyncQueue(Node node) {
    // 節點在條件隊列
    // 同步隊列中節點的狀态 隻能為0、SIGNAL、PROPAGATE 和 CANCELLED 其中之一
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    // 如果後繼節點不為null,則表明節點在同步隊列上
    // 因為條件隊列使用的是nextWaiter指向後繼節點的
    // 條件隊列上節點的next均為null
    if (node.next != null) // If has successor, it must be on queue
        return true;

    // 走到這一步,說明node.prev!=null && node.next=null
    // 但這并不能說明node在同步隊列中,因為節點在入隊過程中
    // 是先設定node.prev後設定node.next的(詳見addWaiter方法)
    // 有可能CAS設定尾節點失敗,導緻沒有加入隊列
    // 是以從尾到頭周遊一遍
    return findNodeFromTail(node);
}           

複制

// AbstractQueuedSynchronizer
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}           

複制

檢測線程在等待期間是否發生中斷

// ConditionObject
// Checks for interrupt, returning THROW_IE if interrupted
// before signalled, REINTERRUPT if after signalled, or
// 0 if not interrupted.
private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}           

複制

// AbstractQueuedSynchronizer
final boolean transferAfterCancelledWait(Node node) {
    // signalled之前發生中斷,因為signalled之後會将會将節點狀态從CONDITION 設定為0
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        enq(node);
        return true;
    }

    // signalled之後發生中斷
    // 如果節點還沒有被放入同步隊列,則放棄目前CPU資源
    // 讓其他任務執行
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}           

複制

清除條件隊列中取消狀态的節點

// ConditionObject
private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    // 指向尾節點
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            // 隻有頭節點的狀态不是CONDITION才會執行到這一步
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            // 周遊完連結清單,設定尾節點
            if (next == null)
                lastWaiter = trail;
        }
        else
            trail = t;
        t = next;
    }
}           

複制

響應中斷的方式

// ConditionObject
private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    if (interruptMode == THROW_IE)
        // 直接響應中斷
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}           

複制

// AbstractQueuedSynchronizer
static void selfInterrupt() {
    Thread.currentThread().interrupt();
}           

複制

來看signal,喚醒等待時間最長的線程

// ConditionObject
public final void signal() {
    // 目前線程沒有擷取到鎖
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 喚醒等待隊列中的頭結點
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}           

複制

// ConditionObject
private void doSignal(Node first) {
    do {
        // 将同步隊列的頭結點,設定為目前頭結點的下一個節點
        // 如果頭節點的下一個節點為null,則設定尾節點為null
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        // 将first節點從條件隊列中移除
        first.nextWaiter = null;
        // 通知第一個非CANCELLED節點被喚醒,或者周遊完,退出
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}           

複制

// 将節點從條件隊列放入同步隊列,true為成功 
// AbstractQueuedSynchronizer
final boolean transferForSignal(Node node) {

    // 通過CAS将節點的狀态從CONDITION設定為0
    // 如果設定失敗,說明這個節點狀态為CANCELLED
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    // 加入同步隊列,并傳回前繼節點
    Node p = enq(node);
    int ws = p.waitStatus;
    // 前繼節點為CANCELLED狀态,或者設定SIGNAL狀态失敗
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        // 喚醒線程
        LockSupport.unpark(node.thread);
    return true;
}           

複制

signalAll和signal實作類似,差別如下,signal将等待隊列中的一個非CANCELLED節點放到同步隊列,而signalAll是将等待隊列中的所有非CANCELLED節點放到同步隊列中

參考書籍

《Java并發程式設計的藝術》