Jdk1.6 JUC源碼解析(20)-Executors
作者:大飛
功能簡介:
- Executors是JUC包提供的一個工具性質的幫助類,它針對ExecutorService、ScheduledExecutorService、ThreadFactory和Callable提供了一系列工廠方法和工具方法。
源碼分析:
- 首先看下針對ExecutorService提供的一些工廠方法:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
通過之前文章中對ThreadPoolExecutor的分析可知:
1.這個方法建立了一個核心線程數量和最大線程數量一緻的,并且任務隊列是無界隊列的線程池。 2.由于預設核心線程不會逾時,是以逾時相關的參數也沒有意義。 3.如果線上程關閉之前,一個工作線程由于某種原因挂了,那麼線程池會自動補上一個新的工作線程。
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
除了能定制ThreadFactory之外,和上個方法一樣。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
這個工廠方法看上去有點類似newFixedThreadPool(1) ,但有一點兒差別,這個不能重新調整配置(比如動态增大核心線程數量)了,由于方法内傳回的不是ThreadPoolExecutor執行個體,而是一個包裝類:
static class FinalizableDelegatedExecutorService
extends DelegatedExecutorService {
FinalizableDelegatedExecutorService(ExecutorService executor) {
super(executor);
}
protected void finalize() {
super.shutdown();//被垃圾回收時,關閉線程池。
}
}
/**
* 包裝類,方法代理到内部的ExecutorService,隻暴漏ExecutorService定義的方法。
*/
static class DelegatedExecutorService extends AbstractExecutorService {
private final ExecutorService e;
DelegatedExecutorService(ExecutorService executor) { e = executor; }
public void execute(Runnable command) { e.execute(command); }
public void shutdown() { e.shutdown(); }
public List<Runnable> shutdownNow() { return e.shutdownNow(); }
public boolean isShutdown() { return e.isShutdown(); }
public boolean isTerminated() { return e.isTerminated(); }
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
return e.awaitTermination(timeout, unit);
}
public Future<?> submit(Runnable task) {
return e.submit(task);
}
public <T> Future<T> submit(Callable<T> task) {
return e.submit(task);
}
public <T> Future<T> submit(Runnable task, T result) {
return e.submit(task, result);
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
return e.invokeAll(tasks);
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
return e.invokeAll(tasks, timeout, unit);
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
return e.invokeAny(tasks);
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return e.invokeAny(tasks, timeout, unit);
}
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
除了能定制ThreadFactory之外,和上個方法一樣。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
通過之前文章中對ThreadPoolExecutor的分析可知: 1.這個方法建立了一個核心線程數量為0,最大線程(可以認為)無上限,并且任務隊列是同步隊列(無實際容量)的線程池。 2.針對每一個新任務,如果目前沒有空閑線程,都會建立一個新的工作線程來處理任務。工作線程預設空閑超過60秒逾時被回收。
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
除了能定制ThreadFactory之外,和上個方法一樣。
public static ExecutorService unconfigurableExecutorService(ExecutorService executor) {
if (executor == null)
throw new NullPointerException();
return new DelegatedExecutorService(executor);
}
DelegatedExecutorService這面已經看到過,這個方法就相當于将一個ExecutorService包裝成一個不可配置的ExecutorService。
- 繼續看下針對ScheduledExecutorService提供的一些工廠方法:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
通過之前文章中對ScheduledThreadPoolExecutor的分析可知:
1.這個方法建立了一個給定(核心)線程數量的ScheduledThreadPoolExecutor(由于其内部的任務隊列是無界的,是以盡管繼承自ThreadPoolExecutor,但最大線程數量無意義)。
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}
類似newScheduledThreadPool(1) (這個其實看起來更像是一個加強版的Timer),但不能調整配置:
static class DelegatedScheduledExecutorService
extends DelegatedExecutorService
implements ScheduledExecutorService {
private final ScheduledExecutorService e;
DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
super(executor);
e = executor;
}
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return e.schedule(command, delay, unit);
}
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return e.schedule(callable, delay, unit);
}
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
return e.scheduleAtFixedRate(command, initialDelay, period, unit);
}
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
return e.scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
}
public static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor) {
if (executor == null)
throw new NullPointerException();
return new DelegatedScheduledExecutorService(executor);
}
相當于将一個ScheduledExecutorService包裝成一個不可配置的ScheduledExecutorService。
- 再看下針對ThreadFactory提供的一些工廠方法:
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
傳回一個DefaultThreadFactory執行個體。在建立ThreadPoolExecutor和ScheduledThreadPoolExecutor時如果沒有顯式指定ThreadFactory,會預設使用這個,看下實作:
static class DefaultThreadFactory implements ThreadFactory {
static final AtomicInteger poolNumber = new AtomicInteger(1);
final ThreadGroup group;
final AtomicInteger threadNumber = new AtomicInteger(1);
final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null)? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
public static ThreadFactory privilegedThreadFactory() {
return new PrivilegedThreadFactory();
}
static class PrivilegedThreadFactory extends DefaultThreadFactory {
private final ClassLoader ccl;
private final AccessControlContext acc;
PrivilegedThreadFactory() {
super();
this.ccl = Thread.currentThread().getContextClassLoader();
this.acc = AccessController.getContext();
acc.checkPermission(new RuntimePermission("setContextClassLoader"));
}
public Thread newThread(final Runnable r) {
return super.newThread(new Runnable() {
public void run() {
AccessController.doPrivileged(new PrivilegedAction<Object>() {
public Object run() {
Thread.currentThread().setContextClassLoader(ccl);
r.run();
return null;
}
}, acc);
}
});
}
}
privilegedThreadFactory和defaultThreadFactory傳回的工廠類會建立設定相同的Thread,隻是PrivilegedThreadFactory建立的Thread會使用和目前線程(建立線程)相同的通路控制和類加載器。
- 最後看下針對Callable提供的一些工具方法:
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
将一個Runnable和一個傳回值包裝成一個Callable,傳回的這個适配類之前也見過:
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
public static Callable<Object> callable(Runnable task) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<Object>(task, null);
}
上面方法的重載版本,傳回值預設為null。
當然也會涉及到有通路控制和類加載器設定的工具方法:
public static Callable<Object> callable(final PrivilegedAction<?> action) {
if (action == null)
throw new NullPointerException();
return new Callable<Object>() {
public Object call() { return action.run(); }};
}
public static Callable<Object> callable(final PrivilegedExceptionAction<?> action) {
if (action == null)
throw new NullPointerException();
return new Callable<Object>() {
public Object call() throws Exception { return action.run(); }};
}
public static <T> Callable<T> privilegedCallable(Callable<T> callable) {
if (callable == null)
throw new NullPointerException();
return new PrivilegedCallable<T>(callable);
}
public static <T> Callable<T> privilegedCallableUsingCurrentClassLoader(Callable<T> callable) {
if (callable == null)
throw new NullPointerException();
return new PrivilegedCallableUsingCurrentClassLoader<T>(callable);
}
static final class PrivilegedCallable<T> implements Callable<T> {
private final AccessControlContext acc;
private final Callable<T> task;
private T result;
private Exception exception;
PrivilegedCallable(Callable<T> task) {
this.task = task;
this.acc = AccessController.getContext();
}
public T call() throws Exception {
AccessController.doPrivileged(new PrivilegedAction<T>() {
public T run() {
try {
result = task.call();
} catch (Exception ex) {
exception = ex;
}
return null;
}
}, acc);
if (exception != null)
throw exception;
else
return result;
}
}
static final class PrivilegedCallableUsingCurrentClassLoader<T> implements Callable<T> {
private final ClassLoader ccl;
private final AccessControlContext acc;
private final Callable<T> task;
private T result;
private Exception exception;
PrivilegedCallableUsingCurrentClassLoader(Callable<T> task) {
this.task = task;
this.ccl = Thread.currentThread().getContextClassLoader();
this.acc = AccessController.getContext();
acc.checkPermission(new RuntimePermission("getContextClassLoader"));
acc.checkPermission(new RuntimePermission("setContextClassLoader"));
}
public T call() throws Exception {
AccessController.doPrivileged(new PrivilegedAction<T>() {
public T run() {
Thread t = Thread.currentThread();
try {
ClassLoader cl = t.getContextClassLoader();
if (ccl == cl) {
result = task.call();
} else {
t.setContextClassLoader(ccl);
try {
result = task.call();
} finally {
t.setContextClassLoader(cl);
}
}
} catch (Exception ex) {
exception = ex;
}
return null;
}
}, acc);
if (exception != null)
throw exception;
else
return result;
}
}
Executors的代碼解析完畢! 參見: Jdk1.6 JUC源碼解析(17)-ThreadPoolExecutor Jdk1.6 JUC源碼解析(19)-ScheduledThreadPoolExecutor