天天看點

Spring事務執行前後程式設計實作自定義同步操作

PlatformTransactionManager

PlatformTransactionManager接口提供commit送出事務功能
public interface PlatformTransactionManager extends TransactionManager {
	
	TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
			throws TransactionException;


	void commit(TransactionStatus status) throws TransactionException;

	
	void rollback(TransactionStatus status) throws TransactionException;

}
           

AbstractPlatformTransactionManager

AbstractPlatformTransactionManager繼承PlatformTransactionManager接口實作commit功能;

commit中調用processCommit方法;

processCommit方法提供事務執行前置(triggerBeforeCommit)、後置(triggerAfterCommit)、結束(triggerAfterCompletion)等操作,在這些操作中調用TransactionSynchronizationUtils的前置(triggerBeforeCommit)、後置(triggerAfterCommit)、結束(triggerAfterCompletion)等操作;

public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
 
	@Override
	public final void commit(TransactionStatus status) throws TransactionException {
		// 省略部分代碼

		processCommit(defStatus);
	}

	
	private void processCommit(DefaultTransactionStatus status) throws TransactionException {
		try {
			boolean beforeCompletionInvoked = false;

			try {
				boolean unexpectedRollback = false;
				prepareForCommit(status);
				triggerBeforeCommit(status);
				triggerBeforeCompletion(status);
				beforeCompletionInvoked = true;

				// 省略部分代碼
			}
			catch (UnexpectedRollbackException ex) {
				// can only be caused by doCommit
				triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
				throw ex;
			}
			catch (TransactionException ex) {
				// can only be caused by doCommit
				if (isRollbackOnCommitFailure()) {
					doRollbackOnCommitException(status, ex);
				}
				else {
					triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
				}
				throw ex;
			}
			catch (RuntimeException | Error ex) {
				if (!beforeCompletionInvoked) {
					triggerBeforeCompletion(status);
				}
				doRollbackOnCommitException(status, ex);
				throw ex;
			}

			
			try {
				triggerAfterCommit(status);
			}
			finally {
				triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
			}

		}
		finally {
			cleanupAfterCompletion(status);
		}
	}

    
	protected final void triggerBeforeCommit(DefaultTransactionStatus status) {
		if (status.isNewSynchronization()) {
			if (status.isDebug()) {
				logger.trace("Triggering beforeCommit synchronization");
			}
			TransactionSynchronizationUtils.triggerBeforeCommit(status.isReadOnly());
		}
	}

	
	protected final void triggerBeforeCompletion(DefaultTransactionStatus status) {
		if (status.isNewSynchronization()) {
			if (status.isDebug()) {
				logger.trace("Triggering beforeCompletion synchronization");
			}
			TransactionSynchronizationUtils.triggerBeforeCompletion();
		}
	}

	
	private void triggerAfterCommit(DefaultTransactionStatus status) {
		if (status.isNewSynchronization()) {
			if (status.isDebug()) {
				logger.trace("Triggering afterCommit synchronization");
			}
			TransactionSynchronizationUtils.triggerAfterCommit();
		}
	}

	
	private void triggerAfterCompletion(DefaultTransactionStatus status, int completionStatus) {
		if (status.isNewSynchronization()) {
			List<TransactionSynchronization> synchronizations = TransactionSynchronizationManager.getSynchronizations();
			TransactionSynchronizationManager.clearSynchronization();
			if (!status.hasTransaction() || status.isNewTransaction()) {
				if (status.isDebug()) {
					logger.trace("Triggering afterCompletion synchronization");
				}			
				invokeAfterCompletion(synchronizations, completionStatus);
			}
			else if (!synchronizations.isEmpty()) {				
				registerAfterCompletionWithExistingTransaction(status.getTransaction(), synchronizations);
			}
		}
	}

}
           

TransactionSynchronizationUtils

TransactionSynchronizationUtils提供triggerBeforeCommit、triggerAfterCommit、triggerAfterCompletion等方法,在這些方法中周遊事務同步管理器TransactionSynchronizationManager.getSynchronizations()去執行前置或者後置操作
public abstract class TransactionSynchronizationUtils {
	
	public static void triggerFlush() {
		for (TransactionSynchronization synchronization : TransactionSynchronizationManager.getSynchronizations()) {
			synchronization.flush();
		}
	}

	
	public static void triggerBeforeCommit(boolean readOnly) {
		for (TransactionSynchronization synchronization : TransactionSynchronizationManager.getSynchronizations()) {
			synchronization.beforeCommit(readOnly);
		}
	}

	
	public static void triggerBeforeCompletion() {
		for (TransactionSynchronization synchronization : TransactionSynchronizationManager.getSynchronizations()) {
			try {
				synchronization.beforeCompletion();
			}
			catch (Throwable ex) {
				logger.debug("TransactionSynchronization.beforeCompletion threw exception", ex);
			}
		}
	}

	
	public static void triggerAfterCommit() {
		invokeAfterCommit(TransactionSynchronizationManager.getSynchronizations());
	}

	
	public static void invokeAfterCommit(@Nullable List<TransactionSynchronization> synchronizations) {
		if (synchronizations != null) {
			for (TransactionSynchronization synchronization : synchronizations) {
				synchronization.afterCommit();
			}
		}
	}

	
	public static void triggerAfterCompletion(int completionStatus) {
		List<TransactionSynchronization> synchronizations = TransactionSynchronizationManager.getSynchronizations();
		invokeAfterCompletion(synchronizations, completionStatus);
	}
	
	public static void invokeAfterCompletion(@Nullable List<TransactionSynchronization> synchronizations,
			int completionStatus) {

		if (synchronizations != null) {
			for (TransactionSynchronization synchronization : synchronizations) {
				try {
					synchronization.afterCompletion(completionStatus);
				}
				catch (Throwable ex) {
					logger.debug("TransactionSynchronization.afterCompletion threw exception", ex);
				}
			}
		}
	}	

}
           

TransactionSynchronizationManager

TransactionSynchronizationManager提供注冊同步操作(registerSynchronization)和查詢同步操作(getSynchronizations)方法;
public abstract class TransactionSynchronizationManager {

	private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
			new NamedThreadLocal<>("Transaction synchronizations");	

	
	public static void registerSynchronization(TransactionSynchronization synchronization)
			throws IllegalStateException {

		Assert.notNull(synchronization, "TransactionSynchronization must not be null");
		Set<TransactionSynchronization> synchs = synchronizations.get();
		if (synchs == null) {
			throw new IllegalStateException("Transaction synchronization is not active");
		}
		synchs.add(synchronization);
	}

	
	public static List<TransactionSynchronization> getSynchronizations() throws IllegalStateException {
		Set<TransactionSynchronization> synchs = synchronizations.get();
		if (synchs == null) {
			throw new IllegalStateException("Transaction synchronization is not active");
		}		
		if (synchs.isEmpty()) {
			return Collections.emptyList();
		}
		else {			
			List<TransactionSynchronization> sortedSynchs = new ArrayList<>(synchs);
			OrderComparator.sort(sortedSynchs);
			return Collections.unmodifiableList(sortedSynchs);
		}
	}	
}
           

注冊事務同步操作例子

TransactionSynchronizationManager.registerSynchronization 方法注冊事務同步操作

程式設計式事務

@Service
public class CarOrderReplaceSubmitServices implements CarOrderSubmitServices {
    private final static Logger LOGGER = LoggerFactory.getLogger(CarOrderReplaceSubmitServices.class);

    @Autowired
    private TransactionTemplate transactionTemplate;

    @Autowired
    private CarOrderDao carOrderDao;

    @Override
    public Integer submit(CarOrderSubmitReqDto reqDto) {
        CarOrderEntity carOrderEntity = new CarOrderEntity();
        carOrderEntity.setOrderNo(reqDto.getOrderNo());
        carOrderEntity.setCarNo(reqDto.getCarNo());      

        /**
         * 程式設計式事務帶傳回值
         * 異常會自動復原
         */
        Integer id = transactionTemplate.execute(transactionStatus -> {
            // 注冊執行事務送出後置操作
            TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
                @Override
                public void beforeCommit(boolean readOnly) {
                    LOGGER.info("事務送出前......");
                }

                @Override
                public void afterCommit() {
                    LOGGER.info("事務送出後......");
                }

                /**
                 * 不管事務成功還是失敗,當事務完成後就會執行
                 * @param status
                 */
                @Override
                public void afterCompletion(int status) {
                    LOGGER.info("事務執行完成......");
                }
            });
            // 執行資料庫事務操作
            carOrderDao.add(carOrderEntity);
            Integer tempId = carOrderEntity.getId();
            // 執行資料庫更新操作
            this.updateStock(carOrderEntity, 1);
            return tempId;
        });

        return id;
    }

    @Override
    public void updateStock(CarOrderEntity carOrderEntity, Integer stockCount) {        
        carOrderDao.updateStockByCarNo(carOrderEntity.getCarNo(), stockCount);
    }
   
}
           

聲明式事務

@Service
public class CarOrderSelfSubmitServices implements CarOrderSubmitServices {
    private final static Logger LOGGER = LoggerFactory.getLogger(CarOrderSelfSubmitServices.class);

    @Autowired
    private CarOrderDao carOrderDao;

    @Override
    @Transactional(propagation = Propagation.REQUIRED,rollbackFor = {Exception.class,RuntimeException.class})
    public Integer submit(CarOrderSubmitReqDto reqDto) {
        CarOrderEntity carOrderEntity = new CarOrderEntity();
        carOrderEntity.setOrderNo(reqDto.getOrderNo());
        carOrderEntity.setCarNo(reqDto.getCarNo());

        // 執行事務送出後置操作
        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
            @Override
            public void beforeCommit(boolean readOnly) {
                LOGGER.info("事務送出前......");
            }

            @Override
            public void afterCommit() {
                LOGGER.info("事務送出後......");
            }

            /**
             * 不管事務成功還是失敗,當事務完成後就會執行
             * @param status
             */
            @Override
            public void afterCompletion(int status) {
                LOGGER.info("事務執行完成......");
            }
        });

        carOrderDao.add(carOrderEntity);

        Integer id = carOrderEntity.getId();

        this.updateStock(carOrderEntity, 1);

        return id;
    }

    @Override
    public void updateStock(CarOrderEntity carOrderEntity, Integer stockCount) {
        carOrderDao.updateStockById(carOrderEntity.getId(),stockCount);
    }   
}
           
Spring事務執行前後程式設計實作自定義同步操作

 注冊事務同步操作對程式設計式事務和聲明式事務同樣有效

繼續閱讀