天天看點

并發工具類:Condition如何實作條件通知?

并發工具類:Condition如何實作條件通知?

AQS如何實作等待通知?

在進行線程間的通信時,可以用基于Object對象的wait和notify方法實作等待/通知機制

基于Object實作等待/通知機制的相關方法

方法名稱 描述
notify() 通知一個在對象上等待的線程,使其從wait()方法傳回
notifyAll() 通知所有等待在該對象上的線程
wait() 調用該方法的線程進入WAITING狀态,隻有等待另外線程的通知或被中斷才會傳回,需要注意,調用wait()方法後,會釋放對象的鎖
wait(long) 逾時等待一段時間,這裡的參數是毫秒,也就是等待長達n毫秒,如果沒有通知就逾時傳回
wait(long, int) 對于逾時時間更細粒度的控制,可以達到納秒

用例子示範一下用法

public class WaitNotify {

    private static Object lock = new Object();

    public static void main(String[] args) throws InterruptedException {
        Thread waitThread = new Thread(new Wait());
        waitThread.start();
        TimeUnit.SECONDS.sleep(1);
        Thread notifyThread = new Thread(new Notify());
        notifyThread.start();
    }

    private static class Wait implements Runnable {

        @Override
        public void run() {
            synchronized (lock) {
                try {
                    System.out.println("lock wait start");
                    lock.wait();
                    System.out.println("lock wait end");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 條件滿足,完成工作
                System.out.println("run finish");
            }
        }
    }

    private static class Notify implements Runnable {

        @Override
        public void run() {
            synchronized (lock) {
                System.out.println("lock notify start");
                lock.notify();
                System.out.println("lock notify end");
            }
        }
    }
}      
lock wait start
lock notify start
lock notify end
lock wait end
run finish      

wait()方法被執行後,鎖被自動釋放,但notify()方法被執行後,鎖卻不自動釋放,必須執行完notify()方法所在的同步synchronized代碼塊後才釋放鎖

在AQS相關類中怎麼實作這種等待/通知機制呢?

答案是Condition,Condition是一個接口,AbstractQueuedSynchronizer中有一個内部類實作了這個接口,一個Condition就代表一個條件隊列

基于Condition實作等待/通知機制

和synchronized一樣,調用await和signal方法時,必須獲得與Condition相關的鎖

Condition的主要方法有2個

  1. await,将線程放入等待隊列進行等待,當其他線程調用signal或者interrupt方法時會被喚醒
  2. signal,喚醒等待隊列中的線程,将其放入同步隊列,獲得鎖後開始執行

Conditon使用例子如下

public class WaitNotifyUseCondition {

    private static ReentrantLock lock = new ReentrantLock();
    private static Condition conditionA  = lock.newCondition();
    private static Condition conditionB = lock.newCondition();

    public static void main(String[] args) throws InterruptedException {
        Thread waitThreadA = new Thread(new WaitA());
        waitThreadA.start();
        Thread waitThreadB = new Thread(new WaitB());
        waitThreadB.start();
        TimeUnit.SECONDS.sleep(2);
        lock.lock();
        try {
            conditionA.signal();
        } finally {
            lock.unlock();
        }
    }

    private static class WaitA implements Runnable {

        @Override
        public void run() {
            lock.lock();
            try {
                System.out.println("WaitA begin wait");
                conditionA.await();
                System.out.println("WaitA end wait");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }

    private static class WaitB implements Runnable {

        @Override
        public void run() {
            lock.lock();
            try {
                System.out.println("WaitB begin wait");
                conditionB.await();
                System.out.println("WaitB end wait");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }
}      
WaitA begin wait
WaitB begin wait
WaitA end wait      

WaitThreadB因為沒有被通知,一直阻塞

可以看到Condition比Object的等待通知更強大,因為它可以實作條件通知

其實原因也和簡單,基于Object的等待通知隻有一個等待隊列,而在AQS中有多個等待隊列,一個Condition類代表一個條件隊列

同步隊列和等待隊列

并發工具類:Condition如何實作條件通知?

在之前的AQS文章中,我們提到了同步隊列,本節我們又提到了等待隊列,那他們兩者是如何協同工作的?

AbstractQueuedSynchronizer内部維護着一個同步隊列(雙向連結清單實作),多個條件隊列(單向連結清單實作),條件隊列由AbstractQueuedSynchronizer的内部類ConditionObject來維護,new一個ConditonObject ,則多一個條件隊列,當一個線程執行await方法是,會把當線程包裝成一個Node節點,放到執行await方法的ConditionObject的條件隊列中,釋放鎖并被阻塞,當執行signal方式時,會把條件隊列的第一個節點移除,并轉移到同步隊列中,擷取到鎖即可繼續執行

ConditionObject 是AbstractQueuedSynchronizer的一個内部類,用來實作條件隊列,屬性如下

public class ConditionObject implements Condition, java.io.Serializable {

  // 條件隊列的頭節點
  private transient Node firstWaiter;
  
  // 條件隊列的尾節點
  private transient Node lastWaiter;

  public ConditionObject() { }

  // 阻塞過程中不響應中斷,僅設定标志位,讓之後的方法處理
  private static final int REINTERRUPT =  1;
  
  // 阻塞過程中響應中斷,并throw InterruptedException
  private static final int THROW_IE    = -1;

}      

假如在阻塞過程中發生了中斷,REINTERRUPT标志了中斷發生在 signalled之後,

THROW_IE标志了中斷發生在 signalled之前,進而決定采用那種方式響應中斷

await阻塞線程

将線程放入等待隊列,并且阻塞線程。當調用signal的時候,喚醒線程,進入同步隊列擷取鎖

// ConditionObject
public final void await() throws InterruptedException {
  if (Thread.interrupted())
    throw new InterruptedException();
  // 目前線程加入等待隊列
  Node node = addConditionWaiter();
  // 挂起線程前,必須釋放目前鎖,這裡調用ReentrantLock釋放鎖的邏輯
  int savedState = fullyRelease(node);
  // 标志位
  int interruptMode = 0;
  // 判斷目前線程是否在同步隊列中,如果不在同步隊列,則直接挂起線程
  while (!isOnSyncQueue(node)) {
    // 阻塞
    LockSupport.park(this);
    // 線程被喚醒,線程節點從條件隊列移除,并放到放到同步隊列,或被中斷
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
      break;
  }
  // 喚醒之後進入同步隊列,競争擷取鎖
  // 擷取鎖的過程中有中斷,并且标志位不是(響應中斷)
  if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    interruptMode = REINTERRUPT;
  if (node.nextWaiter != null) // clean up if cancelled
    // 清除等待隊列中不是等待狀态的節點
    unlinkCancelledWaiters();
  // 阻塞中被中斷過,則進行中斷
  if (interruptMode != 0)
    // 根據标志位,決定對中斷的處理方式
    reportInterruptAfterWait(interruptMode);
}      

增加等待節點的邏輯,每次增加節點都會剔除非condition的節點

// ConditionObject
private Node addConditionWaiter() {
  Node t = lastWaiter;
  // If lastWaiter is cancelled, clean out.
  if (t != null && t.waitStatus != Node.CONDITION) {
    // 清除等待隊列中取消狀态的節點
    unlinkCancelledWaiters();
    t = lastWaiter;
  }
  Node node = new Node(Thread.currentThread(), Node.CONDITION);
  // 連結清單還沒有初始化
  if (t == null)
    firstWaiter = node;
  else
    t.nextWaiter = node;
  lastWaiter = node;
  return node;
}      

釋放鎖

// AbstractQueuedSynchronizer
final int fullyRelease(Node node) {
  boolean failed = true;
  try {
    int savedState = getState();
    if (release(savedState)) {
      failed = false;
      return savedState;
    } else {
      // 釋放鎖失敗
      throw new IllegalMonitorStateException();
    }
  } finally {
    // 釋放鎖失敗後,将目前節點狀态設定為CANCELLED
    // 後序會被清理出條件隊列
    if (failed)
      node.waitStatus = Node.CANCELLED;
  }
}      

判斷節點是否在同步隊列

// AbstractQueuedSynchronizer
final boolean isOnSyncQueue(Node node) {
  // 節點在條件隊列
  // 同步隊列中節點的狀态 隻能為0、SIGNAL、PROPAGATE 和 CANCELLED 其中之一
  if (node.waitStatus == Node.CONDITION || node.prev == null)
    return false;
  // 如果後繼節點不為null,則表明節點在同步隊列上
  // 因為條件隊列使用的是nextWaiter指向後繼節點的
  // 條件隊列上節點的next均為null
  if (node.next != null) // If has successor, it must be on queue
    return true;
  
  // 走到這一步,說明node.prev!=null && node.next=null
  // 但這并不能說明node在同步隊列中,因為節點在入隊過程中
  // 是先設定node.prev後設定node.next的(詳見addWaiter方法)
  // 有可能CAS設定尾節點失敗,導緻沒有加入隊列
  // 是以從尾到頭周遊一遍
  return findNodeFromTail(node);
}      
// AbstractQueuedSynchronizer
private boolean findNodeFromTail(Node node) {
  Node t = tail;
  for (;;) {
    if (t == node)
      return true;
    if (t == null)
      return false;
    t = t.prev;
  }
}      

檢測線程在等待期間是否發生中斷

// ConditionObject
// Checks for interrupt, returning THROW_IE if interrupted
// before signalled, REINTERRUPT if after signalled, or
// 0 if not interrupted.
private int checkInterruptWhileWaiting(Node node) {
  return Thread.interrupted() ?
    (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
    0;
}      
// AbstractQueuedSynchronizer
final boolean transferAfterCancelledWait(Node node) {
  // signalled之前發生中斷,因為signalled之後會将會将節點狀态從CONDITION 設定為0
  if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
    enq(node);
    return true;
  }

  // signalled之後發生中斷
  // 如果節點還沒有被放入同步隊列,則放棄目前CPU資源
  // 讓其他任務執行
  while (!isOnSyncQueue(node))
    Thread.yield();
  return false;
}      

清除等待隊列中取消狀态的節點

// ConditionObject
private void unlinkCancelledWaiters() {
  Node t = firstWaiter;
  // 指向尾節點
  Node trail = null;
  while (t != null) {
    Node next = t.nextWaiter;
    if (t.waitStatus != Node.CONDITION) {
      t.nextWaiter = null;
      // 隻有頭節點的狀态不是CONDITION才會執行到這一步
      if (trail == null)
        firstWaiter = next;
      else
        trail.nextWaiter = next;
      // 周遊完連結清單,設定尾節點
      if (next == null)
        lastWaiter = trail;
    }
    else
      trail = t;
    t = next;
  }
}      

響應中斷的方式

interruptMode=THROW_IE(-1):await退出時,直接抛出InterruptedException

interruptMode=REINTERRUPT(1):await退出時,重置中斷标記位

// ConditionObject
private void reportInterruptAfterWait(int interruptMode)
  throws InterruptedException {
  if (interruptMode == THROW_IE)
    // 直接響應中斷
    throw new InterruptedException();
  else if (interruptMode == REINTERRUPT)
    // 重置中斷标記位
    selfInterrupt();
}      
// AbstractQueuedSynchronizer
static void selfInterrupt() {
  Thread.currentThread().interrupt();
}      

signal,喚醒等待時間最長的線程

// ConditionObject
public final void signal() {
  // 目前線程沒有擷取到鎖
  if (!isHeldExclusively())
    throw new IllegalMonitorStateException();
  // 喚醒等待隊列中的頭結點
  Node first = firstWaiter;
  if (first != null)
    doSignal(first);
}      
// ConditionObject
private void doSignal(Node first) {
  do {
    // 将同步隊列的頭結點,設定為目前頭結點的下一個節點
    // 如果頭節點的下一個節點為null,則設定尾節點為null
    if ( (firstWaiter = first.nextWaiter) == null)
      lastWaiter = null;
    // 将first節點從條件隊列中移除
    first.nextWaiter = null;
    // 通知第一個非CANCELLED節點被喚醒,或者周遊完,退出
  } while (!transferForSignal(first) &&
       (first = firstWaiter) != null);
}      

喚醒第一個非CANCELLED節點

// 将節點從條件隊列放入同步隊列,true為成功 
// AbstractQueuedSynchronizer
final boolean transferForSignal(Node node) {
   
  // 通過CAS将節點的狀态從CONDITION設定為0
  // 如果設定失敗,說明這個節點狀态為CANCELLED
  if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
    return false;

  // 加入同步隊列,并傳回前繼節點
  Node p = enq(node);
  int ws = p.waitStatus;
  // 前繼節點為CANCELLED狀态,或者設定SIGNAL狀态失敗
  if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
    // 喚醒線程
    LockSupport.unpark(node.thread);
  return true;
}      

參考部落格