文章目錄
- 一,condition簡介
- 二,使用
- 1.簡單使用
- 2.基于condition的有界隊列demo
- 三,實作分析
- 1.await()
- 2.signal()
一,condition簡介
任何一個java對象都擁有一對螢幕方法(定義在Object上)主要包括:wait()/wait(long timeOut),notify(),notifyAll(),這些方法和synchronized配合實作等待/通知機制
Condition接口也提供了類似的螢幕方法,配合Lock實作等待/通知機制
二,使用
1.簡單使用
- 調用lock.newCondition()獲得condition對象
- 配合lock使用
- 支援響應中斷和對中斷不敏感的進入等待狀态
- 支援喚醒一個或多個線程
- 支援目前線程釋放鎖并進入逾時等待狀态
public class ConditionDemo {
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
/**
* 線程等待(對中斷敏感)
* <h> condition.await() </h>
* <p>
* 等待:
* 目前線程進入等待狀态直到被通知或被中斷
* 喚醒:
* 1.signal(),signalAll()
* 2.其他線程調用interrupt()中斷該線程(對中斷敏感)
* </p>
*/
public void awaitThread(){
//執行condition相關方法先擷取到鎖(lock)
lock.lock();
try{
//執行condition相關方法
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//釋放鎖
lock.unlock();
}
}
/**
* 線程等待(對中斷不敏感)
* <h> condition.awaitUninterruptibly() </h>
* <p>
* 等待:
* 目前線程進入等待狀态直到被通知
* 喚醒:
* 1.signal(),signalAll()
* </p>
*/
public void awaitUninterruptiblyThread(){
//執行condition相關方法先擷取到鎖(lock)
lock.lock();
try{
//執行condition相關方法
condition.awaitUninterruptibly();
} catch (Exception e) {
e.printStackTrace();
}finally {
//釋放鎖
lock.unlock();
}
}
/**
* 喚醒目前等待的線程
* <h> condition.signal() </h>
*/
public void signalThread(){
lock.lock();
try {
condition.signal();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
/**
* 喚醒所有等待的線程
* <h> condition.signalAll() </h>
*/
public void signalAllThread(){
lock.lock();
try {
condition.signalAll();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
2.基于condition的有界隊列demo
public class BoundedQueue {
//基于數組的簡單有界隊列
private Integer[] array ;
/**
* <params>
* addIndex : 添加索引
* removeIndex : 移除索引
* count :目前隊列元素數量
* </params>
*/
private int addIndex,removeIndex,count;
private Lock lock = new ReentrantLock();
//當隊列滿時的等待/通知機制
private Condition fullCondition = lock.newCondition();
//當隊列空時的等待/通知機制
private Condition emptyCondition = lock.newCondition();
public BoundedQueue(int size){
array = new Integer[size];
}
/**
* 向隊列添加一個元素,當隊列滿時阻塞線程,不能添加
* @param i
* @throws InterruptedException
*/
public void add(int i) throws InterruptedException {
lock.lock();
try{
//判斷隊列是否已滿,有空位才能跳出循環完成添加
while(count == array.length){
fullCondition.await();
}
array[addIndex] = i;
//如果剛好達到隊列滿時,重置
if(++addIndex == array.length){
addIndex = 0;
}
++count;
fullCondition.signal();
}finally {
lock.unlock();
}
}
/**
* 移除隊列元素,當隊列是空時,線程被阻塞
* @return
*/
public Integer remove(){
lock.lock();
try{
while(count == 0){
emptyCondition.awaitUninterruptibly();
}
Integer x = array[removeIndex];
array[removeIndex] = null;
if(++removeIndex == array.length){
removeIndex = 0;
}
--count;
emptyCondition.signal();
return x;
}finally {
lock.unlock();
}
}
}
三,實作分析
每個condition對象都包含一個等待隊列,這個等待隊列是實作等待/通知機制的重點
1.await()
源碼:
public final void await() throws InterruptedException {
//不響應中斷
if (Thread.interrupted())
throw new InterruptedException();
//構造節點,加入到等待隊列中
Node node = addConditionWaiter();
//釋放該節點對應線程的同步狀态
int savedState = fullyRelease(node);
int interruptMode = 0;
//判斷節點是否還在同步隊列
while (!isOnSyncQueue(node)) {
//在同步隊列則阻塞該線程
LockSupport.park(this);
//如果中斷則跳出循環
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//在被别的線程喚醒後, 将剛剛這個節點放到 AQS 隊列中.接下來就是那個節點的事情了,比如搶鎖.
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
2.signal()
public final void signal() {
//檢查是否擷取了鎖
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//從等待隊列移除首節點
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
//CAS設定節點狀态
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//進入同步隊列
Node p = enq(node);
int ws = p.waitStatus;
//喚醒該節點對應線程
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}