天天看点

Seata AT模式TransactionHook竟然会被莫名删除!

前言

兄弟们,刚刚又给seata社区修了一个​

​BUG​

​,有用户提了issue反应TransactionHook在某些情况下不会被调用:

Seata AT模式TransactionHook竟然会被莫名删除!

该用户在issue中已经指出了相关问题所在:

Seata AT模式TransactionHook竟然会被莫名删除!

下面我们来看一下到底是什么原因导致了上述​

​BUG​

​的产生。

问题定位

根据用户的反馈,我们找到目标源码​

​io.seata.tm.api.TransactionalTemplate#execute()​

​:

try {
    // 开启分布式事务,获取XID         
    beginTransaction(txInfo, tx);

    Object rs;
    try {
        // 执行业务代码
        rs = business.execute();
    } catch (Throwable ex) {
        // 3\. 处理异常,准备回滚.
        completeTransactionAfterThrowing(txInfo, tx, ex);
        throw ex;
    }
    // 4\. 提交事务.
    commitTransaction(tx, txInfo);

    return rs;
} finally {
    //5\. 回收现场
    resumeGlobalLockConfig(previousConfig);
    triggerAfterCompletion();
    cleanUp();
}
复制代码      

问题代码就出在​

​cleanUp()​

​中,我们来看一下里面做了什么操作,最终我们定位到:

public final class TransactionHookManager {

  private static final ThreadLocal<List<TransactionHook>> LOCAL_HOOKS = new ThreadLocal<>();

  // 注册TransactionHook
  public static void registerHook(TransactionHook transactionHook) {
      if (transactionHook == null) {
            throw new NullPointerException("transactionHook must not be null");
        }
        List<TransactionHook> transactionHooks = LOCAL_HOOKS.get();
        if (transactionHooks == null) {
            LOCAL_HOOKS.set(new ArrayList<>());
        }
        LOCAL_HOOKS.get().add(transactionHook);
    }

  // 移除当前线程上所有TransactionHook
  public static void clear() {
      LOCAL_HOOKS.remove();
  }
}
复制代码      

由上面的源码可知,​

​cleanUp()​

​操作时把当前线程中的所有​

​TransactionHook​

​都清除掉了。也就是说,假如事务A和事务B共用同一个线程,当事务B处理完毕后,调用了​

​cleanUp()​

​回收现场时,把该线程当中存储的所有​

​TransactionHook​

​全部清除掉了,导致事务A的生命周期中找不到该事务对应的​

​TransactionHook​

​,从而产生了​

​BUG​

​。

如何解决

通过与seata社区的大佬不断地沟通,最终敲定以下方案:

1.改造​

​TransactionHookManager.LOCAL_HOOKS​

​,把数据类型改成​

​ThreadLocal<Map<String, List<TransactionHook>>>​

​,​

​Map​

​中的​

​key​

​对应分布式事务​

​XID​

​;

2.针对当前上下文中没有XID,那么​

​key​

​就为​

​null​

​,因为​

​HashMap​

​允许​

​key​

​为​

​null​

​;

3.当用户查询指定​

​XID​

​下的​

​hook​

​时,连同​

​key​

​为​

​null​

​对应的​

​hook​

​也一起返回;
  • 第一步比较好理解,因为事务A和事务B对应的

    TransactionHook

    没有被区分出来,所以造成了清理事务B的

    TransactionHook

    时连同事务A的

    TransactionHook

    一起被清除,那么我们修改数据结构来区分事务A和事务B的

    TransactionHook

    ,以便清理的时候不会造成误删;
  • 第二步为什么要针对没有XID的时候也要能设置

    TransactionHook

    ,因为有这么一段代码:
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
        try {
            // 执行triggerBeforeBegin()
            triggerBeforeBegin();
            // 注册分布式事务,生成XID
            tx.begin(txInfo.getTimeOut(), txInfo.getName());
            // 执行triggerAfterBegin()
            triggerAfterBegin();
        } catch (TransactionException txe) {
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                    TransactionalExecutor.Code.BeginFailure);
        }
    }
复制代码      

上面的代码会产生一个问题,因为我们的​

​TransactionHook​

​依赖于​

​XID​

​,但是​

​triggerBeforeBegin()​

​执行的时候还没有产生​

​XID​

​,所以为了能够在没有​

​XID​

​的时候也能够让​

​TransactionHook​

​生效,我们要有一个​

​虚值key​

​来临时设置​

​TransactionHook​

​;

  • 第三步的设计时为了在第二步的基础上,当事务开启后获取​

    ​XID​

    ​后,要保证​

    ​XID​

    ​获取前注册的​

    ​TransactionHook​

    ​也要生效,我们在通过​

    ​XID​

    ​查询​

    ​TransactionHook​

    ​时要把​

    ​虚值key​

    ​对应的​

    ​TransactionHook​

    ​也一起返回;

注意事项

try {
            // 调用triggerBeforeCommit()
            triggerBeforeCommit();
            // 提交事务,清除XID
            tx.commit();

            if (Arrays.asList(GlobalStatus.TimeoutRollbacking, GlobalStatus.TimeoutRollbacked).contains(tx.getLocalStatus())) {
                throw new TransactionalExecutor.ExecutionException(tx,
                        new TimeoutException(String.format("Global transaction[%s] is timeout and will be rollback[TC].", tx.getXid())),
                        TransactionalExecutor.Code.TimeoutRollback);
            }
            // 调用triggerAfterCommit()
            triggerAfterCommit();
        } catch (TransactionException txe) {
            // 4.1 Failed to commit
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                    TransactionalExecutor.Code.CommitFailure);
        }
复制代码      
try {
            // 调用triggerBeforeCommit()
            triggerBeforeCommit();
            // 提交事务,清除XID
            tx.commit();

            if (Arrays.asList(GlobalStatus.TimeoutRollbacking, GlobalStatus.TimeoutRollbacked).contains(tx.getLocalStatus())) {
                throw new TransactionalExecutor.ExecutionException(tx,
                        new TimeoutException(String.format("Global transaction[%s] is timeout and will be rollback[TC].", tx.getXid())),
                        TransactionalExecutor.Code.TimeoutRollback);
            }
            // 调用triggerAfterCommit()
            triggerAfterCommit(tx.getXid());
        } catch (TransactionException txe) {
            // 4.1 Failed to commit
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                    TransactionalExecutor.Code.CommitFailure);
        }
复制代码      

改造后的TransactionHookManager

public final class TransactionHookManager {

    private TransactionHookManager() {

    }

    private static final ThreadLocal<Map<String, List<TransactionHook>>> LOCAL_HOOKS = new ThreadLocal<>();

    /**
     * get the current hooks
     *
     * @return TransactionHook list
     */
    public static List<TransactionHook> getHooks() {
        String xid = RootContext.getXID();
        return getHooks(xid);
    }

    /**
     * get hooks by xid
     * 
     * @param xid
     * @return TransactionHook list
     */
    public static List<TransactionHook> getHooks(String xid) {
        Map<String, List<TransactionHook>> hooksMap = LOCAL_HOOKS.get();
        if (hooksMap == null || hooksMap.isEmpty()) {
            return Collections.emptyList();
        }
        List<TransactionHook> hooks = new ArrayList<>();
        List<TransactionHook> localHooks = hooksMap.get(xid);
        if (StringUtils.isNotBlank(xid)) {
            List<TransactionHook> virtualHooks = hooksMap.get(null);
            if (virtualHooks != null && !virtualHooks.isEmpty()) {
                hooks.addAll(virtualHooks);
            }
        }
        if (localHooks != null && !localHooks.isEmpty()) {
            hooks.addAll(localHooks);
        }
        if (hooks.isEmpty()) {
            return Collections.emptyList();
        }
        return Collections.unmodifiableList(hooks);
    }

    /**
     * add new hook
     *
     * @param transactionHook transactionHook
     */
    public static void registerHook(TransactionHook transactionHook) {
        if (transactionHook == null) {
            throw new NullPointerException("transactionHook must not be null");
        }
        Map<String, List<TransactionHook>> hooksMap = LOCAL_HOOKS.get();
        if (hooksMap == null) {
            hooksMap = new HashMap<>();
            LOCAL_HOOKS.set(hooksMap);
        }
        String xid = RootContext.getXID();
        List<TransactionHook> hooks = hooksMap.get(xid);
        if (hooks == null) {
            hooks = new ArrayList<>();
            hooksMap.put(xid, hooks);
        }
        hooks.add(transactionHook);
    }

    /**
     * clear hooks by xid
     * 
     * @param xid
     */
    public static void clear(String xid) {
        Map<String, List<TransactionHook>> hooksMap = LOCAL_HOOKS.get();
        if (hooksMap == null || hooksMap.isEmpty()) {
            return;
        }
        hooksMap.remove(xid);
        if (StringUtils.isNotBlank(xid)) {
            hooksMap.remove(null);
        }
    }
}      

继续阅读