天天看點

「Seata源碼」——TCC模式

作者:儒雅程式員阿鑫

前言

本章分析Seata(1.5.0)在TCC模式下,TM、RM、TC角色的原理。

一、案例

1、business-service

business-service服務同時擔任TM和RM角色。

TM角色下@GlobalTransactional負責管理全局事務。

@DubboReference
private StorageTccAction storageTccAction;
@Autowired
private LocalTccAction localTccAction;

@GlobalTransactional(timeoutMills = 300000, name = "tcc-demo-commit")
@GetMapping(value = "/seata/tcc/commit", produces = "application/json")
public String tccCommit() {
  // local tcc
  localTccAction.prepare(null, COMMODITY_CODE, ORDER_COUNT);
  // rpc(storage-service) tcc
  storageTccAction.freeze(null, COMMODITY_CODE, ORDER_COUNT);
  return SUCCESS;
}
複制代碼           

RM角色下LocalTccAction被@LocalTCC+@TwoPhaseBusinessAction标注,作為一個TCC資源向TC注冊,管理分支事務注冊、送出和復原。(忽略LocalTccActionImpl實作)

@LocalTCC
public interface LocalTccAction {
    @TwoPhaseBusinessAction(name = "local-tcc-action", // 資源名稱
            commitMethod = "commit", // 二階段送出方法
            rollbackMethod = "rollback", // 二階段復原方法
            commitArgsClasses = {BusinessActionContext.class, String.class, Integer.class}, // 送出方法參數清單
            rollbackArgsClasses = {BusinessActionContext.class, String.class, Integer.class}, // 復原方法參數清單
            useTCCFence = true // 是否啟用TCCFence,由SeataTCC架構處理TCC三大問題(幂等、懸挂、空復原)
    )
    void prepare(BusinessActionContext actionContext,
                @BusinessActionContextParameter("commodityCode") String commodityCode,
                @BusinessActionContextParameter("count") Integer count);


    void commit(BusinessActionContext actionContext,
                @BusinessActionContextParameter("commodityCode") String commodityCode,
                @BusinessActionContextParameter("count") Integer count);

    void rollback(BusinessActionContext actionContext,
                  @BusinessActionContextParameter("commodityCode") String commodityCode,
                  @BusinessActionContextParameter("count") Integer count);
}
複制代碼           

StorageTccAction是Dubbo用戶端ReferenceBean,freeze方法被@TwoPhaseBusinessAction标注,會在發送RPC請求前注冊分支事務。

public interface StorageTccAction {
    @TwoPhaseBusinessAction(name = "storage-tcc-action", // 資源名稱
            commitMethod = "deduct", // 二階段送出方法
            rollbackMethod = "unFreeze", // 二階段復原方法
            commitArgsClasses = {BusinessActionContext.class, String.class, Integer.class}, // 送出方法參數清單
            rollbackArgsClasses = {BusinessActionContext.class, String.class, Integer.class} // 復原方法參數清單
    )
    void freeze(BusinessActionContext actionContext,
                @BusinessActionContextParameter("commodityCode") String commodityCode,
                @BusinessActionContextParameter("count") Integer count);


    void deduct(BusinessActionContext actionContext,
                @BusinessActionContextParameter("commodityCode") String commodityCode,
                @BusinessActionContextParameter("count") Integer count);

    void unFreeze(BusinessActionContext actionContext,
                  @BusinessActionContextParameter("commodityCode") String commodityCode,
                  @BusinessActionContextParameter("count") Integer count);

}
複制代碼           

2、storage-service

storage-service是純粹的一個RM,負責管理分支事務的送出和復原。

StorageTccAction與business-service用戶端一緻。

public interface StorageTccAction {
    @TwoPhaseBusinessAction(name = "storage-tcc-action", // 資源名稱
            commitMethod = "deduct", // 二階段送出方法
            rollbackMethod = "unFreeze", // 二階段復原方法
            commitArgsClasses = {BusinessActionContext.class, String.class, Integer.class}, // 送出方法參數清單
            rollbackArgsClasses = {BusinessActionContext.class, String.class, Integer.class} // 復原方法參數清單
    )
    void freeze(BusinessActionContext actionContext,
                @BusinessActionContextParameter("commodityCode") String commodityCode,
                @BusinessActionContextParameter("count") Integer count);


    void deduct(BusinessActionContext actionContext,
                @BusinessActionContextParameter("commodityCode") String commodityCode,
                @BusinessActionContextParameter("count") Integer count);

    void unFreeze(BusinessActionContext actionContext,
                  @BusinessActionContextParameter("commodityCode") String commodityCode,
                  @BusinessActionContextParameter("count") Integer count);

}
複制代碼           

StorageTccActionImpl作為Dubbo的ServiceBean暴露遠端服務。

在啟動階段被@TwoPhaseBusinessAction标注,識别為TCC資源,向TC注冊,負責分支事務送出和復原。

注意RPC的分支事務注冊是由用戶端完成的,即business-service調用storageTccAction.freeze時,由business-service注冊了分支事務。

@DubboService(protocol = "dubbo")
@Component
public class StorageTccActionImpl implements StorageTccAction {
// ...
}
複制代碼           

二、啟動

在應用啟動階段,TCC模式也是利用GlobalTransactionScanner建立代理(繼承AbstractAutoProxyCreator,在postProcessAfterInitialization階段建立代理),代理邏輯在TccActionInterceptor中。

// GlobalTransactionScanner extends AbstractAutoProxyCreator
// postProcessAfterInitialization階段
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
    // ...
    // 注冊RM、判斷是否需要代理
    if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) { 
         // tcc_fence_log清理任務
        TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);
        // 代理邏輯TccActionInterceptor
        interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName)); 
    }
    // ...
}
複制代碼           

TCC模式下有三種特殊的SpringBean。

  • LocalTCC注釋接口的Bean:如案例中的LocalTccAction;
  • RPC服務提供方ServiceBean:如Dubbo中被@DubboService注釋的服務實作類,如案例中的StorageTccActionImpl;
  • RPC服務消費方ReferenceBean:如Dubbo中被@DubboReference注入的Bean,如案例中的StorageTccAction;

針對于三種不同的Bean,在TCCBeanParserUtils.isTccAutoProxy做了很多邏輯,比如注冊RM、判斷是否需要被TccActionInterceptor攔截等。

LocalTCC類進入1;ServiceBean進入2傳回false;ReferenceBean由于是個FactoryBean,進入3。傳回true代表需要被TccActionInterceptor代理。

// TCCBeanParserUtils
public static boolean isTccAutoProxy(Object bean, String beanName, ApplicationContext applicationContext) {
    // dubbo:service 和 LocalTCC 注冊為 RM
    boolean isRemotingBean = parserRemotingServiceInfo(bean, beanName);
    RemotingDesc remotingDesc = DefaultRemotingParser.get().getRemotingBeanDesc(beanName);
    if (isRemotingBean) {
        if (remotingDesc != null && remotingDesc.getProtocol() == Protocols.IN_JVM) {
            // LocalTCC 需要被代理 TccActionInterceptor
            return isTccProxyTargetBean(remotingDesc); // 1
        } else {
            // dubbo:service(ServiceBean) 不需要被代理
            return false; // 2
        }
    } else {
        if (remotingDesc == null) {
            if (isRemotingFactoryBean(bean, beanName, applicationContext)) {
                // dubbo:reference(Dubbo ReferenceBean) 需要被代理 TccActionInterceptor
                remotingDesc = DefaultRemotingParser.get().getRemotingBeanDesc(beanName);
                return isTccProxyTargetBean(remotingDesc); // 3
            } else {
                return false;
            }
        } else {
            return isTccProxyTargetBean(remotingDesc);
        }
    }
}
複制代碼           

isTccProxyTargetBean判斷LocalTCC和ReferenceBean具體是否會被代理,隻有接口裡有TwoPhaseBusinessAction注解方法的類,才會傳回true,被TccActionInterceptor攔截。

// TCCBeanParserUtils
public static boolean isTccProxyTargetBean(RemotingDesc remotingDesc) {
    if (remotingDesc == null) {
        return false;
    }
    boolean isTccClazz = false;
    Class<?> tccInterfaceClazz = remotingDesc.getInterfaceClass();
    Method[] methods = tccInterfaceClazz.getMethods();
    TwoPhaseBusinessAction twoPhaseBusinessAction;
    for (Method method : methods) {
        twoPhaseBusinessAction = method.getAnnotation(TwoPhaseBusinessAction.class);
        if (twoPhaseBusinessAction != null) {
            isTccClazz = true;
            break;
        }
    }
    if (!isTccClazz) {
        return false;
    }
    short protocols = remotingDesc.getProtocol();
    if (Protocols.IN_JVM == protocols) {
        return true; // local tcc
    }
    return remotingDesc.isReference(); // dubbo:reference
}
複制代碼           

此外,在parserRemotingServiceInfo方法中,識别所有LocalTCC和ServiceBean中被TwoPhaseBusinessAction注解标注的方法,每個TwoPhaseBusinessAction注解的方法都作為一個TCCResource注冊到TC。

注意TCCResource.actionName會作為目前應用(spring.application.name)的唯一資源辨別,是以在同一個應用内部TwoPhaseBusinessAction的name屬性不能重複。

這一步保證了TCC二階段TC能找到對應的commit或rollback方法,調用對應RM做二階段送出或復原。

// TCCBeanParserUtils
protected static boolean parserRemotingServiceInfo(Object bean, String beanName) {
    RemotingParser remotingParser = DefaultRemotingParser.get().isRemoting(bean, beanName);
    if (remotingParser != null) {
        return DefaultRemotingParser.get().parserRemotingServiceInfo(bean, beanName, remotingParser) != null;
    }
    return false;
}
// DefaultRemotingParser
public RemotingDesc parserRemotingServiceInfo(Object bean, String beanName, RemotingParser remotingParser) {
    RemotingDesc remotingBeanDesc = remotingParser.getServiceDesc(bean, beanName);
    if (remotingBeanDesc == null) {
        return null;
    }
    remotingServiceMap.put(beanName, remotingBeanDesc);

    Class<?> interfaceClass = remotingBeanDesc.getInterfaceClass();
    Method[] methods = interfaceClass.getMethods();
    if (remotingParser.isService(bean, beanName)) {
        // localTcc or ServiceBean
        try {
            Object targetBean = remotingBeanDesc.getTargetBean();
            for (Method m : methods) {
                TwoPhaseBusinessAction twoPhaseBusinessAction = m.getAnnotation(TwoPhaseBusinessAction.class);
                // 所有TwoPhaseBusinessAction注解标注的方法注冊為一個Resource
                if (twoPhaseBusinessAction != null) {
                    TCCResource tccResource = new TCCResource();
                    tccResource.setActionName(twoPhaseBusinessAction.name());
                    tccResource.setTargetBean(targetBean);
                    tccResource.setPrepareMethod(m);
                    tccResource.setCommitMethodName(twoPhaseBusinessAction.commitMethod());
                    tccResource.setCommitMethod(interfaceClass.getMethod(twoPhaseBusinessAction.commitMethod(),
                            twoPhaseBusinessAction.commitArgsClasses()));
                    tccResource.setRollbackMethodName(twoPhaseBusinessAction.rollbackMethod());
                    tccResource.setRollbackMethod(interfaceClass.getMethod(twoPhaseBusinessAction.rollbackMethod(),
                            twoPhaseBusinessAction.rollbackArgsClasses()));
                    tccResource.setCommitArgsClasses(twoPhaseBusinessAction.commitArgsClasses());
                    tccResource.setRollbackArgsClasses(twoPhaseBusinessAction.rollbackArgsClasses());
                    tccResource.setPhaseTwoCommitKeys(this.getTwoPhaseArgs(tccResource.getCommitMethod(),
                            twoPhaseBusinessAction.commitArgsClasses()));
                    tccResource.setPhaseTwoRollbackKeys(this.getTwoPhaseArgs(tccResource.getRollbackMethod(),
                            twoPhaseBusinessAction.rollbackArgsClasses()));
                    DefaultResourceManager.get().registerResource(tccResource);
                }
            }
        } catch (Throwable t) {
            throw new FrameworkException(t, "parser remoting service error");
        }
    }
    if (remotingParser.isReference(bean, beanName)) {
        remotingBeanDesc.setReference(true);
    }
    return remotingBeanDesc;
}
複制代碼           

綜上,在三種特殊bean且有方法被标注TwoPhaseBusinessAction的情況下,是否需要被代理和RM注冊邏輯如下。

Bean類型 是否需要代理 是否需要注冊為RM
LocalTCC Bean
Service Bean
Reference Bean

三、一階段(Try)

一階段由TM開啟全局事務,這裡邏輯在AT模式下已經講過,不再贅述。全局事務不關心分支事務使用的是何種模式,AT模式與TCC模式也可以混合使用。

1、RM

TccActionInterceptor在proceed方法中攔截所有被TwoPhaseBusinessAction注解标注的方法,環繞處理一下目前分支事務類型RootContext.branchType。實際調用ActionInterceptorHandler執行TCC一階段try邏輯。

// TccActionInterceptor
private ActionInterceptorHandler actionInterceptorHandler = new ActionInterceptorHandler();
@Override
public Object invoke(final MethodInvocation invocation) throws Throwable {
    if (!RootContext.inGlobalTransaction() || disable || RootContext.inSagaBranch()) {
        return invocation.proceed();
    }
    Method method = getActionInterfaceMethod(invocation);
    TwoPhaseBusinessAction businessAction = method.getAnnotation(TwoPhaseBusinessAction.class);
    if (businessAction != null) {
        String xid = RootContext.getXID();
        BranchType previousBranchType = RootContext.getBranchType();
        if (BranchType.TCC != previousBranchType) {
            RootContext.bindBranchType(BranchType.TCC);
        }
        try {
            return actionInterceptorHandler.proceed(method, invocation.getArguments(), xid, businessAction,
                    invocation::proceed);
        } finally {
            if (BranchType.TCC != previousBranchType) {
                RootContext.unbindBranchType();
            }
            MDC.remove(RootContext.MDC_KEY_BRANCH_ID);
        }
    }

    return invocation.proceed();
}
複制代碼           

ActionInterceptorHandler邏輯大緻分為三步:

  1. 準備BusinessActionContext;
  2. 向TC注冊分支事務BranchRegisterRequest,傳回branchId,這說明在非LocalTCC模式下,分支事務注冊在服務調用方,分支事務送出在服務提供方;
  3. 執行業務方法,LocalTCC就是本地try方法(可被TCCFence環繞,暫時忽略),DubboReference就是進行遠端RPC調用;
// ActionInterceptorHandler
public Object proceed(Method method, Object[] arguments, String xid, TwoPhaseBusinessAction businessAction,
                                   Callback<Object> targetCallback) throws Throwable {
    // 從入參清單中擷取BusinessActionContext 或 直接new一個
    BusinessActionContext actionContext = getOrCreateActionContextAndResetToArguments(method.getParameterTypes(), arguments);
    actionContext.setXid(xid); // 全局事務id
    String actionName = businessAction.name();
    actionContext.setActionName(actionName);
    actionContext.setDelayReport(businessAction.isDelayReport());

    // 填充actionContext并注冊分支事務
    String branchId = doTccActionLogStore(method, arguments, businessAction, actionContext);
    actionContext.setBranchId(branchId);

    BusinessActionContext previousActionContext = BusinessActionContextUtil.getContext();
    try {
        BusinessActionContextUtil.setContext(actionContext);

        // TCCFence處理
        if (businessAction.useTCCFence()) {
            try {
                return TCCFenceHandler.prepareFence(xid, Long.valueOf(branchId), actionName, targetCallback);
            } catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {
                Throwable originException = e.getCause();
                if (originException instanceof FrameworkException) {
                    LOGGER.error("[{}] prepare TCC fence error: {}", xid, originException.getMessage());
                }
                throw originException;
            }
        } else {
            // 執行業務方法,即try方法
            return targetCallback.execute();
        }
    } finally {
       // BusinessActionContextUtil恢複原有BusinessActionContext
    }
}
複制代碼           

BusinessActionContext用于存儲TCC兩個階段的上下文,其中actionName就是resourceId資源id。

public class BusinessActionContext {
    // 全局事務id
    private String xid;
    // 分支事務id
    private String branchId;
    // @TwoPhaseBusinessAction.name
    private String actionName;
    // 是否延遲Report,用于二階段提高性能
    private Boolean isDelayReport;
    // 辨別actionContext是否被業務修改
    private Boolean isUpdated;
    // actionContext
    private Map<String, Object> actionContext;
}
複制代碼           

BusinessActionContext.actionContext存儲了包括:try方法名(sys::prepare)、commit方法名(sys::commit)、rollback方法名(sys::rollback)、actionName(@TwoPhaseBusinessAction.name)、是否開啟tccFence(@TwoPhaseBusinessAction.useTCCFence)、參數名稱和參數值。

「Seata源碼」——TCC模式

2、TC

TC接收BranchRegisterRequest,由TccCore處理,TccCore未重寫任何AbstractCore中的方法。

public class TccCore extends AbstractCore {

    public TccCore(RemotingServer remotingServer) {
        super(remotingServer);
    }

    @Override
    public BranchType getHandleBranchType() {
        return BranchType.TCC;
    }
}
複制代碼           

TC邏輯還是和AT模式一樣,在AbstractCore中,TCC模式下沒有擷取全局鎖邏輯。隻是将分支事務的所有資訊持久化。

// AbstractCore
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,
                           String applicationData, String lockKeys) throws TransactionException {
    // Step1 根據xid查詢global_table得到GlobalSession
    GlobalSession globalSession = assertGlobalSessionNotNull(xid, false);
    // 對于存儲模式=file的情況,由于GlobalSession在記憶體中,是以需要擷取鎖後再執行
    // 對于存儲模式=db/redis的情況,不需要擷取鎖
    return SessionHolder.lockAndExecute(globalSession, () -> {
        // 狀态校驗 必須為begin
        globalSessionStatusCheck(globalSession);
        globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
        BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,
                applicationData, lockKeys, clientId);
        MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));
        // Step2 擷取全局鎖(隻有AT模式需要)
        branchSessionLock(globalSession, branchSession);
        try {
            // Step3 儲存分支事務
            globalSession.addBranch(branchSession);
        } catch (RuntimeException ex) {
            branchSessionUnlock(branchSession);
            throw new BranchTransactionException(FailedToAddBranch, String
                    .format("Failed to store branch xid = %s branchId = %s", globalSession.getXid(),
                            branchSession.getBranchId()), ex);
        }
        return branchSession.getBranchId();
    });
}
複制代碼           

四、二階段送出(Commit)

1、TC

TM發起GlobalCommitRequest給TC,TC負責執行每個分支事務送出。

DefaultCore.commit(xid)方法執行全局事務送出。這裡與AT模式有兩個差別:

  1. globalSession.closeAndClean:如果全局事務中包含AT模式分支事務,需要先删除AT模式中擷取的全局鎖,而TCC模式的分支事務在這裡什麼都不做;
  2. globalSession.canBeCommittedAsync:如果全局事務中都是AT模式的分支事務,可以執行異步送出;否則需要執行同步送出。即存在TCC分支事務的情況下,都要同步送出。
// DefaultCore
public GlobalStatus commit(String xid) throws TransactionException {
    GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
    if (globalSession == null) {
        return GlobalStatus.Finished;
    }
    globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
    boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {
        if (globalSession.getStatus() == GlobalStatus.Begin) {
            // 如果分支事務存在AT模式,先釋放全局鎖,delete from lock_table where xid = ?
            globalSession.closeAndClean();
            // 如果分支事務都是AT模式,則可以執行異步送出
            if (globalSession.canBeCommittedAsync()) {
                // 執行異步送出,更新全局事務狀态為AsyncCommitting,update global_table set status = AsyncCommitting where xid = ?
                globalSession.asyncCommit();
                MetricsPublisher.postSessionDoneEvent(globalSession, GlobalStatus.Committed, false, false);
                return false;
            } else {
                // TCC
                globalSession.changeGlobalStatus(GlobalStatus.Committing);
                return true;
            }
        }
        return false;
    });

    if (shouldCommit) { // 同步送出(TCC)
        boolean success = doGlobalCommit(globalSession, false);
        if (success && globalSession.hasBranch() && globalSession.canBeCommittedAsync()) {
            globalSession.asyncCommit();
            return GlobalStatus.Committed;
        } else {
            return globalSession.getStatus();
        }
    } else { // 異步送出(AT)
        return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();
    }
}
複制代碼           

DefaultCore.doGlobalCommit執行全局事務送出核心邏輯,如果二階段送出失敗,會重試至成功為止。

如果所有分支事務都是AT模式,這個方法會被異步調用,因為AT模式下在釋放完全局鎖後,全局事務和分支事務隻是做資料清理工作,比如删除global_table、branch_table、undo_log。

如果考慮分支事務是TCC模式,這個方法将被同步調用,如果中間存在分支事務送出失敗,會異步重試直至成功。

如果AT模式和TCC模式混合使用,AT模式的分支事務會在異步任務中再次執行doGlobalCommit異步送出,TCC模式的分支事務還是會在第一次調用doGlobalCommit時同步送出。

// DefaultCore
public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
    boolean success = true;
    if (globalSession.isSaga()) {
        success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);
    } else {
        Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {
            // AT模式和TCC模式共存的情況下,AT模式跳過同步送出,隻對TCC模式分支事務同步送出
            if (!retrying && branchSession.canBeCommittedAsync()) {
                return CONTINUE;
            }
            try {
                // Step1 發送BranchCommitRequest給RM,AT模式RM會删除undo_log,TCC模式RM執行二階段送出
                BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);
                switch (branchStatus) {
                    case PhaseTwo_Committed:
                        // Step2 删除branch_table中的分支事務記錄
                        SessionHelper.removeBranch(globalSession, branchSession, !retrying);
                        return CONTINUE;
                    case PhaseTwo_CommitFailed_Unretryable: // 不可重試(XA中有實作)
                        return false;

                    default:
                        if (!retrying) {
                            // 更新全局事務為二階段送出重試狀态,異步重試至成功位置
                            globalSession.queueToRetryCommit();
                            return false;
                        }
                        if (globalSession.canBeCommittedAsync()) {
                            return CONTINUE;
                        } else {
                            return false;
                        }
                }
            } catch (Exception ex) {
                if (!retrying) {

                    globalSession.queueToRetryCommit();
                    throw new TransactionException(ex);
                }
            }
            // 某個分支事務處理失敗,繼續處理後續分支事務
            return CONTINUE;
        });
        // 如果是同步送出,某個分支事務處理失敗,直接傳回false
        if (result != null) {
            return result;
        }
        if (globalSession.hasBranch() && !globalSession.canBeCommittedAsync()) {
            return false;
        }
        if (!retrying) {
            globalSession.setStatus(GlobalStatus.Committed);
        }
    }
    if (success && globalSession.getBranchSessions().isEmpty()) {
        // Step3 删除全局事務 delete from global_table where xid = ?
        SessionHelper.endCommitted(globalSession, retrying);
    }
    return success;
}
複制代碼           

這裡再考慮一個問題,如果TCC分支事務是LocalTCC注解辨別的資源,那麼分支事務BranchSession注冊和送出是在同一個服務中,如果TCC分支事務是像DubboRPC一樣的資源,分支事務BranchSession是由ReferenceBean用戶端注冊的,而分支事務送出是由ServiceBean服務提供方送出的。

對于前者LocalTCC還比較好了解,LocalTCC注冊和送出BranchSession是同一個服務執行個體,注冊時BranchSession就攜帶了目前服務執行個體的相關資訊,比如applicationId、ip、port;但是Dubbo這種怎麼做呢,隻能通過資源注冊時的resourceId(actionName)才能定位到,但是也不是BranchSession注冊時的那個應用。

// AbstractCore
protected BranchStatus branchCommitSend(BranchCommitRequest request, GlobalSession globalSession,
                                        BranchSession branchSession) throws IOException, TimeoutException {
    BranchCommitResponse response = (BranchCommitResponse) remotingServer.sendSyncRequest(
            branchSession.getResourceId(), branchSession.getClientId(), request);
    return response.getBranchStatus();
}

// AbstractNettyRemotingServer
public Object sendSyncRequest(String resourceId, String clientId, Object msg) throws TimeoutException {
    // 定位用戶端Channel
    Channel channel = ChannelManager.getChannel(resourceId, clientId);
    if (channel == null) {
        throw new RuntimeException("rm client is not connected. dbkey:" + resourceId + ",clientId:" + clientId);
    }
    RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
    return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout());
}
複制代碼           

關注ChannelManager.getChannel方法,入參resourceId就是TCC資源的actionName,clientId是applicationId+ip+port拼接而成的字元串,由BranchSession注冊時RM給到。

ChannelManager用一個Map存儲資源+應用+ip+port到Rpc上下文的映射關系,其中RpcContext持有RMClient注冊時的Channel。

public class ChannelManager {

    /**
     * resourceId -> applicationId -> ip -> port -> RpcContext
     */
    private static final ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<String,
        ConcurrentMap<Integer, RpcContext>>>> RM_CHANNELS = new ConcurrentHashMap<>();
}
複制代碼           

對于LocalTCC或者AT模式,分支事務注冊與送出是同一個服務執行個體,通過resourceId+applicationId+ip+port一般就能定位到二階段通訊的服務執行個體,但是可能對應服務當機或者當機後重連,這邊會降級去找同一個ip不同port的,或者同一個applicationId的不同ip:port。

對于TCC模式下二階段要找ServiceBean服務提供方的情況,直接進入Step2-fallback,找同一個resourceId下的其他applicationId注冊的RM,這裡就能找到storage-service進行二階段送出,是以resourceId(actionName)最好全局唯一。

// ChannelManager
public static Channel getChannel(String resourceId, String clientId) {
    Channel resultChannel = null;
    String[] clientIdInfo = readClientId(clientId);
    String targetApplicationId = clientIdInfo[0];
    String targetIP = clientIdInfo[1];
    int targetPort = Integer.parseInt(clientIdInfo[2]);

    ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>>> applicationIdMap = RM_CHANNELS.get(resourceId);

    // Step1 根據resourceId找對應applicationId-ip-port對應channel
    if (targetApplicationId == null || applicationIdMap == null ||  applicationIdMap.isEmpty()) {
        return null;
    }
    ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> ipMap = applicationIdMap.get(targetApplicationId);

    // Step2 根據BranchSession注冊的applicationId應用
    if (ipMap != null && !ipMap.isEmpty()) {
        // Step3 根據BranchSession注冊的ip
        ConcurrentMap<Integer, RpcContext> portMapOnTargetIP = ipMap.get(targetIP);
        if (portMapOnTargetIP != null && !portMapOnTargetIP.isEmpty()) {

            // Step4 根據BranchSession注冊的port
            RpcContext exactRpcContext = portMapOnTargetIP.get(targetPort);
            if (exactRpcContext != null) {
                Channel channel = exactRpcContext.getChannel();
                if (channel.isActive()) {
                    resultChannel = channel;
                }
            }

            // Step4-fallback 可能原始channel關閉了,周遊BranchSession注冊的ip對應的其他port(resourceId+applicationId+ip)
            if (resultChannel == null) {
                for (ConcurrentMap.Entry<Integer, RpcContext> portMapOnTargetIPEntry : portMapOnTargetIP
                    .entrySet()) {
                    Channel channel = portMapOnTargetIPEntry.getValue().getChannel();

                    if (channel.isActive()) {
                        resultChannel = channel;
                        break;
                    } 
                }
            }
        }

        // Step3-fallback BranchSession注冊的ip沒有對應Channel,從resourceId+applicationId找對應channel
        if (resultChannel == null) {
            for (ConcurrentMap.Entry<String, ConcurrentMap<Integer, RpcContext>> ipMapEntry : ipMap
                .entrySet()) {
                if (ipMapEntry.getKey().equals(targetIP)) { continue; }

                ConcurrentMap<Integer, RpcContext> portMapOnOtherIP = ipMapEntry.getValue();
                if (portMapOnOtherIP == null || portMapOnOtherIP.isEmpty()) {
                    continue;
                }

                for (ConcurrentMap.Entry<Integer, RpcContext> portMapOnOtherIPEntry : portMapOnOtherIP.entrySet()) {
                    Channel channel = portMapOnOtherIPEntry.getValue().getChannel();

                    if (channel.isActive()) {
                        resultChannel = channel;
                        break;
                    } 
                }
                if (resultChannel != null) { break; }
            }
        }
    }

    // Step2-fallback BranchSession注冊的applicationId沒有對應channel,從resourceId中找一個Channel
    if (resultChannel == null) {
        resultChannel = tryOtherApp(applicationIdMap, targetApplicationId);
    }

    return resultChannel;

}
複制代碼           

2、RM

TCCResourceManager.branchCommit方法解析BranchCommitRequest裡的資料和本地緩存TCCResource裡的資料,通過反射調用二階段送出方法。

其中TCCResource主要包含commit方法,BranchCommitRequest主要包含commit方法參數(一階段RM帶給TC,二階段TC再給到RM)。

// TCCResourceManager
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                 String applicationData) throws TransactionException {
    // Step1 從本地緩存tccResourceMap中定位到資源對應本地commit方法
    TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);
    Object targetTCCBean = tccResource.getTargetBean();
    Method commitMethod = tccResource.getCommitMethod();
    try {
        // Step2 反序列化BusinessActionContext
        BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,
            applicationData);
        // Step3 解析commit方法入參清單
        Object[] args = this.getTwoPhaseCommitArgs(tccResource, businessActionContext);
        Object ret;
        boolean result;
        // Step4 執行commit方法
        if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_TCC_FENCE))) {
            // Step4-1 開啟useTCCFence
            try {
                result = TCCFenceHandler.commitFence(commitMethod, targetTCCBean, xid, branchId, args);
            } catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {
                throw e.getCause();
            }
        } else {
            // Step4-2 未開啟useTCCFence
            ret = commitMethod.invoke(targetTCCBean, args);
            if (ret != null) {
                if (ret instanceof TwoPhaseResult) {
                    result = ((TwoPhaseResult)ret).isSuccess();
                } else {
                    result = (boolean)ret;
                }
            } else {
                result = true;
            }
        }
        return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable;
    } catch (Throwable t) {
        return BranchStatus.PhaseTwo_CommitFailed_Retryable;
    }
}
複制代碼           

五、二階段復原(Rollback)

1、TC

TC二階段復原邏輯基本與純AT模式一緻,細節上比如Step2-1處對于TCC分支不需要删除全局鎖(branch_table)。如果某個TCC分支事務復原失敗,會重試到成功為止。

// DefaultCore
public GlobalStatus rollback(String xid) throws TransactionException {
    GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
    if (globalSession == null) {
        return GlobalStatus.Finished;
    }
    globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
    boolean shouldRollBack = SessionHolder.lockAndExecute(globalSession, () -> {
        globalSession.close();
        if (globalSession.getStatus() == GlobalStatus.Begin) {
            // 将全局鎖lock_table狀态更新為Rollbacking 
            // 将全局事務global_table狀态更新為Rollbacking
            globalSession.changeGlobalStatus(GlobalStatus.Rollbacking);
            return true;
        }
        return false;
    });
    if (!shouldRollBack) {
        return globalSession.getStatus();
    }
    // 執行全局復原
    boolean rollbackSuccess = doGlobalRollback(globalSession, false);
    return rollbackSuccess ? GlobalStatus.Rollbacked : globalSession.getStatus();
}

// DefaultCore
public boolean doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {
    boolean success = true;

    Boolean result = SessionHelper.forEach(globalSession.getReverseSortedBranches(), branchSession -> {
        BranchStatus currentBranchStatus = branchSession.getStatus();
        if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {
            SessionHelper.removeBranch(globalSession, branchSession, !retrying);
            return CONTINUE;
        }
        try {
            // Step1 發送BranchRollbackRequest
            BranchStatus branchStatus = branchRollback(globalSession, branchSession);
            switch (branchStatus) {
                case PhaseTwo_Rollbacked:
                    // Step2-1 釋放全局鎖,删除分支事務
                    SessionHelper.removeBranch(globalSession, branchSession, !retrying);
                    return CONTINUE;
                case PhaseTwo_RollbackFailed_Unretryable: // 復原失敗且無法重試成功
                    SessionHelper.endRollbackFailed(globalSession, retrying);
                    return false;
                default:
                    // Step2-2 如果RM復原失敗 全局事務狀态變為RollbackRetrying 等待重試
                    if (!retrying) {
                        globalSession.queueToRetryRollback();
                    }
                    return false;
            }
        } catch (Exception ex) {
            if (!retrying) {
                // 如果Step1或Step2步驟異常 全局事務狀态變為RollbackRetrying 等待重試
                globalSession.queueToRetryRollback();
            }
            throw new TransactionException(ex);
        }
    });
    // 如果存在一個分支事務復原失敗,則傳回false
    if (result != null) {
        return result;
    }

    // Step3
    // 對于file模式,直接删除全局事務
    // 對于db/redis模式,異步再次執行doGlobalRollback,這裡不做任何處理
    //  防止由于各種網絡波動造成分支事務注冊成功lock_table和branch_table中始終有殘留資料
    //  導緻全局鎖一直被占用,無法釋放
    if (success) {
        SessionHelper.endRollbacked(globalSession, retrying);
    }
    return success;
}

// SessionHelper
public static void endRollbacked(GlobalSession globalSession, boolean retryGlobal) throws TransactionException {
     // 如果是重試 或 file模式
     if (retryGlobal || !DELAY_HANDLE_SESSION) {
        long beginTime = System.currentTimeMillis();
        GlobalStatus currentStatus = globalSession.getStatus();
        boolean retryBranch =
            currentStatus == GlobalStatus.TimeoutRollbackRetrying || currentStatus == GlobalStatus.RollbackRetrying;
        if (isTimeoutGlobalStatus(currentStatus)) {
            globalSession.changeGlobalStatus(GlobalStatus.TimeoutRollbacked);
        } else {
            globalSession.changeGlobalStatus(GlobalStatus.Rollbacked);
        }
        // 删除全局事務global_table
        globalSession.end();
    }
}
複制代碼           

2、RM

RM側處理BranchRollbackRequest。

TCCResourceManager.branchRollback方法實際處理RM二階段復原,流程與二階段送出類似。

// TCCResourceManager
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
                                   String applicationData) throws TransactionException {
    // Step1 從本地緩存tccResourceMap中定位到資源對應本地rollback方法
    TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);
    Object targetTCCBean = tccResource.getTargetBean();
    Method rollbackMethod = tccResource.getRollbackMethod();
    try {
        // Step2 反序列化BusinessActionContext
        //BusinessActionContext
        BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,
            applicationData);
        // Step3 解析rollback方法入參清單
        Object[] args = this.getTwoPhaseRollbackArgs(tccResource, businessActionContext);
        Object ret;
        boolean result;
        // Step4 執行rollback方法
        if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_TCC_FENCE))) {
            try {
                result = TCCFenceHandler.rollbackFence(rollbackMethod, targetTCCBean, xid, branchId,
                        args, tccResource.getActionName());
            } catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {
                throw e.getCause();
            }
        } else {
            ret = rollbackMethod.invoke(targetTCCBean, args);
            if (ret != null) {
                if (ret instanceof TwoPhaseResult) {
                    result = ((TwoPhaseResult)ret).isSuccess();
                } else {
                    result = (boolean)ret;
                }
            } else {
                result = true;
            }
        }
        return result ? BranchStatus.PhaseTwo_Rollbacked : BranchStatus.PhaseTwo_RollbackFailed_Retryable;
    } catch (Throwable t) {
        return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
    }
}
複制代碼           

六、useTCCFence

在LocalTCC模式下,可以選擇開啟useTCCFence=true,通過seata架構内置的tcc分支事務狀态表解決TCC的三大問題:

  • 幂等:由于網絡波動,TC未在逾時時間内收到RM二階段響應,重試導緻RM收到多次二階段rollback或commit請求;
  • 資源懸挂:由于網絡波動,RM在收到二階段rollback請求之後,再收到try請求;
  • 空復原:RM由于各種原因未成功執行try,TM復原全局事務,RM在沒有執行try的情況下rollback;
@LocalTCC
public interface LocalTccAction {
    @TwoPhaseBusinessAction(name = "local-tcc-action", // 資源名稱
            commitMethod = "commit", // 二階段送出方法
            rollbackMethod = "rollback", // 二階段復原方法
            commitArgsClasses = {BusinessActionContext.class, String.class, Integer.class}, // 送出方法參數清單
            rollbackArgsClasses = {BusinessActionContext.class, String.class, Integer.class}, // 復原方法參數清單
            useTCCFence = true // 是否啟用TCCFence,由SeataTCC架構處理TCC三大問題(幂等、懸挂、空復原)
    )
    void prepare(BusinessActionContext actionContext,
                @BusinessActionContextParameter("commodityCode") String commodityCode,
                @BusinessActionContextParameter("count") Integer count);
}
複制代碼           

所有處理邏輯都在TCCFenceHandler類中,在執行prepare、commit、rollback業務方法之前,用tccFenceLog事務狀态表攔截非法請求。

在try階段,TCCFenceHandler首先開啟事務保證與業務在同一個事務中送出,然後插入一條tcc_fence_log狀态為STATUS_TRIED,最後執行業務try方法。

// TCCFenceHandler
public static Object prepareFence(String xid, Long branchId, String actionName, Callback<Object> targetCallback) {
    // 開啟事務
    return transactionTemplate.execute(status -> {
        try {
            Connection conn = DataSourceUtils.getConnection(dataSource);
            // 插入tcc_fence_log status = STATUS_TRIED
            boolean result = insertTCCFenceLog(conn, xid, branchId, actionName, TCCFenceConstant.STATUS_TRIED);
            LOGGER.info("TCC fence prepare result: {}. xid: {}, branchId: {}", result, xid, branchId);
            if (result) {
                // 業務try方法
                return targetCallback.execute();
            } else {
                throw new TCCFenceException(String.format("Insert tcc fence record error, prepare fence failed. xid= %s, branchId= %s", xid, branchId),
                        FrameworkErrorCode.InsertRecordError);
            }
        } catch (TCCFenceException e) {
            if (e.getErrcode() == FrameworkErrorCode.DuplicateKeyException) {
                LOGGER.error("Branch transaction has already rollbacked before,prepare fence failed. xid= {},branchId = {}", xid, branchId);
                addToLogCleanQueue(xid, branchId);
            }
            status.setRollbackOnly();
            throw new SkipCallbackWrapperException(e);
        } catch (Throwable t) {
            status.setRollbackOnly();
            throw new SkipCallbackWrapperException(t);
        }
    });
}
複制代碼           

在二階段commit方法執行前,先查詢tcc_fence_log(select for update)中是否有記錄,如果有則做幂等狀态校驗,隻有當狀态為STATUS_TRIED時,才執行二階段業務commit方法。

// TCCFenceHandler
public static boolean commitFence(Method commitMethod, Object targetTCCBean,
                                  String xid, Long branchId, Object[] args) {
    // 開啟事務
    return transactionTemplate.execute(status -> {
        try {
            Connection conn = DataSourceUtils.getConnection(dataSource);
            // select for update
            TCCFenceDO tccFenceDO = TCC_FENCE_DAO.queryTCCFenceDO(conn, xid, branchId);
            if (tccFenceDO == null) {
                throw new TCCFenceException(String.format("TCC fence record not exists, commit fence method failed. xid= %s, branchId= %s", xid, branchId),
                        FrameworkErrorCode.RecordAlreadyExists);
            }
            // (1)幂等
            if (TCCFenceConstant.STATUS_COMMITTED == tccFenceDO.getStatus()) {
                return true;
            }
            // 之前已經收到二階段復原請求
            if (TCCFenceConstant.STATUS_ROLLBACKED == tccFenceDO.getStatus() || TCCFenceConstant.STATUS_SUSPENDED == tccFenceDO.getStatus()) {
                return false;
            }
            // 通過幂等校驗後,更新狀态STATUS_COMMITTED并執行目标方法
            return updateStatusAndInvokeTargetMethod(conn, commitMethod, targetTCCBean, xid, branchId, TCCFenceConstant.STATUS_COMMITTED, status, args);
        } catch (Throwable t) {
            status.setRollbackOnly();
            throw new SkipCallbackWrapperException(t);
        }
    });
}
複制代碼           

TCCFenceHandler處理二階段復原邏輯就比較複雜了。開啟事務後查詢tcc_fence_log(select for update)中是否有記錄。

如果無記錄,代表這是一次空復原,那麼嘗試插入一條tcc_fence_log記錄,且status=STATUS_SUSPENDED:

  1. 如果插入成功,代表二階段復原執行完成,直接傳回;如果後續收到try請求,在prepareFence中直接抛出異常(唯一限制xid+branchId),不執行一階段try業務方法,阻止資源懸挂;
  2. 如果插入失敗(唯一限制xid+branchId),可能發生了在rollback階段收到了一階段try請求,此時抛出異常,等待後續TC執行二階段復原重試,解決資源懸挂;

如果有記錄,做幂等校驗後,執行業務rollback方法。

// TCCFenceHandler
public static boolean rollbackFence(Method rollbackMethod, Object targetTCCBean,
                                    String xid, Long branchId, Object[] args, String actionName) {
    // 開啟事務
    return transactionTemplate.execute(status -> {
        try {
            Connection conn = DataSourceUtils.getConnection(dataSource);
            // select for update
            TCCFenceDO tccFenceDO = TCC_FENCE_DAO.queryTCCFenceDO(conn, xid, branchId);
            // (2) 資源懸挂,插入tcc_fence_log status=STATUS_SUSPENDED,防止一階段try請求在復原之後到達
            // (3) 空復原,如果目前沒有tcc_fence_log,代表是一次空復原,不執行二階段rollback方法
            if (tccFenceDO == null) {
                boolean result = insertTCCFenceLog(conn, xid, branchId, actionName, TCCFenceConstant.STATUS_SUSPENDED);
                if (!result) {
                    throw new TCCFenceException(String.format("Insert tcc fence record error, rollback fence method failed. xid= %s, branchId= %s", xid, branchId),
                            FrameworkErrorCode.InsertRecordError);
                }
                return true;
            } else {
                // (1) 幂等
                if (TCCFenceConstant.STATUS_ROLLBACKED == tccFenceDO.getStatus() || TCCFenceConstant.STATUS_SUSPENDED == tccFenceDO.getStatus()) {
                    return true;
                }
                if (TCCFenceConstant.STATUS_COMMITTED == tccFenceDO.getStatus()) {
                    return false;
                }
            }
            // 通過幂等和空復原校驗後,更新狀态并執行目标方法
            return updateStatusAndInvokeTargetMethod(conn, rollbackMethod, targetTCCBean, xid, branchId, TCCFenceConstant.STATUS_ROLLBACKED, status, args);
        } catch (Throwable t) {
            status.setRollbackOnly();
            throw new SkipCallbackWrapperException(t);
        }
    });
}
複制代碼           

總結

1、三種特殊bean

1)@LocalTCC+@TwoPhaseBusinessAction注解的LocalTCCBean,在啟動階段注冊為RM,執行TCC事務中被seata邏輯代理注冊分支事務,并負責送出/復原分支事務

2)@DubboReference+@TwoPhaseBusinessAction注解的ReferenceBean,僅在執行階段,執行TCC事務中被seata邏輯代理注冊分支事務

3)@DubboService+@TwoPhaseBusinessAction注解的ServiceBean,在啟動階段注冊為RM,僅送出/復原分支事務

2、try階段

1)RM被TccActionInterceptor攔截,将TwoPhaseBusinessAction注解方法和入參封裝為BranchRegisterRequest請求TC,注冊分支事務

2)TC收到BranchRegisterRequest請求,持久化分支事務資料,傳回分支事務id。與AT不同,AT在持久化分支事務前還需要先擷取全局鎖

3)RM執行一階段try業務邏輯,對于LocalTCC是本地事務,對于DubboReference是遠端RPC調用

3、commit階段

1)TM送出全局事務,發送GlobalCommitRequest給TC

2)TC收到GlobalCommitRequest

  • 如果全局事務中存在AT分支事務,先删除AT全局鎖;
  • 同步送出TCC分支事務,發送BranchCommitRequest給RM,如果RM響應失敗,異步重試至成功為止,如果成功,删除分支事務;
  • 異步送出AT分支事務,發送BranchCommitRequest給RM,RM異步删除undo_log;

3)RM收到BranchCommitRequest

無論是LocalTCC還是Dubbo,收到BranchCommitRequest的都是啟動時注冊TCC資源的服務執行個體。

RM解析TC傳來的資料(commit方法名,一階段try的參數),通過反射調用TCC的commit方法。

如果是LocalTCC可以開啟useTCCFence,利用seata解決tcc三大問題。

4、rollback階段

與commit類似

1)TM復原全局事務,請求TC

2)TC處理全局復原

  • 更新global_table全局事務為Rollbacking,如果有AT分支,更新lock_table為Rollbacking
  • 如果有AT分支,釋放全局鎖lock_table
  • 發送BranchRollbackRequest給RM,如果失敗,異步重試至成功為止
  • 如果成功,同步删除branch_table中的分支事務,db/redis模式異步删除global_table全局事務

3)RM收到BranchRollbackRequest

和commit一緻,反射執行rollback方法,如果是LocalTCC可以開啟useTCCFence,利用seata解決tcc三大問題。

5、useTCCFence

在LocalTCC模式下,可以選擇開啟useTCCFence=true,通過seata架構内置的tcc分支事務狀态表解決TCC的三大問題:

幂等:RM在try階段會插入一條STATUS_TRIED狀态的分支事務狀态記錄。收到rollback和commit請求時,RM會通過select for update查詢分支事務狀态記錄,如果狀态為STATUS_TRIED才會執行二階段方法。

空復原:如果rollback和commit時,RM通過select for update查詢分支事務狀态記錄為空,則代表發生空復原,這裡嘗試插入一條STATUS_SUSPENDED狀态的分支事務記錄。如果發生唯一限制沖突,代表try方法被同時執行,傳回TC失敗,TC會重試;如果沒發生唯一限制沖突,傳回成功。

資源懸挂:由于處理空復原的時候會插入STATUS_SUSPENDED狀态的分支事務記錄,RM當rollback後收到try,插入STATUS_TRIED狀态記錄會發生唯一限制沖突,RM傳回TC失敗,避免了資源懸挂。

連結:https://juejin.cn/post/7162425024732332040