Java中經常用到多線程來處理業務。在多線程的使用中,非常的不建議使用單純的Thread或者實作Runnable接口的方式來建立線程,因為這樣的線程建立及銷毀勢必會造成耗費資源、線程上下文切換問題,同時建立過多的線程也可能會引發資源耗盡的風險,對線程的管理非常的不友善。是以在使用多線程的時候,日常開發中我們經常引入的是線程池,利用線程池十分友善的對線程任務進行管理。
這裡主要對線程池ThreadPoolExecutor和ThreadPoolTaskExecutor進行對比與使用見解。
一、ThreadPoolExecutor
該圖是它的繼承關系
它的構造方法為
public ThreadPoolExecutor(int coreSize,
int maxSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExectionHandler handler);
幾個參數的含義分别是:
coreSize:核心線程數,也是線程池中常駐的線程數
maxSize:最大線程數,在核心線程數的基礎上可能會額外增加一些非核心線程,需要注意的是隻有當workQueue隊列填滿時才會建立多于
核心線程數的線程
keepAliveTime:非核心線程的空閑時間超過keepAliveTime就會被自動終止回收掉
unit:keepAliveTime的時間機關
workQueue:用于儲存任務的隊列,可以為直接送出隊列、無界任務隊列、有界任務隊列、優先任務隊列類型之一,當池子裡的工作線程數大于
核心線程數
時,這時新進來的任務會被放到隊列中
threadFactory:執行程式建立新線程時使用的工廠
handler:線程池無法繼續接收任務是的拒絕政策
workQueue任務隊列
workQueue任務隊列可以為直接送出隊列、無界任務隊列、有界任務隊列、優先任務隊列類型之一,示例如下
例1:直接送出隊列
SynchronousQueue它沒有容量,每執行一個插入操作就會阻塞,需要再執行一個删除操作才會被喚醒,反之每一個删除操作也都要等待對應的插入操作
當建立的線程數大于最大線程數時,會直接執行設定好的拒絕政策
new ThreadPoolExecutor(1,
2,
1000,
TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
例2:有界的任務隊列
ArrayBlockingQueue有界的任務隊列。如果有新的任務需要執行時,線程池會建立新的線程,知道建立的線程數量達到核心線程數時,則會将新的任務加入到等待的隊列中。如果等待的隊列已滿,則會繼續建立線程,直到線程數量達到設定的最大線程數,如果建立的線程數大于了最大線程數,則執行拒絕政策。
new ThreadPoolExecutor(
1,
2,
1000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
例3:無界的任務隊列
LinkedBlockingQueue無界的任務隊列,線程池的任務隊列可以無限制的添加新的任務,而線程池建立的最大線程數就是設定的核心線程數量,也就是說在這種情況下,就算你設定了最大線程數也是無效的,哪怕你的任務隊列中緩存了很多未執行的任務,當線程池的線程數達到
corePoolSize
後,就不會再增加了;若後續有新的任務加入,則直接進入隊列等待,當使用這種任務隊列模式時,一定要注意你任務送出與處理之間的協調與控制,不然會出現隊列中的任務由于無法及時處理導緻一直增長,直到最後資源耗盡的問題。
new ThreadPoolExecutor(
1,
2,
1000,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
例4:優先任務隊列
PriorityBlockingQueue優先任務隊列,線程池的線程數一直為設定的核心線程數個,無論添加多少個任務,線程池建立的線程數也不會超過你設定的核心線程數,隻不過PriorityBlockingQueue隊列内的任務可以自定義隊則根據任務的優先級順序進行執行,不同于其它隊列是按照先進先出的規則處理的
new ThreadPoolExecutor(1,
2,
1000,
TimeUnit.MILLISECONDS,
new PriorityBlockingQueue<Runnable>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
線程池拒絕政策
AbortPolicy
直接抛出異常阻止系統正常工作
CallerRunsPolicy
隻要線程池未關閉,該政策直接在調用者線程中,運作目前被丢棄的任務
DiscardOldestPolicy
丢棄最老的一個請求,嘗試再次送出目前任務
DiscardPolicy
丢棄無法處理的任務,不給予任何處理
除上述拒絕政策外,可以實作RejectedExecutionHandler接口,自定義拒絕政策
new ThreadPoolExecutor(
1,
2,
1000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(5),
Executors.defaultThreadFactory(),
new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
{
System.out.println(r.toString() + "執行了拒絕政策");
}
});
ThreadPoolExecutor工作流程
當一個新的任務送出給線程池時,線程池的處理步奏:
1、首先判斷核心線程數是否已滿,如果沒滿則調用一個線程處理Task任務,如果已滿則執行步奏2;
2、這時會判斷阻塞隊列是否已滿,如果阻塞隊列沒滿,就将Task任務加入到阻塞隊列中等待執行,如果阻塞隊列已滿,則執行步奏3;
3、判斷是否大于最大線程數,如果小于最大線程數,則建立線程執行Task任務,如果大于最大線程數,則執行步驟4;
4、這時會使用淘汰政策來處理無法執行的Task任務
ThreadpoolExecutor線程池的使用
書寫一個配置類,在配置類中定義一個bean,如下
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Configuration
@Slf4j
@EnableAsync
public class ExecutorConfig {
@Bean
public ThreadPoolExecutor asyncExecutor(){
ThreadPoolExecutor executor = new ThreadPoolExecutor(
10,
20,
1000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(5),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
System.out.println("線程"+r.hashCode()+"建立");
//線程命名
Thread th = new Thread(r,"threadPool"+r.hashCode());
return th;
}
},
new ThreadPoolExecutor.CallerRunsPolicy()
){
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("準備執行:"+ ((ThreadTask)r).getTaskName());
super.beforeExecute(t, r);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("執行完畢:"+((ThreadTask)r).getTaskName());
super.afterExecute(r, t);
}
@Override
protected void terminated() {
System.out.println("線程池退出");
super.terminated();
}
};
return executor;
}
}
說明:
-
:線程池中任務運作前執行beforeExecute
-
:線程池中任務運作完畢後執行afterExecute
-
:線程池退出後執行terminated
代碼中的ThreadTask如下,此處可根據自己需求進行代碼編寫
public class ThreadTask implements Runnable {
private String taskName;
public String getTaskName() {
return taskName;
}
public void setTaskName(String taskName) {
this.taskName = taskName;
}
public ThreadTask(String name) {
this.setTaskName(name);
}
public void run() {
//輸出執行線程的名稱
System.out.println("TaskName"+this.getTaskName()+"---ThreadName:"+Thread.currentThread().getName());
}
}
二、ThreadPoolTaskExecutor
ThreadPoolTaskExecutor這個類是Spring-Context支援的一個,專門用于Spring環境的線程池。其底層是在ThreadPoolExecutor的基礎上包裝的一層,使得Spring的整合更加友善
繼承關系如下
其成員變量如ThreadPoolExecutor,有核心線程數、最大線程數、keepAliveTIme、逾時時間機關、隊列、線程建立工廠、拒絕政策
檢視它的源碼如下
可以看出,它依賴的還是ThreadPoolExecutor,并且注意它直接設定了keepAliveTime的時間機關
它的隊列、拒絕政策通ThreadPoolExecutor一緻
ThreadPoolTaskExecutor的使用
書寫一個配置類,在配置類中對線程池ThreadPoolTaskExecutor進行配置
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@Slf4j
@EnableAsync
public class ExecutorConfig {
@Bean
public ThreadPoolTaskExecutor asyncExecutor() {
log.info("start asyncServiceExecutor");
ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
//配置核心線程數
executor.setCorePoolSize(10);
//配置最大線程數
executor.setMaxPoolSize(20);
//配置隊列大小
executor.setQueueCapacity(100);
//配置keepAliveTime
executor.setKeepAliveSeconds(10);
//配置線程池中的線程的名稱字首
executor.setThreadNamePrefix("async-service-");
//拒絕政策
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//執行初始化
executor.initialize();
return executor;
}
}
配置類中的VisiableThreadPoolTaskExecutor()類擴充了ThreadPoolTaskExecutor,對線程執行前後各階段做了補充操作,類似于上面ThreadPoolExecutor中的
beforeExecute、afterExecute等操作,具體代碼如下
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
@Slf4j
public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
private void showThreadPoolInfo(String prefix) {
ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
if (null == threadPoolExecutor) {
return;
}
log.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]",
this.getThreadNamePrefix(),
prefix,
threadPoolExecutor.getTaskCount(),
threadPoolExecutor.getCompletedTaskCount(),
threadPoolExecutor.getActiveCount(),
threadPoolExecutor.getQueue().size());
}
@Override
public void execute(Runnable task) {
showThreadPoolInfo("1. do execute");
super.execute(task);
}
@Override
public void execute(Runnable task, long startTimeout) {
showThreadPoolInfo("2. do execute");
super.execute(task, startTimeout);
}
@Override
public Future<?> submit(Runnable task) {
showThreadPoolInfo("1. do submit");
return super.submit(task);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
showThreadPoolInfo("2. do submit");
return super.submit(task);
}
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
showThreadPoolInfo("1. do submitListenable");
return super.submitListenable(task);
}
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
showThreadPoolInfo("2. do submitListenable");
return super.submitListenable(task);
}
}
三、線程池在接口中的具體使用
上述描述中,最終書寫了一個配置類,對線程池進行了配置,定義了一個bean對象,那麼在具體接口中該怎麼使用,如下所示
1、建立controller層,書寫接口入口,調用server層代碼
import com.smile.syncproject.service.AsyncService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("async")
@Slf4j
public class AsyncController {
@Autowired
private AsyncService asyncService;
@RequestMapping("test")
public String test() {
log.info("start submit");
//調用service層的任務
asyncService.executeAsync();
log.info("end submit");
return "success";
}
}
2、在service層實作層進行線程池的使用
通過注解@Async
@Async("asyncServiceExecutor")
注解内的值就是上面定義好的配置類中的bean的名稱。如果有多個線程池,就需要在定義不同bean的時候指定其name了
import com.smile.syncproject.service.AsyncService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class AsyncServiceImpl implements AsyncService {
@Override
@Async("asyncServiceExecutor")
public void executeAsync() {
log.info("start executeAsync");
try{
Thread.sleep(1000);
}catch(Exception e){
e.printStackTrace();
}
log.info("end executeAsync");
}
}
四、其它
1、線程池大小的設定