天天看点

并发编程-深入分析线程池原理

作者:程序猿凯撒

什么是线程池

线程池是一种用于管理和复用线程的机制,可以有效地管理线程的创建、执行和销毁。使用线程池可以减少线程创建和销毁的开销,并能够提供对并发任务的调度和执行的控制。

Java提供了ExecutorService接口和一些实现类来实现线程池。以下是几个常用的线程池实现类:

  1. ThreadPoolExecutor:ThreadPoolExecutor是一个灵活的线程池实现,可以通过构造函数参数来指定线程池的核心线程数、最大线程数、线程空闲时间等参数。
  2. Executors.newFixedThreadPool(int nThreads):newFixedThreadPool方法返回一个固定大小的线程池,其中线程数固定为指定的数量。
  3. Executors.newCachedThreadPool():newCachedThreadPool方法返回一个根据需要创建线程的线程池。线程池会根据任务的数量自动调整线程的数量。
  4. Executors.newSingleThreadExecutor():newSingleThreadExecutor方法返回一个只有一个线程的线程池,用于按顺序执行任务。

这些线程池实现类都实现了ExecutorService接口,提供了一些常用的方法来管理线程池和执行任务,例如submit用于提交任务,shutdown用于关闭线程池等。

使用线程池的主要优点是:

  • 重用线程:线程池会复用已创建的线程,避免了频繁创建和销毁线程的开销。
  • 控制并发数:可以限制线程池中线程的数量,控制并发执行的任务数,避免资源耗尽。
  • 提供任务调度和执行控制:可以提交任务并由线程池负责调度和执行任务,可以设置任务的优先级、超时等属性。

使用线程池可以提高多线程应用程序的性能和稳定性,并且能够更好地管理线程资源。

线程池的使用示例

这里就举一个使用Executors.newFixedThreadPool()创建线程池的例子

java复制代码package org.example;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {
    public static void main(String[] args) {
        // 创建固定大小为3的线程池
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        // 提交任务给线程池
        for (int i = 0; i < 10; i++) {
            Runnable task = new MyTask(i);
            executorService.execute(task);
        }

        // 关闭线程池
        executorService.shutdown();
    }
}

class MyTask implements Runnable {
    private int taskId;

    public MyTask(int taskId) {
        this.taskId = taskId;
    }

    @Override
    public void run() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        System.out.println(Thread.currentThread().getName() + "---" + "Task " + taskId + " is running.");
        // 执行任务的代码
    }
}
           
并发编程-深入分析线程池原理

可以发现使用都是这三个线程

线程池的实现思路

线程池的实现原理基于以下几个关键组件:

  1. 任务队列(Task Queue):用于存储提交给线程池的任务。当线程池中的线程空闲时,会从任务队列中获取任务进行执行。
  2. 线程池管理器(ThreadPool Manager):负责线程池的创建、初始化、销毁等操作,以及对线程池中的线程进行管理。
  3. 工作线程(Worker Threads):线程池中的线程,用于执行提交的任务。线程池会创建一定数量的工作线程,并将任务分配给它们执行。
  4. 饱和策略(Saturation Policy):用于决定当任务队列已满且无法继续接受新任务时,线程池应该采取的策略。常见的饱和策略有直接提交、丢弃、丢弃最旧任务和调用者运行等。

线程池的工作流程如下:

  1. 创建线程池:通过线程池的工厂方法或自定义方式创建线程池,并指定线程池的参数,如核心线程数、最大线程数、线程空闲时间等。
  2. 初始化线程:线程池在创建后,会初始化一定数量的核心线程,这些线程处于等待任务的状态。
  3. 提交任务:应用程序通过调用线程池的任务提交方法(如submit()或execute())将任务提交给线程池。
  4. 任务队列:线程池维护一个任务队列,用于存储已提交但尚未执行的任务。
  5. 任务调度和执行:线程池根据任务调度策略从任务队列 中选择任务,并将任务分配给合适的线程执行。任务可以在核心线程、非核心线程或等待队列中等待执行。
  6. 线程复用:线程池会复用已创建的线程来执行多个任务,避免了线程的频繁创建和销毁开销。执行完一个任务后,线程会被重新分配给下一个任务。
  7. 动态调整线程数量:线程池根据当前的工作负载情况和线程池参数,动态调整线程的数量。当任务量增加时,可以创建新的线程来处理;当任务量减少时,可以销毁多余的线程。
  8. 线程超时:对于一些线程池实现,当线程在一定时间内没有任务可执行时,超过指定的线程空闲时间,线程可能会被终止和移除。
  9. 错误处理:线程池可以提供错误处理机制,当任务抛出异常时,可以进行相应的处理,如记录日志或通知应用程序。
  10. 关闭线程池:当应用程序不再需要线程池时,应调用线程池的关闭方法来关闭线程池。关闭线程池会停止接受新的任务,并等待已提交的任务执行完成。可以选择使用不同的关闭方法,如shutdown()方法来等待任务执行完成,或shutdownNow()方法来立即停止执行中的任务并关闭线程池。

线程池的实现原理可以提高线程的利用率、减少线程创建和销毁的开销,并提供对并发任务的管理和调度。通过合理配置线程池的参数,可以控制并发线程的数量,避免资源耗尽,并提高系统的性能和稳定性。

源码分析

构造方法源码

arduino复制代码public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
           
ini复制代码public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
           

这里比较简单就是初始化一些初始值,需要注意的是这里使用到了阻塞队列,主要用于存储任务使用

execute源码分析

scss复制代码public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    //这里需要注意这个ctl,高三位标识当前线程池的状态
    //剩下的是线程数workerCountOf其实就是一个位运算,取得线程数
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        //当前工作的线程比核心线程少则创建一个线程
        //这里可以看出线程池中的线程并不是线程池初始化的时候创建的
        //而是使用懒加载的一种方式创建线程
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
    //2.核心池已满,但任务队列未满,添加到队列中
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            //再次检查线程池的状态
            reject(command);
        else if (workerCountOf(recheck) == 0)
            //说明线程已经被销毁了需要新建线程
            addWorker(null, false);
    }
    //3.核心池已满,队列已满,试着创建一个新线程
    else if (!addWorker(command, false))
       //这都失败了则可以直接拒绝任务了
       reject(command);
    }
           

线程池的状态

这里我们可以看一下线程池的状态

arduino复制代码private static final int COUNT_BITS = Integer.SIZE - 3; // 32 -3 = 29
//这个就表示最大线程数
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
//可以发现状态都是左移29位,也就印证了上文说了高三位存储的是线程池的状态
//接收任务并且执行队列中的任务
private static final int RUNNING    = -1 << COUNT_BITS; 
//不接收任务但是会执行队列中的任务
private static final int SHUTDOWN   =  0 << COUNT_BITS; 
//不接收任务不执行队列中的任务且会中断执行中的任务
private static final int STOP       =  1 << COUNT_BITS; 
//所有的任务都已经结束,线程数量为 0,处于该状态的线程池即将调用  terminated()方法
private static final int TIDYING    =  2 << COUNT_BITS;
//terminated()方法执行完成
private static final int TERMINATED =  3 << COUNT_BITS;
           

状态变化流程图:

并发编程-深入分析线程池原理

addWorker源码分析

ini复制代码
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
//获取当前状态
int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        //状态是STOP TIDYING TERMINATED会直接返回false
        //如果是SHUTDOWN则会判断firstTask是否是null,并且任务队列总还有任务的时候
        是可以允许继续添加线程的(应为要等任务全部执行完成)
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            //工作线程数
            int wc = workerCountOf(c);
            //判断工作线城市是否已经大于等于默认的最大线程数
            //或者是判断是否大于核心线程数或者我们设置的最大线程数
            (这里是根据创建类型判断取值的也就是core)
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
               //上面的条件都通过了则会通过CAS修改线程数如果成功了则会跳出循环
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
            //没有成功的话继续重试
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    //工作线程开始的标志
    boolean workerStarted = false;
    //工作线程添加完成的标志
    boolean workerAdded = false;
    Worker w = null;
    try {
        //新创建一个Worker
        w = new Worker(firstTask);
        //获取对应的线程
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            //这里需要加锁
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                //如果是运行状态或者说是SHUTDOWN状态但是firstTask == null是可以添加线程的
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    //添加到线程池中
                    workers.add(w);
                    int s = workers.size();
                    //如果集合中的工作线程数大于最大线程数,
                    这个最大线程数表示线程池曾经出现过的最大线程数
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                        
                    //这里就表示添加成功了
                    workerAdded = true;
                }
            } finally {
                //解锁
                mainLock.unlock();
            }
            if (workerAdded) {
                //最后启动线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            //如果失败了需要移除对应的worker并且数量递减
            addWorkerFailed(w);
    }
    return workerStarted;

}

           

这里我们可以看一下Worker的数据结构

arduino复制代码private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    private static final long serialVersionUID = 6138294804551838833L;
    //真正工作的线程
    final Thread thread;
    //需要执行的任务
    Runnable firstTask;
    volatile long completedTasks;

    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }
    ......
  }
           

可以发现Worker继承了AQS并且实现了Runnable,在构造方法中会将对应state置为-1并且会使用ThreadFactory创建一个新的线程,需要注意的是传入的Runnable就是this,意思就是一旦线程start则会执行Worker中的run方法,上文中我们也知道添加成功之后会直线线程start,也就是说会走到run方法执行runWorker对应的源码如下:

ini复制代码final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        //这个循环直到任务队列中的任务全部执行完了才会停止(getTask获取任务队列中的任务)
        while (task != null || (task = getTask()) != null) {
            w.lock();
            //线程池状态时STOP的时候需要终止任务
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
            //这里默认是没有实现的,在一些特定的场景中我们可以自己继承 
            ThreadpoolExecutor 自己重写
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //这里还是执行任务了
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //这里默认也是没有实现
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                //最后将完成任务书加一
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        //执行到这里说明任务队列中也没有可以执行的任务了所以这里会
        将入参 worker 从数组 workers 里删除掉;
        processWorkerExit(w, completedAbruptly);
    }
}
           

注:为什么没有使用可重入锁ReentrantLock,而是使用AQS,为的就是实现不可重入的特性去反应线程现在的执行状态

最后再看一下getTask方法

getTask

java复制代码private Runnable getTask() {
    //超时标志
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        //如果当前的状态
        //1. 线程池状态为  shutdown,且workQueue 为空
        (反映了 shutdown 状态的线程池还是要执行 workQueue 中剩余的任务的)
        //2. 线程池状态为  stop(shutdownNow()会导致变成 STOP)(此时不用考虑  workQueue的情况)
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            //返回null之后runWorker会回收Worker,这个线程就会被释放掉
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        // timed 变量用于判断是否需要进行超时控制
        // allowCoreThreadTimeOut 默认是 false,也就是核心线程不允许进行超时
        // wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量
        // 对于超过核心线程数量的这些线程,需要进行超时控制
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        //线程数量超过  maximumPoolSize 
        //timed && timedOut 如果为 true,表示当前操作需要进行超时控制
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                //此时也是worker数量减一然后返回null 外层调用方法会释放worker
                return null;
            continue;
        }

        try {
            //据timed来判断,如果为true,则通过阻塞队列poll方法进行超时控制,
            如果在 keepaliveTime 时间内没有获取到任务,则返回null
            否则通过 take 方法阻塞式获取队列中的任务
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                //如果拿到了则返回任务继续执行
                return r;
            //拿不到标志timedOut位true. 下一次循环会返回null
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
           

可以发现这个方法中有两个重要的方法就是workQueue.poll()和workQueue.take(),他们分别表示超时等待还是说无限等待

workQueue.poll()

ini复制代码public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E x = null;
    int c = -1;
    long nanos = unit.toNanos(timeout);
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {//这里表示任务数量
            //第一次进来肯定不是0
            //第二次进来就为0所以会直接返回null
            //当然如果第二次循环这个时候任务数量不是0则不会走到这个循环
            //而是返回任务执行任务
            if (nanos <= 0)
                return null;
            //这个时候就等待对应的时间,等待完成之后nanos=0
     
            nanos = notEmpty.awaitNanos(nanos);
        }
        //任务出队
        x = dequeue();
        //任务数量递减
        c = count.getAndDecrement();
        if (c > 1)
            //如果此时还有任务则通知(这个时候等待的线程会收到通知并唤醒线程)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}
           

可以发现这个方法就是等待一定的时间如果一直没有任务则当前线程会直接被销毁掉

workQueue.take()

ini复制代码public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}
           

这里比较简单,就是只要当前的任务数量是0就会等待,直到调用signal方法唤醒当前线程

整体逻辑流程图

并发编程-深入分析线程池原理
并发编程-深入分析线程池原理

作者:Potato_土豆

链接:https://juejin.cn/post/7243725147109916731

继续阅读