開發過程中我們會遇到很多使用線程池的場景,例如異步短信通知,異步發郵件,異步記錄記錄檔,異步處理批量Excel解析。這些異步處理的場景我們都可以把它放線上程池中去完成,當然還有很多場景也都可以使用線程池,掌握線程池後開發中自己靈活應用。
例如在生成訂單的時候給使用者發送短信,生成訂單的結果不應該被發送短信的成功與否所左右,也就是說生成訂單這個主操作是不依賴于發送短信這個操作,我們就可以把發送短信這個操作置為異步操作。當然也有的小夥伴會說我使用多線程不就行了,為啥還要使用線程池,那我就先聊一下線程和線程池的優缺點。
使用線程的缺點:
1:每次new Thread對象的時候,建立對象這樣性能很差。
2:線程缺乏管理,有可能無限建立線程,這樣可能造成系統資源的浪費或者OOM(記憶體溢出)。
使用線程池的優點:
1:重用存在的線程,減少線程的建立,性能良好。
2:可以有效的控制最大的線程并發數,提高系統資源的使用率。
說完上面就知道使用線程池有多好了吧,那知道了線程池的好處,我們怎樣使用線程池呢?好了重點對象出現了【PS 對象出現了汪汪汪】。
這個時候可能會有小夥伴疑問為什麼要先聊線程池呢?Spring的異步處理寫的很好直接用不就完事了,因為線程池和Spring的異步處理有着千絲萬縷的關系,仔細看就知道了。
Java中使用線程池,那就要深刻了解大名鼎鼎的ThreadPoolExecutor對象。那怎麼建立這個對象的,請看給的源碼So Easy (學會了建立對象,同僚再不擔心你的學習能力了,廣告詞)
java複制代碼/**
* Creates a new {@code ThreadPoolExecutor}
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
很多人一看就是運用幾個參數建立對象,确實不難。但是這幾個參數的表達的意思懂嗎,看英文确實有點不懂,好了那我就仔細聊聊這幾個參數,繼續學英語【這是真正學英語,不是電視劇中的學英語】
1:corePoolSize,線程池中的核心線程,當送出一個新的任務時候,線程池會建立一個新的線程執行任務,直到目前的線程數等于corePoolSize;如果目前線程數為corePoolSize,繼續送出新的任務到阻塞隊列中,等待被執行。
2:maximumPoolSize,線程池中允許的最大的線程數,如果阻塞隊列滿了,繼續送出新的任務,則建立新的線程執行任務。前提是目前線程數小于maximumPoolSize。
3:keepAliveTime,線程池維護線程所允許的時間,當線程池中的數量大于corePoolSize時候,如果沒有任務送出,核心線程外的線程不會被立即銷毀,而是等待時間超過了keepAliveTime。
4:unit,keepAliveTime的時間機關。
5:workQueue:用來儲存等待被執行任務的阻塞隊列,且任務必須實作Runable接口。
6:threadFactory,他是threadFactory類型的變量,用來建立線程,預設使用Executors.defaultThreadFactory()來建立線程。
7:handler:線程池的飽和政策,當阻塞隊列滿了,且沒有空閑的工作線程,如果繼續送出任務,必須采取一種政策處理該任務,線程池提供了4種政策:
7.1、AbortPolicy:直接抛出異常,預設政策;
7.2、CallerRunsPolicy:用調用者所在的線程來執行任務;
73、DiscardOldestPolicy:丢棄阻塞隊列中靠最前的任務,并執行目前任務;
7.4、DiscardPolicy:直接丢棄任務;
上面的4種政策都是ThreadPoolExecutor的内部類。
當然也可以根據應用場景實作RejectedExecutionHandler接口,自定義飽和政策。
好了看到上面的解釋應該比較懂了吧,如果不懂那我再畫一張圖,幫你更好的了解線程池的工作原理,如下圖:
看了上圖如果還不懂,那我就給你上個代碼,保證你看懂了【PS因為我剛開始就給你說了So Easy,不騙你】(呸呸呸,咋有點渣)。
scss複制代碼public class ThreadPoolTest {
public static void main(String[] args) {
ThreadPoolExecutor pools = createPool();
int activeCount = -1;
int queueSize = -1;
while (true) {
if (activeCount != pools.getActiveCount() || queueSize != pools.getQueue().size()) {
System.out.println("活躍的線程的個數:" + pools.getActiveCount());
System.out.println("隊列中線程的個數:" + pools.getQueue().size());
System.out.println("最大的線程的個數" + pools.getMaximumPoolSize());
activeCount = pools.getActiveCount();
queueSize = pools.getQueue().size();
System.out.println("=========================================");
}
}
}
// 建立線程池,通過更改線程池的參數友善你更好的了解線程池
// 其中第六個參數你也可以改成ThreadPoolExecutor預設的:Executors.defaultThreadFactory()
private static ThreadPoolExecutor createPool() {
ThreadPoolExecutor pools = new ThreadPoolExecutor(1, 2, 30,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(1),
r -> {
Thread t = new Thread(r);
return t;
}, new ThreadPoolExecutor.AbortPolicy());
System.out.println("The PoolExecutor is create done");
pools.execute(() -> {
sleep(100);
});
//這個裡面就可以寫自己的業務
pools.execute(() -> {
sleep(10);
});
pools.execute(() -> {
sleep(10);
});
return pools;
}
private static void sleep(int seconds) {
try {
System.out.println(" " + Thread.currentThread().getName() + " ");
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
好了了解了線程池,那就引入本文的重點Spring的異步處理。如果是使用Spring Boot項目那隻需要2個注解就能搞定了。如下:
第一步加@EnableAsync注解,如下圖:
第二步在要使用的方法上加@Async注解,如下:
然後就可以直接使用了,如下是運作結果,加了Async注解和沒加注解出來的名字不一樣,有興趣的小夥伴可以試一下沒加注解列印出來的是什麼名字:
當然可能使用Spring Boot版本不同,列印出來的線程名稱可能會有點不同。這個時候可能會有小夥伴說這使用也太簡單了,講上面的線程池沒有啊。
繼續看,容我仔細說。我們先看下面2個注解
less複制代碼@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
/**
* Indicate the 'async' annotation type to be detected at either class
* or method level.
* 預設情況下,要開啟異步操作,要在相應的方法或者類上加上@Async注解
*/
Class<? extends Annotation> annotation() default Annotation.class;
/**
* Indicate whether subclass-based (CGLIB) proxies are to be created as opposed
* to standard Java interface-based proxies.
* true表示啟用CGLIB代理
*/
boolean proxyTargetClass() default false;
/**
* Indicate the order in which the {@link AsyncAnnotationBeanPostProcessor}
* should be applied.
* 直接定義:它的執行順序(因為可能有多個@EnableXXX)
*/
int order() default Ordered.LOWEST_PRECEDENCE;
}
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Async {
/**
* A qualifier value for the specified asynchronous operation(s).
* 這個value值是用來指定執行器的
*/
String value() default "";
}
最重要的還是上面的@Import注解導入的類:AsyncConfigurationSelector。這種方式我以前的文章說過很多次了,如果看過我以前寫的文章的,對這種導入應該很熟悉,是以我直接說這個類的作用了。這個類幫我們導入了ProxyAsyncConfiguration這個類,然後又幫我們注入了AsyncAnnotationBeanPostProcessor這個類。它就是和@Async比較相關的一個類了。從上的源碼可議看出,支援@Asycn注解異步處理我們寫的業務處理方法,交給了AnnotationAsyncExecutionInterceptor。具體的實作功能交給了它的繼承類AsyncExecutionInterceptor。由于主要功能處理都在AsyncExecutionInterceptor這個類中是以我主要聊這個類了。
首先是這個方法:
less複制代碼@Override
@Nullable
// 見名知意就知道這個是擷取線程池的方法,
// 這個厲害了。如果父類傳回的defaultExecutor 為null,
// 那就new一個SimpleAsyncTaskExecutor作為預設的執行器,是以我們上文中
// 如果沒有指定線程池,那麼就預設給我們一個預設的:SimpleAsyncTaskExecuto
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}
先簡單說一下這個預設的線程池,看完這個預設的線程池解釋就知道我最開始為什麼要先說一下線程池了。
SimpleAsyncTaskExecutor:異步執行使用者任務的SimpleAsyncTaskExecutor。每次執行使用者送出給它的任務時,它會啟動新的線程,并允許開發者控制并發線程的上限(concurrencyLimit),進而起到一定的資源節流作用。預設時,concurrencyLimit取值為-1,即不啟用資源節流,是以它不是真的線程池,這個類不重用線程,每次調用都會建立一個新的線程(是以建議我們在使用@Aysnc的時候,自己配置一個線程池,節約資源)
然後看擷取預設線程池的方法,這個方法很牛,先看代碼後面解釋為什麼牛【PS裡面中文都是添加的,老外們不會中文】。
kotlin複制代碼protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
if (beanFactory != null) {
// 這個處理很有意思,它是用用的try catch的技巧去處理的
try {
// 如果容器記憶體在唯一的TaskExecutor(子類),就直接傳回了
return beanFactory.getBean(TaskExecutor.class);
}
catch (NoUniqueBeanDefinitionException ex) {
// 這是出現了多個TaskExecutor類型的話,那就按照名字去拿 `taskExecutor`且是Executor類型
try {
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
// 如果再沒有找到,也不要報錯,而是接下來建立一個預設的處理器
// 這裡輸出一個info資訊
catch (NoSuchBeanDefinitionException ex2) {
}
}
catch (NoSuchBeanDefinitionException ex) {
try {
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
catch (NoSuchBeanDefinitionException ex2) {
}
// 這裡還沒有擷取到,就放棄。用本地預設的executor吧~~~
// 子類可以去複寫此方法,發現為null的話可議給一個預設值~~~~比如`AsyncExecutionInterceptor`預設給的就是`SimpleAsyncTaskExecutor`作為執行器的
// Giving up -> either using local default executor or none at all...
}
}
return null;
}
好了看了擷取預設線程池的方法了,對我們後面配置程式中自己的線程池就有了很大的幫助了,慢慢知道我一開始為啥要先聊線程池了嗎,放心不會騙你的【PS呸呸呸 這句話說的是不是有點渣)
然後我們再聊線程異步執行的方法如下:
這個三個步驟就是執行異步的核心我會一個一個說:
determineAsyncExecutor方法
kotlin複制代碼/**
* Determine the specific executor to use when executing the given method.
* Should preferably return an {@link AsyncListenableTaskExecutor} implementation.
* @return the executor to use (or {@code null}, but just if no default executor is available)
*/
@Nullable
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
// 如果緩存中能夠找到該方法對應的執行器,就立馬傳回了
AsyncTaskExecutor executor = this.executors.get(method);
if (executor == null) {
Executor targetExecutor;
// 抽象方法:AnnotationAsyncExecutionInterceptor有實作。
// 就是@Async注解的value值
String qualifier = getExecutorQualifier(method);
// 現在知道@Async直接的value值的作用了吧。就是制定執行此方法的執行器的(容器内執行器的Bean的名稱)
// 當然有可能為null。注意此處是支援@Qualified注解标注在類上來區分Bean的
// 注意:此處targetExecutor仍然可能為null
// 使用自定義線程池d額時候Async注解的value值最好加上線程池的名稱
if (StringUtils.hasLength(qualifier)) {
targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
}
else {
targetExecutor = this.defaultExecutor.get();
}
if (targetExecutor == null) {
return null;
}
executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
this.executors.put(method, executor);
}
return executor;
}
好了上面的代碼中提到AnnotationAsyncExecutionInterceptor這個類的getExecutorQualifier方法了,這個方法也是極其重要的點,是以我直接拉出來,如下:
typescript複制代碼/**
* Return the qualifier or bean name of the executor to be used when executing the
* given method, specified via {@link Async#value} at the method or declaring
* class level. If {@code @Async} is specified at both the method and class level, the
* method's {@code #value} takes precedence (even if empty string, indicating that
* the default executor should be used preferentially).
*/
@Override
@Nullable
protected String getExecutorQualifier(Method method) {
// Maintainer's note: changes made here should also be made in
// AnnotationAsyncExecutionAspect#getExecutorQualifier
// 可以見它就是去方法拿到@Async的value值。
// 這下知道配置這個注解的作用了吧。
Async async = AnnotatedElementUtils.findMergedAnnotation(method, Async.class);
if (async == null) {
async = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), Async.class);
}
return (async != null ? async.value() : null);
}
根據這個@Async的配置,會得到具體的Executor,也就是線程池如下:
好了那擷取到異步執行的線程池,那就開始執行具體的方法了,這個不聊了也就是我們寫的業務方法。執行完方法後幹嘛呢?當然就是第三步驟處理傳回值啊,如下:
kotlin複制代碼/**
* Delegate for actually executing the given task with the chosen executor.
* 用標明的執行者實際執行給定任務
*/
@Nullable
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
//根據不同的傳回值類型,來采用不同的方案去異步執行,但是執行器都是executor
if (CompletableFuture.class.isAssignableFrom(returnType)) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
}
catch (Throwable ex) {
throw new CompletionException(ex);
}
}, executor);
}
// // ListenableFuture接口繼承自Future 是Spring自己擴充的一個接口。
else if (ListenableFuture.class.isAssignableFrom(returnType)) {
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
}
// 普通的submit
else if (Future.class.isAssignableFrom(returnType)) {
return executor.submit(task);
}
else {
// 沒有傳回值的情況下 也用sumitt送出,按時傳回null
executor.submit(task);
return null;
}
一共四個分支,前面三個都是判斷是否是 Future 類型的。而我們的程式走到了最後的一個 else,含義就是如果傳回值不是 Future 類型的。直接把任務 submit 到線程池之後,就傳回了一個 null。這可不得爆出空指針異常嗎?但是源碼為什麼隻支援 void 和 Future 的傳回類型?
因為底層的線程池隻支援這兩種類型的傳回。隻是Spring的做法稍微有點坑,直接把其他的傳回類型的傳回值都處理為 null 了。
好了Spring處理異步的過程都說了,我們也看到Spring的異步處理器不是太好,需要我們自己配置預設的線程池,還有如果程式中有傳回結果一定要記得把傳回結果用Futrue封裝一下,要不然寫出來的程式可能出現空指針的情況【PS你已經是一個成熟的開發了,要記得自己避免空指針。嘿嘿又一句廣告詞】。
那我就把Spring異步處理程式優化一點,自定義自己的異步的線程池如下圖,不貼代碼了,這個很重要要不要複制粘了自己多敲點代碼吧:
結果:
PS :使用自定義線程池的時候@Async注解的value記得加上線程池的名稱,但是線程池不能濫用,但是一個項目裡面是可以有多個自定義線程池的。根據你的業務場景來劃分。比如舉個簡單的例子,業務主流程上可以用一個線程池,但是當主流程中的某個環節出問題了,假設需要發送預警短信。發送預警短信的這個操作,就可以用另外一個線程池來做。
線程池的那些參數具體配置多少,需要自己根據伺服器的配置,通路的使用者量等等其他的一些資訊來進行配置,我隻是讓大家了解線程池參數表達的意義,讓大家自己配置線程池參數更加友善。
作者:爛豬皮
連結:https://juejin.cn/post/7232137028497113149