Spring事务和Atomikos源码分析
Spring-tx
spring-tx模块是Spring实现事务管理功能的核心所在,以注解+AOP切面的方式将事务注入到业务代码中。
实际应用中只要加上
@Transactional
即可,spring框架下Atomikos分布式事务控制也依托于此模块实现。
-
TransactionInterceptor
所有声明式事务控制的入口
public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {
//省略...
@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
// 获取被调用方法所属的类
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// 调用父类TransactionAspectSupport中方法具体处理
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
//省略...
}
public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
// NOTE: 不能序列化,因为它是AspectJ方面的基类(不允许实现Serializable)
// 省略...
/**
* 将方法的业务代码包围在事务处理的逻辑中
*
* @param method 被调用的方法
* @param targetClass 我们调用方法所属的目标类
* @param invocation 用于继续目标调用的回调
* @return 返回值
* @throws Throwable propagated from the target invocation
*/
@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// 事务属性为空,则该方法是非事务控制的
TransactionAttributeSource tas = getTransactionAttributeSource();
// 根据方法和目标类获取事务属性,
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
// 获取事务管理器,没有定义则返回默认的,默认不存在就从BeanFactory中取。
// 我使用的是Atomikos,故返回的是JtaTransactionManager
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
// 返回方法的全路径名
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// 为方法创建一个TransactionInfo对象,存储事务相关信息
// 包含事务管理器、事务状态、事务相关属性、方法全路径名以及方法开始前的事务信息oldTransactionInfo
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal;
try {
// 执行方法
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// 方法抛异常后完结事务,并执行回滚操作
// 如果非事务性的方法,不回滚,除非TransactionStatus.isRollbackOnly() is true
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
// 恢复方法开始之前的事务信息现场,线程级别
// transactionInfoHolder.set(this.oldTransactionInfo)
cleanupTransactionInfo(txInfo);
}
// 提交事务
commitTransactionAfterReturning(txInfo);
return retVal;
}
else {
//...
}
}
//...
/**
* 创建一个事务
* @param txAttr 事务属性
* @param joinpointIdentification 方法全路径名
* @return a TransactionInfo object
* @see #getTransactionAttributeSource()
*/
@SuppressWarnings("serial")
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
// 事务名称没有就设置为方法的全路径名
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
//获取事务当前的状态
status = tm.getTransaction(txAttr);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}
// 组装一个TransactionInfo对象返回
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, String joinpointIdentification,
@Nullable TransactionStatus status) {
TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
if (txAttr != null) {
// We need a transaction for this method...
if (logger.isTraceEnabled()) {
logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
// 如果已经存在不兼容的tx,事务管理器将标记错误。
txInfo.newTransactionStatus(status);
}
else {
// 非事务性的方法不不需要创建事务.
if (logger.isTraceEnabled()) {
logger.trace("No need to create transaction for [" + joinpointIdentification +
"]: This method is not transactional.");
}
}
// 将TransactionInfo放入ThreadLocal,即使最后没有创建事务,也能确保TransactionInfo栈不出问题
txInfo.bindToThread();
return txInfo;
}
private void bindToThread() {
// Expose current TransactionStatus, preserving any existing TransactionStatus
// for restoration after this transaction is complete.
this.oldTransactionInfo = transactionInfoHolder.get();
transactionInfoHolder.set(this);
}
//省略...
}
- PlatformTransactionManager
-
Spring提供的事务管理器接口,提供了事务的两个接口:commit(TransactionStatus)和rollback(TransactionStatus)
具体实现包括以下这些:
Spring事务和Atomikos源码分析Spring事务和Atomikos源码分析 - 源码分析
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable { // 省略... /** * 处理事务隔离级别以及开始事务 * * @see #doGetTransaction * @see #isExistingTransaction * @see #doBegin */ @Override public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException { Object transaction = doGetTransaction(); // ... if (definition == null) { // Use defaults if no transaction definition given. definition = new DefaultTransactionDefinition(); } // 看是否已经存在事务,存在就看状态事务为 STATUS_NO_TRANSACTION, // 1. 事务存在且状态不是STATUS_NO_TRANSACTION就进入方法。 // 2. 反之则继续往下走新建事务。 // 此处会根据当前线程从 Map<Thread, Stack<CompositeTransaction>> threadtotxmap_ (CompositeTransactionManagerImp.class) // 取栈顶的组合事务CompositeTransaction,线程级别 // 再根据CompositeTransaction的tid (事务id)从jtaTransactionToCoreTransactionMap (TransactionManagerImp.class)获取事务 if (isExistingTransaction(transaction)) { // 其实就是当方法中调用了别的服务的方法时,看被调用的方法是否设置了事务, // 没有就直接继承原有方法的事务,如果存在再看事务设置的传播属性。 // 如果设置的是:PROPAGATION_REQUIRES_NEW ,那么就会在调用的方法中创建新的事务, // 其他的根据方法设定的事务传播属性具体分析 return handleExistingTransaction(definition, transaction, debugEnabled); } // 超时抛异常 if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout()); } // PROPAGATION_MANDATORY意思是当前方法被单独调用时,会抛出异常。 // 必须在别的存在活动的事务的方法中被调用 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); } // 判断事务的传播属性,满足就走下列方法。@Transactional默认是Propagation.REQUIRED else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { SuspendedResourcesHolder suspendedResources = suspend(null); // ... try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); // 开始事务,具体代码由JtaTransactionManager实现(我搭建项目用的是Atomikos,其他组件另说) doBegin(transaction, definition); // 事务同步管理器 // 我们在事务执行前后可能需要做一些额外的操作这个时候我们就需要用到TransactionSynchronizationManager // 去注入一个TransactionSynchronization事务同步器,然后重写TransactionSynchronization或者 // 其子类的beforeCommit()或者afterCommit()方法,写入我们需要执行的业务代码。 prepareSynchronization(status, definition); return status; } catch (RuntimeException | Error ex) { resume(null, suspendedResources); throw ex; } } else { // Create "empty" transaction: no actual transaction, but potentially synchronization. if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) { logger.warn("Custom isolation level specified but no actual transaction initiated; " + "isolation level will effectively be ignored: " + definition); } boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null); } } // 省略... }
-
Atomikos
接下来的部分就是提供商的具体实现,我这里采用的时Atomikos组件实现的分布式事务
-
JtaTransactionManager
继承了
,重写了AbstractPlatformTransactionManager
方法doBegin(Object transaction, TransactionDefinition definition)
public class JtaTransactionManager extends AbstractPlatformTransactionManager
implements TransactionFactory, InitializingBean, Serializable {
/**
* 根据definition开始一个新事务.不需要关心事务隔离级别,
* 已经在AbstractPlatformTransactionManager中做了处理.
* <p>当事务管理器决定开始一个新事务时调用该方法.要么之前没有事务,要么事务已被暂停.
* @param transaction transaction object returned by {@code doGetTransaction}
* @param definition a TransactionDefinition instance, describing propagation
* behavior, isolation level, read-only flag, timeout, and transaction name
* @throws TransactionException in case of creation or system errors
* @throws org.springframework.transaction.NestedTransactionNotSupportedException
* if the underlying transaction does not support nesting
*/
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
JtaTransactionObject txObject = (JtaTransactionObject) transaction;
try {
// 开始jta事务
doJtaBegin(txObject, definition);
}
catch (NotSupportedException | UnsupportedOperationException ex) {
throw new NestedTransactionNotSupportedException(
"JTA implementation does not support nested transactions", ex);
}
catch (SystemException ex) {
throw new CannotCreateTransactionException("JTA failure on begin", ex);
}
}
protected void doJtaBegin(JtaTransactionObject txObject, TransactionDefinition definition)
throws NotSupportedException, SystemException {
// 校验事务隔离界别
applyIsolationLevel(txObject, definition.getIsolationLevel());
int timeout = determineTimeout(definition);
// 校验并设置事务的超时时间
applyTimeout(txObject, timeout);
// 开始事务
txObject.getUserTransaction().begin();
}
}
-
TransactionManagerImp
实现了JTA提供的TransactionManager和UserTransaction接口,既可充当事务管理器,也可自行调用实现事务控制
- 示例代码
@Override public int saveUser(UserVO userVO) { // UserTransaction tx = new UserTransactionImp(); UserTransaction tx = new UserTransactionManager(); try { tx.setTransactionTimeout(60); tx.begin(); UserLog userLog = buildUserLog(userVO); userLogService.save(userLog); DynamicDataSourceContextHolder.setDataSourceRouterKey("local127_becychen"); userLogService.save(userLog); DynamicDataSourceContextHolder.setDataSourceRouterKey("local_itoyoung"); //会抛出空指针异常 StringUtil.md5(permissions.toString()); userLogService.save(userLog); tx.commit(); } catch (Exception e) { try { tx.rollback(); } catch (SystemException ex) { ex.printStackTrace(); } throw new BaseRuntimeException("异常; " + e.getMessage()); } return userVO.getUid(); }
- 源码分析,下面因为分析需要,稍微打乱方法顺序
public class TransactionManagerImp implements TransactionManager, SubTxAwareParticipant, Referenceable, UserTransaction { // 事务集合,key是tid,value是事务 private Map<String, TransactionImp> jtaTransactionToCoreTransactionMap; // 组合事务管理器:管理组合事务的创建、绑定以及获取等 private CompositeTransactionManager compositeTransactionManager; // 省略... /** * 创建一个新事务并且关联本地线程。如果当前线程已经存在事务,则创建一个本地子事务 * */ public void begin () throws NotSupportedException, SystemException { begin ( getTransactionTimeout() ); } /** * 根据超时时间timeout开始事务 */ public void begin ( int timeout ) throws NotSupportedException, SystemException { CompositeTransaction ct = null; ResumePreviousTransactionSubTxAwareParticipant resumeParticipant = null; // 根据当前线程获取组合事务并判断是否是JTA事务 ct = compositeTransactionManager.getCompositeTransaction(); if ( ct != null && ct.getProperty ( JTA_PROPERTY_NAME ) == null ) { // 暂时暂停不兼容的事务:即当前事务不是JTA事务(com.atomikos.icatch.jta.transaction), // 在JTA事务结束后恢复 LOGGER.logWarning ( "JTA: temporarily suspending incompatible transaction: " + ct.getTid() + " (will be resumed after JTA transaction ends)" ); ct = compositeTransactionManager.suspend(); resumeParticipant = new ResumePreviousTransactionSubTxAwareParticipant ( ct ); } try { // 创建一个组合事务,如果当前线程存在事务则创建子事务,然后推入到当前线程绑定的组合事务栈中 // 之后判断是否存在事务和状态:isExistingTransaction(transaction) 就是从这里获取的 ct = compositeTransactionManager.createCompositeTransaction ( ( ( long ) timeout ) * 1000 ); if ( resumeParticipant != null ) ct.addSubTxAwareParticipant ( resumeParticipant ); if ( ct.isRoot () && getDefaultSerial () ) ct.setSerial (); // 设定当前组合事务为JTA事务 ct.setProperty ( JTA_PROPERTY_NAME , "true" ); } catch ( SysException se ) { String msg = "Error in begin()"; LOGGER.logError( msg , se ); throw new ExtendedSystemException ( msg , se ); } recreateCompositeTransactionAsJtaTransaction(ct); } // 根据组合事务创建一个TransactionImp事务 private TransactionImp recreateCompositeTransactionAsJtaTransaction( CompositeTransaction ct) { TransactionImp ret = null; if (ct.getState ().equals ( TxState.ACTIVE )) { // setRollbackOnly may have been called! ret = new TransactionImp ( ct, enableAutomatRegistrationOfUnknownXAResources ); addToMap ( ct.getTid (), ret ); ct.addSubTxAwareParticipant ( this ); } return ret; } // 将当前组合事务的tid和当前新创建的事务放入jtaTransactionToCoreTransactionMap中, // 之后可以通过当前线程获取组合事务栈 // 通过栈顶组合事务的tid来获取当前事务,即下面的方法 getTransaction () private void addToMap ( String tid , TransactionImp tx ) { synchronized ( jtaTransactionToCoreTransactionMap ) { jtaTransactionToCoreTransactionMap.put ( tid , tx ); } } // 获取当前事务的状态,之前AbstractPlatformTransactionManager中 // 判断是否存在事务就是调用这里和下面的方法 public int getStatus() throws SystemException { int ret = Status.STATUS_NO_TRANSACTION; Transaction tx = getTransaction(); if ( tx == null ) { ret = Status.STATUS_NO_TRANSACTION; } else { ret = tx.getStatus (); } return ret; } public Transaction getTransaction () throws SystemException { TransactionImp ret = null; // 获取当前线程的组合事务 CompositeTransaction ct = getCompositeTransaction(); // 根据组合事务的tid获取当前事务 if ( ct != null) ret = getJtaTransactionWithId ( ct.getTid () ); return ret; } private CompositeTransaction getCompositeTransaction() throws ExtendedSystemException { CompositeTransaction ct = null; try { ct = compositeTransactionManager.getCompositeTransaction (); } catch ( SysException se ) { String msg = "Error while retrieving the transaction for the calling thread"; LOGGER.logError( msg , se); throw new ExtendedSystemException ( msg , se ); } // establishJtaTransactionContextIfNecessary(ct); return ct; } // 判断事务是否创建成功,若创建失败,则根据组合事务再次创建 private void establishJtaTransactionContextIfNecessary( CompositeTransaction ct) { if ( isJtaTransaction(ct) ) { TransactionImp jtaTransaction = getJtaTransactionWithId ( ct.getTid () ); if ( jtaTransaction == null ) { recreateCompositeTransactionAsJtaTransaction(ct); } } } // 省略... }
-
CompositeTransactionManagerImpl
组合事务管理器,之前获取和绑定组合事务的代码都在这个类中实现。这里为了分析会稍微把方法顺序打乱
public class CompositeTransactionManagerImp implements CompositeTransactionManager,
SubTxAwareParticipant
{
// 这两个map就是存储和获取当前线程组合事务的关键
private Map<Thread, Stack<CompositeTransaction>> threadtotxmap_ = null;
private Map<CompositeTransaction, Thread> txtothreadmap_ = null;
public CompositeTransactionManagerImp ()
{
threadtotxmap_ = new HashMap<Thread, Stack<CompositeTransaction>> ();
txtothreadmap_ = new HashMap<CompositeTransaction, Thread> ();
}
// 省略...
// 创建一个组合事务,如果当前存在就创建一个子事务,并和当前线程绑定以及存入 threadtotxmap_和 txtothreadmap_
public CompositeTransaction createCompositeTransaction ( long timeout ) throws SysException
{
CompositeTransaction ct = null , ret = null;
ct = getCurrentTx ();
if ( ct == null ) {
ret = getTransactionService().createCompositeTransaction ( timeout );
if(LOGGER.isDebugEnabled()){
LOGGER.logDebug("createCompositeTransaction ( " + timeout + " ): "
+ "created new ROOT transaction with id " + ret.getTid ());
}
} else {
if(LOGGER.isDebugEnabled()) LOGGER.logDebug("createCompositeTransaction ( " + timeout + " )");
ret = ct.createSubTransaction ();
}
Thread thread = Thread.currentThread ();
setThreadMappings ( ret, thread );
return ret;
}
// 这个方法和下面的方法就是获取当前组合事务
private CompositeTransaction getCurrentTx ()
{
Thread thread = Thread.currentThread ();
synchronized ( threadtotxmap_ ) {
Stack<CompositeTransaction> txs = threadtotxmap_.get ( thread );
if ( txs == null )
return null;
else
return txs.peek ();
}
}
public CompositeTransaction getCompositeTransaction () throws SysException
{
CompositeTransaction ct = null;
ct = getCurrentTx ();
if ( ct != null ) {
if(LOGGER.isTraceEnabled()){
LOGGER.logTrace("getCompositeTransaction() returning instance with id "
+ ct.getTid ());
}
} else{
if(LOGGER.isTraceEnabled()){
LOGGER.logTrace("getCompositeTransaction() returning NULL!");
}
}
return ct;
}
// 将组合事务和当前线程绑定
private void setThreadMappings ( CompositeTransaction ct , Thread thread )
throws IllegalStateException, SysException
{
//case 21806: callbacks to ct to be made outside synchronized block
ct.addSubTxAwareParticipant ( this ); //step 1
synchronized ( threadtotxmap_ ) {
// 从step 1 到这儿,中间可能发生回滚/超时;
// 这里的判断会确保我们不会添加一个永远不会被移除的线程映射
if ( TxState.ACTIVE.equals ( ct.getState() )) {
Stack<CompositeTransaction> txs = threadtotxmap_.get ( thread );
if ( txs == null )
txs = new Stack<CompositeTransaction>();
txs.push ( ct );
threadtotxmap_.put ( thread, txs );
txtothreadmap_.put ( ct, thread );
}
}
}
// 省略...
}