天天看點

SPRING技術内幕-筆記(十一)spring事物處理的設計與實作

11.1事物的建立

TransactionInterceptor的invoke的回調過程中會使用createTransactionIfNecessary,這個方法在其基類TransactionAspectSupport中實作,期間會使用AbstractPlatformTransactionManager調用getTransaction(txAttr),這個過程要對不同情況進行處理,得到TransactionStatus,然後塞進TransactionInfo中,最後将TransactionInfo與目前線程綁定,TransactionInfo穿插在整個事物處理過程中。

createTransactionIfNecessary的調用時序圖:

SPRING技術内幕-筆記(十一)spring事物處理的設計與實作
protected TransactionInfo createTransactionIfNecessary(
            PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {

        // If no name specified, apply method identification as transaction name.
        //如果沒有配置事物屬性,就用方法名作為事物的名字
        if (txAttr != null && txAttr.getName() == null) {
            txAttr = new DelegatingTransactionAttribute(txAttr) {
                @Override
                public String getName() {
                    return joinpointIdentification;
                }
            };
        }
        //TransactionStatus 封裝了事物執行的狀态資訊
        TransactionStatus status = null;
        if (txAttr != null) {
            if (tm != null) {
            //使用定義好的事物屬性建立事物,傳回事物的狀态和 事物本身,由事物處理器去完成,事物處理器重寫getTransaction
             //方法.
                status = tm.getTransaction(txAttr);
            }
            else {
                if (logger.isDebugEnabled()) {
                    logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
                            "] because no transaction manager has been configured");
                }
            }
        }
        //準備TransactionInfo,TransactionInfo封裝了事物的配置資訊以及TransactionStatus 
        return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
    }
           

prepareTransactionInfo代碼清單:

protected TransactionInfo prepareTransactionInfo(PlatformTransactionManager tm,
            TransactionAttribute txAttr, String joinpointIdentification, TransactionStatus status) {
        //建立新的TransactionInfo 
        TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
        if (txAttr != null) {
            // We need a transaction for this method
            if (logger.isTraceEnabled()) {
                logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
            }
            // The transaction manager will flag an error if an incompatible tx already exists
            //為TransactionInfo 設定TransactionStatus,這個TransactionStatus比較重要,内含了事物建立過程的資訊,
            //比如Transaction是由TransactionStatus持有的
            txInfo.newTransactionStatus(status);
        }
        else {
            // The TransactionInfo.hasTransaction() method will return
            // false. We created it only to preserve the integrity of
            // the ThreadLocal stack maintained in this class.
            if (logger.isTraceEnabled())
                logger.trace("Don't need to create transaction for [" + joinpointIdentification +
                        "]: This method isn't transactional.");
        }

        // We always bind the TransactionInfo to the thread, even if we didn't create
        // a new transaction here. This guarantees that the TransactionInfo stack
        //将目前的TransactionInfo 與目前線程綁定,TransactionInfo 内部用一個含量持有以前的TransactionInfo ,這樣
        //就會有一系列的TransactionInfo ,雖然不會建立新的事物,但是總會履歷新的TransactionInfo 。
        txInfo.bindToThread();
        return txInfo;
    }
           

接下來就是AbstractPlatformTransactionManager得getTransaction,此方法提供了事物處理流程的一個模闆。

public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
    //doGetTransaction由事物處理器完成,比如datasourcetransactionmanager
        Object transaction = doGetTransaction();

        // Cache debug flag to avoid repeated checks.
        //緩存标志,防止重複檢查
        boolean debugEnabled = logger.isDebugEnabled();

        if (definition == null) {
            // Use defaults if no transaction definition given.
            //使用預設事物處理屬性,預設的配置如下:
            //  private int propagationBehavior = PROPAGATION_REQUIRED;
            //  private int isolationLevel = ISOLATION_DEFAULT;
            //  private int timeout = TIMEOUT_DEFAULT;
            //  private boolean readOnly = false;
            definition = new DefaultTransactionDefinition();
        }
        //判斷目前事物是否已經存在,如果已經存在事物,那麼需要根據事物屬性中定義的事物傳播性配置來處理事物的
        //産生.
        if (isExistingTransaction(transaction)) {
            // Existing transaction found -> check propagation behavior to find out how to behave.
            //如果目前線程存,那麼在檢查事物處理行為,将已經存在的事物進行處理,結果封裝在
            //TransactionState中。
            return handleExistingTransaction(definition, transaction, debugEnabled);
        }

        // Check definition settings for new transaction.
        //檢查事物中timeout的設定是否合理
        if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
            throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
        }

        // No existing transaction found -> check propagation behavior to find out how to proceed.

        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
            throw new IllegalTransactionStateException(
                    "No existing transaction found for transaction marked with propagation 'mandatory'");
        }
    //目前沒有存在的事物,那麼根據配置的事物屬性的事物傳播行為處理,比如PROPAGATION_REQUIRED,
        //PROPAGATION_REQUIRES_NEW,PROPAGATION_NESTED
        else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
            definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
            SuspendedResourcesHolder suspendedResources = suspend(null);
            if (debugEnabled) {
                logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
            }
            try {
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            //要傳回的TransactionStatus,封裝事物處理情況,getTransactionSynchronization()預設為
            //SYNCHRONIZATION_ALWAYS,是以newSynchronization為true
                DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                //建立事物的調用,由具體的事物處理器來完成。
                doBegin(transaction, definition);
                prepareSynchronization(status, definition);
                return status;
            }
            catch (RuntimeException ex) {
                resume(null, suspendedResources);
                throw ex;
            }
            catch (Error err) {
                resume(null, suspendedResources);
                throw err;
            }
        }
        else {
            // Create "empty" transaction: no actual transaction, but potentially synchronization.
            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
            //設定事物為空,建立新的事物狀态對象,沒有transaction。
            return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
        }
    }
           

AbstractPlatformTransactionManager建立事物的過程可以看到TransactionStatus的建立過程。

protected DefaultTransactionStatus newTransactionStatus(
            TransactionDefinition definition, Object transaction, boolean newTransaction,
            boolean newSynchronization, boolean debug, Object suspendedResources) {
        //判斷是不是新的事物,如果是新的事物,需要放到目前線程中去,
        //AbstractPlatformTransactionManager維護了一系列的threadLocal變量來保持事物的屬性,比如并
        //事物并發事物隔離級别,是否有活躍的事物。
        boolean actualNewSynchronization = newSynchronization &&
                !TransactionSynchronizationManager.isSynchronizationActive();
                //使用DefaultTransactionStatus傳回
        return new DefaultTransactionStatus(
                transaction, newTransaction, actualNewSynchronization,
                definition.isReadOnly(), debug, suspendedResources);
    }
           

所謂建立,首先是把建立工作較給具體的事物處理器來完成,比如dataSourceTransactionManager,把建立的事物對象哎TransactionStatus中儲存,然後把其他事物的屬性和線程ThreadLocal綁定。

對于線程中已經存在事物,會涉及事物傳播屬性的具體處理。處理邏輯在AbstractPlatformTransactionManager的handleExistingTransaction方法中:

private TransactionStatus handleExistingTransaction(
            TransactionDefinition definition, Object transaction, boolean debugEnabled)
            throws TransactionException {
//如果目前線程已有事物存在且目前事物的傳播屬性是PROPAGATION_NEVER,那麼抛出異常,這種無法處理
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
            throw new IllegalTransactionStateException(
                    "Existing transaction found for transaction marked with propagation 'never'");
        }
         //如果目前線程存在事物并且目前事物傳播屬性是PROPAGATION_NOT_SUPPORTED,那麼将事物挂起
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
            if (debugEnabled) {
                logger.debug("Suspending current transaction");
            }
            Object suspendedResources = suspend(transaction);
            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
            //此時Transaction為null意味着事物方法不需要在事物環境中執行,同僚挂起的事物記錄到TransactionStatus當中
            //包含ThreadLocal對事物資訊的記錄
            return prepareTransactionStatus(
                    definition, null, false, newSynchronization, debugEnabled, suspendedResources);
        }
//如果目前事物的傳播屬性是PROPAGATION_REQUIRES_NEW,那麼将目前線程中已有的事物挂起,與建立事物不同的是,此處需要将已有
//事物挂起,而建立不需要這步操作
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
            if (debugEnabled) {
                logger.debug("Suspending current transaction, creating new transaction with name [" +
                        definition.getName() + "]");
            }
            SuspendedResourcesHolder suspendedResources = suspend(transaction);
            try {
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                doBegin(transaction, definition);
                prepareSynchronization(status, definition);
                return status;
            }
            catch (RuntimeException beginEx) {
                resumeAfterBeginException(transaction, suspendedResources, beginEx);
                throw beginEx;
            }
            catch (Error beginErr) {
                resumeAfterBeginException(transaction, suspendedResources, beginErr);
                throw beginErr;
            }
        }
    //嵌套事物的建立
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
            if (!isNestedTransactionAllowed()) {
                throw new NestedTransactionNotSupportedException(
                        "Transaction manager does not allow nested transactions by default - " +
                        "specify 'nestedTransactionAllowed' property with value 'true'");
            }
            if (debugEnabled) {
                logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
            }
            if (useSavepointForNestedTransaction()) {
                // Create savepoint within existing Spring-managed transaction,
                // through the SavepointManager API implemented by TransactionStatus.
                // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
                //在spring事物進行中,建立事物儲存點
                DefaultTransactionStatus status =
                        prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
                status.createAndHoldSavepoint();
                return status;
            }
            else {
                // Nested transaction through nested begin and commit/rollback calls.
                // Usually only for JTA: Spring synchronization might get activated here
                // in case of a pre-existing JTA transaction.
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, null);
                doBegin(transaction, definition);
                prepareSynchronization(status, definition);
                return status;
            }
        }

        // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
        if (debugEnabled) {
            logger.debug("Participating in existing transaction");
        }
        //判斷目前事物方法中的事物配置與已有事物的屬性配置是否一緻,如果不一緻則不執行事物的方法并且抛出異常
        if (isValidateExistingTransaction()) {
            if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
                Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
                if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                    Constants isoConstants = DefaultTransactionDefinition.constants;
                    throw new IllegalTransactionStateException("Participating transaction with definition [" +
                            definition + "] specifies isolation level which is incompatible with existing transaction: " +
                            (currentIsolationLevel != null ?
                                    isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
                                    "(unknown)"));
                }
            }
            if (!definition.isReadOnly()) {
                if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                    throw new IllegalTransactionStateException("Participating transaction with definition [" +
                            definition + "] is not marked as read-only but existing transaction is");
                }
            }
        }
        //傳回TransactionStatus,參數newTransaction為false,表示不是使用新的事物
        boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
        return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
    }
           

11.2事物的挂起

事物的挂起牽扯到目前線程與事物處理資訊的儲存:

protected final SuspendedResourcesHolder suspend(Object transaction) throws TransactionException {
        if (TransactionSynchronizationManager.isSynchronizationActive()) {
            List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
            try {
                Object suspendedResources = null;
                if (transaction != null) {
                //把挂起的事物交給具體的事物處理器去完成,如果事物處理器不支援事物挂起,就會抛出異常
                    suspendedResources = doSuspend(transaction);
                }
                //設定事物處理狀态資訊,放到線程中去,并且重置線程中的相關的threadlocal變量
                String name = TransactionSynchronizationManager.getCurrentTransactionName();
                TransactionSynchronizationManager.setCurrentTransactionName(null);
                boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
                TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
                Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
                TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
                boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
                TransactionSynchronizationManager.setActualTransactionActive(false);
                return new SuspendedResourcesHolder(
                        suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
            }
            catch (RuntimeException ex) {
                // doSuspend failed - original transaction is still active...
                doResumeSynchronization(suspendedSynchronizations);
                throw ex;
            }
            catch (Error err) {
                // doSuspend failed - original transaction is still active...
                //doSuspend方法失敗,但是原先的事物依然存在
                doResumeSynchronization(suspendedSynchronizations);
                throw err;
            }
        }
        else if (transaction != null) {
            // Transaction active but no synchronization active.
            Object suspendedResources = doSuspend(transaction);
            return new SuspendedResourcesHolder(suspendedResources);
        }
        else {
            // Neither transaction nor synchronization active.
            return null;
        }
    }
           

11.3 事務的送出

commitTransactionAfterReturning方法通過直接調用事物處理器來完成事物的送出

protected void commitTransactionAfterReturning(TransactionInfo txInfo) {
        if (txInfo != null && txInfo.hasTransaction()) {
            if (logger.isTraceEnabled()) {
                logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
            }
            //txInfo是TransactionInfo ,調用事物處理器的commit方法
            txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
        }
    }
           

AbstractPlatformTransactionManager的commit方法:

public final void commit(TransactionStatus status) throws TransactionException {
    //status.isCompleted()表示事務已經結束
        if (status.isCompleted()) {
            throw new IllegalTransactionStateException(
                    "Transaction is already completed - do not call commit or rollback more than once per transaction");
        }
//事務處理過程中出現異常,則進行復原
        DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
        if (defStatus.isLocalRollbackOnly()) {
            if (defStatus.isDebug()) {
                logger.debug("Transactional code has requested rollback");
            }
            processRollback(defStatus);
            return;
        }
        if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
            if (defStatus.isDebug()) {
                logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
            }
        //處理復原
            processRollback(defStatus);
            // Throw UnexpectedRollbackException only at outermost transaction boundary
            // or if explicitly asked to.
            if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
                throw new UnexpectedRollbackException(
                        "Transaction rolled back because it has been marked as rollback-only");
            }
            return;
        }
//最後調事務
        processCommit(defStatus);
    }
           

處理事務的送出:

private void processCommit(DefaultTransactionStatus status) throws TransactionException {
        try {
            boolean beforeCompletionInvoked = false;
            try {
            // 事務的送出準備操作由具體的事物處理器完成
                prepareForCommit(status);
                triggerBeforeCommit(status);
                triggerBeforeCompletion(status);
                beforeCompletionInvoked = true;
                boolean globalRollbackOnly = false;
                if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
                    globalRollbackOnly = status.isGlobalRollbackOnly();
                }
                //嵌套事物的處理
                if (status.hasSavepoint()) {
                    if (status.isDebug()) {
                        logger.debug("Releasing transaction savepoint");
                    }
                    status.releaseHeldSavepoint();
                }
                 //如果目前持有的事務是建立立的事物那麼由具體的事務處理器完成送出,否則由已存在的事務完成送出
                else if (status.isNewTransaction()) {
                    if (status.isDebug()) {
                        logger.debug("Initiating transaction commit");
                    }
                     //由具體的事務處理器完成送出
                    doCommit(status);
                }
                // Throw UnexpectedRollbackException if we have a global rollback-only
                // marker but still didn't get a corresponding exception from commit.
                if (globalRollbackOnly) {
                    throw new UnexpectedRollbackException(
                            "Transaction silently rolled back because it has been marked as rollback-only");
                }
            }
            catch (UnexpectedRollbackException ex) {
                // can only be caused by doCommit
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
                throw ex;
            }
            catch (TransactionException ex) {
                // can only be caused by doCommit
                if (isRollbackOnCommitFailure()) {
                    doRollbackOnCommitException(status, ex);
                }
                else {
                    triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
                }
                throw ex;
            }
            catch (RuntimeException ex) {
                if (!beforeCompletionInvoked) {
                    triggerBeforeCompletion(status);
                }
                doRollbackOnCommitException(status, ex);
                throw ex;
            }
            catch (Error err) {
                if (!beforeCompletionInvoked) {
                    triggerBeforeCompletion(status);
                }
                doRollbackOnCommitException(status, err);
                throw err;
            }

            // Trigger afterCommit callbacks, with an exception thrown there
            // propagated to callers but the transaction still considered as committed.
            try {
            //觸發triggerAfterCommit復原
                triggerAfterCommit(status);
            }
            finally {
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
            }

        }
        finally {
            cleanupAfterCompletion(status);
        }
    }
           

11.4事務的復原

復原和送出比較類似:

private void processRollback(DefaultTransactionStatus status) {
        try {
            try {
                triggerBeforeCompletion(status);
                //嵌套事務的復原
                if (status.hasSavepoint()) {
                    if (status.isDebug()) {
                        logger.debug("Rolling back transaction to savepoint");
                    }
                    status.rollbackToHeldSavepoint();
                }
                 //建立事務的復原
                else if (status.isNewTransaction()) {
                    if (status.isDebug()) {
                        logger.debug("Initiating transaction rollback");
                    }
                    doRollback(status);
                }
                //如果目前事務調用方法中,沒有建立事務的復原方法
                else if (status.hasTransaction()) {
                    if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
                        if (status.isDebug()) {
                            logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
                        }
                        doSetRollbackOnly(status);
                    }//由線程的前一個事物來處理,目前不做任何操作
                    else {
                        if (status.isDebug()) {
                            logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
                        }
                    }
                }
                else {
                    logger.debug("Should roll back transaction but cannot - no transaction available");
                }
            }
            catch (RuntimeException ex) {
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
                throw ex;
            }
            catch (Error err) {
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
                throw err;
            }
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
        }
        finally {
            cleanupAfterCompletion(status);
        }
    }
           

到此事務的處理建立,挂起,送出、復原完成,下一步說一下具體的事物處理器對事物的處理過程。