天天看点

线程安全(六)线程池系列文章目录0.前言1.Executor 与ExecutorService接口2.AbstractExecutorService类3.ThreadPoolExecutor4.常见线程池5.线程池关键属性

系列文章目录

线程安全(一)java对象头分析以及锁状态

线程安全(二)java中的CAS机制

线程安全(三)实现方法sychronized与ReentrantLock(阻塞同步)

线程安全(四)Java内存模型与volatile关键字

线程安全(五)线程状态和线程创建

线程安全(六)线程池

线程安全(七)ThreadLocal和java的四种引用

线程安全(八)Semaphore

线程安全(九)CyclicBarrier

线程安全(十)AQS(AbstractQueuedSynchronizer)

0.前言

为什么要使用线程池?

直接使用线程会有线程重复的创建和销毁

优点:

  • 降低内存消耗,避免线程重复的创建和销毁
  • 可以创建核心线程数,可直接使用,不需要再创建
  • 显性的管理监控线程,防止创建过多

先说结果

线程安全(六)线程池系列文章目录0.前言1.Executor 与ExecutorService接口2.AbstractExecutorService类3.ThreadPoolExecutor4.常见线程池5.线程池关键属性

再看类图

线程安全(六)线程池系列文章目录0.前言1.Executor 与ExecutorService接口2.AbstractExecutorService类3.ThreadPoolExecutor4.常见线程池5.线程池关键属性

1.Executor 与ExecutorService接口

1.1.Executor接口

// 提交任务的办法
void execute(Runnable command);
           

1.2.ExecutorService接口

// 关闭线程池,已提交的任务继续执行,不接受继续提交新任务
    void shutdown();

    // 关闭线程池,尝试停止正在执行的所有任务,不接受继续提交新任务
    // 它和前面的方法相比,加了一个单词“now”,区别在于它会去停止当前正在进行的任务
    List<Runnable> shutdownNow();

    // 线程池是否已关闭
    boolean isShutdown();

    // 如果调用了 shutdown() 或 shutdownNow() 方法后,所有任务结束了,那么返回true
    // 这个方法必须在调用shutdown或shutdownNow方法之后调用才会返回true
    boolean isTerminated();

    // 等待所有任务完成,并设置超时时间
    // 我们这么理解,实际应用中是,先调用 shutdown 或 shutdownNow,
    // 然后再调这个方法等待所有的线程真正地完成,返回值意味着有没有超时
    boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException;

    // 提交一个 Callable 任务
    <T> Future<T> submit(Callable<T> task);

    // 提交一个 Runnable 任务,第二个参数将会放到 Future 中,作为返回值,
    // 因为 Runnable 的 run 方法本身并不返回任何东西
    <T> Future<T> submit(Runnable task, T result);

    // 提交一个 Runnable 任务
    Future<?> submit(Runnable task);

    // 执行所有任务,返回 Future 类型的一个 list
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException;

    // 也是执行所有任务,但是这里设置了超时时间
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
            throws InterruptedException;

    // 只有其中的一个任务结束了,就可以返回,返回执行完的那个任务的结果
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException;

    // 同上一个方法,只有其中的一个任务结束了,就可以返回,返回执行完的那个任务的结果,
    // 不过这个带超时,超过指定的时间,抛出 TimeoutException 异常
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
           

2.AbstractExecutorService类

//  构建RunnableFuture
	protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

   // 提交任务
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        // 1. 将任务包装成 FutureTask
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

   // 提交任务
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        // 1. 将任务包装成 FutureTask
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

   // 提交任务
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        // 1. 将任务包装成 FutureTask
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
           

不难看出,上述只是实现提交任务,没有真正开启线程,真正运行的还是excute方法。

3.ThreadPoolExecutor

3.1.属性

32位的整数,前3位用于存放线程池状态,低29位标识线程数。
// 原子性操作变量,CAS的体现
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// 29位
private static final int COUNT_BITS = Integer.SIZE - 3;

// 线程池的最大线程数
111 11111111111111111111111111(29个1)
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState存储在高阶位
// 运行:接受新任务并处理队列任务 
111 00000000000000000000000000000(111后29个0)
private static final int RUNNING    = -1 << COUNT_BITS;

// 关机:不接受新任务,但处理队列任务 
000 00000000000000000000000000000
private static final int SHUTDOWN   =  0 << COUNT_BITS;

// 停止:不接受新任务,不处理排队任务,并中断正在进行的任务 
001 00000000000000000000000000000(1后29个0)
private static final int STOP       =  1 << COUNT_BITS;

// 整理:所有任务已经终止
010 00000000000000000000000000000(10后29个0)
private static final int TIDYING    =  2 << COUNT_BITS;

// terminated() 方法结束后,已完成 
011 00000000000000000000000000000(11后29个0)
private static final int TERMINATED =  3 << COUNT_BITS;
           

3.2.线程状态方法

// CAPACITY值为:11111111111111111111111111111
// ~CAPACITY值为:11100000000000000000000000000000

// &操作,位运算结果将低29位置0了,而高3位还是保持c的值,得到是runState的值
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 这个方法用于取出workerCount的值 

// &操作,位运算结果将高3位置0了,保留参数c的低29位,得到workerCount的值
private static int workerCountOf(int c)  { return c & CAPACITY; }

// 之前说过 32为2进制数,前三位存的是线程状态,后29位存的是线程数,这个运算使将他俩合在一起 取rs(runState)的前三位和wc(workerCount)的后29位
private static int ctlOf(int rs, int wc) { return rs | wc; }
// c是否小于s 
private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }
// c是否大于等于s
private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }
// c是否小于SHUTDOEN 即是否在run 只有RUNNING状态会小于0
private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }
// 线程号+1
private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }
// 线程号-1
 private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }
// 一个线程突然终止 减少线程号
private void decrementWorkerCount() {
        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }
           

3.2.相关属性

// 队列 用于保存任务并传递给工作线程的队列
private final BlockingQueue<Runnable> workQueue;
// 锁
private final ReentrantLock mainLock = new ReentrantLock();
// 包含所有工作线程,只有持有mainLock的才能进入
private final HashSet<Worker> workers = new HashSet<Worker>();
// 等待条件支持awaittermination
private final Condition termination = mainLock.newCondition();
//最大的池大小
private int largestPoolSize;
//完成任务的计数器。仅在工作线程终止时更新。只有在mainlock访问
private long completedTaskCount;
// 线程工厂
private volatile ThreadFactory threadFactory;
// 饱和和关闭时使用
private volatile RejectedExecutionHandler handler;
// 线程从队列中获取任务的超时时间,也就是说如果线程空闲超过这个时间就会终止。
private volatile long keepAliveTime;
// 如果为false(默认),即使空闲时,核心线程仍然保持生存状态。
// 如果为true,核心线程使用keepalivetime超时等待工作。
private volatile boolean allowCoreThreadTimeOut;
// 核心线程数量
private volatile int corePoolSize;
// 最大线程数。注意,实际最大值是由容量内部限定的。
private volatile int maximumPoolSize;
// 默认拒绝 程序
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
//
private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");
// 终结器
private final AccessControlContext acc;
           

3.3.构造方法

// 核心线程数,最大线程数,保持时间,单位,队列
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

// 核心线程数,最大线程数,保持时间,单位,队列,线程工厂
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }

// 核心线程数,最大线程数,保持时间,单位,队列, handler
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

// 核心线程数,最大线程数,保持时间,单位,队列,线程工厂,handler
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
           

3.4. void execute(Runnable command)

//
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * 1.如果少于核心线程数的线程在运行,启动一个新线程,原子性检查运行状态和线程号,
         * 通过返回false来防止错误的新建线程
         * 2. 如果一个任务可以成功地排队,那么我们仍然需要仔细检查是否应该添加一个线程。
         * (自从上次检查结束后,有的线程有的已经死亡)或者自从进入这个方法后池就关闭了。
         *所以我们重新检查状态,如果停止,需要回滚入队,如果没有, 或启动一个新线程。
         * 3.如果我们不能对任务进行队列,那么我们尝试添加一个新线程。
         * 如果失败,我们知道我们被关闭或饱和,所以任务倍拒绝。
         // 获取线程状态和线程数
        int c = ctl.get();
        // 活动线程数 小与 核心线程数,直接启动新的线程,在增加时会原子性检测
        // 1,小于核心线程新建线程
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 2.线程池处于运行状态,并且队列未满
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 不是runState 则从workQueue中移除任务并拒绝
            if (! isRunning(recheck) && remove(command))
	            // 采用线程池指定的策略拒绝任务
                reject(command);
            // 线程池处于RUNNING状态 || 线程池处于非RUNNING状态但是任务移除失败 
            else if (workerCountOf(recheck) == 0)
            // 这行代码是为了SHUTDOWN状态下没有活动线程了,但是队列里还有任务没执行这种特殊情况。  
             // 添加一个null任务是因为SHUTDOWN状态下,线程池不再接受新任务  
                addWorker(null, false);
              // 两种情况:  
                   // 1.非RUNNING状态拒绝新的任务  
                   // 2.队列满了启动新的线程失败(workCount > maximumPoolSize)
        }
        // 如果 workQueue 队列满了,/ 以 maximumPoolSize 为界创建新的 worker,
        // 如果失败,说明当前线程数已经达到 maximumPoolSize,执行拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }
           

3.5. void shutdown()

// 会将runState置为SHUTDOWN,会终止所有空闲的线程。
public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {   
            checkShutdownAccess();
             // 线程池状态设为SHUTDOWN,如果已经至少是这个状态那么则直接返回  
            advanceRunState(SHUTDOWN);
            // 注意这里是中断所有空闲的线程:runWorker中等待的线程被中断 → 进入processWorkerExit →  
	        // tryTerminate方法中会保证队列中剩余的任务得到执行。
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

private void interruptIdleWorkers(boolean onlyOne) {  
    final ReentrantLock mainLock = this.mainLock;  
    mainLock.lock();  
    try {  
        for (Worker w : workers) {  
            Thread t = w.thread;  
   // w.tryLock能获取到锁,说明该线程没有在运行,因为runWorker中执行任务会先lock,  
          // 因此保证了中断的肯定是空闲的线程。  
            if (!t.isInterrupted() && w.tryLock()) {  
                try {  
                    t.interrupt();  
                } catch (SecurityException ignore) {  
                } finally {  
                    w.unlock();  
                }  
            }  
            if (onlyOne)  
                break;  
        }  
    } finally {  
        mainLock.unlock();  
    }  
} 
           

3.6. List shutdownNow()

public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // STOP状态:不再接受新任务且不再执行队列中的任务。
            advanceRunState(STOP);
            // 中断所有线程
            interruptWorkers();
            // 返回队列中还没有被执行的任务。
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

 void interruptIfStarted() {  
            Thread t;  
			// 初始化时state == -1  
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {  
                try {  
                    t.interrupt();  
                } catch (SecurityException ignore) {  
                }  
            }  
        }
           

3.7. void runWorker(Worker w)

// 第一次启动会执行初始化传进来的任务firstTask;
// 然后会从workQueue中取任务执行,如果队列为空则等待keepAliveTime这么长时间。
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // Worker的构造函数中抑制了线程中断setState(-1),所以这里需要unlock从而允许中断  
        w.unlock(); // allow interrupts
        // 为true的情况:1.执行任务抛出异常;2.被中断。
        boolean completedAbruptly = true;
        try {
	        // 如果getTask返回null那么getTask中会将workerCount递减,如果异常了这个递减操作会在processWorkerExit中处理
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
	                // 任务执行前可以插入一些处理,子类重载该方法
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
	                    // 执行用户任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
	                    // 和beforeExecute一样,留给子类去重载
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
	        // 结束线程的一些清理工作
            processWorkerExit(w, completedAbruptly);
        }
    }
           

3.8. boolean addWorker(Runnable firstTask, boolean core)

// 第一个为当前任务,第二个为是否为核心线程,是用corePoolSize ,false用maximumPoolSize
private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            // 这条语句等价:rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null ||  
            // workQueue.isEmpty())  
            // 满足下列调价则直接返回false,线程创建失败:  
            // rs > SHUTDOWN:STOP || TIDYING || TERMINATED 此时不再接受新的任务,且所有任务执行结束  
            // rs = SHUTDOWN:firtTask != null 此时不再接受任务,但是仍然会执行队列中的任务  
            // rs = SHUTDOWN:firtTask == null见execute方法的addWorker(null,  
            // false),任务为null && 队列为空  
            // 最后一种情况也就是说SHUTDONW状态下,如果队列不为空还得接着往下执行,为什么?add一个null任务目的到底是什么?  
            // 看execute方法只有workCount==0的时候firstTask才会为null结合这里的条件就是线程池SHUTDOWN了不再接受新任务  
            // 但是此时队列不为空,那么还得创建线程把任务给执行完才行
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
			// 走到这的情形:  
            // 1.线程池状态为RUNNING  
            // 2.SHUTDOWN状态,但队列中还有任务需要执行 
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 原子操作递增workCount  

                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                // 如果线程池的状态发生变化则重试
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
		// wokerCount递增成功  
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                // 并发的访问线程池workers对象必须加锁
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
					// RUNNING状态 || SHUTDONW状态下清理队列中剩余的任务 
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // 启动新添加的线程,这个线程首先执行firstTask,然后不停的从队列中取任务执行  
                // 当等待keepAlieTime还没有任务执行则该线程结束。见runWoker和getTask方法的代码。
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
	        // 线程启动失败,则从wokers中移除w并递减wokerCount
            if (! workerStarted)
	            // 递减wokerCount会触发tryTerminate方法
                addWorkerFailed(w);
        }
        return workerStarted;
    }
           

3.9 invokeAll(Collection<? extends Callable> tasks,long timeout, TimeUnit unit)

批量执行线程任务时设置的的超时时间(常用于Future,需要拿到所有线程返回结果的值)

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            for (Callable<T> t : tasks)
                futures.add(newTaskFor(t));

            final long deadline = System.nanoTime() + nanos;
            final int size = futures.size();

            // Interleave time checks and calls to execute in case
            // executor doesn't have any/much parallelism.
            for (int i = 0; i < size; i++) {
            	// 所有任务执行execute,同步执行
                execute((Runnable)futures.get(i));
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L)
                    return futures;
            }

            for (int i = 0; i < size; i++) {
            	// 判断每个future是否执行完成
                Future<T> f = futures.get(i);
                if (!f.isDone()) {
                    if (nanos <= 0L)
                        return futures;
                    try {
                    	// 在限定时间内拿到结果
                        f.get(nanos, TimeUnit.NANOSECONDS);
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    } catch (TimeoutException toe) {
                        return futures;
                    }
                    // 根据目前已用时间 更新下个future的限定时间
                    nanos = deadline - System.nanoTime();
                }
            }
            done = true;
            return futures;
        } finally {
           // 只要非完成的,就会中断所有线程
            if (!done)
                for (int i = 0, size = futures.size(); i < size; i++)
                    futures.get(i).cancel(true);
        }
    }
           

4.常见线程池

Executors是一个工具类,生成几种常见的线程池

线程安全(六)线程池系列文章目录0.前言1.Executor 与ExecutorService接口2.AbstractExecutorService类3.ThreadPoolExecutor4.常见线程池5.线程池关键属性

附:图片来源于网络,侵删

4.1.newFixedThreadPool()

生成一个固定大小的线程池(核心线程数和最大线程数是一样的,超出核心线程数的新提交的任务都会加入无界队列)

因为是无界队列,所以最大线程数其实用不到

创建一个线程池,该线程池重用在共享无界队列上运行的固定数量的线程。在任何时候,最多 nThreads 个线程将是活动的处理任务。如果在所有线程都处于活动状态时提交了其他任务,它们将在队列中等待,直到有线程可用。如果任何线程在关闭之前的执行过程中由于失败而终止,如果需要执行后续任务,新的线程将取代它。池中的线程将一直存在,直到显式关闭。
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
           

4.2.newSingleThreadExecutor()

生成只有一个线程的固定线程池,设置线程数为 1

因为是无界队列,所以最大线程数其实用不到

创建一个使用单个工作线程在无界队列上运行的 Executor。 (但请注意,如果该单线程在关闭前的执行过程中因故障而终止,如果需要执行后续任务,则新线程将取代它。)任务保证按顺序执行,并且不会有超过一个任务处于活动状态在任何给定时间。
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
           

4.3.newCachedThreadPool()

生成一个需要的时候就创建新的线程,同时可以复用之前创建的线程(如果这个线程当前没有任务)的线程池。

SynchronousQueue内部没有数据的存储空间,任何入队的线程都会阻塞,直到有线程来出队,可以保证如果加入一个数据,必须要remove掉,才能再次计入

创建一个线程池,根据需要创建新线程,但在可用时将重用以前构造的线程。这些池通常会提高执行许多短期异步任务的程序的性能。如果可用,对执行的调用将重用以前构造的线程。如果没有可用的现有线程,将创建一个新线程并将其添加到池中。六十秒内未使用的线程将被终止并从缓存中删除。因此,保持空闲足够长时间的池不会消耗任何资源。
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
           

4.4.newScheduledThreadPool()

定期或延时执行

创建一个线程池,可以安排命令在给定延迟后运行,或定期执行
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
           

5.线程池关键属性

5.1.线程池创建参数

  • corePoolSize(核心线程数)
  • maximumPoolSize(最大线程数)
  • workQueue(队列)
  • keepAliveTime(空闲时间)
  • rejectedExecutionHandler(拒绝策略)

    corePoolSize 到 maximumPoolSize 之间的线程会被回收,当然 corePoolSize 的线程也可以通过设置而得到回收(allowCoreThreadTimeOut(true))。

    workQueue 用于存放任务,添加任务的时候,如果当前线程数超过了 corePoolSize,那么往该队列中插入任务,线程池中的线程会负责到队列中拉取任务。

    keepAliveTime 用于设置空闲时间,如果线程数超出了 corePoolSize,并且有些线程的空闲时间超过了这个值,会执行关闭这些线程的操作

    rejectedExecutionHandler 用于处理当线程池不能执行此任务时的情况。

  • AbortPolicy(默认):丢弃任务并抛出RejectedExecutionException异常。
  • DiscardPolicy:丢弃任务,但是不抛出异常。
  • DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务。
  • CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务。