Spring事务传播属性
事务传播行为类型 | 说明 |
PROPAGATION_REQUIRED | 如果当前没有事务,就新建一个事务,如果已经存在一个事务中,加入到这个事务中。Spring的默认事务传播类型 |
PROPAGATION_SUPPORTS | 支持当前事务,如果当前没有事务,就以非事务方式执行。 |
PROPAGATION_MANDATORY | 使用当前的事务,如果当前没有事务,就抛出异常。 |
PROPAGATION_REQUIRES_NEW | 新建事务,如果当前存在事务,把当前事务挂起(暂停)。 |
PROPAGATION_NOT_SUPPORTED | 以非事务方式执行操作,如果当前存在事务,就把当前事务挂起。 |
PROPAGATION_NEVER | 以非事务方式执行,如果当前存在事务,则抛出异常。 |
PROPAGATION_NESTED | 如果当前存在事务,则在嵌套事务内执行。如果当前没有事务,则执行与PROPAGATION_REQUIRED类似的操作。 |
Spring 事务是通过AOP实现的,如果是我们自己写一个AOP控制事务,该怎么做呢?
// 伪代码
public Object invokeWithinTransaction() {
// 开启事务
connection.beginTransaction();
try {
// 反射执行方法
Object result = invoke();
// 提交事务
connection.commit();
return result;
} catch(Exception e) {
// 发生异常时回滚
connection.rollback();
throw e;
}
}
源码入手
要阅读事务传播相关的源码,我们先来了解下Spring 事务管理的核心接口与类
- TransactionDefinition该接口定义了事务的所有属性(隔离级别,传播类型,超时时间等等),我们日常开发中经常使用的 @Transactional 其实最终会被转化为 TransactionDefinition
- TransactionStatus事务的状态,以最常用的实现 DefaultTransactionStatus 为例,该类存储了当前的事务对象,savepoint,当前挂起的事务,是否完成,是否仅回滚等等
- TransactionManager这是一个空接口,直接继承他的 interface 有 PlatformTransactionManager(我们平时用的就是这个,默认的实现类DataSourceTransactionManager)以及ReactiveTransactionManager(响应式事务管理器,由于不是本文重点,我们不多说)
从上述两个接口来看,TransactionManager 的主要作用
- 通过TransactionDefinition开启一个事务,返回TransactionStatus
- 通过TransactionStatus 提交、回滚事务(实际开启事务的Connection通常存储在TransactionStatus中)
public interface PlatformTransactionManager extends TransactionManager {
TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException;
void commit(TransactionStatus status) throws TransactionException;
void rollback(TransactionStatus status) throws TransactionException;
}
- **TransactionInterceptor事务拦截器,事务AOP的核心类(**支持响应式事务,编程式事务,以及我们常用的标准事务),由于篇幅原因,本文只讨论标准事务的相关实现
下面我们从事务逻辑的入口 TransactionInterceptor 入手,来看下Spring事务管理的核心逻辑以及事务传播的实现
TransactionInterceptor
TransactionInterceptor 实现了MethodInvocation(这是实现AOP的一种方式),
其核心逻辑在父类TransactionAspectSupport 中,方法位置:TransactionInterceptor::invokeWithinTransaction
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// If the transaction attribute is null, the method is non-transactional.
TransactionAttributeSource tas = getTransactionAttributeSource();
// 当前事务的属性 TransactionAttribute extends TransactionDefinition
final **TransactionAttribute** txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
// 事务属性中可以定义当前使用哪个事务管理器
// 如果没有定义就去Spring上下文找到一个可用的 TransactionManager
final TransactionManager tm = determineTransactionManager(txAttr);
// 省略了响应式事务的处理 ...
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
TransactionInfo txInfo = **createTransactionIfNecessary**(ptm, txAttr, joinpointIdentification);
Object retVal;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
// 如果有下一个拦截器则执行,最终会执行到目标方法,也就是我们的业务代码
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// target invocation exception
// 当捕获到异常时完成当前事务 (提交或者回滚)
**completeTransactionAfterThrowing**(txInfo, ex);
throw ex;
}
finally {
cleanupTransactionInfo(txInfo);
}
if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
TransactionStatus status = txInfo.getTransactionStatus();
if (status != null && txAttr != null) {
retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
}
// 根据事务的状态提交或者回滚
**commitTransactionAfterReturning**(txInfo);
return retVal;
}
// 省略了编程式事务的处理 ...
}
这里代码很多,根据注释的位置,我们可以把核心逻辑梳理出来
- 获取当前事务属性,事务管理器(以注解事务为例,这些都可以通过@Transactional来定义)
- createTransactionIfNecessary,判断是否有必要创建事务
- invocation.proceedWithInvocation 执行拦截器链,最终会执行到目标方法
- completeTransactionAfterThrowing当抛出异常后,完成这个事务,提交或者回滚,并抛出这个异常
- commitTransactionAfterReturning 从方法命名来看,这个方法会提交事务。但是深入源码中会发现,该方法中也包含回滚逻辑,具体行为会根据当前TransactionStatus的一些状态来决定(也就是说,我们也可以通过设置当前TransactionStatus,来控制事务回滚,并不一定只能通过抛出异常),详见AbstractPlatformTransact ionManager::commit
我们继续,来看看createTransactionIfNecessary做了什么
TransactionAspectSupport::createTransactionIfNecessary
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
// If no name specified, apply method identification as transaction name.
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
// 通过事务管理器开启事务
status = tm.getTransaction(txAttr);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}
return **prepareTransactionInfo**(tm, txAttr, joinpointIdentification, status);
}
createTransactionIfNecessary中的核心逻辑
- 通过PlatformTransactionManager(事务管理器)开启事务
- prepareTransactionInfo 准备事务信息,这个具体做了什么我们稍后再讲
继续来看PlatformTransactionManager::getTransaction,该方法只有一个实现 AbstractPlatformTransactionManager::getTransaction
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
// Use defaults if no transaction definition given.
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
// 获取当前事务,该方法有继承 AbstractPlatformTransactionManager 的子类自行实现
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
// 如果目前存在事务
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
return handleExistingTransaction(def, transaction, debugEnabled);
}
// Check definition settings for new transaction.
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
}
// 传播类型PROPAGATION_MANDATORY, 要求当前必须有事务
// No existing transaction found -> check propagation behavior to find out how to proceed.
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
// PROPAGATION_REQUIRED, PROPAGATION_REQUIRES_NEW, PROPAGATION_NESTED 不存在事务时创建事务
else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
}
try {
// 开启事务
return startTransaction(def, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
// 创建一个空事务:非真正的事务,而是可能的事务同步
if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + def);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
}
}
代码很多,重点关注注释部分即可
- doGetTransaction获取当前事务
- 如果存在事务,则调用handleExistingTransaction处理,这个我们稍后会讲到
接下来,会根据事务的传播决定是否开启事务
- 如果事务传播类型为PROPAGATION_MANDATORY,且不存在事务,则抛出异常
- 如果传播类型为 PROPAGATION_REQUIRED, PROPAGATION_REQUIRES_NEW, PROPAGATION_NESTED,且当前不存在事务,则调用startTransaction创建事务
- 当不满足 3、4时,例如 PROPAGATION_NOT_SUPPORTED,此时会执行事务同步,但是不会创建真正的事务
Spring 如何管理当前的事务
接下来讲讲上面提到的doGetTransaction、handleExistingTransaction,这两个方法是由不同的TransactionManager自行实现的
我们以SpringBoot默认的TransactionManager,DataSourceTransactionManager为例
@Override
protected Object doGetTransaction() {
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed());
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
@Override
protected boolean isExistingTransaction(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
}
结合 AbstractPlatformTransactionManager::getTransaction 一起来看,doGetTransaction 其实获取的是当前的Connection。判断当前是否存在事务,是判断DataSourceTransactionObject 对象中是否包含connection,以及connection是否开启了事务。
我们继续来看下TransactionSynchronizationManager.getResource(obtainDataSource())获取当前connection的逻辑
TransactionSynchronizationManager::getResource
private static final ThreadLocal<Map<Object, Object>> resources =
new NamedThreadLocal<>("Transactional resources");
@Nullable
// TransactionSynchronizationManager::getResource
public static Object getResource(Object key) {
// DataSourceTransactionManager 调用该方法时,以数据源作为key
// TransactionSynchronizationUtils::unwrapResourceIfNecessary 如果key为包装类,则获取被包装的对象
// 我们可以忽略该逻辑
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
Object value = doGetResource(actualKey);
if (value != null && logger.isTraceEnabled()) {
logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" +
Thread.currentThread().getName() + "]");
}
return value;
}
/**
* Actually check the value of the resource that is bound for the given key.
*/
@Nullable
private static Object doGetResource(Object actualKey) {
Map<Object, Object> map = resources.get();
if (map == null) {
return null;
}
Object value = map.get(actualKey);
// Transparently remove ResourceHolder that was marked as void...
if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
map.remove(actualKey);
// Remove entire ThreadLocal if empty...
if (map.isEmpty()) {
resources.remove();
}
value = null;
}
return value;
}
看到这里,我们能明白DataSourceTransactionManager是如何管理线程之间的Connection,ThreadLocal 中存储一个Map,key为数据源对象,value为该数据源在当前线程的Connection
DataSourceTransactionManager 在开启事务后,会调用TransactionSynchronizationManager::bindResource将指定数据源的Connection绑定到当前线程
AbstractPlatformTransactionManager::handleExistingTransaction
我们继续回头看,如果存在事务的情况,如何处理
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
// 如果事务的传播要求以非事务方式执行 抛出异常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
// PROPAGATION_NOT_SUPPORTED 如果存在事务,则挂起当前事务,以非事务方式执行
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
// 挂起当前事务
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
// 构建一个无事务的TransactionStatus
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
// PROPAGATION_REQUIRES_NEW 如果存在事务,则挂起当前事务,新建一个事务
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
return startTransaction(definition, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
// PROPAGATION_NESTED 内嵌事务,就是我们开头举得例子
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
// 非JTA事务管理器都是通过savePoint实现的内嵌事务
// savePoint:关系型数据库中事务可以创建还原点,并且可以回滚到还原点
if (useSavepointForNestedTransaction()) {
// Create savepoint within existing Spring-managed transaction,
// through the SavepointManager API implemented by TransactionStatus.
// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
// 创建还原点
status.createAndHoldSavepoint();
return status;
}
else {
// Nested transaction through nested begin and commit/rollback calls.
// Usually only for JTA: Spring synchronization might get activated here
// in case of a pre-existing JTA transaction.
return startTransaction(definition, transaction, debugEnabled, null);
}
}
// 如果执行到这一步传播类型一定是,PROPAGATION_SUPPORTS 或者 PROPAGATION_REQUIRED
// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
if (debugEnabled) {
logger.debug("Participating in existing transaction");
}
// 校验目前方法中的事务定义和已存在的事务定义是否一致
if (isValidateExistingTransaction()) {
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] specifies isolation level which is incompatible with existing transaction: " +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 构建一个TransactionStatus,但不开启事务
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
这里代码很多,逻辑看上述注释即可。这里终于看到了期待已久的挂起事务和内嵌事务了,我们还是看一下DataSourceTransactionManager的实现
- 挂起事务:通过TransactionSynchronizationManager::unbindResource 根据数据源获取当前的Connection,并在resource中移除该Connection。之后会将该Connection存储到TransactionStatus对象中
// DataSourceTransactionManager::doSuspend
@Override
protected Object doSuspend(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
txObject.setConnectionHolder(null);
return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}
在事务提交或者回滚后,调用 AbstractPlatformTransactionManager::cleanupAfterCompletion会将TransactionStatus 中缓存的Connection重新绑定到resource中
- 内嵌事务:通过关系型数据库的savePoint实现,提交或回滚的时候会判断如果当前事务为savePoint则释放savePoint或者回滚到savePoint,具体逻辑参考AbstractPlatformTransactionManager::processRollback 和 AbstractPlatformTransactionManager::processCommit
至此,事务的传播源码分析结束
prepareTransactionInfo
上文留下了一个问题,prepareTransactionInfo 方法做了什么,我们先来看下TransactionInfo的结构
protected static final class TransactionInfo {
@Nullable
private final PlatformTransactionManager transactionManager;
@Nullable
private final TransactionAttribute transactionAttribute;
private final String joinpointIdentification;
@Nullable
private TransactionStatus transactionStatus;
@Nullable
private TransactionInfo oldTransactionInfo;
// ...
}
该类在Spring中的作用,是为了内部传递对象。ThreadLocal中存储了最新的TransactionInfo,通过当前TransactionInfo可以找到他的oldTransactionInfo。每次创建事务时会新建一个TransactionInfo(无论有没有真正的事务被创建)存储到ThreadLocal中,在每次事务结束后,会将当前ThreadLocal中的TransactionInfo重置为oldTransactionInfo,这样的结构形成了一个链表,使得Spring事务在逻辑上可以无限嵌套下去
源码跟踪流程
@EnableTransactionManagement
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(TransactionManagementConfigurationSelector.class)
public @interface EnableTransactionManagement {
// ......
}
TransactionManagementConfigurationSelector
public class TransactionManagementConfigurationSelector extends AdviceModeImportSelector<EnableTransactionManagement> {
/**
* Returns {@link ProxyTransactionManagementConfiguration} or
* {@code AspectJ(Jta)TransactionManagementConfiguration} for {@code PROXY}
* and {@code ASPECTJ} values of {@link EnableTransactionManagement#mode()},
* respectively.
*/
@Override
protected String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
return new String[] {AutoProxyRegistrar.class.getName(),
ProxyTransactionManagementConfiguration.class.getName()};
case ASPECTJ:
return new String[] {determineTransactionAspectClass()};
default:
return null;
}
}
}
以JDK动态代理为例:
AutoProxyRegistrar:向BeanRegistry注册JDK动态代理所需的BeanDefinition
ProxyTransactionManagementConfiguration
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {
@Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor(
TransactionAttributeSource transactionAttributeSource, TransactionInterceptor transactionInterceptor) {
BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
advisor.setTransactionAttributeSource(transactionAttributeSource);
advisor.setAdvice(transactionInterceptor);
if (this.enableTx != null) {
advisor.setOrder(this.enableTx.<Integer>getNumber("order"));
}
return advisor;
}
@Bean
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public TransactionAttributeSource transactionAttributeSource() {
return new AnnotationTransactionAttributeSource();
}
@Bean
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public TransactionInterceptor transactionInterceptor(TransactionAttributeSource transactionAttributeSource) {
TransactionInterceptor interceptor = new TransactionInterceptor();
interceptor.setTransactionAttributeSource(transactionAttributeSource);
if (this.txManager != null) {
interceptor.setTransactionManager(this.txManager);
}
return interceptor;
}
}
TransactionInterceptor
public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {
/**
* Create a new TransactionInterceptor.
* @param ptm the default transaction manager to perform the actual transaction management
* @param tas the attribute source to be used to find transaction attributes
* @since 5.2.5
* @see #setTransactionManager
* @see #setTransactionAttributeSource
*/
public TransactionInterceptor(TransactionManager ptm, TransactionAttributeSource tas) {
setTransactionManager(ptm);
setTransactionAttributeSource(tas);
}
@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
// Work out the target class: may be {@code null}.
// The TransactionAttributeSource should be passed the target class
// as well as the method, which may be from an interface.
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// Adapt to TransactionAspectSupport's invokeWithinTransaction...
// 调用父类方法
return **invokeWithinTransaction**(invocation.getMethod(), targetClass, invocation::proceed);
}
}
TransactionAspectSupport:
public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
@Nullable
private TransactionManager transactionManager;
@Nullable
private TransactionAttributeSource transactionAttributeSource;
@Nullable
private BeanFactory beanFactory;
@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// If the transaction attribute is null, the method is non-transactional.
TransactionAttributeSource tas = getTransactionAttributeSource();
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
final TransactionManager tm = determineTransactionManager(txAttr);
// 反应式编程事务处理
if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {
throw new TransactionUsageException(
"Unsupported annotated transaction on suspending function detected: " + method +
". Use TransactionalOperator.transactional extensions instead.");
}
ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());
if (adapter == null) {
throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +
method.getReturnType());
}
return new ReactiveTransactionSupport(adapter);
});
return txSupport.invokeWithinTransaction(
method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);
}
// 命令式编程事务处理
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
Object retVal;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// target invocation exception
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
cleanupTransactionInfo(txInfo);
}
if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
TransactionStatus status = txInfo.getTransactionStatus();
if (status != null && txAttr != null) {
retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
}
commitTransactionAfterReturning(txInfo);
return retVal;
}
else {
Object result;
final ThrowableHolder throwableHolder = new ThrowableHolder();
// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
try {
result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> {
TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status);
try {
Object retVal = invocation.proceedWithInvocation();
if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
return retVal;
}
catch (Throwable ex) {
if (txAttr.rollbackOn(ex)) {
// A RuntimeException: will lead to a rollback.
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
}
else {
throw new ThrowableHolderException(ex);
}
}
else {
// A normal return value: will lead to a commit.
throwableHolder.throwable = ex;
return null;
}
}
finally {
cleanupTransactionInfo(txInfo);
}
});
}
catch (ThrowableHolderException ex) {
throw ex.getCause();
}
catch (TransactionSystemException ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
ex2.initApplicationException(throwableHolder.throwable);
}
throw ex2;
}
catch (Throwable ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
}
throw ex2;
}
// Check result state: It might indicate a Throwable to rethrow.
if (throwableHolder.throwable != null) {
throw throwableHolder.throwable;
}
return result;
}
}
}