天天看點

從源碼角度淺析 Spring @Transactional 的代理過程

本文基于 Spring Boot 項目測試,依賴的 Spring 源碼版本為5.2.1,主要測試代碼如下:

@EnableTransactionManagement
//@EnableAspectJAutoProxy(exposeProxy = true)
@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

}      
@Service
@Slf4j
@Transactional(propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
public class StudentServiceImpl implements StudentService {
    @Autowired
    private StudentDao studentDao;

    @Override
    public void insert() {
        Student student = new Student();
        student.setName("test");
        student.setAge(23);
        student.setNo("1002");
        studentDao.insert(student);
    }
}      

在之前講Spring IOC相關的文章了講到過 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory#doCreateBean 中調用了 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory#populateBean 方法,在這方法後面還調用了一個方法,主要就是用來完成AOP的,主要代碼如下:

// Initialize the bean instance.
  Object exposedObject = bean;
  try {
    // 屬性填充
    populateBean(beanName, mbd, instanceWrapper);
    // 執行後置處理器,aop就是在這裡完成的處理
    exposedObject = initializeBean(beanName, exposedObject, mbd);
  }
  catch (Throwable ex) {
    if (ex instanceof BeanCreationException && beanName.equals(((BeanCreationException) ex).getBeanName())) {
      throw (BeanCreationException) ex;
    }
    else {
      throw new BeanCreationException(
          mbd.getResourceDescription(), beanName, "Initialization of bean failed", ex);
    }
  }      

在 initializeBean 方法中會調用Bean的後置處理器( applyBeanPostProcessorsAfterInitialization() )來完成AOP代理,主要代碼如下:

@Override
public Object applyBeanPostProcessorsAfterInitialization(Object existingBean, String beanName)
    throws BeansException {

  Object result = existingBean;
  for (BeanPostProcessor processor : getBeanPostProcessors()) {
    Object current = processor.postProcessAfterInitialization(result, beanName);
    if (current == null) {
      return result;
    }
    result = current;
  }
  return result;
}      

當調用到此方法時,可以看到 getBeanPostProcessors() 中有一個 AnnotationAwareAspectJAutoProxyCreator,如下:

從源碼角度淺析 Spring @Transactional 的代理過程

之是以能擷取到 AnnotationAwareAspectJAutoProxyCreator 是因為加了@EnableTransactionManagement注解,它引入了 TransactionManagementConfigurationSelector,而這個Selector中有段代碼是這樣的(會被調用到):

從源碼角度淺析 Spring @Transactional 的代理過程

上圖中紅色區域傳回了 AutoProxyRegistrar.class 和 ProxyTransactionManagementConfiguration.class(下文會用到),在spring初始化過程中會調用到 org.springframework.context.annotation.AutoProxyRegistrar#registerBeanDefinitions ,進而調用到 org.springframework.aop.config.AopConfigUtils#registerOrEscalateApcAsRequired,有一段這樣的代碼:

從源碼角度淺析 Spring @Transactional 的代理過程

後續又會調用到 org.springframework.beans.factory.support.AbstractBeanFactory#getMergedLocalBeanDefinition ,在這個方法中擷取到了 AnnotationAwareAspectJAutoProxyCreator 的 BeanDefinition,後續會添加到 BeanPostProcessors 中去,如下:

從源碼角度淺析 Spring @Transactional 的代理過程

AnnotationAwareAspectJAutoProxyCreator 實作或繼承了N多類,如下:

從源碼角度淺析 Spring @Transactional 的代理過程

從這幅圖說明他的功能很強大,這裡不擴充講。這個後置處理器會調用 postProcessAfterInitialization 方法,然後依次調用:

org.springframework.aop.framework.autoproxy.AbstractAutoProxyCreator#postProcessAfterInitialization
org.springframework.aop.framework.autoproxy.AbstractAutoProxyCreator#wrapIfNecessary
org.springframework.aop.framework.autoproxy.AbstractAdvisorAutoProxyCreator#getAdvicesAndAdvisorsForBean
org.springframework.aop.framework.autoproxy.AbstractAdvisorAutoProxyCreator#findEligibleAdvisors
org.springframework.aop.framework.autoproxy.AbstractAdvisorAutoProxyCreator#findAdvisorsThatCanApply
org.springframework.aop.support.AopUtils#findAdvisorsThatCanApply
org.springframework.aop.support.AopUtils#canApply(org.springframework.aop.Pointcut, java.lang.Class<?>, boolean)
org.springframework.transaction.interceptor.TransactionAttributeSourcePointcut#matches
org.springframework.transaction.interceptor.AbstractFallbackTransactionAttributeSource#getTransactionAttribute
org.springframework.transaction.interceptor.AbstractFallbackTransactionAttributeSource#computeTransactionAttribute      

下面我們重點看一下 org.springframework.transaction.interceptor.AbstractFallbackTransactionAttributeSource#computeTransactionAttribute 方法,

@Nullable
  protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
    // 這裡判斷了隻允許public修飾的方法,這就是為什麼 @Transactional預設隻對可見性為public的方法起作用
    // Don't allow no-public methods as required.
    if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) {
      return null;
    }

    // The method may be on an interface, but we need attributes from the target class.
    // If the target class is null, the method will be unchanged.
    Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass);

    // 1.這裡先從方法上面找@Transactional,是以方法上面加了注解的優先級會高于類上面加了注解的,因為找到了就傳回
    // First try is the method in the target class.
    TransactionAttribute txAttr = findTransactionAttribute(specificMethod);
    if (txAttr != null) {
      return txAttr;
    }

    // 2.這裡是從類上面找@Transactional,如果類(StudentServiceImpl)上找不到,會從接口(StudentService)上找,找到就傳回
    // Second try is the transaction attribute on the target class.
    txAttr = findTransactionAttribute(specificMethod.getDeclaringClass());
    if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
      return txAttr;
    }

    if (specificMethod != method) {
      // Fallback is to look at the original method.
      txAttr = findTransactionAttribute(method);
      if (txAttr != null) {
        return txAttr;
      }
      // Last fallback is the class of the original method.
      txAttr = findTransactionAttribute(method.getDeclaringClass());
      if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
        return txAttr;
      }
    }

    return null;
  }      

示例是将注解加載了類上,是以會走第2個邏輯,斷點一直跟進去到:org.springframework.transaction.annotation.SpringTransactionAnnotationParser#parseTransactionAnnotation(java.lang.reflect.AnnotatedElement),如下:

從源碼角度淺析 Spring @Transactional 的代理過程

然後會調用 parseTransactionAnnotation 方法來處理,如下:

protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) {
    RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute();

    Propagation propagation = attributes.getEnum("propagation");
    rbta.setPropagationBehavior(propagation.value());
    Isolation isolation = attributes.getEnum("isolation");
    rbta.setIsolationLevel(isolation.value());
    rbta.setTimeout(attributes.getNumber("timeout").intValue());
    rbta.setReadOnly(attributes.getBoolean("readOnly"));
    rbta.setQualifier(attributes.getString("value"));

    List<RollbackRuleAttribute> rollbackRules = new ArrayList<>();
    for (Class<?> rbRule : attributes.getClassArray("rollbackFor")) {
      rollbackRules.add(new RollbackRuleAttribute(rbRule));
    }
    for (String rbRule : attributes.getStringArray("rollbackForClassName")) {
      rollbackRules.add(new RollbackRuleAttribute(rbRule));
    }
    for (Class<?> rbRule : attributes.getClassArray("noRollbackFor")) {
      rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
    }
    for (String rbRule : attributes.getStringArray("noRollbackForClassName")) {
      rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
    }
    rbta.setRollbackRules(rollbackRules);

    return rbta;
  }      

然後回到 org.springframework.transaction.interceptor.AbstractFallbackTransactionAttributeSource#getTransactionAttribute 中,可以看到傳回的 TransactionAttribute 和我們設定的(或預設的)一樣,如下:

從源碼角度淺析 Spring @Transactional 的代理過程

然後将其放入緩存中(下次就可以直接從緩存中取):

從源碼角度淺析 Spring @Transactional 的代理過程

繼續執行直至回到 org.springframework.aop.framework.autoproxy.AbstractAutoProxyCreator#wrapIfNecessary 中,建立并傳回代理對象:

Object proxy = createProxy(
          bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));      
從源碼角度淺析 Spring @Transactional 的代理過程

到這裡,基本上就完成了 @Transactional 的代理過程。

上文提到過 ProxyTransactionManagementConfiguration.class ,在spring 初始化時會調用到這樣一段代碼:

從源碼角度淺析 Spring @Transactional 的代理過程

圖中重點是 org.springframework.transaction.interceptor.TransactionInterceptor,在調用接口進行插入操作時,會被這個攔截器所攔截,然後會調用到 org.springframework.transaction.interceptor.TransactionAspectSupport#invokeWithinTransaction 方法,代碼如下:

@Nullable
  protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
      final InvocationCallback invocation) throws Throwable {

    // If the transaction attribute is null, the method is non-transactional.
    TransactionAttributeSource tas = getTransactionAttributeSource();
    final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
    final TransactionManager tm = determineTransactionManager(txAttr);

    if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
      ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
        if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {
          throw new TransactionUsageException(
              "Unsupported annotated transaction on suspending function detected: " + method +
              ". Use TransactionalOperator.transactional extensions instead.");
        }
        ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());
        if (adapter == null) {
          throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +
              method.getReturnType());
        }
        return new ReactiveTransactionSupport(adapter);
      });
      return txSupport.invokeWithinTransaction(
          method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);
    }

    PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
    final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

        // 聲明式事務處理
    if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
      // Standard transaction demarcation with getTransaction and commit/rollback calls.
      // 建立事務
      TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

      Object retVal;
      try {
        // This is an around advice: Invoke the next interceptor in the chain.
        // This will normally result in a target object being invoked.
        // 執行目标方法
        retVal = invocation.proceedWithInvocation();
      }
      catch (Throwable ex) {
        // target invocation exception
        // 異常處理
        completeTransactionAfterThrowing(txInfo, ex);
        throw ex;
      }
      finally {
          // 清除事務資訊
        cleanupTransactionInfo(txInfo);
      }

      if (vavrPresent && VavrDelegate.isVavrTry(retVal)) {
        // Set rollback-only in case of Vavr failure matching our rollback rules...
        TransactionStatus status = txInfo.getTransactionStatus();
        if (status != null && txAttr != null) {
          retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
        }
      }

            // 送出事務
      commitTransactionAfterReturning(txInfo);
      return retVal;
    }

    // 程式設計式事務處理
    else {
      final ThrowableHolder throwableHolder = new ThrowableHolder();

      // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
      try {
        Object result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> {
          TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status);
          try {
            Object retVal = invocation.proceedWithInvocation();
            if (vavrPresent && VavrDelegate.isVavrTry(retVal)) {
              // Set rollback-only in case of Vavr failure matching our rollback rules...
              retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
            }
            return retVal;
          }
          catch (Throwable ex) {
            if (txAttr.rollbackOn(ex)) {
              // A RuntimeException: will lead to a rollback.
              if (ex instanceof RuntimeException) {
                throw (RuntimeException) ex;
              }
              else {
                throw new ThrowableHolderException(ex);
              }
            }
            else {
              // A normal return value: will lead to a commit.
              throwableHolder.throwable = ex;
              return null;
            }
          }
          finally {
            cleanupTransactionInfo(txInfo);
          }
        });

        // Check result state: It might indicate a Throwable to rethrow.
        if (throwableHolder.throwable != null) {
          throw throwableHolder.throwable;
        }
        return result;
      }
      catch (ThrowableHolderException ex) {
        throw ex.getCause();
      }
      catch (TransactionSystemException ex2) {
        if (throwableHolder.throwable != null) {
          logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
          ex2.initApplicationException(throwableHolder.throwable);
        }
        throw ex2;
      }
      catch (Throwable ex2) {
        if (throwableHolder.throwable != null) {
          logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
        }
        throw ex2;
      }
    }
  }      

由于我們采用的注解方式,相當于聲明式事務,是以會調用到 createTransactionIfNecessary 方法,代碼如下:

protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
      @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {

    // If no name specified, apply method identification as transaction name.
    if (txAttr != null && txAttr.getName() == null) {
      txAttr = new DelegatingTransactionAttribute(txAttr) {
        @Override
        public String getName() {
          return joinpointIdentification;
        }
      };
    }

    TransactionStatus status = null;
    if (txAttr != null) {
      if (tm != null) {
          // 擷取 TransactionStatus ,會根據目前配置的事物傳播特性做出相應處理,進而傳回合适的 TransactionStatus 
        status = tm.getTransaction(txAttr);
      }
      else {
        if (logger.isDebugEnabled()) {
          logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
              "] because no transaction manager has been configured");
        }
      }
    }
    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
  }