天天看点

Java 并发编程之任务取消(七)只运行一次的服务ShutdownNow的局限性

只运行一次的服务

以检查是否有新邮件为栗

采用AtomicBoolean而不使用Volatile类型的boolean.是因为能从内部的Runnable中访问hasNewMail标志,因此它必须是final类型以免被修改。

boolean checkMain(Set<String> hosts, long timeout, TimeUnit unit) throws InterruptedException {
		ExecutorService exec = Executors.newCachedThreadPool();
		final AtomicBoolean hasNewMail = new AtomicBoolean(false);
		try {
			for (String host : hosts) {
				exec.execute(new Runnable() {

					@Override
					public void run() {
						// TODO Auto-generated method stub
						if (checkMail(host)) {
							hasNewMail.set(true);
						}
					}
				});
			}
		} finally {
			exec.shutdown();
			exec.awaitTermination(timeout, unit);
		}
		return hasNewMail.get();
	}
           

ShutdownNow的局限性

上次说shutdownNow会返回所有尚未启动的Runnable。但是它无法返回的是正在执行的Runnable。通过封装ExecutorService并使得execute记录哪些任务是在关闭后取消的。并且在任务返回时必须保持线程的中断状态

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

public class TrackingExecutor extends AbstractExecutorService {
	private final ExecutorService exec;
	private final Set<Runnable> tasksCancelledAtShutdown = Collections
			.synchronizedSet(new HashSet<Runnable>());

	public TrackingExecutor(ExecutorService exec) {
		super();
		this.exec = exec;
	}

	public List<Runnable> getCancelledTasks() {
		if (!exec.isTerminated()) {
			throw new IllegalStateException("terminated");
		}
		return new ArrayList<Runnable>(tasksCancelledAtShutdown);
	}

	@Override
	public void execute(final Runnable command) {
		// TODO Auto-generated method stub
		exec.execute(new Runnable() {
			public void run() {
				try {
					command.run();
				} finally {
					if (isShutdown() && Thread.currentThread().isInterrupted()) {
						tasksCancelledAtShutdown.add(command);
					}
				}
			}
		});

	}

	@Override
	public void shutdown() {
		// TODO Auto-generated method stub

	}

	@Override
	public List<Runnable> shutdownNow() {
		// TODO Auto-generated method stub
		return null;
	}

	@Override
	public boolean isShutdown() {
		// TODO Auto-generated method stub
		return false;
	}

	@Override
	public boolean isTerminated() {
		// TODO Auto-generated method stub
		return false;
	}

	@Override
	public boolean awaitTermination(long timeout, TimeUnit unit)
			throws InterruptedException {
		// TODO Auto-generated method stub
		return false;
	}

}
           

将ExecutorService的其他方法委托给exec。以网页爬虫为例。使用TrackingExecutor的用法

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public abstract class WebCrawler {
	private volatile TrackingExecutor exec;
	private final Set<URL> urltoCrawl = new HashSet<URL>();

	public static void main(String[] args) {
		WebCrawler webc = new WebCrawler() {

			@Override
			protected List<URL> processPage(URL url) {
				System.out.println(url.getHost());
				List<URL> url2 = new ArrayList<URL>();
				try {
					url2.add(new URL("http://www.baidu.com"));
				} catch (MalformedURLException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
				return url2;
			}
		};
		try {
			webc.urltoCrawl.add(new URL("http://www.baidu.com"));
		} catch (MalformedURLException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		webc.start();

		try {
			Thread.sleep(1000);
			webc.stop();
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	public synchronized void start() {
		exec = new TrackingExecutor(Executors.newCachedThreadPool());
		for (URL url : urltoCrawl) {
			submitCrawlTask(url);
		}
		urltoCrawl.clear();
	}

	public synchronized void stop() throws InterruptedException {
		try {
			saveUncrawled(exec.shutdownNow());
			if (exec.awaitTermination(2000, TimeUnit.MILLISECONDS)) {
				saveUncrawled(exec.getCancelledTasks());
			}
		} finally {
			exec = null;
		}
	}

	protected abstract List<URL> processPage(URL url);

	private void saveUncrawled(List<Runnable> uncrawled) {
		for (Runnable runnable : uncrawled) {
			urltoCrawl.add(((CrawlTask) runnable).getPage());
		}
	}

	private void submitCrawlTask(URL u) {
		exec.execute(new CrawlTask(u));
	}

	private class CrawlTask implements Runnable {
		private final URL url;

		public CrawlTask(URL url) {
			super();
			this.url = url;
		}

		@Override
		public void run() {
			// TODO Auto-generated method stub
			for (URL link : processPage(url)) {
				if (Thread.currentThread().isInterrupted())
					return;
				submitCrawlTask(link);
			}
		}

		public URL getPage() {
			return url;
		}
	}
}
           

上面程序中抽象方法processPage中的代码是测试用的。里面正确的代码目的是获取该url下所有的链接。其实也不难。但是不属于本篇知识点内。只要得到所有<a href就可以了。有兴趣的可以看看元素选择器 还有一个开源的爬虫Crawl研究一下。

执行Stop方法后会调用saveUncrawle方法获取正在执行中的任务。之后要进行保存到文件中还是怎么办只要把代码添加到saveUncrawled内即可。