天天看点

线程池的简单实现线程池的简单实现前言一、简单实现线程池二、线程池是重点总结

线程池的简单实现

文章目录

  • 线程池的简单实现
  • 前言
  • 一、简单实现线程池
    • 1、都要有哪些类
    • 2、各个类的职责
      • (1)核心线程的类
      • (2)临时线程的类
      • (3)线程池类
      • (4)实现
  • 二、线程池是重点
  • 总结

前言

上一篇学习了定时器的简单实现和线程池的基础知识,线程池是面试热点问题,基础知识一定要掌握,比如什么是线程池、为什么会有线程池、线程池的工作流程、及其内部的一些机制,此外还要理解线程池构造方法形参列表各个参数的含义及作用。掌握了这些关于线程池的基础知识后,就来考虑一下如何简单的实现线程池吧

一、简单实现线程池

👉在这篇文章👈的后半部分,我详细的描述了一些关于线程池的基础知识,链接放在这儿,等我忘记了好去复习复习。

1、都要有哪些类

Java作为面向对象的高级语言,类和对象应该是其最大的特点,有时候一个功能还没实现,就先定义出一大堆的类。那么要实现我们的线程池,应该具有类呢?

从线程池的内部处理流程来看,在任务不断提交过程中,核心线程、阻塞队列、临时线程都可以处理任务,所以我们应该暂时需要核心线程的类和临时线程的类,阻塞队列可以直接使用阻塞队列数据结构实现,还有就是线程池类。

2、各个类的职责

(1)核心线程的类

核心线程就是负责执行任务的,正常情况下不会让它终止,所以该类的任务就是死循环处理任务。

(2)临时线程的类

临时线程也是负责执行任务的,但是又最重要的一点就是临时线程在空闲状态下有存活时常的限制,所以在临时线程类中,除了执行任务之外,还得判断该线程是否到达空闲状态的最长存活时间。

(3)线程池类

调用该类的角色向向线程池类中提交任务,线程池这个类负责根据当前线程池内线程的情况采取一定的措施:

  • 当前核心线程数量小于核心线程数量上限时创建核心线程去执行任务;
  • 核心线程数量到达上限时将任务放进阻塞队列;
  • 阻塞队列已满时创建临时线程执行任务;
  • 总线程数量到达上限时采取指定的拒绝策略;

(4)实现

核心线程类:

/**核心线程类(一个正式员工线程要完成的工作)
 * @author Lvvvvvv
 * @date 2022/07/30 15:16
 **/
public class CoreJob implements Runnable {
    // 需要阻塞队列
    private final BlockingQueue<Runnable> workQueue;
    private Runnable firstCommand;

    CoreJob(BlockingQueue<Runnable> workQueue, Runnable firstCommand) {
        this.workQueue = workQueue;
        this.firstCommand = firstCommand;
    }

    @Override
    public void run() {
        try {
            firstCommand.run();     // 优先先把刚提交的任务先做掉了
            firstCommand = null;    // 这里设置 null 的意思是,不影响 firstCommand 对象被 GC 时的回收

            while (!Thread.interrupted()) {
                Runnable command = workQueue.take();
                command.run();
            }
        } catch (InterruptedException ignored) {}
    }
}

           

临时线程类:

/**临时线程类(一个临时员工线程要完成的工作)
 * @author Lvvvvvv
 * @date 2022/07/30 15:23
 **/
public class TemporaryJob implements Runnable {
    // 需要阻塞队列
    private final BlockingQueue<Runnable> workQueue;
    private final long keepAliveTime;
    private final TimeUnit unit;
    private Runnable firstCommand;

    TemporaryJob(long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, Runnable firstCommand) {
        this.keepAliveTime = keepAliveTime;
        this.unit = unit;
        this.workQueue = workQueue;
        this.firstCommand = firstCommand;
    }

    @Override
    public void run() {
        try {
            firstCommand.run();     // 优先先把刚提交的任务先做掉了
            firstCommand = null;    // 这里设置 null 的意思是,不影响 firstCommand 对象被 GC 时的回收

            // 一旦超过一定时间没有任务,临时工是需要退出的
            // 1. keepAliveTime + unit 记录起来
            // 2. 怎么就知道超过多久没有任务了?如果一定时间内都无法从队列中取出来任务,则认为摸鱼时间够了
            while (!Thread.interrupted()) {
//                Runnable command = workQueue.take();
                Runnable command = workQueue.poll(keepAliveTime, unit);
                if (command == null) {
                    // 说明,没有取到任务
                    // 说明超时时间已到
                    // 说明该线程已经 keepAliveTime + unit 时间没有工作了
                    // 所以,可以退出了
                    break;
                }
                command.run();
            }
        } catch (InterruptedException ignored) {}
    }
}

           

线程池类:

/** 线程池类(线程池对象)
 * @author Lvvvvvv
 * @date 2022/07/30 14:42
 **/
public class MyThreadPoolExecutor implements Executor {
    // 创建线程的工厂对象
    private final ThreadFactory threadFactory;

    // 临时工摸鱼的时间上限
    private final long keepAliveTime;
    private final TimeUnit unit;

    // 当前正式员工的数量
    private int currentCoreSize;

    // 正式员工的数量上限
    private final int corePoolSize;

    // 当前临时员工的数量
    private int currentTemporarySize;

    // 临时员工的数量上限
    private final int temporaryPoolSize;

    // 传递任务的阻塞队列
    private final BlockingQueue<Runnable> workQueue;

    public MyThreadPoolExecutor(int corePoolSize,
                                int maximumPoolSize,
                                long keepAliveTime,
                                TimeUnit unit,
                                BlockingQueue<Runnable> workQueue,
                                ThreadFactory threadFactory,
                                RejectedExecutionHandler handler) {
        this.corePoolSize = corePoolSize;
        this.temporaryPoolSize = maximumPoolSize - corePoolSize;
        this.workQueue = workQueue;
        this.threadFactory = threadFactory;
        this.keepAliveTime = keepAliveTime;
        this.unit = unit;
    }

    // 向线程池中提交任务
    @Override
    public void execute(Runnable command) {
        // 1. 如果正式员工的数量还低于正式员工的数量上限,则优先创建正式员工处理任务
        // 1.1 需要管理,当前正式员工有多少,正式员工的数量上限有多少?
        if (currentCoreSize < corePoolSize) {
            // 优先创建正式员工进行处理
            // 创建一个线程,这个线程中的任务就是不断地取任务-做任务,但是不需要考虑退出的问题

            //下面这两行就是,通过实现Runnable接口,覆写run方法,然后实例化Runnable对象(job),利用Runnable对象去构建一个线程
            CoreJob job = new CoreJob(workQueue, command);//例化Runnable对象(job)
            //Thread thread = new Thread(job);//这是不用线程工厂创建线程
            Thread thread = threadFactory.newThread(job);//这是用线程工厂创建的线程
            String name = String.format("正式员工-%d", currentCoreSize);
            thread.setName(name);

            thread.start();

            // 只是两种不同的策略,没有谁是正确的说法
            // 1. 把 command 放到队列中;command 的执行次序是在队列已有的任务之后
            // 2. 创建正式员工的时候,就把 command 提交给正式员工,让 command 优先执行
            // 我们这里采用第二种方案,主要原因就是 java 官方的就是使用的第二种策略

            currentCoreSize++;
            return;
        }

        // 走到这里,说明正式员工的数量 == 正式员工的上限了
        // 2. 优先把任务放入队列中,如果放入成功,execute 执行结束,否则还需要继续
        // 2.1 需要一个阻塞队列
//        workQueue.put(command); // 带阻塞的放入,是否满足这里的需求?
                                // 我们这里希望的是立即得到结果
        boolean success = workQueue.offer(command);
        if (success == true) {
            // 说明放入队列成功
            return;
        }

        // 队列也已经放满了
        // 3. 继续判断,临时工的数量有没有到上限,如果没有到达,创建新的临时工来处理
        if (currentTemporarySize < temporaryPoolSize) {
            // 创建临时工进行处理
            TemporaryJob job = new TemporaryJob(keepAliveTime, unit, workQueue, command);
//            Thread thread = new Thread(job);    // 不使用工厂创建的线程
            Thread thread = threadFactory.newThread(job);   // thread 代表的就是临时员工
            String name = String.format("临时员工-%d", currentTemporarySize);
            thread.setName(name);

            thread.start();

            currentTemporarySize++;
            return;
        }

        // 4. 执行拒绝策略
        // 为了实现方便,暂时不考虑其他策略
        throw new RejectedExecutionException();
    }
}

           

主方法测试一下吧🔨

/**
 * @author Lvvvvvv
 * @date 2022/07/30 15:32
 **/
public class Main {
    //Task:用户提交的一次具体任务
    static class Task implements Runnable {
        @Override
        public void run() {
            try {
                //让任务处于休眠状态,sleep()方法使得该任务不到1000s不会停下来
                // 也是为了方便观察现象,没有什么特殊含义
                TimeUnit.SECONDS.sleep(1000);
            } catch (InterruptedException ignored) {}
        }
    }

    //线程工厂
    static class MyThreadFactory implements ThreadFactory {
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r);
        }
    }

    public static void main(String[] args) {
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(5);
        // 同时最多有 15 个任务
        // 3 个正式的
        // 5 个队列中
        // 7 个临时的
        // 提交第 16(从0开始计算,第16个的下标为15) 个任务时就会出现拒绝服务
        MyThreadPoolExecutor executor = new MyThreadPoolExecutor(
                3, 10, 10, TimeUnit.SECONDS,
                workQueue,
                new MyThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
        );
        for (int i = 0; i < 150; i++) {
            System.out.println("提交任务: " + i);
            //Task:用户提交的一次具体任务
            Task task = new Task();
            //让线程池对象去执行用户提交的任务
            executor.execute(task);
        }
    }
}

           

在我们的定义中,线程池中最多有3个核心线程,10个总线程,所以可以有7个临时线程,再加上队列中可以放5个任务,采取拒绝策略。所以当我们不断提交任务时,提交到第16个任务,就会出现拒绝该任务的现象,并且之前提交的任务都没有执行完,所以线程池中有10个线程,3个核心线程和7个临时线程。

看看运行结果:

线程池的简单实现线程池的简单实现前言一、简单实现线程池二、线程池是重点总结

运行结果与我们的预期一致。再用 jconsole 看看线程情况:

线程池的简单实现线程池的简单实现前言一、简单实现线程池二、线程池是重点总结

如上图所示,此时线程池中有10个线程,与我们的预期结果一致。

线程池的简单实现就先简简单单做到这里,这是线程不安全的,还有改进的空间,等我再往后学一学,再完善一个线程安全版本的。

二、线程池是重点

重要的事情说三遍,【线程池很重要,面试经常问】*️⃣3️⃣。

一定要掌握的哦🙇‍♀️

总结

上一篇和这一篇,重点掌握线程池的知识,拜拜🙋‍♀️