前言
我們開發的時候常常會遇到多線程事務的問題。以為添加了@Transactional注解就行了,其實你加了注解之後會發現事務失效。
原因:資料庫連接配接spring是放在threadLocal裡面,多線程場景下,拿到的資料庫連接配接是不一樣的,即是屬于不同僚務。
本文是基于springboot的@Async注解開啟多線程,,并通過自定義注解和AOP實作的多線程事務,避免繁瑣的手動送出/復原事務 (CV即用、參數齊全、無需配置)
一、springboot多線程(聲明式)的使用方法?
1、springboot提供了注解@Async來使用線程池,具體使用方法如下:
(1) 在啟動類(配置類)添加@EnableAsync來開啟線程池
(2) 在需要開啟子線程的方法上添加注解@Async
注意: 架構預設 -----> 來一個請求開啟一個線程,在高并發下容易記憶體溢出
是以使用時需要配置自定義線程池,如下:
@Configuration
@EnableAsync
public class ThreadPoolTaskConfig {
@Bean("threadPoolTaskExecutor")//自定義線程池名稱
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//線程池建立的核心線程數,線程池維護線程的最少數量,即使沒有任務需要執行,也會一直存活
executor.setCorePoolSize(16);
//如果設定allowCoreThreadTimeout=true(預設false)時,核心線程會逾時關閉
//executor.setAllowCoreThreadTimeOut(true);
//阻塞隊列 當核心線程數達到最大時,新任務會放在隊列中排隊等待執行
executor.setQueueCapacity(124);
//最大線程池數量,當線程數>=corePoolSize,且任務隊列已滿時。線程池會建立新線程來處理任務
//任務隊列已滿時, 且當線程數=maxPoolSize,,線程池會拒絕處理任務而抛出異常
executor.setMaxPoolSize(64);
//當線程空閑時間達到keepAliveTime時,線程會退出,直到線程數量=corePoolSize
//允許線程空閑時間30秒,當maxPoolSize的線程在空閑時間到達的時候銷毀
//如果allowCoreThreadTimeout=true,則會直到線程數量=0
executor.setKeepAliveSeconds(30);
//spring 提供的 ThreadPoolTaskExecutor 線程池,是有setThreadNamePrefix() 方法的。
//jdk 提供的ThreadPoolExecutor 線程池是沒有 setThreadNamePrefix() 方法的
executor.setThreadNamePrefix("自定義線程池-");
// rejection-policy:拒絕政策:當線程數已經達到maxSize的時候,如何處理新任務
// CallerRunsPolicy():交由調用方線程運作,比如 main 線程;如果添加到線程池失敗,那麼主線程會自己去執行該任務,不會等待線程池中的線程去執行, (個人推薦)
// AbortPolicy():該政策是線程池的預設政策,如果線程池隊列滿了丢掉這個任務并且抛出RejectedExecutionException異常。
// DiscardPolicy():如果線程池隊列滿了,會直接丢掉這個任務并且不會有任何異常
// DiscardOldestPolicy():丢棄隊列中最老的任務,隊列滿了,會将最早進入隊列的任務删掉騰出空間,再嘗試加入隊列
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//設定線程池關閉的時候等待所有任務都完成再繼續銷毀其他的Bean,這樣這些異步任務的銷毀就會先于Redis線程池的銷毀
executor.setWaitForTasksToCompleteOnShutdown(true);
//設定線程池中任務的等待時間,如果超過這個時候還沒有銷毀就強制銷毀,以確定應用最後能夠被關閉,而不是阻塞住。
executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}
}
開啟子線程方法: 在需要開啟線程的方法上添加 注解@Async("threadPoolTaskExecutor")即可,其中注解中的參數為自定義線程池的名稱。
二、自定義注解實作多線程事務控制
1.自定義注解
本文是使用了兩個注解共同作用實作的,主線程當做協調者,各子線程作為參與者
package com.example.anno;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* 多線程事務注解: 主事務
*
* @author zlj
* @since 2022/11/3
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface MainTransaction {
int value();//子線程數量
}
package com.example.anno;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* 多線程事務注解: 子事務
*
* @author zlj
* @since 2022/11/3
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface SonTransaction {
String value() default "";
}
解釋:
兩個注解都是用在方法上的,須配合@Transactional(rollbackFor = Exception.class)一起使用
@MainTransaction注解 用在調用方,其參數為必填,參數值為本方法中調用的方法開啟的線程數,如:在這個方法中調用的方法中有2個方法用@Async注解開啟了子線程,則參數為@MainTransaction(2),另外如果未使用@MainTransaction注解,則直接已無多線程事務執行(不影響方法的單線程事務)
@SonTransaction注解 用在被調用方(開啟線程的方法),無需傳入參數
2.AOP内容
代碼如下:
package com.example.aop;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.example.anno.MainTransaction;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 多線程事務
*
* @author zlj
* @since 2022/11/3
*/
@Aspect
@Component
public class TransactionAop {
//用來存儲各線程計數器資料(每次執行後會從map中删除)
private static final Map<String, Object> map = new HashMap<>();
@Resource
private PlatformTransactionManager transactionManager;
@Around("@annotation(mainTransaction)")
public void mainIntercept(ProceedingJoinPoint joinPoint, MainTransaction mainTransaction) throws Throwable {
//目前線程名稱
Thread thread = Thread.currentThread();
String threadName = thread.getName();
//初始化計數器
CountDownLatch mainDownLatch = new CountDownLatch(1);
CountDownLatch sonDownLatch = new CountDownLatch(mainTransaction.value());//@MainTransaction注解中的參數, 為子線程的數量
// 用來記錄子線程的運作狀态,隻要有一個失敗就變為true
AtomicBoolean rollBackFlag = new AtomicBoolean(false);
// 用來存每個子線程的異常,把每個線程的自定義異常向vector的首位置插入,其餘異常向末位置插入,避免線程不安全,是以使用vector代替list
Vector<Throwable> exceptionVector = new Vector<>();
map.put(threadName + "mainDownLatch", mainDownLatch);
map.put(threadName + "sonDownLatch", sonDownLatch);
map.put(threadName + "rollBackFlag", rollBackFlag);
map.put(threadName + "exceptionVector", exceptionVector);
try {
joinPoint.proceed();//執行方法
} catch (Throwable e) {
exceptionVector.add(0, e);
rollBackFlag.set(true);//子線程復原
mainDownLatch.countDown();//放行所有子線程
}
if (!rollBackFlag.get()) {
try {
// sonDownLatch等待,直到所有子線程執行完插入操作,但此時還沒有送出事務
sonDownLatch.await();
mainDownLatch.countDown();// 根據rollBackFlag狀态放行子線程的await處,告知是復原還是送出
} catch (Exception e) {
rollBackFlag.set(true);
exceptionVector.add(0, e);
}
}
if (CollectionUtils.isNotEmpty(exceptionVector)) {
map.remove(threadName + "mainDownLatch");
map.remove(threadName + "sonDownLatch");
map.remove(threadName + "rollBackFlag");
map.remove(threadName + "exceptionVector");
throw exceptionVector.get(0);
}
}
@Around("@annotation(com.huigu.common.anno.SonTransaction)")
public void sonIntercept(ProceedingJoinPoint joinPoint) throws Throwable {
Object[] args = joinPoint.getArgs();
Thread thread = (Thread) args[args.length - 1];
String threadName = thread.getName();
CountDownLatch mainDownLatch = (CountDownLatch) map.get(threadName + "mainDownLatch");
if (mainDownLatch == null) {
//主事務未加注解時, 直接執行子事務
joinPoint.proceed();//這裡最好的方式是:交由上面的thread來調用此方法,但我沒有找尋到對應api,隻能直接放棄事務, 歡迎大神來優化, 留言分享
return;
}
CountDownLatch sonDownLatch = (CountDownLatch) map.get(threadName + "sonDownLatch");
AtomicBoolean rollBackFlag = (AtomicBoolean) map.get(threadName + "rollBackFlag");
Vector<Throwable> exceptionVector = (Vector<Throwable>) map.get(threadName + "exceptionVector");
//如果這時有一個子線程已經出錯,那目前線程不需要執行
if (rollBackFlag.get()) {
sonDownLatch.countDown();
return;
}
DefaultTransactionDefinition def = new DefaultTransactionDefinition();// 開啟事務
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);// 設定事務隔離級别
TransactionStatus status = transactionManager.getTransaction(def);
try {
joinPoint.proceed();//執行方法
sonDownLatch.countDown();// 對sonDownLatch-1
mainDownLatch.await();// 如果mainDownLatch不是0,線程會在此阻塞,直到mainDownLatch變為0
// 如果能執行到這一步說明所有子線程都已經執行完畢判斷如果atomicBoolean是true就復原false就送出
if (rollBackFlag.get()) {
transactionManager.rollback(status);
} else {
transactionManager.commit(status);
}
} catch (Throwable e) {
exceptionVector.add(0, e);
// 復原
transactionManager.rollback(status);
// 并把狀态設定為true
rollBackFlag.set(true);
mainDownLatch.countDown();
sonDownLatch.countDown();
}
}
}
擴充說明: CountDownLatch是什麼?
一個同步輔助類
- 建立對象時: 用給定的數字初始化 CountDownLatch
- countDown() 方法: 使計數減1
- await() 方法: 阻塞目前線程, 直至目前計數到達零。
本文中:
用 計數 1 初始化的 mainDownLatch 當作一個簡單的開/關鎖存器,或入口:在通過調用 countDown() 的線程打開入口前,所有調用 await 的線程都一直在入口處等待。
用 子線程數量 初始化的 sonDownLatch 可以使一個線程在 N 個線程完成某項操作之前一直等待,或者使其在某項操作完成 N 次之前一直等待。
3、注解使用Demo
任務方法:
package com.example.demo.service;
import com.example.demo.anno.SonTransaction;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/**
* @author zlj
* @since 2022/11/14
*/
@Service
public class SonService {
/**
* 參數說明: 以下4個方法參數和此相同
*
* @param args 業務中需要傳遞的參數
* @param thread 調用者的線程, 用于aop擷取參數, 不建議以方法重寫的方式簡略此參數,
* 在調用者方法中可以以此參數為辨別計算子線程的個數作為注解參數,避免線程參數計算錯誤導緻鎖表
* 傳參時參數固定為: Thread.currentThread()
*/
@Transactional(rollbackFor = Exception.class)
@Async("threadPoolTaskExecutor")
@SonTransaction
public void sonMethod1(String args, Thread thread) {
System.out.println(args + "開啟了線程");
}
@Transactional(rollbackFor = Exception.class)
@Async("threadPoolTaskExecutor")
@SonTransaction
public void sonMethod2(String args1, String args2, Thread thread) {
System.out.println(args1 + "和" + args2 + "開啟了線程");
}
@Transactional(rollbackFor = Exception.class)
@Async("threadPoolTaskExecutor")
@SonTransaction
public void sonMethod3(String args, Thread thread) {
System.out.println(args + "開啟了線程");
}
//sonMethod4方法沒有使用線程池
@Transactional(rollbackFor = Exception.class)
public void sonMethod4(String args) {
System.out.println(args + "沒有開啟線程");
}
}
調用方:
package com.example.demo.service;
import com.example.demo.anno.MainTransaction;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
/**
* @author zlj
* @since 2022/11/14
*/
@Service
public class MainService {
@Resource
private SonService sonService;
@MainTransaction(3)//調用的方法中sonMethod1/sonMethod2/sonMethod3使用@Async開啟了線程, 是以參數為: 3
@Transactional(rollbackFor = Exception.class)
public void test1() {
sonService.sonMethod1("路飛", Thread.currentThread());
sonService.sonMethod2("索隆", "山治", Thread.currentThread());
sonService.sonMethod3("娜美", Thread.currentThread());
sonService.sonMethod4("羅賓");
}
/*
* 有的業務中存在if的多種可能, 每一種走向調用的方法(開啟線程的方法)數量如果不同, 這時可以選擇放棄使用@MainTransaction注解避免鎖表
* 這時候如果發生異常會導緻多線程不能同時復原, 可根據業務自己權衡是否使用
*/
@Transactional(rollbackFor = Exception.class)
public void test2() {
sonService.sonMethod1("路飛", Thread.currentThread());
sonService.sonMethod2("索隆", "山治", Thread.currentThread());
sonService.sonMethod3("娜美", Thread.currentThread());
sonService.sonMethod4("羅賓");
}
}