PlatformTransactionManager
PlatformTransactionManager接口提供commit送出事務功能
public interface PlatformTransactionManager extends TransactionManager {
TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException;
void commit(TransactionStatus status) throws TransactionException;
void rollback(TransactionStatus status) throws TransactionException;
}
AbstractPlatformTransactionManager
AbstractPlatformTransactionManager繼承PlatformTransactionManager接口實作commit功能;
commit中調用processCommit方法;
processCommit方法提供事務執行前置(triggerBeforeCommit)、後置(triggerAfterCommit)、結束(triggerAfterCompletion)等操作,在這些操作中調用TransactionSynchronizationUtils的前置(triggerBeforeCommit)、後置(triggerAfterCommit)、結束(triggerAfterCompletion)等操作;
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
@Override
public final void commit(TransactionStatus status) throws TransactionException {
// 省略部分代碼
processCommit(defStatus);
}
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
boolean unexpectedRollback = false;
prepareForCommit(status);
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
// 省略部分代碼
}
catch (UnexpectedRollbackException ex) {
// can only be caused by doCommit
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
}
catch (TransactionException ex) {
// can only be caused by doCommit
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
}
else {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
}
catch (RuntimeException | Error ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, ex);
throw ex;
}
try {
triggerAfterCommit(status);
}
finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
}
finally {
cleanupAfterCompletion(status);
}
}
protected final void triggerBeforeCommit(DefaultTransactionStatus status) {
if (status.isNewSynchronization()) {
if (status.isDebug()) {
logger.trace("Triggering beforeCommit synchronization");
}
TransactionSynchronizationUtils.triggerBeforeCommit(status.isReadOnly());
}
}
protected final void triggerBeforeCompletion(DefaultTransactionStatus status) {
if (status.isNewSynchronization()) {
if (status.isDebug()) {
logger.trace("Triggering beforeCompletion synchronization");
}
TransactionSynchronizationUtils.triggerBeforeCompletion();
}
}
private void triggerAfterCommit(DefaultTransactionStatus status) {
if (status.isNewSynchronization()) {
if (status.isDebug()) {
logger.trace("Triggering afterCommit synchronization");
}
TransactionSynchronizationUtils.triggerAfterCommit();
}
}
private void triggerAfterCompletion(DefaultTransactionStatus status, int completionStatus) {
if (status.isNewSynchronization()) {
List<TransactionSynchronization> synchronizations = TransactionSynchronizationManager.getSynchronizations();
TransactionSynchronizationManager.clearSynchronization();
if (!status.hasTransaction() || status.isNewTransaction()) {
if (status.isDebug()) {
logger.trace("Triggering afterCompletion synchronization");
}
invokeAfterCompletion(synchronizations, completionStatus);
}
else if (!synchronizations.isEmpty()) {
registerAfterCompletionWithExistingTransaction(status.getTransaction(), synchronizations);
}
}
}
}
TransactionSynchronizationUtils
TransactionSynchronizationUtils提供triggerBeforeCommit、triggerAfterCommit、triggerAfterCompletion等方法,在這些方法中周遊事務同步管理器TransactionSynchronizationManager.getSynchronizations()去執行前置或者後置操作
public abstract class TransactionSynchronizationUtils {
public static void triggerFlush() {
for (TransactionSynchronization synchronization : TransactionSynchronizationManager.getSynchronizations()) {
synchronization.flush();
}
}
public static void triggerBeforeCommit(boolean readOnly) {
for (TransactionSynchronization synchronization : TransactionSynchronizationManager.getSynchronizations()) {
synchronization.beforeCommit(readOnly);
}
}
public static void triggerBeforeCompletion() {
for (TransactionSynchronization synchronization : TransactionSynchronizationManager.getSynchronizations()) {
try {
synchronization.beforeCompletion();
}
catch (Throwable ex) {
logger.debug("TransactionSynchronization.beforeCompletion threw exception", ex);
}
}
}
public static void triggerAfterCommit() {
invokeAfterCommit(TransactionSynchronizationManager.getSynchronizations());
}
public static void invokeAfterCommit(@Nullable List<TransactionSynchronization> synchronizations) {
if (synchronizations != null) {
for (TransactionSynchronization synchronization : synchronizations) {
synchronization.afterCommit();
}
}
}
public static void triggerAfterCompletion(int completionStatus) {
List<TransactionSynchronization> synchronizations = TransactionSynchronizationManager.getSynchronizations();
invokeAfterCompletion(synchronizations, completionStatus);
}
public static void invokeAfterCompletion(@Nullable List<TransactionSynchronization> synchronizations,
int completionStatus) {
if (synchronizations != null) {
for (TransactionSynchronization synchronization : synchronizations) {
try {
synchronization.afterCompletion(completionStatus);
}
catch (Throwable ex) {
logger.debug("TransactionSynchronization.afterCompletion threw exception", ex);
}
}
}
}
}
TransactionSynchronizationManager
TransactionSynchronizationManager提供注冊同步操作(registerSynchronization)和查詢同步操作(getSynchronizations)方法;
public abstract class TransactionSynchronizationManager {
private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
new NamedThreadLocal<>("Transaction synchronizations");
public static void registerSynchronization(TransactionSynchronization synchronization)
throws IllegalStateException {
Assert.notNull(synchronization, "TransactionSynchronization must not be null");
Set<TransactionSynchronization> synchs = synchronizations.get();
if (synchs == null) {
throw new IllegalStateException("Transaction synchronization is not active");
}
synchs.add(synchronization);
}
public static List<TransactionSynchronization> getSynchronizations() throws IllegalStateException {
Set<TransactionSynchronization> synchs = synchronizations.get();
if (synchs == null) {
throw new IllegalStateException("Transaction synchronization is not active");
}
if (synchs.isEmpty()) {
return Collections.emptyList();
}
else {
List<TransactionSynchronization> sortedSynchs = new ArrayList<>(synchs);
OrderComparator.sort(sortedSynchs);
return Collections.unmodifiableList(sortedSynchs);
}
}
}
注冊事務同步操作例子
TransactionSynchronizationManager.registerSynchronization 方法注冊事務同步操作
程式設計式事務
@Service
public class CarOrderReplaceSubmitServices implements CarOrderSubmitServices {
private final static Logger LOGGER = LoggerFactory.getLogger(CarOrderReplaceSubmitServices.class);
@Autowired
private TransactionTemplate transactionTemplate;
@Autowired
private CarOrderDao carOrderDao;
@Override
public Integer submit(CarOrderSubmitReqDto reqDto) {
CarOrderEntity carOrderEntity = new CarOrderEntity();
carOrderEntity.setOrderNo(reqDto.getOrderNo());
carOrderEntity.setCarNo(reqDto.getCarNo());
/**
* 程式設計式事務帶傳回值
* 異常會自動復原
*/
Integer id = transactionTemplate.execute(transactionStatus -> {
// 注冊執行事務送出後置操作
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void beforeCommit(boolean readOnly) {
LOGGER.info("事務送出前......");
}
@Override
public void afterCommit() {
LOGGER.info("事務送出後......");
}
/**
* 不管事務成功還是失敗,當事務完成後就會執行
* @param status
*/
@Override
public void afterCompletion(int status) {
LOGGER.info("事務執行完成......");
}
});
// 執行資料庫事務操作
carOrderDao.add(carOrderEntity);
Integer tempId = carOrderEntity.getId();
// 執行資料庫更新操作
this.updateStock(carOrderEntity, 1);
return tempId;
});
return id;
}
@Override
public void updateStock(CarOrderEntity carOrderEntity, Integer stockCount) {
carOrderDao.updateStockByCarNo(carOrderEntity.getCarNo(), stockCount);
}
}
聲明式事務
@Service
public class CarOrderSelfSubmitServices implements CarOrderSubmitServices {
private final static Logger LOGGER = LoggerFactory.getLogger(CarOrderSelfSubmitServices.class);
@Autowired
private CarOrderDao carOrderDao;
@Override
@Transactional(propagation = Propagation.REQUIRED,rollbackFor = {Exception.class,RuntimeException.class})
public Integer submit(CarOrderSubmitReqDto reqDto) {
CarOrderEntity carOrderEntity = new CarOrderEntity();
carOrderEntity.setOrderNo(reqDto.getOrderNo());
carOrderEntity.setCarNo(reqDto.getCarNo());
// 執行事務送出後置操作
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void beforeCommit(boolean readOnly) {
LOGGER.info("事務送出前......");
}
@Override
public void afterCommit() {
LOGGER.info("事務送出後......");
}
/**
* 不管事務成功還是失敗,當事務完成後就會執行
* @param status
*/
@Override
public void afterCompletion(int status) {
LOGGER.info("事務執行完成......");
}
});
carOrderDao.add(carOrderEntity);
Integer id = carOrderEntity.getId();
this.updateStock(carOrderEntity, 1);
return id;
}
@Override
public void updateStock(CarOrderEntity carOrderEntity, Integer stockCount) {
carOrderDao.updateStockById(carOrderEntity.getId(),stockCount);
}
}
注冊事務同步操作對程式設計式事務和聲明式事務同樣有效