天天看點

SpringBoot中使用ThreadPoolExecutor和ThreadPoolTaskExecutor線程池的方法和差別

        Java中經常用到多線程來處理業務。在多線程的使用中,非常的不建議使用單純的Thread或者實作Runnable接口的方式來建立線程,因為這樣的線程建立及銷毀勢必會造成耗費資源、線程上下文切換問題,同時建立過多的線程也可能會引發資源耗盡的風險,對線程的管理非常的不友善。是以在使用多線程的時候,日常開發中我們經常引入的是線程池,利用線程池十分友善的對線程任務進行管理。

        這裡主要對線程池ThreadPoolExecutor和ThreadPoolTaskExecutor進行對比與使用見解。

一、ThreadPoolExecutor

該圖是它的繼承關系

SpringBoot中使用ThreadPoolExecutor和ThreadPoolTaskExecutor線程池的方法和差別

 它的構造方法為

public ThreadPoolExecutor(int coreSize,
        int maxSize,
        long keepAliveTime,
        TimeUnit unit,
        BlockingQueue<Runnable> workQueue,
        ThreadFactory threadFactory,
        RejectedExectionHandler handler);      

幾個參數的含義分别是:

coreSize:核心線程數,也是線程池中常駐的線程數

maxSize:最大線程數,在核心線程數的基礎上可能會額外增加一些非核心線程,需要注意的是隻有當workQueue隊列填滿時才會建立多于​​

​核心線程數的線程​

​​ keepAliveTime:非核心線程的空閑時間超過keepAliveTime就會被自動終止回收掉

unit:keepAliveTime的時間機關

workQueue:用于儲存任務的隊列,可以為直接送出隊列、無界任務隊列、有界任務隊列、優先任務隊列類型之一,當池子裡的工作線程數大于​

​核心線程數​

​時,這時新進來的任務會被放到隊列中

threadFactory:執行程式建立新線程時使用的工廠

handler:線程池無法繼續接收任務是的拒絕政策

workQueue任務隊列

        workQueue任務隊列可以為直接送出隊列、無界任務隊列、有界任務隊列、優先任務隊列類型之一,示例如下

例1:直接送出隊列

SynchronousQueue它沒有容量,每執行一個插入操作就會阻塞,需要再執行一個删除操作才會被喚醒,反之每一個删除操作也都要等待對應的插入操作

        當建立的線程數大于最大線程數時,會直接執行設定好的拒絕政策

new ThreadPoolExecutor(1,
        2,
        1000,
        TimeUnit.MILLISECONDS,
        new SynchronousQueue<Runnable>(),
        Executors.defaultThreadFactory(),
        new ThreadPoolExecutor.AbortPolicy()
    );      

例2:有界的任務隊列

ArrayBlockingQueue有界的任務隊列。如果有新的任務需要執行時,線程池會建立新的線程,知道建立的線程數量達到核心線程數時,則會将新的任務加入到等待的隊列中。如果等待的隊列已滿,則會繼續建立線程,直到線程數量達到設定的最大線程數,如果建立的線程數大于了最大線程數,則執行拒絕政策。

new ThreadPoolExecutor(
        1, 
        2, 
        1000, 
        TimeUnit.MILLISECONDS, 
        new ArrayBlockingQueue<Runnable>(10), 
        Executors.defaultThreadFactory(), 
        new ThreadPoolExecutor.AbortPolicy()
    );      

 例3:無界的任務隊列

LinkedBlockingQueue無界的任務隊列,線程池的任務隊列可以無限制的添加新的任務,而線程池建立的最大線程數就是設定的核心線程數量,也就是說在這種情況下,就算你設定了最大線程數也是無效的,哪怕你的任務隊列中緩存了很多未執行的任務,當線程池的線程數達到​

​corePoolSize​

​後,就不會再增加了;若後續有新的任務加入,則直接進入隊列等待,當使用這種任務隊列模式時,一定要注意你任務送出與處理之間的協調與控制,不然會出現隊列中的任務由于無法及時處理導緻一直增長,直到最後資源耗盡的問題。

new ThreadPoolExecutor(
        1, 
        2, 
        1000, 
        TimeUnit.MILLISECONDS, 
        new LinkedBlockingQueue<Runnable>(),
        Executors.defaultThreadFactory(),
        new ThreadPoolExecutor.AbortPolicy()
    );      

 例4:優先任務隊列

        PriorityBlockingQueue優先任務隊列,線程池的線程數一直為設定的核心線程數個,無論添加多少個任務,線程池建立的線程數也不會超過你設定的核心線程數,隻不過PriorityBlockingQueue隊列内的任務可以自定義隊則根據任務的優先級順序進行執行,不同于其它隊列是按照先進先出的規則處理的

new ThreadPoolExecutor(1,
        2,
        1000,
        TimeUnit.MILLISECONDS,
        new PriorityBlockingQueue<Runnable>(),
        Executors.defaultThreadFactory(),
        new ThreadPoolExecutor.AbortPolicy()
    );      

線程池拒絕政策

AbortPolicy

       直接抛出異常阻止系統正常工作

CallerRunsPolicy

       隻要線程池未關閉,該政策直接在調用者線程中,運作目前被丢棄的任務

DiscardOldestPolicy

       丢棄最老的一個請求,嘗試再次送出目前任務

DiscardPolicy

       丢棄無法處理的任務,不給予任何處理

除上述拒絕政策外,可以實作RejectedExecutionHandler接口,自定義拒絕政策

new ThreadPoolExecutor(
                1,
                2,
                1000,
                TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(5),
                Executors.defaultThreadFactory(),
                new RejectedExecutionHandler() {
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)                             
                    {
                        System.out.println(r.toString() + "執行了拒絕政策");
                    }
                });      

ThreadPoolExecutor工作流程

        當一個新的任務送出給線程池時,線程池的處理步奏:

1、首先判斷核心線程數是否已滿,如果沒滿則調用一個線程處理Task任務,如果已滿則執行步奏2;

2、這時會判斷阻塞隊列是否已滿,如果阻塞隊列沒滿,就将Task任務加入到阻塞隊列中等待執行,如果阻塞隊列已滿,則執行步奏3;

3、判斷是否大于最大線程數,如果小于最大線程數,則建立線程執行Task任務,如果大于最大線程數,則執行步驟4;

4、這時會使用淘汰政策來處理無法執行的Task任務

ThreadpoolExecutor線程池的使用

書寫一個配置類,在配置類中定義一個bean,如下

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


@Configuration
@Slf4j
@EnableAsync
public class ExecutorConfig {

    @Bean
    public ThreadPoolExecutor asyncExecutor(){
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                10,
                20,
                1000,
                TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(5),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        System.out.println("線程"+r.hashCode()+"建立");
                        //線程命名
                        Thread th = new Thread(r,"threadPool"+r.hashCode());
                        return th;
                    }
                },
                new ThreadPoolExecutor.CallerRunsPolicy()
        ){
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("準備執行:"+ ((ThreadTask)r).getTaskName());
                super.beforeExecute(t, r);
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println("執行完畢:"+((ThreadTask)r).getTaskName());
                super.afterExecute(r, t);
            }

            @Override
            protected void terminated() {
                System.out.println("線程池退出");
                super.terminated();
            }
        };
        return executor;
    }

}      

說明:

  1. ​beforeExecute​

    ​:線程池中任務運作前執行
  2. ​afterExecute​

    ​:線程池中任務運作完畢後執行
  3. ​terminated​

    ​:線程池退出後執行

代碼中的ThreadTask如下,此處可根據自己需求進行代碼編寫

public class ThreadTask implements Runnable {

    private String taskName;

    public String getTaskName() {
        return taskName;
    }

    public void setTaskName(String taskName) {
        this.taskName = taskName;
    }

    public ThreadTask(String name) {
        this.setTaskName(name);
    }

    public void run() {
        //輸出執行線程的名稱
        System.out.println("TaskName"+this.getTaskName()+"---ThreadName:"+Thread.currentThread().getName());
    }
}      

二、ThreadPoolTaskExecutor

ThreadPoolTaskExecutor這個類是Spring-Context支援的一個,專門用于Spring環境的線程池。其底層是在ThreadPoolExecutor的基礎上包裝的一層,使得Spring的整合更加友善

繼承關系如下

SpringBoot中使用ThreadPoolExecutor和ThreadPoolTaskExecutor線程池的方法和差別

 其成員變量如ThreadPoolExecutor,有核心線程數、最大線程數、keepAliveTIme、逾時時間機關、隊列、線程建立工廠、拒絕政策

SpringBoot中使用ThreadPoolExecutor和ThreadPoolTaskExecutor線程池的方法和差別

檢視它的源碼如下

SpringBoot中使用ThreadPoolExecutor和ThreadPoolTaskExecutor線程池的方法和差別

 可以看出,它依賴的還是ThreadPoolExecutor,并且注意它直接設定了keepAliveTime的時間機關

它的隊列、拒絕政策通ThreadPoolExecutor一緻

ThreadPoolTaskExecutor的使用

書寫一個配置類,在配置類中對線程池ThreadPoolTaskExecutor進行配置

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;


@Configuration
@Slf4j
@EnableAsync
public class ExecutorConfig {

    @Bean
    public ThreadPoolTaskExecutor asyncExecutor() {
        log.info("start asyncServiceExecutor");
        ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
        //配置核心線程數
        executor.setCorePoolSize(10);
        //配置最大線程數
        executor.setMaxPoolSize(20);
        //配置隊列大小
        executor.setQueueCapacity(100);
        //配置keepAliveTime
        executor.setKeepAliveSeconds(10);
        //配置線程池中的線程的名稱字首
        executor.setThreadNamePrefix("async-service-");
        //拒絕政策
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //執行初始化
        executor.initialize();
        return executor;
    }

}      

配置類中的VisiableThreadPoolTaskExecutor()類擴充了ThreadPoolTaskExecutor,對線程執行前後各階段做了補充操作,類似于上面ThreadPoolExecutor中的​

​beforeExecute、afterExecute等操作,具體代碼如下​

import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

@Slf4j
public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {

    private void showThreadPoolInfo(String prefix) {
        ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();

        if (null == threadPoolExecutor) {
            return;
        }

        log.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]",
                this.getThreadNamePrefix(),
                prefix,
                threadPoolExecutor.getTaskCount(),
                threadPoolExecutor.getCompletedTaskCount(),
                threadPoolExecutor.getActiveCount(),
                threadPoolExecutor.getQueue().size());
    }

    @Override
    public void execute(Runnable task) {
        showThreadPoolInfo("1. do execute");
        super.execute(task);
    }

    @Override
    public void execute(Runnable task, long startTimeout) {
        showThreadPoolInfo("2. do execute");
        super.execute(task, startTimeout);
    }

    @Override
    public Future<?> submit(Runnable task) {
        showThreadPoolInfo("1. do submit");
        return super.submit(task);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        showThreadPoolInfo("2. do submit");
        return super.submit(task);
    }

    @Override
    public ListenableFuture<?> submitListenable(Runnable task) {
        showThreadPoolInfo("1. do submitListenable");
        return super.submitListenable(task);
    }

    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
        showThreadPoolInfo("2. do submitListenable");
        return super.submitListenable(task);
    }
}      

三、線程池在接口中的具體使用

 上述描述中,最終書寫了一個配置類,對線程池進行了配置,定義了一個bean對象,那麼在具體接口中該怎麼使用,如下所示

1、建立controller層,書寫接口入口,調用server層代碼

import com.smile.syncproject.service.AsyncService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("async")
@Slf4j
public class AsyncController {

    @Autowired
    private AsyncService asyncService;

    @RequestMapping("test")
    public String test() {
        log.info("start submit");

        //調用service層的任務
        asyncService.executeAsync();

        log.info("end submit");

        return "success";
    }

}      

 2、在service層實作層進行線程池的使用

        通過注解@Async

@Async("asyncServiceExecutor")

注解内的值就是上面定義好的配置類中的bean的名稱。如果有多個線程池,就需要在定義不同bean的時候指定其name了

import com.smile.syncproject.service.AsyncService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class AsyncServiceImpl implements AsyncService {

    @Override
    @Async("asyncServiceExecutor")
    public void executeAsync() {
        log.info("start executeAsync");
        try{
            Thread.sleep(1000);
        }catch(Exception e){
            e.printStackTrace();
        }
        log.info("end executeAsync");
    }

}      

四、其它

1、線程池大小的設定