Spring事務和Atomikos源碼分析
Spring-tx
spring-tx子產品是Spring實作事務管理功能的核心所在,以注解+AOP切面的方式将事務注入到業務代碼中。
實際應用中隻要加上
@Transactional
即可,spring架構下Atomikos分布式事務控制也依托于此子產品實作。
-
TransactionInterceptor
所有聲明式事務控制的入口
public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {
//省略...
@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
// 擷取被調用方法所屬的類
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// 調用父類TransactionAspectSupport中方法具體處理
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
//省略...
}
public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
// NOTE: 不能序列化,因為它是AspectJ方面的基類(不允許實作Serializable)
// 省略...
/**
* 将方法的業務代碼包圍在事務處理的邏輯中
*
* @param method 被調用的方法
* @param targetClass 我們調用方法所屬的目标類
* @param invocation 用于繼續目标調用的回調
* @return 傳回值
* @throws Throwable propagated from the target invocation
*/
@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// 事務屬性為空,則該方法是非事務控制的
TransactionAttributeSource tas = getTransactionAttributeSource();
// 根據方法和目标類擷取事務屬性,
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
// 擷取事務管理器,沒有定義則傳回預設的,預設不存在就從BeanFactory中取。
// 我使用的是Atomikos,故傳回的是JtaTransactionManager
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
// 傳回方法的全路徑名
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// 為方法建立一個TransactionInfo對象,存儲事務相關資訊
// 包含事務管理器、事務狀态、事務相關屬性、方法全路徑名以及方法開始前的事務資訊oldTransactionInfo
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal;
try {
// 執行方法
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// 方法抛異常後完結事務,并執行復原操作
// 如果非事務性的方法,不復原,除非TransactionStatus.isRollbackOnly() is true
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
// 恢複方法開始之前的事務資訊現場,線程級别
// transactionInfoHolder.set(this.oldTransactionInfo)
cleanupTransactionInfo(txInfo);
}
// 送出事務
commitTransactionAfterReturning(txInfo);
return retVal;
}
else {
//...
}
}
//...
/**
* 建立一個事務
* @param txAttr 事務屬性
* @param joinpointIdentification 方法全路徑名
* @return a TransactionInfo object
* @see #getTransactionAttributeSource()
*/
@SuppressWarnings("serial")
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
// 事務名稱沒有就設定為方法的全路徑名
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
//擷取事務目前的狀态
status = tm.getTransaction(txAttr);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}
// 組裝一個TransactionInfo對象傳回
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, String joinpointIdentification,
@Nullable TransactionStatus status) {
TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
if (txAttr != null) {
// We need a transaction for this method...
if (logger.isTraceEnabled()) {
logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
// 如果已經存在不相容的tx,事務管理器将标記錯誤。
txInfo.newTransactionStatus(status);
}
else {
// 非事務性的方法不不需要建立事務.
if (logger.isTraceEnabled()) {
logger.trace("No need to create transaction for [" + joinpointIdentification +
"]: This method is not transactional.");
}
}
// 将TransactionInfo放入ThreadLocal,即使最後沒有建立事務,也能確定TransactionInfo棧不出問題
txInfo.bindToThread();
return txInfo;
}
private void bindToThread() {
// Expose current TransactionStatus, preserving any existing TransactionStatus
// for restoration after this transaction is complete.
this.oldTransactionInfo = transactionInfoHolder.get();
transactionInfoHolder.set(this);
}
//省略...
}
- PlatformTransactionManager
-
Spring提供的事務管理器接口,提供了事務的兩個接口:commit(TransactionStatus)和rollback(TransactionStatus)
具體實作包括以下這些:
Spring事務和Atomikos源碼分析Spring事務和Atomikos源碼分析 - 源碼分析
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable { // 省略... /** * 處理事務隔離級别以及開始事務 * * @see #doGetTransaction * @see #isExistingTransaction * @see #doBegin */ @Override public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException { Object transaction = doGetTransaction(); // ... if (definition == null) { // Use defaults if no transaction definition given. definition = new DefaultTransactionDefinition(); } // 看是否已經存在事務,存在就看狀态事務為 STATUS_NO_TRANSACTION, // 1. 事務存在且狀态不是STATUS_NO_TRANSACTION就進入方法。 // 2. 反之則繼續往下走建立事務。 // 此處會根據目前線程從 Map<Thread, Stack<CompositeTransaction>> threadtotxmap_ (CompositeTransactionManagerImp.class) // 取棧頂的組合事務CompositeTransaction,線程級别 // 再根據CompositeTransaction的tid (事務id)從jtaTransactionToCoreTransactionMap (TransactionManagerImp.class)擷取事務 if (isExistingTransaction(transaction)) { // 其實就是當方法中調用了别的服務的方法時,看被調用的方法是否設定了事務, // 沒有就直接繼承原有方法的事務,如果存在再看事務設定的傳播屬性。 // 如果設定的是:PROPAGATION_REQUIRES_NEW ,那麼就會在調用的方法中建立新的事務, // 其他的根據方法設定的事務傳播屬性具體分析 return handleExistingTransaction(definition, transaction, debugEnabled); } // 逾時抛異常 if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout()); } // PROPAGATION_MANDATORY意思是目前方法被單獨調用時,會抛出異常。 // 必須在别的存在活動的事務的方法中被調用 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); } // 判斷事務的傳播屬性,滿足就走下列方法。@Transactional預設是Propagation.REQUIRED else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { SuspendedResourcesHolder suspendedResources = suspend(null); // ... try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); // 開始事務,具體代碼由JtaTransactionManager實作(我搭建項目用的是Atomikos,其他元件另說) doBegin(transaction, definition); // 事務同步管理器 // 我們在事務執行前後可能需要做一些額外的操作這個時候我們就需要用到TransactionSynchronizationManager // 去注入一個TransactionSynchronization事務同步器,然後重寫TransactionSynchronization或者 // 其子類的beforeCommit()或者afterCommit()方法,寫入我們需要執行的業務代碼。 prepareSynchronization(status, definition); return status; } catch (RuntimeException | Error ex) { resume(null, suspendedResources); throw ex; } } else { // Create "empty" transaction: no actual transaction, but potentially synchronization. if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) { logger.warn("Custom isolation level specified but no actual transaction initiated; " + "isolation level will effectively be ignored: " + definition); } boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null); } } // 省略... }
-
Atomikos
接下來的部分就是提供商的具體實作,我這裡采用的時Atomikos元件實作的分布式事務
-
JtaTransactionManager
繼承了
,重寫了AbstractPlatformTransactionManager
方法doBegin(Object transaction, TransactionDefinition definition)
public class JtaTransactionManager extends AbstractPlatformTransactionManager
implements TransactionFactory, InitializingBean, Serializable {
/**
* 根據definition開始一個新事務.不需要關心事務隔離級别,
* 已經在AbstractPlatformTransactionManager中做了處理.
* <p>當事務管理器決定開始一個新事務時調用該方法.要麼之前沒有事務,要麼事務已被暫停.
* @param transaction transaction object returned by {@code doGetTransaction}
* @param definition a TransactionDefinition instance, describing propagation
* behavior, isolation level, read-only flag, timeout, and transaction name
* @throws TransactionException in case of creation or system errors
* @throws org.springframework.transaction.NestedTransactionNotSupportedException
* if the underlying transaction does not support nesting
*/
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
JtaTransactionObject txObject = (JtaTransactionObject) transaction;
try {
// 開始jta事務
doJtaBegin(txObject, definition);
}
catch (NotSupportedException | UnsupportedOperationException ex) {
throw new NestedTransactionNotSupportedException(
"JTA implementation does not support nested transactions", ex);
}
catch (SystemException ex) {
throw new CannotCreateTransactionException("JTA failure on begin", ex);
}
}
protected void doJtaBegin(JtaTransactionObject txObject, TransactionDefinition definition)
throws NotSupportedException, SystemException {
// 校驗事務隔離界别
applyIsolationLevel(txObject, definition.getIsolationLevel());
int timeout = determineTimeout(definition);
// 校驗并設定事務的逾時時間
applyTimeout(txObject, timeout);
// 開始事務
txObject.getUserTransaction().begin();
}
}
-
TransactionManagerImp
實作了JTA提供的TransactionManager和UserTransaction接口,既可充當事務管理器,也可自行調用實作事務控制
- 示例代碼
@Override public int saveUser(UserVO userVO) { // UserTransaction tx = new UserTransactionImp(); UserTransaction tx = new UserTransactionManager(); try { tx.setTransactionTimeout(60); tx.begin(); UserLog userLog = buildUserLog(userVO); userLogService.save(userLog); DynamicDataSourceContextHolder.setDataSourceRouterKey("local127_becychen"); userLogService.save(userLog); DynamicDataSourceContextHolder.setDataSourceRouterKey("local_itoyoung"); //會抛出空指針異常 StringUtil.md5(permissions.toString()); userLogService.save(userLog); tx.commit(); } catch (Exception e) { try { tx.rollback(); } catch (SystemException ex) { ex.printStackTrace(); } throw new BaseRuntimeException("異常; " + e.getMessage()); } return userVO.getUid(); }
- 源碼分析,下面因為分析需要,稍微打亂方法順序
public class TransactionManagerImp implements TransactionManager, SubTxAwareParticipant, Referenceable, UserTransaction { // 事務集合,key是tid,value是事務 private Map<String, TransactionImp> jtaTransactionToCoreTransactionMap; // 組合事務管理器:管理組合事務的建立、綁定以及擷取等 private CompositeTransactionManager compositeTransactionManager; // 省略... /** * 建立一個新事務并且關聯本地線程。如果目前線程已經存在事務,則建立一個本地子事務 * */ public void begin () throws NotSupportedException, SystemException { begin ( getTransactionTimeout() ); } /** * 根據逾時時間timeout開始事務 */ public void begin ( int timeout ) throws NotSupportedException, SystemException { CompositeTransaction ct = null; ResumePreviousTransactionSubTxAwareParticipant resumeParticipant = null; // 根據目前線程擷取組合事務并判斷是否是JTA事務 ct = compositeTransactionManager.getCompositeTransaction(); if ( ct != null && ct.getProperty ( JTA_PROPERTY_NAME ) == null ) { // 暫時暫停不相容的事務:即目前事務不是JTA事務(com.atomikos.icatch.jta.transaction), // 在JTA事務結束後恢複 LOGGER.logWarning ( "JTA: temporarily suspending incompatible transaction: " + ct.getTid() + " (will be resumed after JTA transaction ends)" ); ct = compositeTransactionManager.suspend(); resumeParticipant = new ResumePreviousTransactionSubTxAwareParticipant ( ct ); } try { // 建立一個組合事務,如果目前線程存在事務則建立子事務,然後推入到目前線程綁定的組合事務棧中 // 之後判斷是否存在事務和狀态:isExistingTransaction(transaction) 就是從這裡擷取的 ct = compositeTransactionManager.createCompositeTransaction ( ( ( long ) timeout ) * 1000 ); if ( resumeParticipant != null ) ct.addSubTxAwareParticipant ( resumeParticipant ); if ( ct.isRoot () && getDefaultSerial () ) ct.setSerial (); // 設定目前組合事務為JTA事務 ct.setProperty ( JTA_PROPERTY_NAME , "true" ); } catch ( SysException se ) { String msg = "Error in begin()"; LOGGER.logError( msg , se ); throw new ExtendedSystemException ( msg , se ); } recreateCompositeTransactionAsJtaTransaction(ct); } // 根據組合事務建立一個TransactionImp事務 private TransactionImp recreateCompositeTransactionAsJtaTransaction( CompositeTransaction ct) { TransactionImp ret = null; if (ct.getState ().equals ( TxState.ACTIVE )) { // setRollbackOnly may have been called! ret = new TransactionImp ( ct, enableAutomatRegistrationOfUnknownXAResources ); addToMap ( ct.getTid (), ret ); ct.addSubTxAwareParticipant ( this ); } return ret; } // 将目前組合事務的tid和目前新建立的事務放入jtaTransactionToCoreTransactionMap中, // 之後可以通過目前線程擷取組合事務棧 // 通過棧頂組合事務的tid來擷取目前事務,即下面的方法 getTransaction () private void addToMap ( String tid , TransactionImp tx ) { synchronized ( jtaTransactionToCoreTransactionMap ) { jtaTransactionToCoreTransactionMap.put ( tid , tx ); } } // 擷取目前事務的狀态,之前AbstractPlatformTransactionManager中 // 判斷是否存在事務就是調用這裡和下面的方法 public int getStatus() throws SystemException { int ret = Status.STATUS_NO_TRANSACTION; Transaction tx = getTransaction(); if ( tx == null ) { ret = Status.STATUS_NO_TRANSACTION; } else { ret = tx.getStatus (); } return ret; } public Transaction getTransaction () throws SystemException { TransactionImp ret = null; // 擷取目前線程的組合事務 CompositeTransaction ct = getCompositeTransaction(); // 根據組合事務的tid擷取目前事務 if ( ct != null) ret = getJtaTransactionWithId ( ct.getTid () ); return ret; } private CompositeTransaction getCompositeTransaction() throws ExtendedSystemException { CompositeTransaction ct = null; try { ct = compositeTransactionManager.getCompositeTransaction (); } catch ( SysException se ) { String msg = "Error while retrieving the transaction for the calling thread"; LOGGER.logError( msg , se); throw new ExtendedSystemException ( msg , se ); } // establishJtaTransactionContextIfNecessary(ct); return ct; } // 判斷事務是否建立成功,若建立失敗,則根據組合事務再次建立 private void establishJtaTransactionContextIfNecessary( CompositeTransaction ct) { if ( isJtaTransaction(ct) ) { TransactionImp jtaTransaction = getJtaTransactionWithId ( ct.getTid () ); if ( jtaTransaction == null ) { recreateCompositeTransactionAsJtaTransaction(ct); } } } // 省略... }
-
CompositeTransactionManagerImpl
組合事務管理器,之前擷取和綁定組合事務的代碼都在這個類中實作。這裡為了分析會稍微把方法順序打亂
public class CompositeTransactionManagerImp implements CompositeTransactionManager,
SubTxAwareParticipant
{
// 這兩個map就是存儲和擷取目前線程組合事務的關鍵
private Map<Thread, Stack<CompositeTransaction>> threadtotxmap_ = null;
private Map<CompositeTransaction, Thread> txtothreadmap_ = null;
public CompositeTransactionManagerImp ()
{
threadtotxmap_ = new HashMap<Thread, Stack<CompositeTransaction>> ();
txtothreadmap_ = new HashMap<CompositeTransaction, Thread> ();
}
// 省略...
// 建立一個組合事務,如果目前存在就建立一個子事務,并和目前線程綁定以及存入 threadtotxmap_和 txtothreadmap_
public CompositeTransaction createCompositeTransaction ( long timeout ) throws SysException
{
CompositeTransaction ct = null , ret = null;
ct = getCurrentTx ();
if ( ct == null ) {
ret = getTransactionService().createCompositeTransaction ( timeout );
if(LOGGER.isDebugEnabled()){
LOGGER.logDebug("createCompositeTransaction ( " + timeout + " ): "
+ "created new ROOT transaction with id " + ret.getTid ());
}
} else {
if(LOGGER.isDebugEnabled()) LOGGER.logDebug("createCompositeTransaction ( " + timeout + " )");
ret = ct.createSubTransaction ();
}
Thread thread = Thread.currentThread ();
setThreadMappings ( ret, thread );
return ret;
}
// 這個方法和下面的方法就是擷取目前組合事務
private CompositeTransaction getCurrentTx ()
{
Thread thread = Thread.currentThread ();
synchronized ( threadtotxmap_ ) {
Stack<CompositeTransaction> txs = threadtotxmap_.get ( thread );
if ( txs == null )
return null;
else
return txs.peek ();
}
}
public CompositeTransaction getCompositeTransaction () throws SysException
{
CompositeTransaction ct = null;
ct = getCurrentTx ();
if ( ct != null ) {
if(LOGGER.isTraceEnabled()){
LOGGER.logTrace("getCompositeTransaction() returning instance with id "
+ ct.getTid ());
}
} else{
if(LOGGER.isTraceEnabled()){
LOGGER.logTrace("getCompositeTransaction() returning NULL!");
}
}
return ct;
}
// 将組合事務和目前線程綁定
private void setThreadMappings ( CompositeTransaction ct , Thread thread )
throws IllegalStateException, SysException
{
//case 21806: callbacks to ct to be made outside synchronized block
ct.addSubTxAwareParticipant ( this ); //step 1
synchronized ( threadtotxmap_ ) {
// 從step 1 到這兒,中間可能發生復原/逾時;
// 這裡的判斷會確定我們不會添加一個永遠不會被移除的線程映射
if ( TxState.ACTIVE.equals ( ct.getState() )) {
Stack<CompositeTransaction> txs = threadtotxmap_.get ( thread );
if ( txs == null )
txs = new Stack<CompositeTransaction>();
txs.push ( ct );
threadtotxmap_.put ( thread, txs );
txtothreadmap_.put ( ct, thread );
}
}
}
// 省略...
}