事务的执行
当代理对象的方法被调用时,最终会调用到TransactionInterceptor的invoke()方法上面。对于为什么会调用到invoke()方法的小伙伴,需要取了解一下动态代理的原理
@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
// Work out the target class: may be {@code null}.
// The TransactionAttributeSource should be passed the target class
// as well as the method, which may be from an interface.
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// Adapt to TransactionAspectSupport's invokeWithinTransaction...
// 意思是最终会适配到TransactionAspectSupport.invokeWithinTransaction方法
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
TransactionAspectSupport.invokeWithinTransaction()
@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// 获取AnnotationTransactionAttributeSource,这个是从外面设置进TransactionInterceptor的。
// If the transaction attribute is null, the method is non-transactional.
TransactionAttributeSource tas = getTransactionAttributeSource();
// 获取Transactional注解参数
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
// 从容器中获取TransactionManager实例
final TransactionManager tm = determineTransactionManager(txAttr);
if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {
throw new TransactionUsageException(
"Unsupported annotated transaction on suspending function detected: " + method +
". Use TransactionalOperator.transactional extensions instead.");
}
ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());
if (adapter == null) {
throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +
method.getReturnType());
}
return new ReactiveTransactionSupport(adapter);
});
return txSupport.invokeWithinTransaction(
method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);
}
// TransactionManager转换成PlatformTransactionManager类型
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
// 获取方法身份,类似com.github.AccountServiceImpl.save。
// 因为txAttr是RuleBasedTransactionAttribute类型,所以最终返回ClassUtils.getQualifiedMethodName()返回【类名+"."+方法名】
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
// 如果是声明式事务
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
// 获取事务,这里面就要考虑spring的事务传播机制
// *****这是重点*****这是重点*****这是重点*****这是重点*****这是重点*****这是重点*****
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
Object retVal;
try {
// 执行目标方法
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// target invocation exception
// 遇到异常的情况如何处理事务,要结合事务的传播机制来看。
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
// 清除当前事务的相关信息。把当前线程中TransactionInfo设置成oldTransactionInfo
cleanupTransactionInfo(txInfo);
}
// 假如返回值是Try<Integer> result = Try.of(() -> 1 / 0)这样的类型,那么就要检测它是否会抛异常
if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
TransactionStatus status = txInfo.getTransactionStatus();
if (status != null && txAttr != null) {
// 真正地执行后获取结果。异常后设置status里面的属性值rollbackOnly=true
retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
}
// 最后就是准备提交事务。
commitTransactionAfterReturning(txInfo);
return retVal;
}
// 下面是编程式事务
else {
Object result;
final ThrowableHolder throwableHolder = new ThrowableHolder();
// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
try {
result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> {
TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status);
try {
Object retVal = invocation.proceedWithInvocation();
if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
return retVal;
}
catch (Throwable ex) {
if (txAttr.rollbackOn(ex)) {
// A RuntimeException: will lead to a rollback.
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
}
else {
throw new ThrowableHolderException(ex);
}
}
else {
// A normal return value: will lead to a commit.
throwableHolder.throwable = ex;
return null;
}
}
finally {
cleanupTransactionInfo(txInfo);
}
});
}
catch (ThrowableHolderException ex) {
throw ex.getCause();
}
catch (TransactionSystemException ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
ex2.initApplicationException(throwableHolder.throwable);
}
throw ex2;
}
catch (Throwable ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
}
throw ex2;
}
// Check result state: It might indicate a Throwable to rethrow.
if (throwableHolder.throwable != null) {
throw throwableHolder.throwable;
}
return result;
}
}
总结下来,重点就下面几个方法
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// 调用目标方法前,根据情况确定要不要创建新的事务
TransactionInfo txInfo = createTransactionIfNecessary();
Object retVal;
try{
// 调用目标方法
retVal = invocation.proceedWithInvocation();
}catch(Throwable ex){
// 根据调用目标方法的实际情况确定是回滚还是提交,还是只设置回滚标记。里面最后一步还包括回收链接,恢复被挂起的事务等等
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}finally{
// 清除线程中当前事务信息
cleanupTransactionInfo(txInfo);
}
// 根据情况判断是提交还是回滚。里面最后一步还包括回收链接,恢复被挂起的事务等等
commitTransactionAfterReturning(txInfo);
return retVal;
}
createTransactionIfNecessary
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
// 下面就是包装了一下
// If no name specified, apply method identification as transaction name.
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}
// *****这是重点,获取事务状态******
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
status = tm.getTransaction(txAttr);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}
// 这里就是再包装一层返回TransactionInfo对象
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
// Use defaults if no transaction definition given.
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
// 如果第一次调用的话,返回DataSourceTransactionObject实例,connectionHolder=null;
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
// 如果有connectionHolder,并且事务是激活状态。说明这不是第一次进代理方法,那么就需要结合当前的传播机制来处理当前的事务
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
// *******************这是重点,需要结合事务传播机制来看*******************
return handleExistingTransaction(def, transaction, debugEnabled);
}
// 检测超时时间是否合法
// Check definition settings for new transaction.
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
}
// 下面都是第一次调用。说明之前不存在任何事务。
// No existing transaction found -> check propagation behavior to find out how to proceed.
// 当传播机制是PROPAGATION_MANDATORY,表示当前必须存在一个事务,否则抛出异常
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
// 当第一次调用时,如果传播机制是以下三种的话,那么都需要新建事务
else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
//PROPAGATION_NESTED是嵌套事务,这个事务能否成功提交取决于外部事务是否成功
// 如果内部事务成功,外部事务失败,那么一起回滚
// 如果内部事务成功,外部事务成功,那么一起提交
// 如果内部事务失败,外部事务失败,那么一起回滚
// 如果内部事务失败,外部事务成功,那么内部事务回滚,外部事务提交
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// 挂起上一个事务,目前不存在上一个事务,那么直接传null
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
}
try {
// 开启新事务
return startTransaction(def, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
// 剩下的传播机制,都创建空事务
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + def);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
// 创建一个默认的TransactionStatus,里面的事务为空,同时更新当前线程的信息
return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
}
}
// 开启新事务
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 创建一个新事务, transaction里面的connectionHolder=null
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 开启新事务
doBegin(transaction, definition);
// 设置当前事务的事务同步管理器。这个事务同步管理器其实就是把针对当前事务的配置参数和当前线程绑在一起,通过TransactionSynchronizationManager管理
prepareSynchronization(status, definition);
return status;
}
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
// 因为是新的事务,所以需要创建新的链接Connection
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
// 获取新的Connection
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
// 把新的Connection设置进transaction中
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
// 设置readOnly和isolationLevel
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
txObject.setReadOnly(definition.isReadOnly());
// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
// so we don't want to do it unnecessarily (for example if we've explicitly
// configured the connection pool to set it already).
// 如果是自动提交,切换到手动提交
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}
// 如果是readOnly,通过Statement执行SET TRANSACTION READ ONLY
prepareTransactionalConnection(con, definition);
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// Bind the connection holder to the thread.
if (txObject.isNewConnectionHolder()) {
// 把新创建出来的ConnectionHolder绑定到当前线程中
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}
catch (Throwable ex) {
// 出异常了就释放链接
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, obtainDataSource());
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
前面已经创建过事务的情况:
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
// 那么遇到不允许有事务的传播机制,就直接抛异常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
// PROPAGATION_NOT_SUPPORTED就是以非事务的方式执行,那么我们就需要先把当前的事务挂起来
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
// 挂起当前事务
// 1.把当前线程的connectionHolder解绑并返回connectionHolder
// 2.取出当前事务同步器中的所有信息,并重置线程同步器信息
// 3.把connectionHolder和相关信息封装成一个SuspendedResourcesHolder对象返回
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
// 创建一个DefaultTransactionStatus返回
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
// 如果是PROPAGATION_REQUIRES_NEW,那么必须要创建一个新的事务
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}
// 挂起当前事务
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
// 开启新事务
return startTransaction(definition, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
// 如果是PROPAGATION_NESTED,那么就是嵌套事务。就要使用回滚点
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
if (useSavepointForNestedTransaction()) {
// Create savepoint within existing Spring-managed transaction,
// through the SavepointManager API implemented by TransactionStatus.
// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
// 同样的还是创建一个DefaultTransactionStatus对象
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
// 创建回滚点后返回
status.createAndHoldSavepoint();
return status;
}
else {
// Nested transaction through nested begin and commit/rollback calls.
// Usually only for JTA: Spring synchronization might get activated here
// in case of a pre-existing JTA transaction.
// 针对JTA事务,创建一个新的事务来处理
return startTransaction(definition, transaction, debugEnabled, null);
}
}
// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
if (debugEnabled) {
logger.debug("Participating in existing transaction");
}
if (isValidateExistingTransaction()) {
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] specifies isolation level which is incompatible with existing transaction: " +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
// 剩下的传播机制,还是当前事务
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
总结一下:
1.createTransactionIfNecessary():当代理对象方法被调用时,获取TransactionAttribute、TransactionManager以及目标方法名joinpointIdentification,利用这三个参数来创建事务TransactionInfo
- 1.1.tm.getTransaction():获取事务
- 1.1.1:doGetTransaction():创建新的DataSourceTransactionObject,并从当前线程中查找connectionHolder,不管有没有找到,都设置进DataSourceTransactionObject对象中。
- 1.1.2:isExistingTransaction():判断是否已经存在事务,其实就是判断DataSourceTransactionObject中是否有connectionHolder,并且事务是否是激活的。
- 1.1.3:handleExistingTransaction():如果已经存在事务。-----------这是逻辑分支------------
- 1.1.3.1:如果存在事务,当传播机制是PROPAGATION_NEVER,那么直接抛异常。
- 1.1.3.2:如果存在事务,当传播机制是PROPAGATION_NOT_SUPPORTED,那么直接挂起当前事务。创建DefaultTransactionStatus对象,并且更新当前线程事务信息。
- 1.1.3.3:如果存在事务,当传播机制是PROPAGATION_REQUIRES_NEW,那么先挂起当前事务,并且创建新的连接设置进新事务中,并通过新创建的transaction封装一个DefaultTransactionStatus对象,同时更新当前线程中事务信息。
- 1.1.3.4:如果存在事务,当传播机制是PROPAGATION_NESTED,那么直接封装一个DefaultTransactionStatus对象,更新当前线程事务信息,并设置保存点后返回。
- 1.1.3.5:如果存在事务,当传播机制是PROPAGATION_SUPPORTS、PROPAGATION_MANDATORY、PROPAGATION_REQUIRED,那么直接创建一个DefaultTransactionStatus对象,更新当前线程的事务信息后返回。
- 1.1.4:如果不存在事务,传播机制是PROPAGATION_MANDATORY,就抛异常。该传播机制的要求是当前一定要存在事务才能继续执行,否则就抛异常。-----------这是逻辑分支------------
- 1.1.5:如果不存在事务,传播机制是PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED,那么意味着就需要自己创建事务。-----------这是逻辑分支------------
- 1.1.5.1:suspend(null):挂起空事务。就是把当前线程中的事务相关信息都取出来封装到一个SuspendedResourcesHolder对象中并返回。并且重置当前线程中事务相关信息,通过TransactionSynchronizationManager来设置。
- 1.1.5.2:startTransaction():开启新事务
- 1.1.5.2.1:doBegin():创建新的ConnectionHolder,并设置进transaction中,开启手动提交,最后绑定到当前线程中
- 1.1.5.2.2:prepareSynchronization():创建DefaultTransactionStatus对象,更新当前线程中的信息。因为此时已经创建了新的事务了。
- 1.1.6:如果不存在事务,传播机制是PROPAGATION_SUPPORTS、PROPAGATION_NOT_SUPPORTED、PROPAGATION_NEVER,那么就直接创建一个DefaultTransactionStatus对象返回。同时更新一下当前线程中的信息。-----------这是逻辑分支------------
- 1.2.prepareTransactionInfo():把TransactionAttribute、TransactionManager以及目标方法名joinpointIdentification打包成TransactionInfo对象,并且把TransactionStatus设置进去,最后把oldTransactionInfo解绑并取出来保存,把最新的TransactionInfo和当前线程绑定
protected static final class TransactionInfo {
private void bindToThread() {
// 从当前线程中取出来,然后把新的TransactionInfo和当前线程绑定
this.oldTransactionInfo = transactionInfoHolder.get();
transactionInfoHolder.set(this);
}
}
其实总结一下上面的逻辑,createTransactionIfNecessary()最终创建了一个TransactionInfo对象,但是整个过程重点在于通过getTransaction()方法获取TransactionStatus对象,创建这个对象重点取决于创建Transaction,然后这个对象里面其实是包含了最关键的connectionHolder。
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
"] after exception: " + ex);
}
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
try {
// 如果这个抛出的异常正好匹配上Transaction注解上指定的异常,那么执行回滚操作
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by rollback exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by rollback exception", ex);
throw ex2;
}
}
else {
// We don't roll back on this exception.
// Will still roll back if TransactionStatus.isRollbackOnly() is true.
try {
// 如果异常匹配不上,那么还是依然执行提交。如果TransactionStatus里面的状态被标记为需要回滚的话,那么还是依然回滚,否则就提交
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by commit exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by commit exception", ex);
throw ex2;
}
}
}
}
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
boolean unexpectedRollback = unexpected;
try {
triggerBeforeCompletion(status);
// 检查是否有保存点,如果有保存点,就重置到保存点
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
// 重置到保存点
status.rollbackToHeldSavepoint();
}
// 如果是新的事务,那么就直接回滚
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
// 其实是拿出里面的ConnectionHolder来调用rollback
doRollback(status);
}
else {
// Participating in larger transaction
// 如果不是新的事务,那么为了避免多次回滚,只需要将当前的rollback标记设置成true,这样的话,当上层代理catch这个抛出去的异常时,会检查rollback标记,最后才会回滚。
if (status.hasTransaction()) {
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
}
doSetRollbackOnly(status);
}
else {
if (status.isDebug()) {
logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
}
}
}
else {
logger.debug("Should roll back transaction but cannot - no transaction available");
}
// Unexpected rollback only matters here if we're asked to fail early
if (!isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = false;
}
}
}
catch (RuntimeException | Error ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
// Raise UnexpectedRollbackException if we had a global rollback-only marker
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction rolled back because it has been marked as rollback-only");
}
}
finally {
// 解绑connectionHolder,重置connectionHolder,并且放回链接池。把挂起的事务恢复。
cleanupAfterCompletion(status);
}
}
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
// 其实在这里面还是会判断TransactionStatus里面是否有回滚状态,如果有的话,依然会回滚,否则就提交
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
- 如果捕捉到异常,那么直接进入到completeTransactionAfterThrowing()方法中。从completeTransactionAfterThrowing()方法出来,把异常抛出去之前,还要执行finally中的cleanupTransactionInfo()方法
- completeTransactionAfterThrowing()方法:
- 1.判断当前异常是否匹配注解设置的异常,是的话,进入
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
- 1.1:判断是否有保存点,关联到传播机制为PROPAGATION_NESTED,如果有保存点,那么直接回滚到保存点
- 1.2:判断是否是新事务,关联到传播机制为PROPAGATION_REQUIRES_NEW,那么直接执行回滚操作
- 1.3:判断不是新事务,那么直接设置rollback标记,避免多次回滚。因为异常会一直往外抛,这个时候只需要设置回滚标记即可。
- 1.4:最后都会cleanupAfterCompletion():修改当前线程中的事务信息,恢复挂起的事务。解绑并重置connectionHolder,放回链接池。
- 2.如果没有匹配到这个异常的话,那么进入
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
- 2.1:判断事务是否被标记为回滚,那么说明内层事务之前已经设置了回滚标记,此时当前事务是新事务的话,就直接回滚。修改当前线程中的事务信息,恢复挂起的事务。解绑并重置connectionHolder,放回链接池。
- 2.2:如果当前事务被标记回滚,并且不是新事务的话,那么继续标记回滚,并且抛出异常。修改当前线程中的事务信息,恢复挂起的事务。解绑并重置connectionHolder,放回链接池。
- 2.3:如果不满足以上条件,那么直接提交。如果有保存点,那么直接释放保存点。如果是新事务,直接提交。如果提交失败,那么回滚。最后进行清理工作。修改当前线程中的事务信息,恢复挂起的事务。解绑并重置connectionHolder,放回链接池。