天天看點

Seata事務處理流程(主AT)Seata事務處理流程

文章目錄

  • Seata事務處理流程
    • 前言
    • 一、元件裝配
    • 二、核心元件源碼分析
      • 1、GlobalTransactionScanner
        • 1.1、職能詳解
          • 1.1.1、AbstractAutoProxyCreator
            • postProcessBeforeInitialization
            • wrapIfNecessary
          • 1.1.2、InitializingBean
      • 2、GlobalTransactionalInterceptor
        • 2.1、職能詳解
          • 2.1.1、ConfigurationChangeListener
          • 2.1.2、MethodInterceptor
      • 3、全局事務控制依賴的核心元件一覽
        • TMClient
        • RMClient
        • TransactionalTemplate
        • GlobalTransaction
        • TransactionManager
        • ResourceManager
        • DataSourceProxy/ConnectionProxy
    • 三、攔截處理流程
    • 1、一階段
        • transactionalTemplate#execute編排整體事務
        • 擷取或建立事務
          • GlobalTransactionContext#getCurrentOrCreate
        • 開啟事務
          • TransactionalTemplate#beginTransaction
          • DefaultGlobalTransaction#begin
        • 執行本地業務
          • 擷取本地連接配接
          • 本地事務執行
          • 本地事務送出
          • 本地事務復原
      • 2、二階段
        • 全局事務處理
          • 入口與調用鍊
    • 四、多服務調用AT模式示例
      • 1、服務與身份
      • 2、必要條件
      • 3、第一階段
        • 發起者控制主流程
          • M1注冊全局事務
          • M1執行本地事務
          • M1調用遠端服務M2
      • 4、第二階段
        • 發起者收尾流程

Seata事務處理流程

前言

最近在攻克分布式相關的技術棧,剛好需要一個case輔助了解分布式事務和共識,就用seata看了下工程實踐中的實作模型。

本文主要記錄分析用戶端的行為。

參考 https://blog.csdn.net/hosaos/article/details/89403552

參考官方講解 http://seata.io/zh-cn/docs/overview/what-is-seata.html

版本0.9.0。注意距離新版本較遠,可能在細節實作上有所不同。

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-alibaba-seata</artifactId>
    <version>2.1.1.RELEASE</version>
</dependency>
<!-- 内部依賴 -->
<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-all</artifactId>
    <version>0.9.0</version>
</dependency>
           

一、元件裝配

GlobalTransactionAutoConfiguration

依賴spring.factories機制自動裝配了

GlobalTransactionScanner

元件

@Configuration
@EnableConfigurationProperties(SeataProperties.class)
public class GlobalTransactionAutoConfiguration {
...
 @Bean
 public GlobalTransactionScanner globalTransactionScanner() {

  String applicationName = applicationContext.getEnvironment()
    .getProperty("spring.application.name");

  String txServiceGroup = seataProperties.getTxServiceGroup();

  if (StringUtils.isEmpty(txServiceGroup)) {
   txServiceGroup = applicationName + "-seata-service-group";
   seataProperties.setTxServiceGroup(txServiceGroup);
  }
  return new GlobalTransactionScanner(applicationName, txServiceGroup);
 }
}
           

二、核心元件源碼分析

1、GlobalTransactionScanner

1.1、職能詳解

初始化必要元件;掃描代理資料源,織入全局事務的處理邏輯
1.1.1、AbstractAutoProxyCreator

postProcessBeforeInitialization

目的:代理JDBC資料源,攔截DataSource所有方法的執行,以織入全局事務的處理邏輯。

@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
    if (bean instanceof DataSource && !(bean instanceof DataSourceProxy) && ConfigurationFactory.getInstance().getBoolean(DATASOURCE_AUTOPROXY, false)) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Auto proxy of  [" + beanName + "]");
        }
        DataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) bean);
        return Enhancer.create(bean.getClass(), (org.springframework.cglib.proxy.MethodInterceptor) (o, method, args, methodProxy) -> {
            Method m = BeanUtils.findDeclaredMethod(DataSourceProxy.class, method.getName(), method.getParameterTypes());
            if (null != m) {
                return m.invoke(dataSourceProxy, args);
            } else {
                boolean oldAccessible = method.isAccessible();
                try {
                    method.setAccessible(true);
                    return method.invoke(bean, args);
                } finally {
                    //recover the original accessible for security reason
                    method.setAccessible(oldAccessible);
                }
            }
        });
    }
    return bean;
}
           

wrapIfNecessary

為指定的bean生成代理對象(加入攔截器),以應對TCC和全局事務的處理。

protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
    if (disableGlobalTransaction) {
        return bean;
    }
    synchronized (PROXYED_SET) {
        // 代理完畢加入集合,確定隻會代理一次
        if (PROXYED_SET.contains(beanName)) {
            return bean;
        }
        interceptor = null;
        //check TCC proxy
        // 根據bean定義資訊,決定是否用TCC代理模式
        if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
            //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
            interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
        } else {
            Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
            Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
            if (!existsAnnotation(new Class[]{serviceInterface})
                && !existsAnnotation(interfacesIfJdk)) {
                return bean;
            }
            if (interceptor == null) {
                // 不是TCC,就是用這個攔截器,處理@GlobalTransactional和@GlobalLock
                interceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
            }
        }            
        if (!AopUtils.isAopProxy(bean)) {
            bean = super.wrapIfNecessary(bean, beanName, cacheKey);
        } else {
            AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
            Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
            for (Advisor avr : advisor) {
                advised.addAdvisor(0, avr);
            }
        }
        PROXYED_SET.add(beanName);
        return bean;
    }
}
           
1.1.2、InitializingBean
  1. 根據disableGlobalTransaction配置決定是否初始化元件,由seata配置檔案給出
  2. 如果disableGlobalTransaction == false,初始化TMClient和RMClient
private final boolean disableGlobalTransaction =
        ConfigurationFactory.getInstance().getBoolean("service.disableGlobalTransaction", false);

@Override
public void afterPropertiesSet() {
    // 控制事務生效的開關
    if (disableGlobalTransaction) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Global transaction is disabled.");
        }
        return;
    }
    initClient();
}
           
private void initClient() {
    if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
        throw new IllegalArgumentException(
            "applicationId: " + applicationId + ", txServiceGroup: " + txServiceGroup);
    }
    //init TM
    TMClient.init(applicationId, txServiceGroup);
    //init RM
    RMClient.init(applicationId, txServiceGroup);
    registerSpringShutdownHook();
}
           

2、GlobalTransactionalInterceptor

2.1、職能詳解

2.1.1、ConfigurationChangeListener

允許運作中調整seata全局事務的開啟和關閉

2.1.2、MethodInterceptor

處理@GlobalTransactional和@GlobalLock,控制全局事務處理流程。

@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
    Class<?> targetClass = (methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null);
    Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
    final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
    // 拿到注解
    final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
    final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);
    if (globalTransactionalAnnotation != null) {
        // 處理标注了@GlobalTransactional的方法
        return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
    } else if (globalLockAnnotation != null) {
        return handleGlobalLock(methodInvocation);
    } else {
        return methodInvocation.proceed();
    }
}
           

3、全局事務控制依賴的核心元件一覽

TMClient

僅有一個init方法,初始化TmRpcClient,用于和TC互動

public static void init(String applicationId, String transactionServiceGroup) {
    TmRpcClient tmRpcClient = TmRpcClient.getInstance(applicationId, transactionServiceGroup);
    // 内部初始化其他元件,執行一些定時任務
    tmRpcClient.init();
}
           

RMClient

僅有一個init方法

  1. 初始化RmRpcClient,用于和TC通訊;
  2. 設定RM,用于注冊分支事務,上報事務執行狀态,處理二階段事務。
    1. 這裡給出的DefaultResourceManager組合并持有了SPI找到的RM(resourceManagers字段),用政策模式委托給具體的ResourceManager處理對應的事務。
  3. 設定RmMessageListener,處理事件
    • 處理器使用了DefaultRMHandler,請求委托給其處理
      • DefaultRMHandler
public static void init(String applicationId, String transactionServiceGroup) {
    RmRpcClient rmRpcClient = RmRpcClient.getInstance(applicationId, transactionServiceGroup);
    // 注冊RM。内部使用了SPI機制加載各種模式的ResourceManager。AT在這裡對應的是DataSourceManager
    rmRpcClient.setResourceManager(DefaultResourceManager.get());
    // 監聽器用于處理RPC請求
    rmRpcClient.setClientMessageListener(new RmMessageListener(DefaultRMHandler.get()));
    rmRpcClient.init();
}
           

其内部注冊了監聽器RmMessageListener,用于處理TC發回的二階段RPC,一般需要委托給RM處理

TransactionalTemplate

  • TransactionalTemplate#execute是GlobalTransactionalInterceptor攔截業務的入口
  • 擁有成員
    • GlobalTransaction

GlobalTransaction

  • 預設實作DefaultGlobalTransaction
  • 全局事務抽象,裝飾/代理了TransactionManager,是以擁有對全局事務的控制api,如開始、送出、復原,擷取狀态,擷取xid,報告執行狀态等;
  • 擁有成員
    • TransactionManager

TransactionManager

  • 分布式事務中TM角色,用于定義一個全局事務并控制(通知TC)送出還是復原。
  • 預設實作DefaultTransactionManager
  • 面向全局事務進行操作,與本地事務關聯不大。
    • 但是本地事務的正常/異常,會導緻在二階段觸發送出和復原
      • 一般在本地事務中嵌套調用了rpc遠端事務,如果需要復原,就需要在發起者處能夠感覺到異常,進而觸發復原邏輯
    • TM需要感覺遠端事務的異常決定發往TC是送出還是復原
      • TC的送出在AT模式下是異步的,交給定時任務去做。
  • 成員
    • TmRpcClient
      • 用于和TC通訊,以操作在服務端建立的全局事務會話

ResourceManager

  • 分布式事務中本地事務管理器RM角色,用于控制分支事務(對本地事務的操作)。
    • 提供注冊/取消分支、送出/復原分支事務、分支事務狀态報告、擷取全局鎖等功能
  • 在不同的事務模式下,有不同的實作
    • AT對應DataSourceManager
    • SagaResourceManager
    • TCCResourceManager
    • 政策組合實作DefaultResourceManager
      • 利用SPI機制加載了各個模式的實作,并根據模式比對到不同實作

DataSourceProxy/ConnectionProxy

DataSourceProxy實作并包裝了現有DataSource,用ConnectionProxy代理了擷取到的連接配接Connection,以在開啟、送出和復原事務的進行中織入全局事務的邏輯。

依托于spring平台,業務事務控制邏輯總入口TransactionAspectSupport#invokeWithinTransaction

三、攔截處理流程

1、一階段

委托DefaultGlobalTransaction實作了全局事務的流程

GlobalTransactionalInterceptor#handleGlobalTransaction調用transactionalTemplate#execute方法,開始處理全局事務。

transactionalTemplate#execute編排整體事務

public Object execute(TransactionalExecutor business) throws Throwable {
    // 1. 根據上下文中的XID是否存在判定是否需要建立事務,這裡并不會通信。
    // 如果建立了事務,對應的角色是全局事務發起者GlobalTransactionRole.Launcher,且xid仍留白;
    // 如果有XID,對應角色為參與者
    GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

    // 1.1 get transactionInfo
    // 來資源調用時傳入的匿名實作,目前場景來自于注解 @GlobalTransactional
    TransactionInfo txInfo = business.getTransactionInfo();
    if (txInfo == null) {
        throw new ShouldNeverHappenException("transactionInfo does not exist");
    }
    try {

        // 2. begin transaction
        beginTransaction(txInfo, tx);

        Object rs = null;
        try {

            // Do Your Business
            // 執行業務。注意,業務中操作了被seata代理的datasource,将會走代理邏輯。
            rs = business.execute();

        } catch (Throwable ex) {

            // 3.the needed business exception to rollback.
            completeTransactionAfterThrowing(txInfo,tx,ex);
            throw ex;
        }

        // 4. everything is fine, commit.
        commitTransaction(tx);

        return rs;
    } finally {
        //5. clear
        triggerAfterCompletion();
        cleanUp();
    }
}
           

擷取或建立事務

GlobalTransactionContext#getCurrentOrCreate
// 不存在就建立
public static GlobalTransaction getCurrentOrCreate() {
    GlobalTransaction tx = getCurrent();
    if (tx == null) {
        return createNew();
    }
    return tx;
}

// 如果目前上下文xid不存在,傳回null,外部會進行建立邏輯;
// 如果xid存在,那麼目前元件角色是參與者,且狀态直接指定為Begin;
private static GlobalTransaction getCurrent() {
    String xid = RootContext.getXID();
    if (xid == null) {
        return null;
    }
    return new DefaultGlobalTransaction(xid, GlobalStatus.Begin, GlobalTransactionRole.Participant);
}

// 建立新的事務。注意,建立的事務狀态是未知的,而非開始。在開啟事務時才會調整事務狀态。
private static GlobalTransaction createNew() {
    GlobalTransaction tx = new DefaultGlobalTransaction();
    return tx;
}
DefaultGlobalTransaction() {
    this(null, GlobalStatus.UnKnown, GlobalTransactionRole.Launcher);
}
           

開啟事務

TransactionalTemplate#beginTransaction
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
    try {
        // 給出回調。這裡預設沒有注入任何hook
        triggerBeforeBegin();
        // 調用DefaultGlobalTransaction#begin開啟事務。将處理發起者和參與者各自的邏輯,區分點就是目前上下文有無XID
        tx.begin(txInfo.getTimeOut(), txInfo.getName());
        // 給出回調。這裡預設沒有注入任何hook
        triggerAfterBegin();
    } catch (TransactionException txe) {
        throw new TransactionalExecutor.ExecutionException(tx, txe,
            TransactionalExecutor.Code.BeginFailure);
    }
}
           
DefaultGlobalTransaction#begin

發起者

  • 調用TM,向TC發送請求注冊全局事務,拿到傳回的XID

參與者

  • 檢查XID必須存在
@Override
public void begin(int timeout, String name) throws TransactionException {
    // 如果是事務參與者,隻需要檢查必要狀态。實際要求XID必須存在
    if (role != GlobalTransactionRole.Launcher) {
        check();
        return;
    }
    // 如果是發起者(Launcher),xid在前面建立時是為null的,是以這裡肯定仍是不存在,否則抛錯。  
    if (xid != null) {
        throw new IllegalStateException();
    }
    if (RootContext.getXID() != null) {
        throw new IllegalStateException();
    }
    // 調用DefaultTransactionManager#begin拿到xid。這裡對應2PC中發起全局事務的流程。
    // 配置事務狀态,綁定xid到上下文
    xid = transactionManager.begin(null, null, name, timeout);
    status = GlobalStatus.Begin;
    RootContext.bind(xid);
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Begin new global transaction [" + xid + "]");
    }

}

// DefaultTransactionManager#begin
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
    throws TransactionException {
    // 封裝請求
    GlobalBeginRequest request = new GlobalBeginRequest();
    request.setTransactionName(name);
    request.setTimeout(timeout);
    // 内部請求發起時,最終會用到 SeataProperties#txServiceGroup 到file.conf/nacos配置中找對應的server ip和端口(配置key為service.vgroup_mapping.{$txServiceGroup})
    GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request);
    if (response.getResultCode() == ResultCode.Failed) {
        throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
    }
    return response.getXid();
}
           

執行本地業務

擷取本地連接配接

使用了ConnectionProxy代理原有Connection

@Override
public ConnectionProxy getConnection(String username, String password) throws SQLException {
    Connection targetConnection = targetDataSource.getConnection(username, password);
    return new ConnectionProxy(this, targetConnection);
}
           
本地事務執行

執行過程一般會使用到DataSourceTransactionManager控制業務層事務流程,過程中使用的DataSource和Connection均是被Seata代理過的。

DataSourceTransactionManager#doBegin

本地事務送出

調用鍊

DataSourceTransactionManager#doCommit

​ -> ConnectionProxy#commit

​ =>ConnectionProxy#processGlobalTransactionCommit

​ =>ConnectionProxy#register()

​ =>targetConnection.commit

// DataSourceTransactionManager#doCommit,由于用到getConnection,是以傳回的是ConnectionProxy
@Override
protected void doCommit(DefaultTransactionStatus status) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
    Connection con = txObject.getConnectionHolder().getConnection();
    try {
        // ConnectionProxy#commit
        con.commit();
    }
    catch (SQLException ex) {
        throw new TransactionSystemException("Could not commit JDBC transaction", ex);
    }
}

// ConnectionProxy#commit 
private void doCommit() throws SQLException {
    if (context.inGlobalTransaction()) {
        // 如果在全局事務中
        processGlobalTransactionCommit();
    } else if (context.isGlobalLockRequire()) {
        // 如果不在全局事務中,隻是需要用到全局鎖做讀隔離。内部擷取到全局鎖才可以送出
        processLocalCommitWithGlobalLocks();
    } else {
        // 非全局事務的業務,直接按本地邏輯送出
        targetConnection.commit();
    }
}

private void processGlobalTransactionCommit() throws SQLException {
    try {
        // 注冊分支事務,内部會向server發送請求,并拿到分支id
        register();
    } catch (TransactionException e) {
        recognizeLockKeyConflictException(e, context.buildLockKeys());
    }
    try {
        if (context.hasUndoLog()) {
            // 如果寫入了undoLog,這裡調用insertUndoLog對undolog入庫
            UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
        }
        // 送出本地事務
        targetConnection.commit();
    } catch (Throwable ex) {
        LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
        report(false);
        throw new SQLException(ex);
    }
    
    // 向server彙報一階段完成
    report(true);
    context.reset();
}

private void register() throws TransactionException {
    Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
        null, context.getXid(), null, context.buildLockKeys());
    context.setBranchId(branchId);
}
           
本地事務復原

DataSourceTransactionManager#doRollback -> ConnectionProxy#rollback

@Override
public void rollback() throws SQLException {
    // 復原本地事務
    targetConnection.rollback();
    if (context.inGlobalTransaction()) {
        if (context.isBranchRegistered()) {
            // 如果是全局事務,需要向server彙報執行結果
            report(false);
        }
    }
    context.reset();
}
           

2、二階段

RMInboundHandler回調方法處理了二階段事務(送出、復原消息),抽象實作AbstractRMHandler,具體實作RMHandlerTCC、RMHandlerSaga、DefaultRMHandler、RMHandlerAT

public interface RMInboundHandler {
    // 處理送出
    BranchCommitResponse handle(BranchCommitRequest request);
    // 處理復原
    BranchRollbackResponse handle(BranchRollbackRequest request);
    // 處理undolog
    void handle(UndoLogDeleteRequest request);
}
           

全局事務處理

入口與調用鍊

由RMClient#init時設定的

rmRpcClient.setClientMessageListener(new RmMessageListener(DefaultRMHandler.get()))

處理。

調用鍊

RmMessageListener#onMessage

=>RmMessageListener#handleBranchCommit

​ =>(DefaultRMHandler)AbstractRMHandler#onRequest(AbstractMessage request, RpcContext context)

​ =>DefaultRMHandler#handle(BranchCommitRequest)

​ =>(RMHandlerAT)AbstractRMHandler#handle(BranchCommitRequest)

​ =>(RMHandlerAT)AbstractRMHandler#doBranchCommit

​ =>DataSourceManager#branchCommit

// RmMessageListener#onMessage
@Override
public void onMessage(RpcMessage request, String serverAddress, ClientMessageSender sender) {
    Object msg = request.getBody();
    if (msg instanceof BranchCommitRequest) {
        // 分支送出邏輯
        handleBranchCommit(request, serverAddress, (BranchCommitRequest)msg, sender);
    } else if (msg instanceof BranchRollbackRequest) {
        // 分支復原邏輯
        handleBranchRollback(request, serverAddress, (BranchRollbackRequest)msg, sender);
    }else if (msg instanceof UndoLogDeleteRequest) {
        // 全局事務送出後清除undolog的邏輯
        handleUndoLogDelete((UndoLogDeleteRequest) msg);
    }
}

// RmMessageListener#handleBranchCommit
private void handleBranchCommit(RpcMessage request, String serverAddress,
                                BranchCommitRequest branchCommitRequest,
                                ClientMessageSender sender) {

    BranchCommitResponse resultMessage = null;
    try {
        // 調用AbstractRMHandler#onRequest
        resultMessage = (BranchCommitResponse)handler.onRequest(branchCommitRequest, null);
        sender.sendResponse(request, serverAddress, resultMessage);
    } catch (Exception e) {
        if (resultMessage == null) {
            resultMessage = new BranchCommitResponse();
        }
        resultMessage.setResultCode(ResultCode.Failed);
        resultMessage.setMsg(e.getMessage());
        sender.sendResponse(request, serverAddress, resultMessage);
    }
}


// (DefaultRMHandler)AbstractRMHandler#onRequest
@Override
public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
    if (!(request instanceof AbstractTransactionRequestToRM)) {
        throw new IllegalArgumentException();
    }
    AbstractTransactionRequestToRM transactionRequest = (AbstractTransactionRequestToRM)request;
    // 設定消息的處理器。這裡就是目前執行個體。由于RmMessageListener建構時使用的是DefaultRMHandler,是以這裡就是DefaultRMHandler
    transactionRequest.setRMInboundMessageHandler(this);

    // request本身内置了handle處理。這裡可以看到AbstractMessage的實作對應了各種消息
    return transactionRequest.handle(context);
}

// 送出請求
public class BranchCommitRequest extends AbstractBranchEndRequest {

    @Override
    public short getTypeCode() {
        return MessageType.TYPE_BRANCH_COMMIT;
    }
    @Override
    public AbstractTransactionResponse handle(RpcContext rpcContext) {
        // 實際調用DefaultRMHandler
        return handler.handle(this);
    }
}

// DefaultRMHandler#handle(BranchCommitRequest)
@Override
public BranchCommitResponse handle(BranchCommitRequest request) {
    // 使用政策模式拿到對應處理器處理。之類跟進RMHandlerAT
    return getRMHandler(request.getBranchType()).handle(request);
}

// 由于RMHandlerAT沒有實作handle,是以使用了的父類的實作AbstractRMHandler#handle(BranchCommitRequest)
@Override
public BranchCommitResponse handle(BranchCommitRequest request) {
    BranchCommitResponse response = new BranchCommitResponse();
    // 處理送出請求。AbstractCallback是個回調,exceptionHandleTemplate直接調用了其實作的方法對請求進行處理
    exceptionHandleTemplate(new AbstractCallback<BranchCommitRequest, BranchCommitResponse>() {
        @Override
        public void execute(BranchCommitRequest request, BranchCommitResponse response)
            throws TransactionException {
            doBranchCommit(request, response);
        }
    }, request, response);
    return response;
}

// AbstractRMHandler#doBranchCommit
protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response)
    throws TransactionException {
    String xid = request.getXid();
    long branchId = request.getBranchId();
    String resourceId = request.getResourceId();
    String applicationData = request.getApplicationData();
    // 最終的最終,還是委托RM處理。AT對應的RM是DataSourceManager
    BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId,
                                                            applicationData);
    response.setXid(xid);
    response.setBranchId(branchId);
    response.setBranchStatus(status);
}

// DataSourceManager#branchCommit
@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                 String applicationData) throws TransactionException {
    // DataSourceManager#init方法中給出的asyncWorker實作是AsyncWorker
    return asyncWorker.branchCommit(branchType, xid, branchId, resourceId, applicationData);
}
// DataSourceManager#init
@Override
public void init() {
    AsyncWorker asyncWorker = new AsyncWorker();
    asyncWorker.init();
    initAsyncWorker(asyncWorker);
}

// AsyncWorker#branchCommit
@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
    // 入緩存隊列。可以看到即使入隊失敗,也會送出掉
    if (!ASYNC_COMMIT_BUFFER.offer(new Phase2Context(branchType, xid, branchId, resourceId, applicationData))) {
        LOGGER.warn("Async commit buffer is FULL. Rejected branch [" + branchId + "/" + xid + "] will be handled by housekeeping later.");
    }
    return BranchStatus.PhaseTwo_Committed;
}

/**
 * AsyncWorker#Init.從隊列中取任務執行二階段送出。
 */
public synchronized void init() {
    LOGGER.info("Async Commit Buffer Limit: " + ASYNC_COMMIT_BUFFER_LIMIT);
    timerExecutor = new ScheduledThreadPoolExecutor(1,
                                                    new NamedThreadFactory("AsyncWorker", 1, true));
    timerExecutor.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {

                doBranchCommits();

            } catch (Throwable e) {
                LOGGER.info("Failed at async committing ... " + e.getMessage());

            }
        }
    }, 10, 1000 * 1, TimeUnit.MILLISECONDS);
}
           

四、多服務調用AT模式示例

1、服務與身份

MicroServiceFirst: 發起者,簡寫M1;M1即是TM又是RM

MicroServiceSecond: 參與者,簡寫M2

M1執行本地事務中調用M2,M2執行了分支事務

2、必要條件

  1. M1和M2啟用了seata;
  2. M1方法标注了@GlobalTransactional,以被GlobalTransactionScanner生成代理對象,添加GlobalTransactionalInterceptor攔截器;
    • M2方法可以不标注@GlobalTransactional,但是需要代理資料源,并在rpc中拿到XID放到上下文中
  3. M1到M2的遠端調用可以支援Seata上下文處理,并且具備必要SDK以和TC通訊。

3、第一階段

發起者控制主流程

M1注冊全局事務
  1. M1開啟全局事務時向TC請求注冊全局事務,拿到XID;
M1執行本地事務

擷取連接配接時會拿到ConnectionProxy;

開啟本地事務沒什麼特殊的地方;

執行本地事務,但未送出本地事務;

M1調用遠端服務M2

一般在本地事務内部會調用RPC,并在RPC中附帶XID

  • M2收到RPC,進行處理
    • RPC方式支援Seata,一般都是引入Interceptor預處理XID到上下文;
    • M2執行本地事務
      • 由于資料源被代理,并且Interceptor注入了XID,在送出本地事務時,會先進行分支事務的注冊拿到branId
        • 注冊前需要拿到全局鎖
      • 注冊完畢後,會處理插入undolog,然後和本地事務一起送出
  • M2向TC報告本地事務執行狀态為成功,如果遇到異常復原了,會報告異常。假設這裡成功。
    本地commit成功時,後進行report到TC,如果本地異常了,會先report異常到TC,而後抛出異常傳遞到上層,觸發上層的本地復原邏輯
    • 本地事務送出完畢後,M2就很自然地釋放本地事務占用的鎖資源
    • 注意,M2此時并沒有釋放全局鎖
  • M1繼續處理自己的業務
  • M1送出本地事務
    • M1送出前需要嘗試擷取全局鎖,因為和M2同屬一個全局事務,是以可以拿到全局鎖
    • 流程同M2

4、第二階段

發起者收尾流程

第一階段處理完畢後,實質上所有的分支事務都已經送出完畢了,二階段M1根據參與者執行狀态向TC報告是否送出全局事務(注意:一般的理論模型是由TC決定)。

無論處理如何,正常情況均會删去undolog。

  • M1報告事務處理狀态到TC
  • M1的本地事務處理完畢後,開始彙報全局事務的結果。假設都成功,這裡決定送出全局事務
    • 是以這裡rpc調用的遠端事務如果無法讓M1感覺到異常,可能M1最終的決策是送出。
  • TC收到送出請求後,修改全局事務狀态為異步送出,然後傳回送出成功給M1。
  • 流程實際結束,等待TC的送出決定,删除undolog。

繼續閱讀