天天看點

AQS分析Condition等待隊列

AQS是AbstractQueneSynchronizer抽象類

  1. 封裝了Node節點,Node是AQS的靜态内部類,Node節點有兩個有參構造方法Node(thread , node){}建立雙向連結清單AQS隊列,Node( thread , int waitstatus){}建立的單向連結清單,等待隊列
  2. 封裝了ConditionObject,CoditionObject是AQS的内部類,實作了Condition接口
  3. 包含屬性,head(AQS頭節點)、tail(AQS尾節點)、state()、exclusiveOwnableThread(父類屬性)等等
Condition核心方法await();、signal();

使用生産者,消費者模型,對condition進行分析

生産者Producer

public class Producer  implements Runnable {
	private Queue<String> queue;
	private int maxSize;
	private Lock lock ;
	private Condition condition;
	public Producer(Queue<String> queue, int maxSize, Lock lock, Condition condition) {
		super();
		this.queue = queue;
		this.maxSize = maxSize;
		this.lock = lock;
		this.condition = condition;
	}
	@Override
	public void run() {
		int i = 0;
		while (true) {
			i++;
			lock.lock();
			while (queue.size()==maxSize) {
				System.out.println("消息已滿,請消費消息....");
				try {
					condition.await();
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
			try {
				Thread.currentThread().sleep(1000);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			System.out.println("生産第"+i+"條消息");
			queue.add(i+"");
			condition.signal();
			lock.unlock();
		}
		
		
	}
           

消費者Consumer

public class Consumer  implements Runnable {
	private Queue<String> queue;
	private int maxSize;
	private Lock lock ;
	private Condition condition;
	public Consumer(Queue<String> queue, int maxSize, Lock lock, Condition condition) {
		super();
		this.queue = queue;
		this.maxSize = maxSize;
		this.lock = lock;
		this.condition = condition;
	}
	@Override
	public void run() {
		int i = 0;
		while (true) {
			i++;
			lock.lock();
			while (queue.isEmpty()) {
				System.out.println("消息已清空,等待生産消息");
				try {
					condition.await();
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
			try {
				Thread.currentThread().sleep(1000);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			System.out.println("消費第"+i+"條消息");
			queue.remove();
			condition.signal();
			lock.unlock();
		}
	}

}
           

線程啟動

public static void main(String[] args) {
	Queue< String> queue= new LinkedList<String>();
	Lock lock = new  ReentrantLock();
	Condition condition = lock.newCondition();
	int maxSize = 6;
	Consumer consumer = new Consumer(queue, maxSize, lock, condition);
	Producer producer = new Producer(queue, maxSize, lock, condition);
	Thread pThread = new Thread(producer);
	Thread cThread = new Thread(consumer);
	pThread.start();
	cThread.start();	
}
           

消費者消費消息,隊裡無消息,滿足條件,進入await()方法,使得目前線程釋放所有資源,然後阻塞,檢視源碼

public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();//傳回Node單向連結清單節點,将目前線程加入等待隊列
            int savedState = fullyRelease(node);//釋放目前的鎖,得到鎖的狀态,并喚醒 AQS 隊列中的一個線程
            int interruptMode = 0;
            //如果目前節點沒有在同步隊列上,即還沒有被 signal,則将目前線程阻塞
            while (!isOnSyncQueue(node)) {//判斷這個節點是否在 AQS 隊列上,第一次判斷的是 false,因為前面已經釋放鎖了
                LockSupport.park(this);//通過 park 挂起目前線程
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
  			// 當這個線程醒來,會嘗試拿鎖, 當 acquireQueued 傳回 false 就是拿到鎖了.
 			// interruptMode != THROW_IE -> 表示這個線程沒有成功将 node 入隊,但 signal 執行了 enq 方法讓其入隊了.
			// 将這個變量設定成 REINTERRUPT.
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
                // 如果 node 的下一個等待者不是 null, 則進行清理,清理 Condition 隊列上的節點. 
				// 如果是 null ,就沒有什麼好清理的了.
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
                // 如果線程被中斷了,需要抛出異常.或者什麼都不做
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
           

addConditionWaiter();//傳回Node單向連結清單節點,将目前線程加入等待隊列

private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            // 如 果 lastWaiter 不 等 于 空 并 且waitStatus 不等于 CONDITION 時,把沖好這個節點從連結清單中移除
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            //建構一個 Node,waitStatus=CONDITION。這裡的連結清單是一個單向的,是以相比 AQS 來說會簡單很多
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }
           

執行完 addConditionWaiter 這個方法之後,就會産生一個 condition 隊列

fullRelease,釋放鎖,如果目前鎖存在多次重入,那麼在這個方法中隻需要釋放一次就會把所有的重入次數歸零

final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();//獲得重入的次數
            if (release(savedState)) {//釋放鎖并且喚醒下一個同步隊列中的線程
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }
           

isOnSyncQueue(node)//判斷這個節點是否在 AQS 隊列上,第一次判斷的是 false,因為前面已經釋放鎖了

final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null) // If has successor, it must be on queue
            return true;
        return findNodeFromTail(node);
    }
           

正常情況下await()做了這三個基本操作

首先建立Node節點,将目前線程封裝到一個單向連結清單等待隊列中

然後釋放所有資源

最後阻塞處在等待隊列中的目前線程,等待被喚醒

Consumer線程阻塞了,那麼Producer可以搶占鎖了,進入他的同步方法塊,然後Producer調用了signal()方法,喚醒Consumer處于等待隊列的線程,來看看源碼是怎麼實作的

public final void signal() {
           if (!isHeldExclusively())//先判斷目前線程是否獲得了鎖,這個判斷比較簡單,直接用獲得鎖的線程和目前線程相比即可
               throw new IllegalMonitorStateException();
           Node first = firstWaiter;// 拿到 Condition 隊列上第一個節點
           if (first != null)
               doSignal(first);
        }
           

Condition.doSignal

對 condition 隊列中從首部開始的第一個 condition 狀态的節點,執行 transferForSignal 操作,将 node 從 condition隊列中轉換到 AQS 隊列中,同時修改 AQS 隊列中原先尾節點的狀态

private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;// 将 next 節點設定成 null
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }
           

AQS.transferForSignal

該方法先是 CAS 修改了節點狀态,如果成功,就将這個節點放到 AQS 隊列中,然後喚醒這個節點上的線程。
           
final boolean transferForSignal(Node node) {
		//更新節點的狀态為 0,如果更新失敗,隻有一種可能就是節點被 CANCELLED 了,如果是CANCELLED 節點直接傳回false
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        Node p = enq(node);//調用 enq,把目前節點添加到AQS 隊列。并且傳回傳回按目前節點的上一個節點,也就是原tail 節點
        int ws = p.waitStatus;
        // 如果上一個節點的狀态被取消了, 或者嘗試設定上一個節點的狀态為 SIGNAL 失敗了
        //(SIGNAL 表示: 他的 next 節點需要停止阻塞),
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread); // 喚醒節點上的線程.
        return true;//如果 node 的 prev 節點已經是signal 狀态,那麼被阻塞線程的喚醒工作由 AQS 隊列來完成
    }
           

執行完 doSignal 以後,會把 condition 隊列中的節點轉移到 aqs 隊列上,這個時候會判斷 Consumer的 prev 節點也就是 head 節點的 waitStatus,如果大于 0 或者設定 SIGNAL 失敗,表示節點被設定成了 CANCELLED 狀态。這個時候會喚醒Consumer這個線程。否則就基于 AQS 隊列的機制來喚醒,也就是等到 Producer 釋放鎖之後來喚醒 Consumer

前面在分析 await 方法時,線程會被阻塞。而通過 signal被喚醒之後又繼續回到上次執行的邏輯

if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
           

如果目前線程被中斷,則調用transferAfterCancelledWait方法判斷後續的處理,應該是抛出InterruptedException還是重新中斷。這裡需要注意的地方是,如果第一次 CAS 失敗了,則不能判斷目前線程是先進行了中斷還是先進行了 signal 方法的調用,可能是先執行了 signal 然後中斷,也可能是先執行了中斷,後執行了 signal,當然,這兩個操作肯定是發生在 CAS 之前。這時需要做的就是等待目前線程的 node被添加到 AQS 隊列後,也就是 enq 方法傳回後,傳回false 告訴checkInterruptWhileWaiting 方法傳回REINTERRUPT(1),後續進行重新中斷。

private int checkInterruptWhileWaiting(Node node) {
            return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
        }
           

簡單來說,該方法的傳回值代表目前線程是否在 park 的時候被中斷喚醒,如果為 true 表示中斷在 signal 調用之前,signal 還未執行,那麼這個時候會根據 await 的語義,在 await 時遇到中斷需要抛出interruptedException,傳回 true 就是告訴checkInterruptWhileWaiting 傳回 THROW_IE(-1)。如果傳回 false,否則表示 signal 已經執行過了,隻需要重新響應中斷即可

final boolean transferAfterCancelledWait(Node node) {
		//使用 cas 修改節點狀态,如果還能修改成功,說明線程被中斷時,signal 還沒有被調用。
	// 這裡有一個知識點,就是線程被喚醒,并不一定是在 java 層面執行了locksupport.unpark,也可能是調用了線程
		//的 interrupt()方法,這個方法會更新一個中斷辨別,并且會喚醒處于阻塞狀态下的線程。
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            enq(node);//如果 cas 成功,則把node 添加到 AQS 隊列
            return true;
        } 
        //如果 cas 失敗,則判斷目前 node 是否已經在 AQS 隊列上,如果不在,則讓給其他線程執行
		//當 node 被觸發了 signal 方法時,node 就會被加到 aqs 隊列上
        while (!isOnSyncQueue(node))//循環檢測 node 是否已經成功添加到 AQS 隊列中。如果沒有,則通過 yield
            Thread.yield();
        return false;
    }
           

acquireQueued

這個方法在講 aqs 的時候說過,是的目前被喚醒的節點ThreadA 去搶占同步鎖。并且要恢複到原本的重入次數狀态。調用完這個方法之後,AQS 隊列的狀态如下将 head 節點的 waitStatus 設定為-1,Signal 狀态。

reportInterruptAfterWait

根據 checkInterruptWhileWaiting 方法傳回的中斷辨別來進行中斷上報。

如果是 THROW_IE,則抛出中斷異常

如果是 REINTERRUPT,則重新響應中斷

private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException {
            if (interruptMode == THROW_IE)
                throw new InterruptedException();
            else if (interruptMode == REINTERRUPT)
                selfInterrupt();
        }
           

流程圖:

AQS分析Condition等待隊列

await 和 signal 的總結

線程 awaitThread 先通過 lock.lock()方法擷取鎖成功後調用了 condition.await 方法進入等待隊列,而另一個線程 signalThread 通過 lock.lock()方法擷取鎖成功後調用了 condition.signal 或者 signalAll 方法,使得線程awaitThread 能夠有機會移入到同步隊列中,當其他線程釋放 lock 後使得線程 awaitThread 能夠有機會擷取lock,進而使得線程 awaitThread 能夠從 await 方法中退出執行後續操作。如果 awaitThread 擷取 lock 失敗會直接進入到同步隊列。

阻塞:await()方法中,線上程釋放鎖資源之後,如果節點不在 AQS 等待隊列,則阻塞目前線程,如果在等待隊列,則自旋等待嘗試擷取鎖

釋放:signal()後,節點會從 condition 隊列移動到 AQS等待隊列,則進入正常鎖的擷取流程