天天看點

springboot多資料源的事務統一一、二、

開發中,發現資料始終隻復原一部分,一部分資料始終不復原,檢視了很久才發現該service方法的方法裡面存在兩個不同的資料源,想在對原項目不做大的改動下,實作事務的統一復原。在網上查找資料,找到了比較簡單的變通方法。

注:分布式事務在java的解決方案就是JTA(即Java Transaction API);springboot官方提供了 Atomikos or Bitronix的解決思路。

對于單源資料庫,隻要在需要進行事務控制的方法上添加@Transactional注解就可以,但是對于多源資料庫,@Transactionnal是無法管理多個資料源的。

當使用多資料源時,單一的事務會出現問題(當在service層同時操作兩個資料源時,當發生異常,隻會復原離抛出異常最近的資料源的資料)。

一、

如果想真正實作多源資料庫事務控制,肯定是需要分布式鎖的

多源資料庫事務控制的一種

變通方式

,為方法添加多個事務管理器

@Transactional注解支援指定事務管理器,假如可以為一個方法添加多個注解,是不是就可以了完成對兩個資料源的事務管理。但是,Spring是不支援為一個方法添加兩個@Transactional注解的,是以最直接的想法是可不可以通過代碼實作為一個方法添加兩個事務管理器,最終找到一種解決方案,通過自定義注解方式,實作為createUser()方法添加兩個@Transactional注解的效果,并開啟兩個事務管理器。核心代碼如下:

1.1 添加自定義注解MultiTransactional

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MultiTransactional {
 
    String[] value() default {};
}
           

1.2 添加主從資料庫事務管理器名稱常量

public class DbTxConstants {
 
    public static final String DB1_TX = "masterTransactionManager";
 
    public static final String DB2_TX = "slaveTransactionManager";
}
           

其實這一步也可以省略,隻是代碼中要多次使用主從事務管理器名,是以這裡定義成常量。

1.3 添加自定義攔截器

@Aspect
@Component
public class MultiTransactionAop {
 
    private final ComboTransaction comboTransaction;
 
    @Autowired
    public MultiTransactionAop(ComboTransaction comboTransaction) {
        this.comboTransaction = comboTransaction;
    }
 
    @Pointcut("@annotation(com.zhuoli.service.springboot.mybatis.transaction.repository.aop.MultiTransactional)")
    public void pointCut() {
    }
 
    @Around("pointCut() && @annotation(multiTransactional)")
    public Object inMultiTransactions(ProceedingJoinPoint pjp, MultiTransactional multiTransactional) {
        return comboTransaction.inCombinedTx(() -> {
            try {
                return pjp.proceed();
            } catch (Throwable throwable) {
                if (throwable instanceof RuntimeException) {
                    throw (RuntimeException) throwable;
                }
                throw new RuntimeException(throwable);
            }
        }, multiTransactional.value());
    }
}
           

功能為收集被攔截方法MultiTransactional注解,并将Callable對象 () -> createUser()作為參數傳給comboTransaction.inCombinedTx方法。

1.4 ComboTransaction類

@Component
public class ComboTransaction {
 
    @Autowired
    private Db1TxBroker db1TxBroker;
 
    @Autowired
    private Db2TxBroker db2TxBroker;
 
    public <V> V inCombinedTx(Callable<V> callable, String[] transactions) {
        if (callable == null) {
            return null;
        }
 
        Callable<V> combined = Stream.of(transactions)
                .filter(ele -> !StringUtils.isEmpty(ele))
                .distinct()
                .reduce(callable, (r, tx) -> {
                    switch (tx) {
                        case DbTxConstants.DB1_TX:
                            return () -> db1TxBroker.inTransaction(r);
                        case DbTxConstants.DB2_TX:
                            return () -> db2TxBroker.inTransaction(r);
                        default:
                            return null;
                    }
                }, (r1, r2) -> r2);
 
        try {
            return combined.call();
        } catch (RuntimeException e) {
            throw e;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}
           

簡單講一下方法inCombinedTx的作用,其實就是将自定義注解@MultiTransactional的參數通過Java8 Stream的Reduce操作,轉化為Callable對象,将最終的Callable對象調用call方法執行,得到最終結果。

1.5 Db1TxBroker & Db2TxBroker

@Component
public class Db1TxBroker {
 
    @Transactional(DbTxConstants.DB1_TX)
    public <V> V inTransaction(Callable<V> callable) {
        try {
            return callable.call();
        } catch (RuntimeException e) {
            throw e;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}
 
@Component
public class Db2TxBroker {
 
    @Transactional(DbTxConstants.DB2_TX)
    public <V> V inTransaction(Callable<V> callable) {
        try {
            return callable.call();
        } catch (RuntimeException e) {
            throw e;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}
           

1.6 自定義注解使用

@Override
@MultiTransactional(value = {DbTxConstants.DB1_TX, DbTxConstants.DB2_TX})
public void createUser(String userName, String description) {
    MasterUser masterUser = new MasterUser();
 
    /*主資料庫插入*/
    masterUser.setUserName(userName);
    masterUser.setDescription(description);
    masterUser.setIsDeleted(DataStatusEnum.EXIST.getCode());
    masterUserMapper.insertSelective(masterUser);
 
    /*從資料庫插入*/
    SlaveUser slaveUser = new SlaveUser();
    slaveUser.setUserName(userName);
    slaveUser.setDescription(description);
    slaveUser.setIsDeleted(DataStatusEnum.EXIST.getCode());
    slaveUserMapper.insertSelective(slaveUser);
 
    if (true){
        throw new RuntimeException("Exception");
    }
           

使用自定義注解這種方式,假如在從庫插入後,還有其他業務邏輯,并且報了異常,這時候主從資料庫都是可以復原的。當然這種事務控制方式也存在不完美的地方,比如當送出時資料庫崩潰這種情況,依然是無法解決的,但是這種情況可能性是相對比較小的,是以在不使用分布式鎖的情況下,這種事務多中繼資料庫事務管理方式是一種有效的方案。

二、

2.1、自定義事務注解

@Target({ElementType.METHOD,ElementType.PARAMETER})
@Retention(RetentionPolicy.RUNTIME)
public @interface CustomTransaction {
    String[] name() default {"firstDataSourceTransactionManager"};
}
           

2.2、建立aop切面進行事務控制

@Component
@Aspect
public class TransactionAop {
    @Pointcut(value = "@annotation(com.example.demo.annon.CustomTransaction)")
   public void pointCut(){}


    @Around(value = "pointCut()&&@annotation(annotation)")
    public Object twiceAsOld(ProceedingJoinPoint point, CustomTransaction annotation) throws Throwable {
        Stack<DataSourceTransactionManager> dataSourceTransactionManagerStack = new Stack<DataSourceTransactionManager>();
        Stack<TransactionStatus> transactionStatuStack = new Stack<TransactionStatus>();
        try {
            if (!openTransaction(dataSourceTransactionManagerStack, transactionStatuStack, annotation)) {
                return null;
            }
            Object ret = point.proceed();
            commit(dataSourceTransactionManagerStack, transactionStatuStack);
            return ret;
        } catch (Throwable e) {
            rollback(dataSourceTransactionManagerStack, transactionStatuStack);
            throw e;
        }
    }
    /**
     * 開啟事務處理方法
     *
     * @param dataSourceTransactionManagerStack
     * @param transactionStatuStack
     * @param multiTransactional
     * @return
     */
    private boolean openTransaction(Stack<DataSourceTransactionManager> dataSourceTransactionManagerStack,
                                    Stack<TransactionStatus> transactionStatuStack, CustomTransaction multiTransactional) {

        String[] transactionMangerNames = multiTransactional.name();
        if (ArrayUtils.isEmpty(multiTransactional.name())) {
            return false;
        }

        for (String beanName : transactionMangerNames) {
            //根據事務名稱擷取具體的事務
            DataSourceTransactionManager dataSourceTransactionManager = (DataSourceTransactionManager) SpringContextUtil
                    .getBean(beanName);
            TransactionStatus transactionStatus = dataSourceTransactionManager
                    .getTransaction(new DefaultTransactionDefinition());
            transactionStatuStack.push(transactionStatus);
            dataSourceTransactionManagerStack.push(dataSourceTransactionManager);
        }
        return true;
    }

    /**
     * 送出處理方法
     *
     * @param dataSourceTransactionManagerStack
     * @param transactionStatuStack
     */
    private void commit(Stack<DataSourceTransactionManager> dataSourceTransactionManagerStack,
                        Stack<TransactionStatus> transactionStatuStack) {
        while (!dataSourceTransactionManagerStack.isEmpty()) {
            dataSourceTransactionManagerStack.pop().commit(transactionStatuStack.pop());
        }
    }
    /**
     * 復原處理方法
     *
     * @param dataSourceTransactionManagerStack
     * @param transactionStatuStack
     */
    private void rollback(Stack<DataSourceTransactionManager> dataSourceTransactionManagerStack,
                          Stack<TransactionStatus> transactionStatuStack) {
        while (!dataSourceTransactionManagerStack.isEmpty()) {
            dataSourceTransactionManagerStack.pop().rollback(transactionStatuStack.pop());
        }
    }
}
           

2.3、輔助類:SpringContextUtil

@Component
public class SpringContextUtil implements ApplicationContextAware{
    private static ApplicationContext applicationContext;
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        SpringContextUtil.applicationContext = applicationContext;
    }

    /**
     * @Description: 擷取spring容器中的bean,通過bean名稱擷取
     * @param beanName bean名稱
     * @return: Object 傳回Object,需要做強制類型轉換
     * @author: zongf
     * @time: 2018-12-26 10:45:07
     */
    public static Object getBean(String beanName){
        return applicationContext.getBean(beanName);
    }

    /**
     * @Description: 擷取spring容器中的bean, 通過bean類型擷取
     * @param beanClass bean 類型
     * @return: T 傳回指定類型的bean執行個體
     * @author: zongf
     * @time: 2018-12-26 10:46:31
     */
    public static <T> T getBean(Class<T> beanClass) {
        return applicationContext.getBean(beanClass);
    }

    /**
     * @Description: 擷取spring容器中的bean, 通過bean名稱和bean類型精确擷取
     * @param beanName bean 名稱
     * @param beanClass bean 類型
     * @return: T 傳回指定類型的bean執行個體
     * @author: zongf
     * @time: 2018-12-26 10:47:45
     */
    public static <T> T getBean(String beanName, Class<T> beanClass){
        return applicationContext.getBean(beanName,beanClass);
    }
           

2.4、在service層指定使用哪個事務

//注意事務的命名規則 
	@CustomTransaction(name = {"firstDataSourceTransactionManager","secondDataSourceTransactionManager"})
    public void setSepUserMapper(){


        //操作資料源2
        Map<String,Object>mm = new HashMap<>(2);

        mm.put("dataitemId",1L);
        mm.put("name","測試");
        diDataItemMapper.insert(mm);

        //操作資料源1
        Map<String,Object>map = new HashMap<>(3);
        map.put("userId",1L);
        map.put("userName","張三");
        map.put("name","平台管理者");
        sepUserMapper.insert(map);

        throw new RuntimeException("sfsa");
    }
           

資料:

https://www.cnblogs.com/cq-yangzhou/p/10945779.html

https://blog.csdn.net/weixin_41835612/article/details/83713862