天天看点

Phaser分析基本概念使用结构图代码例子

基本概念

注释:A reusablesynchronization barrier

可重用的同步屏障。英文翻译Phaser:['feɪzə]阶段

基本功能:实现多阶段多任务,不同阶段阻塞控制,同一阶段多个任务阻塞控制。

相关机制:

  • 注册机制:在任何时刻添加注册,也就是添加注册的state数量,

构造函数中可初始化为state个,可以通过register();执行一次,state+1.

也可以在任务到达同步点,执行arriveAndDeregister(),执行一次state-1.

  • 同步机制,和“一拨任务等另一波任务的CountDownLatch”还有“一拨任务内互等的CyclicBarrier”相似。

包括基本的API

arriveAndAwaitAdvance (),到达屏障,阻塞等待所有任务都到达屏障,arrived计数+1。

arrive(),到达屏障,不阻塞,arrived+1

register(),注册的phase数+1,这个是限制每一个阶段的state数。

arriveAndDeregister,到达屏障,阻塞并等待所有任务都到达屏障,通知注册的phase数-1。

awaitAdvance(int phase),阻塞并等待某个阶段的所有任务都完成。

可复写的方法onAdvance(),该方法有两参数,分别是phase阶段数,registeredParties当前阶段的注册数。每当某个阶段所有任务都到达,这个方法会被唤醒首先调用。

该方法返回boolean值,返回true说明所有任务已终结,可以自己设计终结条件,返回false说明任务haizia

一些get方法获取阶段数,阶段的注册phase数,Tostring方法。

使用结构图

Phaser分析基本概念使用结构图代码例子

代码例子

分两个阶段;第一个阶段3个任务,第二个阶段3个任务,在main里进行阻塞再进行下一段。

public class MoreTaskMoreStage {

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		// TODO Auto-generated method stub
		final Phaser phaser = new Phaser(3){

			@Override
			protected boolean onAdvance(int phase, int registeredParties) {
				// TODO Auto-generated method stub
				System.out.println("advance  "+this.toString()); 
				if(phase==0){
					System.out.println("advance 第一阶段结束 ,isTerminated is false 整个阶段还未结束"); 
					return false;
				}
				else if(phase==1){
					System.out.println("advace 第二阶段结束 ,isTerminated is false 整个阶段已结束"); 
					return true;
				}
				else return registeredParties==0;
			}
			
		};
		for(int i=0;i<3;i++){
			new Thread(new Task1(i,phaser)).start();
			try {  
                Thread.sleep(1000);  
            } catch(InterruptedException e) {  
                // NOP  
            } 
		}
		phaser.awaitAdvance(phaser.getPhase());	//main线程一同等待
		System.out.println("main : 第一阶段阻塞结束");  
		System.out.println("main  "+phaser.toString()); 
		phaser.register();//第二阶段任务增加了一个,共4个,任务线程3个,main一个
		for(int i=0;i<3;i++)
			new Thread(new Task2(i,phaser)).start();		
		while(!phaser.isTerminated()){
			phaser.arriveAndAwaitAdvance();	
			System.out.println("main :第二阶段结束");  
		}
	}
	public static class Task1 implements Runnable{
		private final int id;  
        private final Phaser phaser;  
        public Task1(int _id,Phaser _phaser){
        	id=_id;
        	phaser=_phaser;
        }
		@Override
		public void run() {
			// TODO Auto-generated method stub
			System.out.println("任务1:  starting thread, id: " + id+"  模拟执行2秒,后阻塞等待该阶段全部任务完成");  
			try {  
                Thread.sleep(2000);  
            } catch(InterruptedException e) {  
                // NOP  
            } 
			phaser.arrive();	
			System.out.println("任务1:  不阻塞");  
			System.out.println("任务1:  "+phaser.toString()); 
		}		
	}
	public static class Task2 implements Runnable{
		private final int id;  
        private final Phaser phaser;  
        public Task2(int _id,Phaser _phaser){
        	id=_id;
        	phaser=_phaser;
        }
		@Override
		public void run() {
			// TODO Auto-generated method stub
			System.out.println("任务2,    starting thread, id: " + id+"  模拟执行1秒,后阻塞等待该阶段全部任务完成");  
			try {  
                Thread.sleep(1000);  
            } catch(InterruptedException e) {  
                // NOP  
            } 
			phaser.arriveAndAwaitAdvance();
		}	
	}
}