天天看點

Seata AT模式TransactionHook竟然會被莫名删除!

前言

兄弟們,剛剛又給seata社群修了一個​

​BUG​

​,有使用者提了issue反應TransactionHook在某些情況下不會被調用:

Seata AT模式TransactionHook竟然會被莫名删除!

該使用者在issue中已經指出了相關問題所在:

Seata AT模式TransactionHook竟然會被莫名删除!

下面我們來看一下到底是什麼原因導緻了上述​

​BUG​

​的産生。

問題定位

根據使用者的回報,我們找到目标源碼​

​io.seata.tm.api.TransactionalTemplate#execute()​

​:

try {
    // 開啟分布式事務,擷取XID         
    beginTransaction(txInfo, tx);

    Object rs;
    try {
        // 執行業務代碼
        rs = business.execute();
    } catch (Throwable ex) {
        // 3\. 處理異常,準備復原.
        completeTransactionAfterThrowing(txInfo, tx, ex);
        throw ex;
    }
    // 4\. 送出事務.
    commitTransaction(tx, txInfo);

    return rs;
} finally {
    //5\. 回收現場
    resumeGlobalLockConfig(previousConfig);
    triggerAfterCompletion();
    cleanUp();
}
複制代碼      

問題代碼就出在​

​cleanUp()​

​中,我們來看一下裡面做了什麼操作,最終我們定位到:

public final class TransactionHookManager {

  private static final ThreadLocal<List<TransactionHook>> LOCAL_HOOKS = new ThreadLocal<>();

  // 注冊TransactionHook
  public static void registerHook(TransactionHook transactionHook) {
      if (transactionHook == null) {
            throw new NullPointerException("transactionHook must not be null");
        }
        List<TransactionHook> transactionHooks = LOCAL_HOOKS.get();
        if (transactionHooks == null) {
            LOCAL_HOOKS.set(new ArrayList<>());
        }
        LOCAL_HOOKS.get().add(transactionHook);
    }

  // 移除目前線程上所有TransactionHook
  public static void clear() {
      LOCAL_HOOKS.remove();
  }
}
複制代碼      

由上面的源碼可知,​

​cleanUp()​

​操作時把目前線程中的所有​

​TransactionHook​

​都清除掉了。也就是說,假如事務A和事務B共用同一個線程,當事務B處理完畢後,調用了​

​cleanUp()​

​回收現場時,把該線程當中存儲的所有​

​TransactionHook​

​全部清除掉了,導緻事務A的生命周期中找不到該事務對應的​

​TransactionHook​

​,進而産生了​

​BUG​

​。

如何解決

通過與seata社群的大佬不斷地溝通,最終敲定以下方案:

1.改造​

​TransactionHookManager.LOCAL_HOOKS​

​,把資料類型改成​

​ThreadLocal<Map<String, List<TransactionHook>>>​

​,​

​Map​

​中的​

​key​

​對應分布式事務​

​XID​

​;

2.針對目前上下文中沒有XID,那麼​

​key​

​就為​

​null​

​,因為​

​HashMap​

​允許​

​key​

​為​

​null​

​;

3.當使用者查詢指定​

​XID​

​下的​

​hook​

​時,連同​

​key​

​為​

​null​

​對應的​

​hook​

​也一起傳回;
  • 第一步比較好了解,因為事務A和事務B對應的

    TransactionHook

    沒有被區分出來,是以造成了清理事務B的

    TransactionHook

    時連同僚務A的

    TransactionHook

    一起被清除,那麼我們修改資料結構來區分事務A和事務B的

    TransactionHook

    ,以便清理的時候不會造成誤删;
  • 第二步為什麼要針對沒有XID的時候也要能設定

    TransactionHook

    ,因為有這麼一段代碼:
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
        try {
            // 執行triggerBeforeBegin()
            triggerBeforeBegin();
            // 注冊分布式事務,生成XID
            tx.begin(txInfo.getTimeOut(), txInfo.getName());
            // 執行triggerAfterBegin()
            triggerAfterBegin();
        } catch (TransactionException txe) {
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                    TransactionalExecutor.Code.BeginFailure);
        }
    }
複制代碼      

上面的代碼會産生一個問題,因為我們的​

​TransactionHook​

​依賴于​

​XID​

​,但是​

​triggerBeforeBegin()​

​執行的時候還沒有産生​

​XID​

​,是以為了能夠在沒有​

​XID​

​的時候也能夠讓​

​TransactionHook​

​生效,我們要有一個​

​虛值key​

​來臨時設定​

​TransactionHook​

​;

  • 第三步的設計時為了在第二步的基礎上,當事務開啟後擷取​

    ​XID​

    ​後,要保證​

    ​XID​

    ​擷取前注冊的​

    ​TransactionHook​

    ​也要生效,我們在通過​

    ​XID​

    ​查詢​

    ​TransactionHook​

    ​時要把​

    ​虛值key​

    ​對應的​

    ​TransactionHook​

    ​也一起傳回;

注意事項

try {
            // 調用triggerBeforeCommit()
            triggerBeforeCommit();
            // 送出事務,清除XID
            tx.commit();

            if (Arrays.asList(GlobalStatus.TimeoutRollbacking, GlobalStatus.TimeoutRollbacked).contains(tx.getLocalStatus())) {
                throw new TransactionalExecutor.ExecutionException(tx,
                        new TimeoutException(String.format("Global transaction[%s] is timeout and will be rollback[TC].", tx.getXid())),
                        TransactionalExecutor.Code.TimeoutRollback);
            }
            // 調用triggerAfterCommit()
            triggerAfterCommit();
        } catch (TransactionException txe) {
            // 4.1 Failed to commit
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                    TransactionalExecutor.Code.CommitFailure);
        }
複制代碼      
try {
            // 調用triggerBeforeCommit()
            triggerBeforeCommit();
            // 送出事務,清除XID
            tx.commit();

            if (Arrays.asList(GlobalStatus.TimeoutRollbacking, GlobalStatus.TimeoutRollbacked).contains(tx.getLocalStatus())) {
                throw new TransactionalExecutor.ExecutionException(tx,
                        new TimeoutException(String.format("Global transaction[%s] is timeout and will be rollback[TC].", tx.getXid())),
                        TransactionalExecutor.Code.TimeoutRollback);
            }
            // 調用triggerAfterCommit()
            triggerAfterCommit(tx.getXid());
        } catch (TransactionException txe) {
            // 4.1 Failed to commit
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                    TransactionalExecutor.Code.CommitFailure);
        }
複制代碼      

改造後的TransactionHookManager

public final class TransactionHookManager {

    private TransactionHookManager() {

    }

    private static final ThreadLocal<Map<String, List<TransactionHook>>> LOCAL_HOOKS = new ThreadLocal<>();

    /**
     * get the current hooks
     *
     * @return TransactionHook list
     */
    public static List<TransactionHook> getHooks() {
        String xid = RootContext.getXID();
        return getHooks(xid);
    }

    /**
     * get hooks by xid
     * 
     * @param xid
     * @return TransactionHook list
     */
    public static List<TransactionHook> getHooks(String xid) {
        Map<String, List<TransactionHook>> hooksMap = LOCAL_HOOKS.get();
        if (hooksMap == null || hooksMap.isEmpty()) {
            return Collections.emptyList();
        }
        List<TransactionHook> hooks = new ArrayList<>();
        List<TransactionHook> localHooks = hooksMap.get(xid);
        if (StringUtils.isNotBlank(xid)) {
            List<TransactionHook> virtualHooks = hooksMap.get(null);
            if (virtualHooks != null && !virtualHooks.isEmpty()) {
                hooks.addAll(virtualHooks);
            }
        }
        if (localHooks != null && !localHooks.isEmpty()) {
            hooks.addAll(localHooks);
        }
        if (hooks.isEmpty()) {
            return Collections.emptyList();
        }
        return Collections.unmodifiableList(hooks);
    }

    /**
     * add new hook
     *
     * @param transactionHook transactionHook
     */
    public static void registerHook(TransactionHook transactionHook) {
        if (transactionHook == null) {
            throw new NullPointerException("transactionHook must not be null");
        }
        Map<String, List<TransactionHook>> hooksMap = LOCAL_HOOKS.get();
        if (hooksMap == null) {
            hooksMap = new HashMap<>();
            LOCAL_HOOKS.set(hooksMap);
        }
        String xid = RootContext.getXID();
        List<TransactionHook> hooks = hooksMap.get(xid);
        if (hooks == null) {
            hooks = new ArrayList<>();
            hooksMap.put(xid, hooks);
        }
        hooks.add(transactionHook);
    }

    /**
     * clear hooks by xid
     * 
     * @param xid
     */
    public static void clear(String xid) {
        Map<String, List<TransactionHook>> hooksMap = LOCAL_HOOKS.get();
        if (hooksMap == null || hooksMap.isEmpty()) {
            return;
        }
        hooksMap.remove(xid);
        if (StringUtils.isNotBlank(xid)) {
            hooksMap.remove(null);
        }
    }
}      

繼續閱讀