天天看点

java多线程知多少

作者:肖文英4353

线程池的状态

java多线程知多少

The workerCount is the number of workers that have been permitted to start and not permitted to stop

RUNNING: Accept new tasks and process queued tasks

SHUTDOWN: Don't accept new tasks, but process queued tasks

STOP: Don't accept new tasks, don't process queued tasks,and interrupt in-progress tasks

TIDYING: All tasks have terminated, workerCount is zero, the thread transitioning to state TIDYING will run the terminated() hook method

TERMINATED: terminated() has completed

RUNNING -> SHUTDOWN

On invocation of shutdown(), perhaps implicitly in finalize()

(RUNNING or SHUTDOWN) -> STOP

On invocation of shutdownNow()

SHUTDOWN -> TIDYING

When both queue(任务队列) and pool(线程池) are empty

STOP -> TIDYING

When pool is empty

TIDYING -> TERMINATED

When the terminated() hook method has completed

shutdown 方法将线程池状态置为 SHUTDOWN,线程池并不会立即停止,要等正在执行和队列里等待的任务执行完才会停止。

看到调用 shutdownNow 后,第一个任务0正在睡眠(sleep)的时候,触发了 interrupt 中断,之前等待的任务1-5被从队列中清除并返回,之后的任务被拒绝。该方法是通过 interrupt 方法去终止正在运行的任务的,因此无法响应 interrupt 中断的任务可能不会被终止。所以,该方法是无法保证一定能终止任务的。

所以 shutdownNow 方法将线程池状态置为 STOP,试图让线程池立刻停止,但不一定能保证立即停止,要等所有正在执行的任务(不能被 interrupt 中断的任务)执行完才能停止。

awaitTermination 的功能如下:

阻塞当前线程,等已提交和正在执行的任务都执行完,解除阻塞;当等待超过设置的时间,检查线程池是否停止,如果停止返回 true,否则返回 false,并解除阻塞

awaitTermination 和 shutdown 执行时都会申请锁,awaitTermination 需要在 shutdown 调用后调用,否则会知道超时后才会返回

参考 线程池相关-shutdown、shutdownNow和awaitTermination

线程池shutdown与shutdownNow有什么区别?

看代码主要三个区别:

1 shutdown会把线程池的状态改为SHUTDOWN,而shutdownNow把当前线程池状态改为STOP

2 shutdown只会中断所有空闲的线程,而shutdownNow会中断所有的线程。

3 shutdown返回方法为空,会将当前任务队列中的所有任务执行完毕;而shutdownNow把任务队列中的所有任务都取出来返回。

java多线程知多少
java多线程知多少
java多线程知多少
java多线程知多少
java多线程知多少

默认线程池可监控属性

taskCount: 线程池待执行的任务数量,从同步队列获取

completedTaskCount: 已经完成的任务数量 不是精确值

largestPoolSize: 线程池中曾经创建的最大的线程数量

getPoolSize: 线程池的线程数量(正在运行+空闲的)

getActiveCount: 活动的线程数(只有正在运行的)

java多线程知多少
java多线程知多少

默认扩展方法

protected void beforeExecute(Thread t, Runnable r) { } // task.run方法之前执行

protected void afterExecute(Runnable r, Throwable t) { } // task执行完之后,不管有没有异常都会执行

protected void terminated() { } //线程池终止时运行

在方法中创建并使用线程池

A pool that is no longer referenced in a program AND has no remaining threads will be shutdown automatically

allowCoreThreadTimeOut

java多线程知多少

这个参数默认值为:false

设置成true还是false需要根据你具体的业务判断,如果该业务需要执行的次数并不多,采用多线程只是为了缩短执行的时间,那么可以设置成true,毕竟用完后很长时间才会用到,线程干耗着也是耗费资源的。

但是如果是需要较高并发执行的业务,那么可以设置为false,保留着线程,避免每次都得创建线程耗费资源。

创建后的初始线程数为 0

默认情况下,线程池创建后的初始线程数为 0,当有任务到来就会创建一个线程去执行任务,当线程池中的线程数目达到 corePoolSize,就会把到达的任务放到缓存队列当中。

java多线程知多少

自定义线程名称

java多线程知多少

spring安全关闭线程池

spring线程池参数 setWaitForTasksToCompleteOnShutdown(true)该方法就是这里的关键,用来设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean,这样这些异步任务的销毁就会先于Redis线程池的销毁。同时,这里还设置了setAwaitTerminationSeconds(60),该方法用来设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住。

异常处理

当线程池中线程频繁出现未捕获的异常,那线程的复用率就大大降低了,需要不断地创建新线程。

1、线程池中线程中异常尽量手动捕获

2、通过设置ThreadFactory的UncaughtExceptionHandler可以对未捕获的异常做保底处理,通过execute提交任务,线程依然会中断,而通过submit提交任务,不会中断线程,线程异常会在get执行结果时抛出。

提交任务执行流程

public void execute(Runnable command) {
     if (command == null)
         throw new NullPointerException();
   
     int c = ctl.get();
     // 线程池当前线程数小于 corePoolSize 时进入if条件调用 addWorker 创建核心线程来执行任务
     if (workerCountOf(c) < corePoolSize) {
         if (addWorker(command, true))
             return;
         c = ctl.get();
     }
     // 线程池当前线程数大于或等于 corePoolSize ,就将任务添加到 workQueue 中
     if (isRunning(c) && workQueue.offer(command)) {
      // 获取到当前线程的状态,赋值给 recheck ,是为了重新检查状态
         int recheck = ctl.get();
         // 如果 isRunning 返回 false ,那就 remove 掉这个任务,然后执行拒绝策略,也就是回滚重新排队
         if (! isRunning(recheck) && remove(command))
             reject(command);
         // 线程池处于 running 状态,但是没有线程,那就创建线程执行任务
         else if (workerCountOf(recheck) == 0)
             addWorker(null, false);
     }
     // 如果任务放入 workQueue 失败,则尝试通过创建非核心线程来执行任务
     // 创建非核心线程失败,则说明线程池已经关闭或者已经饱和,会执行拒绝策略
     else if (!addWorker(command, false))
         reject(command);
 }           

创建工作线程流程

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (; ; ) {
            int c = ctl.get();
            int rs = runStateOf(c);

            //情况1 rs >= SHUTDOWN && rs != SHUTDOWN
            //情况2 rs >= SHUTDOWN && firstTask != null (是正在提交的任务)
            //情况3 rs >= SHUTDOWN && workQueue.isEmpty()
            if (rs >= SHUTDOWN && 
                !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) return false;

            for (; ; ) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false;
                // 跳出两层for循环,执行后面的语句
                if (compareAndIncrementWorkerCount(c)) break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs) continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    //情况1 处于Running状态
                    //情况2 处于Shutdown状态 && 创建的线程用于执行从任务队列获取的任务
                    if (rs < SHUTDOWN || 
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize) largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (!workerStarted) addWorkerFailed(w);
        }
        return workerStarted;
    }           

线程执行任务流程

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
                        && !wt.isInterrupted())
                    //先interrupt线程,然后线程执行遇到阻塞方法时会直接抛出InterruptedException
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);//回调方法
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x;
                        throw x;
                    } catch (Error x) {
                        thrown = x;
                        throw x;
                    } catch (Throwable x) {
                        thrown = x;
                        throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);//回调方法
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }           

从任务同步队列获取任务

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        //for + cas
        for (; ; ) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                //为什么只减少了工作线程个数?因为如果获取任务为null,当前工作线程就会回收(workers.remove(w))
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            //情况1 (wc > maximumPoolSize || (timed && timedOut) && wc >1 
            //情况2 (wc > maximumPoolSize || (timed && timedOut) && 任务队列为空
            if ((wc > maximumPoolSize || (timed && timedOut)) && 
                (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c)) return null;
                continue;
            }

            try {
                //大于核心线程数时会在指定时间内获取任务,超过指定时间就会返回空
                //正常情况,获取任务会一直阻塞,直到取到任务
                Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
                if (r != null) return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }           

参考 从源码分析几道必问线程池的面试题?

线程执行结束时处理逻辑

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            // completedAbruptly=false 表示不是异常退出
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && !workQueue.isEmpty()) min = 1;
                if (workerCountOf(c) >= min) return; // replacement not needed
            }
            //小于核心线程数时会再创建一个工作线程
            addWorker(null, false);
        }
    }