天天看点

Java 并发编程之任务取消(四)采用newTaskFor来封装非标准的取消

采用newTaskFor来封装非标准的取消

我们可以通过newTaskFor方法来进一步优化ReaderThread中封装非标准取消的技术(ReaderThread见上一篇)。这是在Java 6k ThreadPoolExecutor中的新增功能,当把一个Callable提交给ExecutorService时,Submit方法会返回一个Future,我们可以通过这个Future来取消任务。

举栗子时间 :

import java.util.concurrent.Callable;
import java.util.concurrent.RunnableFuture;

public interface CancellableTask<T> extends Callable<T> {
	void cancel();

	RunnableFuture<T> newTask();
}

           

CancellableTask继承了Callable<T>。 Callable是一个可以回调的Runnable

在新的类里声明了取消和创建新任务的接口

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class CancellingExecutor extends ThreadPoolExecutor {

	public CancellingExecutor(int corePoolSize, int maximumPoolSize,
			long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
		// TODO Auto-generated constructor stub
	}

	protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
		if (callable instanceof CancellableTask) {
			return ((CancellableTask<T>) callable).newTask();
		} else {
			return super.newTaskFor(callable);
		}
	}
}
           

CancellingExecutor: CancellableTask的执行器,它重写了ThreadPoolExecutor中的newTaskFor方法。判断如果传入的参数是由CancellableTask类声明的,那么就执行CancellableTask对象的自己的创建新任务的方法。否则还是调用父类的工厂方法

import java.io.IOException;
import java.net.Socket;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableFuture;

public abstract class SocketUsingTask<T> implements CancellableTask<T> {
	private Socket socket;

	private synchronized void setSocket(Socket s) {
		this.socket = s;
	}

	public synchronized void cancel() {

		try {
			if (socket != null)
				socket.close();
		} catch (IOException ignored) {
			// TODO Auto-generated catch block

		}

	}

	@Override
	public RunnableFuture<T> newTask() {
		// TODO Auto-generated method stub
		return new FutureTask<T>(this) {
			public boolean cancel(boolean mayInterruptIfRunning) {
				try {
					SocketUsingTask.this.cancel();
				} finally {
					return super.cancel(mayInterruptIfRunning);
				}
			}
		};
	}
}
           

然后我们新建一个示例类。它实现了自己的cancel方法,在这个方法里关闭了socket。如果SocketUsingTask通过其自己的Future来取消,那么底层的套接字将被关闭并且线程将被中断,因此它提高了任务 对取消操作的响应性,不仅能够在可中断方法 的同时确保响应取消操作,而且还能调用 可阻调 的套接字I/O方法 、。