天天看點

[深度] Seata TCC 分布式事務源碼分析

Seata 是什麼

Seata

是阿裡近期開源的分布式事務架構,位址:https://github.com/seata/seata。架構包括了集團的

TXC

(雲版本叫

GTS

)和螞蟻金服的

TCC

兩種模式,短短數月

Github

上的

star

數已經接近一萬,算是目前唯一有大廠背書的分布式事務解決方案。

TXC

Seata

中又叫

AT

模式,意為補償方法是架構自動生成的,對使用者完全屏蔽,使用者可以向使用本地事務那樣使用分布式事務,缺點是僅支援關系型資料庫(目前支援

MySQL

),引入

Seata AT

的服務需要本地建表存儲

rollback_info

,隔離級别預設

RU

适用場景有限。

TCC

不算是新概念,很早就有了,使用者通過定義

try/confirm/cancel

三個方法在應用層面模拟兩階段送出,差別在于 TCC 中

try

方法也需要操作資料庫進行資源鎖定,後續兩個補償方法由架構自動調用,分别進行資源送出和復原,這點同單純的存儲層

2PC

不太一樣。螞蟻金服向

Seata

貢獻了自己的

TCC

實作,據說已經演化了十多年,大量應用在在金融、交易、倉儲等領域。

分布式事務的誕生背景

早期應用都是單一架構,例如支付服務涉及到的賬戶、金額、訂單系統等都由單一應用負責,底層通路同一個資料庫執行個體,自然事務操作也是本地事務,借助

Spring

可以輕松實作;但是由于量級越來越大,單一服務需要進行職責拆分變為三個獨立的服務,通過

RPC

進行調用,資料也存在不同的資料庫執行個體中,由于這時一次業務操作涉及對多個資料庫資料的修改,無法再依靠本地事務,隻能通過分布式事務架構來解決。

[深度] Seata TCC 分布式事務源碼分析

TCC 就是分布式事務的一種解決方案,屬于柔性補償型,優點在于了解簡單、僅

try

階段加鎖并發性能較好,缺點在于代碼改造成本。

什麼是 TCC 本文就不再贅述了,TCC 的概念本身并不複雜

Seata TCC 使用方法

在分析源碼之前,我們先簡要提及下

Seata TCC

模式的使用方法,有助于後續了解整個

TCC

流程。

Seata TCC 參與方

Seata

中的

TCC

模式要求

TCC

服務的參與方在接口上增加

@TwoPhaseBusinessAction

注解,注明

TCC

接口的名稱(全局唯一),

TCC

接口的

confirm

cancel

方法的名稱,用于後續架構反射調用,下面是一個

TCC

接口的案例:

public interface TccAction {
    @TwoPhaseBusinessAction(name = "yourTccActionName", commitMethod = "confirm", rollbackMethod = "cancel")
    public boolean try(BusinessActionContext businessActionContext, int a, int b);
    public boolean confirm(BusinessActionContext businessActionContext);
    public boolean cancel(BusinessActionContext businessActionContext);
}
           

緊接着定義實作類

Impl

實作這個接口,為三個方法提供具體實作。最後将參與方服務進行釋出,注冊到遠端,主要為了後續能讓

Seata

架構調用到參與方的

confirm

或者

cancel

方法閉環整個

TCC

事務。

Seata TCC 發起方

Seata TCC

的發起方類似于我們上圖中的

payment service

,參與方需要在業務方法上增加

@GlobalTransactional

注解,用于開啟切面注冊全局事務,業務方法中調用

TCC

參與方的若幹

try

方法,一旦業務方法調用成功,

Seata

架構會通知

TC

回調這些參與方的

confirm

cancel

方法。

源碼分析

Seata

TCC

模式的源碼并不複雜,主要集中于:

module class 功能
seata-spring GlobalTransactionalInterceptor.class 全局事務切面邏輯,包括注冊全局事務,拿到 xid
seata-spring TccActionInterceptor.class TCC 參與方切面邏輯
seata-tcc TCCResourceManager.class 解析 TCC Bean,儲存 TCC Resources,便于後續回調
seata-tcc ActionInterceptorHandler.class TCC 分支事務注冊實作
seata-server DefaultCoordinator.class、FileTransactionStoreManager.class 主要是 TC 的實作、事務存儲等實作

注冊 TCC Resources

Seata

中一個

TCC

接口被稱作一個

TCC Resources

,其結構如下:

public class TCCResource implements Resource {

    private String resourceGroupId = "DEFAULT";

    private String appName;

    private String actionName; // TCC 接口名稱     

    private Object targetBean; // TCC Bean

    private Method prepareMethod; // try 方法

    private String commitMethodName;

    private Method commitMethod; // confirm 方法

    private String rollbackMethodName;

    private Method rollbackMethod; // cancel 方法

    // …… 省略
}
           

Seata

解析到應用中存在

TCC Bean

,則通過

parserRemotingServiceInfo

方法生成一個

TCCResource

對象,進而調用

TCCResourceManager

類的

registerResource

方法,将

TCCResource

對象儲存到本地的

tccResourceCache

中,它是一個

ConcurrentHashMap

結構,同時通過

RmRpcClient

将該

TCCResource

resourceId

address

等資訊注冊到服務端,便于後續

TC

通過

RPC

回調到正确的位址。

// 解析 TCCResource 的部分代碼
Class<?> interfaceClass = remotingBeanDesc.getInterfaceClass();
Method[] methods = interfaceClass.getMethods();
if(isService(bean, beanName)){
    try {
        // 如果是 TCC service Bean,解析并注冊該 resource
        Object targetBean = remotingBeanDesc.getTargetBean();
        for(Method m : methods){
            TwoPhaseBusinessAction twoPhaseBusinessAction = m.getAnnotation(TwoPhaseBusinessAction.class);
            if(twoPhaseBusinessAction != null){
                // 如果有 TCC 參與方注解,定義一個 TCCResource,
                TCCResource tccResource = new TCCResource();
                tccResource.setActionName(twoPhaseBusinessAction.name());
                // TCC Bean
                tccResource.setTargetBean(targetBean); 
                // try 方法
                tccResource.setPrepareMethod(m); 
                // confirm 方法名稱
                tccResource.setCommitMethodName(twoPhaseBusinessAction.commitMethod());
                // confirm 方法對象
                tccResource.setCommitMethod(ReflectionUtil.getMethod(interfaceClass, twoPhaseBusinessAction.commitMethod(), new Class[]{BusinessActionContext.class}));
                // cancel 方法名稱
                tccResource.setRollbackMethodName(twoPhaseBusinessAction.rollbackMethod());
                // cancel 方法對象
                tccResource.setRollbackMethod(ReflectionUtil.getMethod(interfaceClass, twoPhaseBusinessAction.rollbackMethod(), new Class[]{BusinessActionContext.class}));
                // 調用到 TCCResourceManager 的 registerResource 方法
                DefaultResourceManager.get().registerResource(tccResource);
            }
        }
    }catch (Throwable t){
        throw new FrameworkException(t, "parser remting service error");
    }
}
           

我們看一下

TCCResourceManager

registerResource

方法的實作:

// 記憶體中儲存的 resourceId 和 TCCResource 的映射關系
private Map<String, Resource> tccResourceCache = new ConcurrentHashMap<String, Resource>();

@Override
public void registerResource(Resource resource) {
    TCCResource tccResource = (TCCResource) resource;
    tccResourceCache.put(tccResource.getResourceId(), tccResource);
    // 調用父類的方法通過 RPC 注冊到遠端
    super.registerResource(tccResource);
}
           

我們看下

TCCResource

是如何注冊到服務端的:

public void registerResource(Resource resource) {
    // 拿到 RmRpcClient 執行個體,調用其 registerResource 方法
    RmRpcClient.getInstance().registerResource(resource.getResourceGroupId(), resource.getResourceId());
}

public void registerResource(String resourceGroupId, String resourceId) {
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("register to RM resourceId:" + resourceId);
    }
    synchronized (channels) {
        for (Map.Entry<String, Channel> entry : channels.entrySet()) {
            String serverAddress = entry.getKey();
            Channel rmChannel = entry.getValue();
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("register resource, resourceId:" + resourceId);
            }
            // 注冊 resourceId,遠端将其解析為一個 RpcContext 儲存在記憶體中
            sendRegisterMessage(serverAddress, rmChannel, resourceId);
        }
    }
}
           

GlobalTransaction 注冊全局事務

GlobalTransaction

注解是全局事務的入口,其切面邏輯實作在

GlobalTransactionalInterceptor

類中。如果判斷進入

@GlobalTransaction

修飾的方法,會調用

handleGlobalTransaction

方法進入切面邏輯,其中關鍵方法是

transactionalTemplate

execute

方法。

public Object execute(TransactionalExecutor business) throws Throwable {
    
    // 如果上遊已經有 xid 傳過來說明自己是下遊,直接參與到這個全局事務中就可以,不必新開一個,角色是 Participant
    // 如果上遊沒有 xid 傳遞過來,說明自己是發起方,新開啟一個全局事務,角色是 Launcher
    GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

    // …… …… 省略 

    try {

        // 開啟全局事務
        beginTransaction(txInfo, tx);

        Object rs = null;
        try {

            // 調用業務方法
            rs = business.execute();

        } catch (Throwable ex) {

            // 如果抛異常,通知 TC 復原全局事務
            completeTransactionAfterThrowing(txInfo,tx,ex);
            throw ex;
        }

        // 如果不抛異常,通知 TC 送出全局事務
        commitTransaction(tx);

        return rs;
    } 

    // …… …… 省略
}
           

beginTransaction

方法調用了

transactionManager

begin

方法:

// 用戶端
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
    throws TransactionException {
    GlobalBeginRequest request = new GlobalBeginRequest();
    request.setTransactionName(name);
    request.setTimeout(timeout);
    // 發送 RPC,擷取 TC 下發的 xid
    GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request);
    return response.getXid();
}

// 服務端
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
    throws TransactionException {
    // 全局事務用 GlobalSession 來表示
    GlobalSession session = GlobalSession.createGlobalSession(
        applicationId, transactionServiceGroup, name, timeout);
    session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
    // 将 GlobalSession 寫入檔案存儲
    session.begin();
    // 傳回 UUID 作為全局事務 ID
    return XID.generateXID(session.getTransactionId());
}
           

TwoPhaseBusinessAction 注冊分支事務

全局事務調用業務方法時,會進入

TCC

參與方的切面邏輯,主要實作在

TccActionInterceptor

類中,關鍵方法是

actionInterceptorHandler

proceed

方法。

public Map<String, Object> proceed(Method method, Object[] arguments, TwoPhaseBusinessAction businessAction, Callback<Object> targetCallback) throws Throwable {
    
    // …… …… 省略

    // 建立分支事務
    String branchId = doTccActionLogStore(method, arguments, businessAction, actionContext);
    actionContext.setBranchId(branchId);
    
    // 記錄方法參數
    Class<?>[] types = method.getParameterTypes();
    int argIndex = 0;
    for (Class<?> cls : types) {
        if (cls.getName().equals(BusinessActionContext.class.getName())) {
            arguments[argIndex] = actionContext;
            break;
        }
        argIndex++;
    }
    
    // …… …… 省略
}
           

doTccActionLogStore

方法負責注冊分支事務:

// 用戶端
protected String doTccActionLogStore(Method method, Object[] arguments, TwoPhaseBusinessAction businessAction, BusinessActionContext actionContext) {
    String actionName = actionContext.getActionName();
    // 拿到全局事務 ID
    String xid = actionContext.getXid();
    
    // …… …… 省略

    try {
        // resourceManager 通過 RPC 向 TC 注冊分支事務
        Long branchId = DefaultResourceManager.get().branchRegister(BranchType.TCC, actionName, null, xid, applicationContextStr, null);
        // 拿到 TC 傳回的分支事務 ID
        return String.valueOf(branchId);
    }

    // …… …… 省略
}

// 服務端
@Override
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,
                            String applicationData, String lockKeys) throws TransactionException {
    GlobalSession globalSession = assertGlobalSession(XID.getTransactionId(xid), GlobalStatus.Begin);
    // 分支事務用 BranchSession 表示,建立一個 BranchSession
    BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,
        applicationData, lockKeys, clientId);

    if (!branchSession.lock()) {
        throw new TransactionException(LockKeyConflict);
    }
    try {
        // 将分支事務加入全局事務中,也會寫檔案
        globalSession.addBranch(branchSession);
    } catch (RuntimeException ex) {
        throw new TransactionException(FailedToAddBranch);
    }
    // 傳回分支事務 ID
    return branchSession.getBranchId();
}
           

TC 回調參與方補償方法

分支事務注冊完畢,業務方法調用成功則通知

TC

送出全局事務。

@Override
public void commit() throws TransactionException {
    // 如果是參與者,無需發起送出請求
    if (role == GlobalTransactionRole.Participant) {
        return;
    }
    // 由 TM 向 TC 發出送出全局事務的請求
    status = transactionManager.commit(xid);
}
           

TC

收到用戶端

TM

commit

請求後:

@Override
public GlobalStatus commit(String xid) throws TransactionException {
    // 根據 xid 找出 GlobalSession
    GlobalSession globalSession = SessionHolder.findGlobalSession(XID.getTransactionId(xid));
    if (globalSession == null) {
        return GlobalStatus.Finished;
    }
    GlobalStatus status = globalSession.getStatus();

    // 關閉這個 GlobalSession,不讓後續的分支事務再注冊上來
    globalSession.closeAndClean(); 

    if (status == GlobalStatus.Begin) {
        // 修改狀态為送出進行中
        globalSession.changeStatus(GlobalStatus.Committing);
        // 一旦分支事務中存在 TCC,做同步送出,其實 TCC 分支也可以異步送出,要求高性能時可以選擇異步
        if (globalSession.canBeCommittedAsync()) {
            asyncCommit(globalSession);
        } else {
            doGlobalCommit(globalSession, false);
        }
    }
    return globalSession.getStatus();
}
           

doGlobalCommit

是我們關注的關鍵方法,我們忽略其中的次要邏輯:

@Override
public void doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
    for (BranchSession branchSession : globalSession.getSortedBranches()) {
        
        // …… …… 省略

        try {
            // 調用 DefaultCoordinator 的 branchCommit 方法做分支送出
            // 參數有分支事務 id,resourceId 用來尋找對應的 TCCResource 和補償方法參數資訊
            BranchStatus branchStatus = resourceManagerInbound.branchCommit(branchSession.getBranchType(),
                XID.generateXID(branchSession.getTransactionId()), branchSession.getBranchId(),
                branchSession.getResourceId(), branchSession.getApplicationData());
        }
    }

    // …… …… 省略
}
           

服務端的

DefaultCoordinator

類中的

branchCommit

方法發出

RPC

請求,調用對應

TCCResource

提供方:

@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                    String applicationData)
    throws TransactionException {
    
    // …… …… 省略
    // 擷取全局事務和分支事務
    GlobalSession globalSession = SessionHolder.findGlobalSession(XID.getTransactionId(xid));
        BranchSession branchSession = globalSession.getBranch(branchId);
    // 根據 resourceId 找到對應的 channel 和 RpcContext 
    BranchCommitResponse response = (BranchCommitResponse)messageSender.sendSyncRequest(resourceId,
        branchSession.getClientId(), request);
    // 傳回分支事務送出狀态
    return response.getBranchStatus();

    // …… …… 省略
}
           

用戶端自然是接收到分支送出的

RPC

請求,然後本地找出之前解析并保持下來的

TCCResource

進行補償方法的反射調用,下面我們截取其中的關鍵步驟進行分析。

@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
    // 根據 resourceId 找出記憶體中保留的 TCCResource 對象
    TCCResource tccResource = (TCCResource) tccResourceCache.get(resourceId);
    if(tccResource == null){
        throw new ShouldNeverHappenException("TCC resource is not exist, resourceId:" + resourceId);
    }
    // 擷取 targetBean 和相應的 method 對象
    Object targetTCCBean = tccResource.getTargetBean();
    Method commitMethod = tccResource.getCommitMethod();
    try {
        boolean result = false;
        // 取出補償方法參數資訊
        BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId, applicationData);
        // 反射調用補償方法
        Object ret = commitMethod.invoke(targetTCCBean, businessActionContext);
        // 傳回狀态
        return result ? BranchStatus.PhaseTwo_Committed:BranchStatus.PhaseTwo_CommitFailed_Retryable;
    }
    // …… …… 省略
}
           

事務存儲

關于 Seata TC 子產品如何進行事務存儲,網上有的文章已經講得很詳細,例如 深度剖析一站式分布式事務方案 Seata-Server,是以這裡不再贅述。

需要提及的一點是,

TC

有可能成為整個分布式事務服務的性能瓶頸,是以如何做到

高性能

高可用

很重要,目前的存儲方式是

File

,代碼中也有關于

DB Store Mode

TODO

項,檔案相比于

DB

性能肯定好一些但是可用性會差一點,這塊怎麼保證要等到後續

HA Cluster

釋出之後再看。

總結

整個

Seata

架構中關于

TCC

部分的源碼并不複雜,本文隻選取了部分類中的關鍵代碼進行展示,忽略了一些判斷邏輯和異常處理,筆者認為

Seata TCC

中關于

TCC

異常的封裝和自定義處理、還有各種使用者擴充埋點的設計也值得一看。

螞蟻

SOFA Channel

之前做過一個關于

Seata TCC

Seata TCC 分享 的講解裡也提到,

TCC

架構的難點不在于本身,而在于如何寫好一個

TCC

接口,如果對這部分内容感興趣,可以點選連結進行詳細了解。

寫在最後

這是一個不定時更新的、披着程式員外衣的文青小号,歡迎關注。

[深度] Seata TCC 分布式事務源碼分析

繼續閱讀