天天看點

Jdk1.6 JUC源碼解析(20)-Executors

Jdk1.6 JUC源碼解析(20)-Executors

作者:大飛

功能簡介:

  • Executors是JUC包提供的一個工具性質的幫助類,它針對ExecutorService、ScheduledExecutorService、ThreadFactory和Callable提供了一系列工廠方法和工具方法。

源碼分析:

  • 首先看下針對ExecutorService提供的一些工廠方法:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
           

       通過之前文章中對ThreadPoolExecutor的分析可知:

              1.這個方法建立了一個核心線程數量和最大線程數量一緻的,并且任務隊列是無界隊列的線程池。               2.由于預設核心線程不會逾時,是以逾時相關的參數也沒有意義。               3.如果線上程關閉之前,一個工作線程由于某種原因挂了,那麼線程池會自動補上一個新的工作線程。  

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
           

       除了能定制ThreadFactory之外,和上個方法一樣。  

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
           

       這個工廠方法看上去有點類似newFixedThreadPool(1) ,但有一點兒差別,這個不能重新調整配置(比如動态增大核心線程數量)了,由于方法内傳回的不是ThreadPoolExecutor執行個體,而是一個包裝類:

static class FinalizableDelegatedExecutorService
	extends DelegatedExecutorService {
	    FinalizableDelegatedExecutorService(ExecutorService executor) {
	        super(executor);
	    }
	    protected void finalize()  {
	        super.shutdown();//被垃圾回收時,關閉線程池。
	    }
    }
    /**
     * 包裝類,方法代理到内部的ExecutorService,隻暴漏ExecutorService定義的方法。
     */
    static class DelegatedExecutorService extends AbstractExecutorService {
        private final ExecutorService e;
        DelegatedExecutorService(ExecutorService executor) { e = executor; }
        public void execute(Runnable command) { e.execute(command); }
        public void shutdown() { e.shutdown(); }
        public List<Runnable> shutdownNow() { return e.shutdownNow(); }
        public boolean isShutdown() { return e.isShutdown(); }
        public boolean isTerminated() { return e.isTerminated(); }
        public boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException {
            return e.awaitTermination(timeout, unit);
        }
        public Future<?> submit(Runnable task) {
            return e.submit(task);
        }
        public <T> Future<T> submit(Callable<T> task) {
            return e.submit(task);
        }
        public <T> Future<T> submit(Runnable task, T result) {
            return e.submit(task, result);
        }
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException {
            return e.invokeAll(tasks);
        }
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                             long timeout, TimeUnit unit)
            throws InterruptedException {
            return e.invokeAll(tasks, timeout, unit);
        }
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException {
            return e.invokeAny(tasks);
        }
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                               long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
            return e.invokeAny(tasks, timeout, unit);
        }
    }
           
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }
           

       除了能定制ThreadFactory之外,和上個方法一樣。  

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
           

       通過之前文章中對ThreadPoolExecutor的分析可知:               1.這個方法建立了一個核心線程數量為0,最大線程(可以認為)無上限,并且任務隊列是同步隊列(無實際容量)的線程池。               2.針對每一個新任務,如果目前沒有空閑線程,都會建立一個新的工作線程來處理任務。工作線程預設空閑超過60秒逾時被回收。  

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
           

       除了能定制ThreadFactory之外,和上個方法一樣。  

public static ExecutorService unconfigurableExecutorService(ExecutorService executor) {
        if (executor == null)
            throw new NullPointerException();
        return new DelegatedExecutorService(executor);
    }
           

       DelegatedExecutorService這面已經看到過,這個方法就相當于将一個ExecutorService包裝成一個不可配置的ExecutorService。  

  • 繼續看下針對ScheduledExecutorService提供的一些工廠方法:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

    public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }
           

       通過之前文章中對ScheduledThreadPoolExecutor的分析可知:

              1.這個方法建立了一個給定(核心)線程數量的ScheduledThreadPoolExecutor(由于其内部的任務隊列是無界的,是以盡管繼承自ThreadPoolExecutor,但最大線程數量無意義)。  

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }

    public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1, threadFactory));
    }
           

        類似newScheduledThreadPool(1)  (這個其實看起來更像是一個加強版的Timer),但不能調整配置:

static class DelegatedScheduledExecutorService
            extends DelegatedExecutorService
            implements ScheduledExecutorService {
        private final ScheduledExecutorService e;
        DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
            super(executor);
            e = executor;
        }
        public ScheduledFuture<?> schedule(Runnable command, long delay,  TimeUnit unit) {
            return e.schedule(command, delay, unit);
        }
        public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
            return e.schedule(callable, delay, unit);
        }
        public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,  long period, TimeUnit unit) {
            return e.scheduleAtFixedRate(command, initialDelay, period, unit);
        }
        public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,  long delay, TimeUnit unit) {
            return e.scheduleWithFixedDelay(command, initialDelay, delay, unit);
        }
    }
           
public static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor) {
        if (executor == null)
            throw new NullPointerException();
        return new DelegatedScheduledExecutorService(executor);
    }
           

       相當于将一個ScheduledExecutorService包裝成一個不可配置的ScheduledExecutorService。  

  • 再看下針對ThreadFactory提供的一些工廠方法:
public static ThreadFactory defaultThreadFactory() {
        return new DefaultThreadFactory();
    }
           

       傳回一個DefaultThreadFactory執行個體。在建立ThreadPoolExecutor和ScheduledThreadPoolExecutor時如果沒有顯式指定ThreadFactory,會預設使用這個,看下實作:

static class DefaultThreadFactory implements ThreadFactory {
        static final AtomicInteger poolNumber = new AtomicInteger(1);
        final ThreadGroup group;
        final AtomicInteger threadNumber = new AtomicInteger(1);
        final String namePrefix;
        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null)? s.getThreadGroup() :
                                 Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }
           
public static ThreadFactory privilegedThreadFactory() {
        return new PrivilegedThreadFactory();
    }
    static class PrivilegedThreadFactory extends DefaultThreadFactory {
        private final ClassLoader ccl;
        private final AccessControlContext acc;
        PrivilegedThreadFactory() {
            super();
            this.ccl = Thread.currentThread().getContextClassLoader();
            this.acc = AccessController.getContext();
            acc.checkPermission(new RuntimePermission("setContextClassLoader"));
        }
        public Thread newThread(final Runnable r) {
            return super.newThread(new Runnable() {
                public void run() {
                    AccessController.doPrivileged(new PrivilegedAction<Object>() {
                        public Object run() {
                            Thread.currentThread().setContextClassLoader(ccl);
                            r.run();
                            return null;
                        }
                    }, acc);
                }
            });
        }
    }
           

       privilegedThreadFactory和defaultThreadFactory傳回的工廠類會建立設定相同的Thread,隻是PrivilegedThreadFactory建立的Thread會使用和目前線程(建立線程)相同的通路控制和類加載器。

  • 最後看下針對Callable提供的一些工具方法:
public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }
           

       将一個Runnable和一個傳回值包裝成一個Callable,傳回的這個适配類之前也見過:

static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable  task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }
           
public static Callable<Object> callable(Runnable task) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<Object>(task, null);
    }
           

       上面方法的重載版本,傳回值預設為null。

         當然也會涉及到有通路控制和類加載器設定的工具方法:

public static Callable<Object> callable(final PrivilegedAction<?> action) {
        if (action == null)
            throw new NullPointerException();
        return new Callable<Object>() {
	    public Object call() { return action.run(); }};
    }

    public static Callable<Object> callable(final PrivilegedExceptionAction<?> action) {
        if (action == null)
            throw new NullPointerException();
	return new Callable<Object>() {
	    public Object call() throws Exception { return action.run(); }};
    }

    public static <T> Callable<T> privilegedCallable(Callable<T> callable) {
        if (callable == null)
            throw new NullPointerException();
        return new PrivilegedCallable<T>(callable);
    }

    public static <T> Callable<T> privilegedCallableUsingCurrentClassLoader(Callable<T> callable) {
        if (callable == null)
            throw new NullPointerException();
        return new PrivilegedCallableUsingCurrentClassLoader<T>(callable);
    }
    static final class PrivilegedCallable<T> implements Callable<T> {
        private final AccessControlContext acc;
        private final Callable<T> task;
        private T result;
        private Exception exception;
        PrivilegedCallable(Callable<T> task) {
            this.task = task;
            this.acc = AccessController.getContext();
        }
        public T call() throws Exception {
            AccessController.doPrivileged(new PrivilegedAction<T>() {
                    public T run() {
                        try {
                            result = task.call();
                        } catch (Exception ex) {
                            exception = ex;
                        }
                        return null;
                    }
                }, acc);
            if (exception != null)
                throw exception;
            else
                return result;
        }
    }

    static final class PrivilegedCallableUsingCurrentClassLoader<T> implements Callable<T> {
        private final ClassLoader ccl;
        private final AccessControlContext acc;
        private final Callable<T> task;
        private T result;
        private Exception exception;
        PrivilegedCallableUsingCurrentClassLoader(Callable<T> task) {
            this.task = task;
            this.ccl = Thread.currentThread().getContextClassLoader();
            this.acc = AccessController.getContext();
            acc.checkPermission(new RuntimePermission("getContextClassLoader"));
            acc.checkPermission(new RuntimePermission("setContextClassLoader"));
        }
        public T call() throws Exception {
            AccessController.doPrivileged(new PrivilegedAction<T>() {
                    public T run() {
                        Thread t = Thread.currentThread();
                        try {
                            ClassLoader cl = t.getContextClassLoader();
                            if (ccl == cl) {
                                result = task.call();
                            } else {
                                t.setContextClassLoader(ccl);
                                try {
                                    result = task.call();
                                } finally {
                                    t.setContextClassLoader(cl);
                                }
                            }
                        } catch (Exception ex) {
                            exception = ex;
                        }
                        return null;
                    }
                }, acc);
            if (exception != null)
                throw exception;
            else
                return result;
        }
    }
           

           Executors的代碼解析完畢!          參見: Jdk1.6 JUC源碼解析(17)-ThreadPoolExecutor                   Jdk1.6 JUC源碼解析(19)-ScheduledThreadPoolExecutor