天天看點

Spring事務原理二(事務攔截邏輯)

事務的執行

當代理對象的方法被調用時,最終會調用到TransactionInterceptor的invoke()方法上面。對于為什麼會調用到invoke()方法的小夥伴,需要取了解一下動态代理的原理

@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
// Work out the target class: may be {@code null}.
// The TransactionAttributeSource should be passed the target class
// as well as the method, which may be from an interface.
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

// Adapt to TransactionAspectSupport's invokeWithinTransaction...
// 意思是最終會适配到TransactionAspectSupport.invokeWithinTransaction方法
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
      

TransactionAspectSupport.invokeWithinTransaction()

@Nullable
  protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
      final InvocationCallback invocation) throws Throwable {
    // 擷取AnnotationTransactionAttributeSource,這個是從外面設定進TransactionInterceptor的。
    // If the transaction attribute is null, the method is non-transactional.
    TransactionAttributeSource tas = getTransactionAttributeSource();
    // 擷取Transactional注解參數
    final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
    // 從容器中擷取TransactionManager執行個體
    final TransactionManager tm = determineTransactionManager(txAttr);

    if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
      ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
        if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {
          throw new TransactionUsageException(
              "Unsupported annotated transaction on suspending function detected: " + method +
              ". Use TransactionalOperator.transactional extensions instead.");
        }
        ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());
        if (adapter == null) {
          throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +
              method.getReturnType());
        }
        return new ReactiveTransactionSupport(adapter);
      });
      return txSupport.invokeWithinTransaction(
          method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);
    }
    // TransactionManager轉換成PlatformTransactionManager類型
    PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
    // 擷取方法身份,類似com.github.AccountServiceImpl.save。
    // 因為txAttr是RuleBasedTransactionAttribute類型,是以最終傳回ClassUtils.getQualifiedMethodName()傳回【類名+"."+方法名】
    final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
    // 如果是聲明式事務
    if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
      // Standard transaction demarcation with getTransaction and commit/rollback calls.
      // 擷取事務,這裡面就要考慮spring的事務傳播機制
      // *****這是重點*****這是重點*****這是重點*****這是重點*****這是重點*****這是重點*****
      TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

      Object retVal;
      try {
        // 執行目标方法
        // This is an around advice: Invoke the next interceptor in the chain.
        // This will normally result in a target object being invoked.
        retVal = invocation.proceedWithInvocation();
      }
      catch (Throwable ex) {
        // target invocation exception
        // 遇到異常的情況如何處理事務,要結合事務的傳播機制來看。
        completeTransactionAfterThrowing(txInfo, ex);
        throw ex;
      }
      finally {
        // 清除目前事務的相關資訊。把目前線程中TransactionInfo設定成oldTransactionInfo
        cleanupTransactionInfo(txInfo);
      }
      // 假如傳回值是Try<Integer> result = Try.of(() -> 1 / 0)這樣的類型,那麼就要檢測它是否會抛異常
      if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
        // Set rollback-only in case of Vavr failure matching our rollback rules...
        TransactionStatus status = txInfo.getTransactionStatus();
        if (status != null && txAttr != null) {
          // 真正地執行後擷取結果。異常後設定status裡面的屬性值rollbackOnly=true
          retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
        }
      }
      // 最後就是準備送出事務。
      commitTransactionAfterReturning(txInfo);
      return retVal;
    }
    // 下面是程式設計式事務
    else {
      Object result;
      final ThrowableHolder throwableHolder = new ThrowableHolder();

      // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
      try {
        result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> {
          TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status);
          try {
            Object retVal = invocation.proceedWithInvocation();
            if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
              // Set rollback-only in case of Vavr failure matching our rollback rules...
              retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
            }
            return retVal;
          }
          catch (Throwable ex) {
            if (txAttr.rollbackOn(ex)) {
              // A RuntimeException: will lead to a rollback.
              if (ex instanceof RuntimeException) {
                throw (RuntimeException) ex;
              }
              else {
                throw new ThrowableHolderException(ex);
              }
            }
            else {
              // A normal return value: will lead to a commit.
              throwableHolder.throwable = ex;
              return null;
            }
          }
          finally {
            cleanupTransactionInfo(txInfo);
          }
        });
      }
      catch (ThrowableHolderException ex) {
        throw ex.getCause();
      }
      catch (TransactionSystemException ex2) {
        if (throwableHolder.throwable != null) {
          logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
          ex2.initApplicationException(throwableHolder.throwable);
        }
        throw ex2;
      }
      catch (Throwable ex2) {
        if (throwableHolder.throwable != null) {
          logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
        }
        throw ex2;
      }

      // Check result state: It might indicate a Throwable to rethrow.
      if (throwableHolder.throwable != null) {
        throw throwableHolder.throwable;
      }
      return result;
    }
  }      

總結下來,重點就下面幾個方法

protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
      final InvocationCallback invocation) throws Throwable {
  // 調用目标方法前,根據情況确定要不要建立新的事務
  TransactionInfo txInfo = createTransactionIfNecessary();
  Object retVal;
  try{
    // 調用目标方法
    retVal = invocation.proceedWithInvocation();
  }catch(Throwable ex){
    // 根據調用目标方法的實際情況确定是復原還是送出,還是隻設定復原标記。裡面最後一步還包括回收連結,恢複被挂起的事務等等
    completeTransactionAfterThrowing(txInfo, ex);
    throw ex;
  }finally{
    // 清除線程中目前事務資訊
    cleanupTransactionInfo(txInfo);
  }
  // 根據情況判斷是送出還是復原。裡面最後一步還包括回收連結,恢複被挂起的事務等等
  commitTransactionAfterReturning(txInfo);
  return retVal;
}
      

createTransactionIfNecessary

protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
      @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
  // 下面就是包裝了一下
    // If no name specified, apply method identification as transaction name.
    if (txAttr != null && txAttr.getName() == null) {
      txAttr = new DelegatingTransactionAttribute(txAttr) {
        @Override
        public String getName() {
          return joinpointIdentification;
        }
      };
    }
  // *****這是重點,擷取事務狀态******
    TransactionStatus status = null;
    if (txAttr != null) {
      if (tm != null) {
        status = tm.getTransaction(txAttr);
      }
      else {
        if (logger.isDebugEnabled()) {
          logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
              "] because no transaction manager has been configured");
        }
      }
    }
  // 這裡就是再包裝一層傳回TransactionInfo對象
    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
  }

@Override
  public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
      throws TransactionException {

    // Use defaults if no transaction definition given.
    TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
    // 如果第一次調用的話,傳回DataSourceTransactionObject執行個體,connectionHolder=null;
    Object transaction = doGetTransaction();
    boolean debugEnabled = logger.isDebugEnabled();
    // 如果有connectionHolder,并且事務是激活狀态。說明這不是第一次進代理方法,那麼就需要結合目前的傳播機制來處理目前的事務
    if (isExistingTransaction(transaction)) {
      // Existing transaction found -> check propagation behavior to find out how to behave.
      // *******************這是重點,需要結合事務傳播機制來看*******************
      return handleExistingTransaction(def, transaction, debugEnabled);
    }
    // 檢測逾時時間是否合法
    // Check definition settings for new transaction.
    if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
      throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
    }
    // 下面都是第一次調用。說明之前不存在任何事務。

    // No existing transaction found -> check propagation behavior to find out how to proceed.
    // 當傳播機制是PROPAGATION_MANDATORY,表示目前必須存在一個事務,否則抛出異常
    if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
      throw new IllegalTransactionStateException(
          "No existing transaction found for transaction marked with propagation 'mandatory'");
    }
    // 當第一次調用時,如果傳播機制是以下三種的話,那麼都需要建立事務
    else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
        def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
             //PROPAGATION_NESTED是嵌套事務,這個事務能否成功送出取決于外部事務是否成功
             // 如果内部事務成功,外部事務失敗,那麼一起復原
             // 如果内部事務成功,外部事務成功,那麼一起送出
             // 如果内部事務失敗,外部事務失敗,那麼一起復原
             // 如果内部事務失敗,外部事務成功,那麼内部事務復原,外部事務送出
        def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
      // 挂起上一個事務,目前不存在上一個事務,那麼直接傳null
      SuspendedResourcesHolder suspendedResources = suspend(null);
      if (debugEnabled) {
        logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
      }
      try {
        // 開啟新事務
        return startTransaction(def, transaction, debugEnabled, suspendedResources);
      }
      catch (RuntimeException | Error ex) {
        resume(null, suspendedResources);
        throw ex;
      }
    }
    // 剩下的傳播機制,都建立空事務
    else {
      // Create "empty" transaction: no actual transaction, but potentially synchronization.
      if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
        logger.warn("Custom isolation level specified but no actual transaction initiated; " +
            "isolation level will effectively be ignored: " + def);
      }
      boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
      // 建立一個預設的TransactionStatus,裡面的事務為空,同時更新目前線程的資訊
      return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
    }
  }
// 開啟新事務
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
      boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
   boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
  // 建立一個新事務, transaction裡面的connectionHolder=null
  DefaultTransactionStatus status = newTransactionStatus(
         definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
  // 開啟新事務
   doBegin(transaction, definition);
  // 設定目前事務的事務同步管理器。這個事務同步管理器其實就是把針對目前事務的配置參數和目前線程綁在一起,通過TransactionSynchronizationManager管理
   prepareSynchronization(status, definition);
   return status;
}


@Override
  protected void doBegin(Object transaction, TransactionDefinition definition) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    Connection con = null;

    try {
      // 因為是新的事務,是以需要建立新的連結Connection
      if (!txObject.hasConnectionHolder() ||
          txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
        // 擷取新的Connection
        Connection newCon = obtainDataSource().getConnection();
        if (logger.isDebugEnabled()) {
          logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
        }
        // 把新的Connection設定進transaction中
        txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
      }

      txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
      con = txObject.getConnectionHolder().getConnection();
      // 設定readOnly和isolationLevel
      Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
      txObject.setPreviousIsolationLevel(previousIsolationLevel);
      txObject.setReadOnly(definition.isReadOnly());

      // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
      // so we don't want to do it unnecessarily (for example if we've explicitly
      // configured the connection pool to set it already).
      // 如果是自動送出,切換到手動送出
      if (con.getAutoCommit()) {
        txObject.setMustRestoreAutoCommit(true);
        if (logger.isDebugEnabled()) {
          logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
        }
        con.setAutoCommit(false);
      }
      // 如果是readOnly,通過Statement執行SET TRANSACTION READ ONLY
      prepareTransactionalConnection(con, definition);
      txObject.getConnectionHolder().setTransactionActive(true);

      int timeout = determineTimeout(definition);
      if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
        txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
      }

      // Bind the connection holder to the thread.
      if (txObject.isNewConnectionHolder()) {
        // 把新建立出來的ConnectionHolder綁定到目前線程中
        TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
      }
    }

    catch (Throwable ex) {
      // 出異常了就釋放連結
      if (txObject.isNewConnectionHolder()) {
        DataSourceUtils.releaseConnection(con, obtainDataSource());
        txObject.setConnectionHolder(null, false);
      }
      throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
    }
  }      

前面已經建立過事務的情況:

private TransactionStatus handleExistingTransaction(
      TransactionDefinition definition, Object transaction, boolean debugEnabled)
      throws TransactionException {
  // 那麼遇到不允許有事務的傳播機制,就直接抛異常
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
      throw new IllegalTransactionStateException(
          "Existing transaction found for transaction marked with propagation 'never'");
    }
  // PROPAGATION_NOT_SUPPORTED就是以非事務的方式執行,那麼我們就需要先把目前的事務挂起來
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
      if (debugEnabled) {
        logger.debug("Suspending current transaction");
      }
      // 挂起目前事務
      // 1.把目前線程的connectionHolder解綁并傳回connectionHolder
      // 2.取出目前事務同步器中的所有資訊,并重置線程同步器資訊
      // 3.把connectionHolder和相關資訊封裝成一個SuspendedResourcesHolder對象傳回
      Object suspendedResources = suspend(transaction);
      boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
      // 建立一個DefaultTransactionStatus傳回
      return prepareTransactionStatus(
          definition, null, false, newSynchronization, debugEnabled, suspendedResources);
    }
  // 如果是PROPAGATION_REQUIRES_NEW,那麼必須要建立一個新的事務
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
      if (debugEnabled) {
        logger.debug("Suspending current transaction, creating new transaction with name [" +
            definition.getName() + "]");
      }
      // 挂起目前事務
      SuspendedResourcesHolder suspendedResources = suspend(transaction);
      try {
        // 開啟新事務
        return startTransaction(definition, transaction, debugEnabled, suspendedResources);
      }
      catch (RuntimeException | Error beginEx) {
        resumeAfterBeginException(transaction, suspendedResources, beginEx);
        throw beginEx;
      }
    }
  // 如果是PROPAGATION_NESTED,那麼就是嵌套事務。就要使用復原點
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
      if (!isNestedTransactionAllowed()) {
        throw new NestedTransactionNotSupportedException(
            "Transaction manager does not allow nested transactions by default - " +
            "specify 'nestedTransactionAllowed' property with value 'true'");
      }
      if (debugEnabled) {
        logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
      }
      if (useSavepointForNestedTransaction()) {
        // Create savepoint within existing Spring-managed transaction,
        // through the SavepointManager API implemented by TransactionStatus.
        // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
        // 同樣的還是建立一個DefaultTransactionStatus對象
        DefaultTransactionStatus status =
            prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
        // 建立復原點後傳回
        status.createAndHoldSavepoint();
        return status;
      }
      else {
        // Nested transaction through nested begin and commit/rollback calls.
        // Usually only for JTA: Spring synchronization might get activated here
        // in case of a pre-existing JTA transaction.
        // 針對JTA事務,建立一個新的事務來處理
        return startTransaction(definition, transaction, debugEnabled, null);
      }
    }

    // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
    if (debugEnabled) {
      logger.debug("Participating in existing transaction");
    }
    if (isValidateExistingTransaction()) {
      if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
        Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
        if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
          Constants isoConstants = DefaultTransactionDefinition.constants;
          throw new IllegalTransactionStateException("Participating transaction with definition [" +
              definition + "] specifies isolation level which is incompatible with existing transaction: " +
              (currentIsolationLevel != null ?
                  isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
                  "(unknown)"));
        }
      }
      if (!definition.isReadOnly()) {
        if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
          throw new IllegalTransactionStateException("Participating transaction with definition [" +
              definition + "] is not marked as read-only but existing transaction is");
        }
      }
    }
  // 剩下的傳播機制,還是目前事務
    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
  }
      

總結一下:

1.createTransactionIfNecessary():當代理對象方法被調用時,擷取TransactionAttribute、TransactionManager以及目标方法名joinpointIdentification,利用這三個參數來建立事務TransactionInfo

  • 1.1.tm.getTransaction():擷取事務
  • 1.1.1:doGetTransaction():建立新的DataSourceTransactionObject,并從目前線程中查找connectionHolder,不管有沒有找到,都設定進DataSourceTransactionObject對象中。
  • 1.1.2:isExistingTransaction():判斷是否已經存在事務,其實就是判斷DataSourceTransactionObject中是否有connectionHolder,并且事務是否是激活的。
  • 1.1.3:handleExistingTransaction():如果已經存在事務。-----------這是邏輯分支------------
  • 1.1.3.1:如果存在事務,當傳播機制是PROPAGATION_NEVER,那麼直接抛異常。
  • 1.1.3.2:如果存在事務,當傳播機制是PROPAGATION_NOT_SUPPORTED,那麼直接挂起目前事務。建立DefaultTransactionStatus對象,并且更新目前線程事務資訊。
  • 1.1.3.3:如果存在事務,當傳播機制是PROPAGATION_REQUIRES_NEW,那麼先挂起目前事務,并且建立新的連接配接設定進新事務中,并通過新建立的transaction封裝一個DefaultTransactionStatus對象,同時更新目前線程中事務資訊。
  • 1.1.3.4:如果存在事務,當傳播機制是PROPAGATION_NESTED,那麼直接封裝一個DefaultTransactionStatus對象,更新目前線程事務資訊,并設定儲存點後傳回。
  • 1.1.3.5:如果存在事務,當傳播機制是PROPAGATION_SUPPORTS、PROPAGATION_MANDATORY、PROPAGATION_REQUIRED,那麼直接建立一個DefaultTransactionStatus對象,更新目前線程的事務資訊後傳回。
  • 1.1.4:如果不存在事務,傳播機制是PROPAGATION_MANDATORY,就抛異常。該傳播機制的要求是目前一定要存在事務才能繼續執行,否則就抛異常。-----------這是邏輯分支------------
  • 1.1.5:如果不存在事務,傳播機制是PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED,那麼意味着就需要自己建立事務。-----------這是邏輯分支------------
  • 1.1.5.1:suspend(null):挂起空事務。就是把目前線程中的事務相關資訊都取出來封裝到一個SuspendedResourcesHolder對象中并傳回。并且重置目前線程中事務相關資訊,通過TransactionSynchronizationManager來設定。
  • 1.1.5.2:startTransaction():開啟新事務
  • 1.1.5.2.1:doBegin():建立新的ConnectionHolder,并設定進transaction中,開啟手動送出,最後綁定到目前線程中
  • 1.1.5.2.2:prepareSynchronization():建立DefaultTransactionStatus對象,更新目前線程中的資訊。因為此時已經建立了新的事務了。
  • 1.1.6:如果不存在事務,傳播機制是PROPAGATION_SUPPORTS、PROPAGATION_NOT_SUPPORTED、PROPAGATION_NEVER,那麼就直接建立一個DefaultTransactionStatus對象傳回。同時更新一下目前線程中的資訊。-----------這是邏輯分支------------
  • 1.2.prepareTransactionInfo():把TransactionAttribute、TransactionManager以及目标方法名joinpointIdentification打包成TransactionInfo對象,并且把TransactionStatus設定進去,最後把oldTransactionInfo解綁并取出來儲存,把最新的TransactionInfo和目前線程綁定
protected static final class TransactionInfo {
  private void bindToThread() {
      // 從目前線程中取出來,然後把新的TransactionInfo和目前線程綁定
      this.oldTransactionInfo = transactionInfoHolder.get();
      transactionInfoHolder.set(this);
    }
}
      

其實總結一下上面的邏輯,createTransactionIfNecessary()最終建立了一個TransactionInfo對象,但是整個過程重點在于通過getTransaction()方法擷取TransactionStatus對象,建立這個對象重點取決于建立Transaction,然後這個對象裡面其實是包含了最關鍵的connectionHolder。

protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
    if (txInfo != null && txInfo.getTransactionStatus() != null) {
      if (logger.isTraceEnabled()) {
        logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
            "] after exception: " + ex);
      }
      if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
        try {
          // 如果這個抛出的異常正好比對上Transaction注解上指定的異常,那麼執行復原操作
          txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
        }
        catch (TransactionSystemException ex2) {
          logger.error("Application exception overridden by rollback exception", ex);
          ex2.initApplicationException(ex);
          throw ex2;
        }
        catch (RuntimeException | Error ex2) {
          logger.error("Application exception overridden by rollback exception", ex);
          throw ex2;
        }
      }
      else {
        // We don't roll back on this exception.
        // Will still roll back if TransactionStatus.isRollbackOnly() is true.
        try {
          // 如果異常比對不上,那麼還是依然執行送出。如果TransactionStatus裡面的狀态被标記為需要復原的話,那麼還是依然復原,否則就送出
          txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
        }
        catch (TransactionSystemException ex2) {
          logger.error("Application exception overridden by commit exception", ex);
          ex2.initApplicationException(ex);
          throw ex2;
        }
        catch (RuntimeException | Error ex2) {
          logger.error("Application exception overridden by commit exception", ex);
          throw ex2;
        }
      }
    }
  }

private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
    try {
      boolean unexpectedRollback = unexpected;

      try {
        triggerBeforeCompletion(status);
        // 檢查是否有儲存點,如果有儲存點,就重置到儲存點
        if (status.hasSavepoint()) {
          if (status.isDebug()) {
            logger.debug("Rolling back transaction to savepoint");
          }
          // 重置到儲存點
          status.rollbackToHeldSavepoint();
        }
        // 如果是新的事務,那麼就直接復原
        else if (status.isNewTransaction()) {
          if (status.isDebug()) {
            logger.debug("Initiating transaction rollback");
          }
          // 其實是拿出裡面的ConnectionHolder來調用rollback
          doRollback(status);
        }
        else {
          // Participating in larger transaction
          // 如果不是新的事務,那麼為了避免多次復原,隻需要将目前的rollback标記設定成true,這樣的話,當上層代理catch這個抛出去的異常時,會檢查rollback标記,最後才會復原。
          if (status.hasTransaction()) {
            if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
              if (status.isDebug()) {
                logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
              }
              doSetRollbackOnly(status);
            }
            else {
              if (status.isDebug()) {
                logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
              }
            }
          }
          else {
            logger.debug("Should roll back transaction but cannot - no transaction available");
          }
          // Unexpected rollback only matters here if we're asked to fail early
          if (!isFailEarlyOnGlobalRollbackOnly()) {
            unexpectedRollback = false;
          }
        }
      }
      catch (RuntimeException | Error ex) {
        triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
        throw ex;
      }

      triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);

      // Raise UnexpectedRollbackException if we had a global rollback-only marker
      if (unexpectedRollback) {
        throw new UnexpectedRollbackException(
            "Transaction rolled back because it has been marked as rollback-only");
      }
    }
    finally {
      // 解綁connectionHolder,重置connectionHolder,并且放回連結池。把挂起的事務恢複。
      cleanupAfterCompletion(status);
    }
  }
      
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
    if (txInfo != null && txInfo.getTransactionStatus() != null) {
      if (logger.isTraceEnabled()) {
        logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
      }
      // 其實在這裡面還是會判斷TransactionStatus裡面是否有復原狀态,如果有的話,依然會復原,否則就送出
      txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
    }
  }
      
  • 如果捕捉到異常,那麼直接進入到completeTransactionAfterThrowing()方法中。從completeTransactionAfterThrowing()方法出來,把異常抛出去之前,還要執行finally中的cleanupTransactionInfo()方法
  • completeTransactionAfterThrowing()方法:
  • 1.判斷目前異常是否比對注解設定的異常,是的話,進入
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
      
  • 1.1:判斷是否有儲存點,關聯到傳播機制為PROPAGATION_NESTED,如果有儲存點,那麼直接復原到儲存點
  • 1.2:判斷是否是新事務,關聯到傳播機制為PROPAGATION_REQUIRES_NEW,那麼直接執行復原操作
  • 1.3:判斷不是新事務,那麼直接設定rollback标記,避免多次復原。因為異常會一直往外抛,這個時候隻需要設定復原标記即可。
  • 1.4:最後都會cleanupAfterCompletion():修改目前線程中的事務資訊,恢複挂起的事務。解綁并重置connectionHolder,放回連結池。
  • 2.如果沒有比對到這個異常的話,那麼進入
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
      
  • 2.1:判斷事務是否被标記為復原,那麼說明内層事務之前已經設定了復原标記,此時目前事務是新事務的話,就直接復原。修改目前線程中的事務資訊,恢複挂起的事務。解綁并重置connectionHolder,放回連結池。
  • 2.2:如果目前事務被标記復原,并且不是新事務的話,那麼繼續标記復原,并且抛出異常。修改目前線程中的事務資訊,恢複挂起的事務。解綁并重置connectionHolder,放回連結池。
  • 2.3:如果不滿足以上條件,那麼直接送出。如果有儲存點,那麼直接釋放儲存點。如果是新事務,直接送出。如果送出失敗,那麼復原。最後進行清理工作。修改目前線程中的事務資訊,恢複挂起的事務。解綁并重置connectionHolder,放回連結池。