天天看点

Seata事务处理流程(主AT)Seata事务处理流程

文章目录

  • Seata事务处理流程
    • 前言
    • 一、组件装配
    • 二、核心组件源码分析
      • 1、GlobalTransactionScanner
        • 1.1、职能详解
          • 1.1.1、AbstractAutoProxyCreator
            • postProcessBeforeInitialization
            • wrapIfNecessary
          • 1.1.2、InitializingBean
      • 2、GlobalTransactionalInterceptor
        • 2.1、职能详解
          • 2.1.1、ConfigurationChangeListener
          • 2.1.2、MethodInterceptor
      • 3、全局事务控制依赖的核心组件一览
        • TMClient
        • RMClient
        • TransactionalTemplate
        • GlobalTransaction
        • TransactionManager
        • ResourceManager
        • DataSourceProxy/ConnectionProxy
    • 三、拦截处理流程
    • 1、一阶段
        • transactionalTemplate#execute编排整体事务
        • 获取或创建事务
          • GlobalTransactionContext#getCurrentOrCreate
        • 开启事务
          • TransactionalTemplate#beginTransaction
          • DefaultGlobalTransaction#begin
        • 执行本地业务
          • 获取本地连接
          • 本地事务执行
          • 本地事务提交
          • 本地事务回滚
      • 2、二阶段
        • 全局事务处理
          • 入口与调用链
    • 四、多服务调用AT模式示例
      • 1、服务与身份
      • 2、必要条件
      • 3、第一阶段
        • 发起者控制主流程
          • M1注册全局事务
          • M1执行本地事务
          • M1调用远程服务M2
      • 4、第二阶段
        • 发起者收尾流程

Seata事务处理流程

前言

最近在攻克分布式相关的技术栈,刚好需要一个case辅助理解分布式事务和共识,就用seata看了下工程实践中的实现模型。

本文主要记录分析客户端的行为。

参考 https://blog.csdn.net/hosaos/article/details/89403552

参考官方讲解 http://seata.io/zh-cn/docs/overview/what-is-seata.html

版本0.9.0。注意距离新版本较远,可能在细节实现上有所不同。

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-alibaba-seata</artifactId>
    <version>2.1.1.RELEASE</version>
</dependency>
<!-- 内部依赖 -->
<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-all</artifactId>
    <version>0.9.0</version>
</dependency>
           

一、组件装配

GlobalTransactionAutoConfiguration

依赖spring.factories机制自动装配了

GlobalTransactionScanner

组件

@Configuration
@EnableConfigurationProperties(SeataProperties.class)
public class GlobalTransactionAutoConfiguration {
...
 @Bean
 public GlobalTransactionScanner globalTransactionScanner() {

  String applicationName = applicationContext.getEnvironment()
    .getProperty("spring.application.name");

  String txServiceGroup = seataProperties.getTxServiceGroup();

  if (StringUtils.isEmpty(txServiceGroup)) {
   txServiceGroup = applicationName + "-seata-service-group";
   seataProperties.setTxServiceGroup(txServiceGroup);
  }
  return new GlobalTransactionScanner(applicationName, txServiceGroup);
 }
}
           

二、核心组件源码分析

1、GlobalTransactionScanner

1.1、职能详解

初始化必要组件;扫描代理数据源,织入全局事务的处理逻辑
1.1.1、AbstractAutoProxyCreator

postProcessBeforeInitialization

目的:代理JDBC数据源,拦截DataSource所有方法的执行,以织入全局事务的处理逻辑。

@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
    if (bean instanceof DataSource && !(bean instanceof DataSourceProxy) && ConfigurationFactory.getInstance().getBoolean(DATASOURCE_AUTOPROXY, false)) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Auto proxy of  [" + beanName + "]");
        }
        DataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) bean);
        return Enhancer.create(bean.getClass(), (org.springframework.cglib.proxy.MethodInterceptor) (o, method, args, methodProxy) -> {
            Method m = BeanUtils.findDeclaredMethod(DataSourceProxy.class, method.getName(), method.getParameterTypes());
            if (null != m) {
                return m.invoke(dataSourceProxy, args);
            } else {
                boolean oldAccessible = method.isAccessible();
                try {
                    method.setAccessible(true);
                    return method.invoke(bean, args);
                } finally {
                    //recover the original accessible for security reason
                    method.setAccessible(oldAccessible);
                }
            }
        });
    }
    return bean;
}
           

wrapIfNecessary

为指定的bean生成代理对象(加入拦截器),以应对TCC和全局事务的处理。

protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
    if (disableGlobalTransaction) {
        return bean;
    }
    synchronized (PROXYED_SET) {
        // 代理完毕加入集合,确保只会代理一次
        if (PROXYED_SET.contains(beanName)) {
            return bean;
        }
        interceptor = null;
        //check TCC proxy
        // 根据bean定义信息,决定是否用TCC代理模式
        if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
            //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
            interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
        } else {
            Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
            Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
            if (!existsAnnotation(new Class[]{serviceInterface})
                && !existsAnnotation(interfacesIfJdk)) {
                return bean;
            }
            if (interceptor == null) {
                // 不是TCC,就是用这个拦截器,处理@GlobalTransactional和@GlobalLock
                interceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
            }
        }            
        if (!AopUtils.isAopProxy(bean)) {
            bean = super.wrapIfNecessary(bean, beanName, cacheKey);
        } else {
            AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
            Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
            for (Advisor avr : advisor) {
                advised.addAdvisor(0, avr);
            }
        }
        PROXYED_SET.add(beanName);
        return bean;
    }
}
           
1.1.2、InitializingBean
  1. 根据disableGlobalTransaction配置决定是否初始化组件,由seata配置文件给出
  2. 如果disableGlobalTransaction == false,初始化TMClient和RMClient
private final boolean disableGlobalTransaction =
        ConfigurationFactory.getInstance().getBoolean("service.disableGlobalTransaction", false);

@Override
public void afterPropertiesSet() {
    // 控制事务生效的开关
    if (disableGlobalTransaction) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Global transaction is disabled.");
        }
        return;
    }
    initClient();
}
           
private void initClient() {
    if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
        throw new IllegalArgumentException(
            "applicationId: " + applicationId + ", txServiceGroup: " + txServiceGroup);
    }
    //init TM
    TMClient.init(applicationId, txServiceGroup);
    //init RM
    RMClient.init(applicationId, txServiceGroup);
    registerSpringShutdownHook();
}
           

2、GlobalTransactionalInterceptor

2.1、职能详解

2.1.1、ConfigurationChangeListener

允许运行中调整seata全局事务的开启和关闭

2.1.2、MethodInterceptor

处理@GlobalTransactional和@GlobalLock,控制全局事务处理流程。

@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
    Class<?> targetClass = (methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null);
    Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
    final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
    // 拿到注解
    final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
    final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);
    if (globalTransactionalAnnotation != null) {
        // 处理标注了@GlobalTransactional的方法
        return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
    } else if (globalLockAnnotation != null) {
        return handleGlobalLock(methodInvocation);
    } else {
        return methodInvocation.proceed();
    }
}
           

3、全局事务控制依赖的核心组件一览

TMClient

仅有一个init方法,初始化TmRpcClient,用于和TC交互

public static void init(String applicationId, String transactionServiceGroup) {
    TmRpcClient tmRpcClient = TmRpcClient.getInstance(applicationId, transactionServiceGroup);
    // 内部初始化其他组件,执行一些定时任务
    tmRpcClient.init();
}
           

RMClient

仅有一个init方法

  1. 初始化RmRpcClient,用于和TC通讯;
  2. 设置RM,用于注册分支事务,上报事务执行状态,处理二阶段事务。
    1. 这里给出的DefaultResourceManager组合并持有了SPI找到的RM(resourceManagers字段),用策略模式委托给具体的ResourceManager处理对应的事务。
  3. 设置RmMessageListener,处理事件
    • 处理器使用了DefaultRMHandler,请求委托给其处理
      • DefaultRMHandler
public static void init(String applicationId, String transactionServiceGroup) {
    RmRpcClient rmRpcClient = RmRpcClient.getInstance(applicationId, transactionServiceGroup);
    // 注册RM。内部使用了SPI机制加载各种模式的ResourceManager。AT在这里对应的是DataSourceManager
    rmRpcClient.setResourceManager(DefaultResourceManager.get());
    // 监听器用于处理RPC请求
    rmRpcClient.setClientMessageListener(new RmMessageListener(DefaultRMHandler.get()));
    rmRpcClient.init();
}
           

其内部注册了监听器RmMessageListener,用于处理TC发回的二阶段RPC,一般需要委托给RM处理

TransactionalTemplate

  • TransactionalTemplate#execute是GlobalTransactionalInterceptor拦截业务的入口
  • 拥有成员
    • GlobalTransaction

GlobalTransaction

  • 默认实现DefaultGlobalTransaction
  • 全局事务抽象,装饰/代理了TransactionManager,所以拥有对全局事务的控制api,如开始、提交、回滚,获取状态,获取xid,报告执行状态等;
  • 拥有成员
    • TransactionManager

TransactionManager

  • 分布式事务中TM角色,用于定义一个全局事务并控制(通知TC)提交还是回滚。
  • 默认实现DefaultTransactionManager
  • 面向全局事务进行操作,与本地事务关联不大。
    • 但是本地事务的正常/异常,会导致在二阶段触发提交和回滚
      • 一般在本地事务中嵌套调用了rpc远程事务,如果需要回滚,就需要在发起者处能够感知到异常,从而触发回滚逻辑
    • TM需要感知远程事务的异常决定发往TC是提交还是回滚
      • TC的提交在AT模式下是异步的,交给定时任务去做。
  • 成员
    • TmRpcClient
      • 用于和TC通讯,以操作在服务端建立的全局事务会话

ResourceManager

  • 分布式事务中本地事务管理器RM角色,用于控制分支事务(对本地事务的操作)。
    • 提供注册/取消分支、提交/回滚分支事务、分支事务状态报告、获取全局锁等功能
  • 在不同的事务模式下,有不同的实现
    • AT对应DataSourceManager
    • SagaResourceManager
    • TCCResourceManager
    • 策略组合实现DefaultResourceManager
      • 利用SPI机制加载了各个模式的实现,并根据模式匹配到不同实现

DataSourceProxy/ConnectionProxy

DataSourceProxy实现并包装了现有DataSource,用ConnectionProxy代理了获取到的连接Connection,以在开启、提交和回滚事务的处理中织入全局事务的逻辑。

依托于spring平台,业务事务控制逻辑总入口TransactionAspectSupport#invokeWithinTransaction

三、拦截处理流程

1、一阶段

委托DefaultGlobalTransaction实现了全局事务的流程

GlobalTransactionalInterceptor#handleGlobalTransaction调用transactionalTemplate#execute方法,开始处理全局事务。

transactionalTemplate#execute编排整体事务

public Object execute(TransactionalExecutor business) throws Throwable {
    // 1. 根据上下文中的XID是否存在判定是否需要创建事务,这里并不会通信。
    // 如果创建了事务,对应的角色是全局事务发起者GlobalTransactionRole.Launcher,且xid仍留空;
    // 如果有XID,对应角色为参与者
    GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

    // 1.1 get transactionInfo
    // 来资源调用时传入的匿名实现,当前场景来自于注解 @GlobalTransactional
    TransactionInfo txInfo = business.getTransactionInfo();
    if (txInfo == null) {
        throw new ShouldNeverHappenException("transactionInfo does not exist");
    }
    try {

        // 2. begin transaction
        beginTransaction(txInfo, tx);

        Object rs = null;
        try {

            // Do Your Business
            // 执行业务。注意,业务中操作了被seata代理的datasource,将会走代理逻辑。
            rs = business.execute();

        } catch (Throwable ex) {

            // 3.the needed business exception to rollback.
            completeTransactionAfterThrowing(txInfo,tx,ex);
            throw ex;
        }

        // 4. everything is fine, commit.
        commitTransaction(tx);

        return rs;
    } finally {
        //5. clear
        triggerAfterCompletion();
        cleanUp();
    }
}
           

获取或创建事务

GlobalTransactionContext#getCurrentOrCreate
// 不存在就创建
public static GlobalTransaction getCurrentOrCreate() {
    GlobalTransaction tx = getCurrent();
    if (tx == null) {
        return createNew();
    }
    return tx;
}

// 如果当前上下文xid不存在,返回null,外部会进行创建逻辑;
// 如果xid存在,那么当前组件角色是参与者,且状态直接指定为Begin;
private static GlobalTransaction getCurrent() {
    String xid = RootContext.getXID();
    if (xid == null) {
        return null;
    }
    return new DefaultGlobalTransaction(xid, GlobalStatus.Begin, GlobalTransactionRole.Participant);
}

// 创建新的事务。注意,新建的事务状态是未知的,而非开始。在开启事务时才会调整事务状态。
private static GlobalTransaction createNew() {
    GlobalTransaction tx = new DefaultGlobalTransaction();
    return tx;
}
DefaultGlobalTransaction() {
    this(null, GlobalStatus.UnKnown, GlobalTransactionRole.Launcher);
}
           

开启事务

TransactionalTemplate#beginTransaction
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
    try {
        // 给出回调。这里默认没有注入任何hook
        triggerBeforeBegin();
        // 调用DefaultGlobalTransaction#begin开启事务。将处理发起者和参与者各自的逻辑,区分点就是当前上下文有无XID
        tx.begin(txInfo.getTimeOut(), txInfo.getName());
        // 给出回调。这里默认没有注入任何hook
        triggerAfterBegin();
    } catch (TransactionException txe) {
        throw new TransactionalExecutor.ExecutionException(tx, txe,
            TransactionalExecutor.Code.BeginFailure);
    }
}
           
DefaultGlobalTransaction#begin

发起者

  • 调用TM,向TC发送请求注册全局事务,拿到返回的XID

参与者

  • 检查XID必须存在
@Override
public void begin(int timeout, String name) throws TransactionException {
    // 如果是事务参与者,只需要检查必要状态。实际要求XID必须存在
    if (role != GlobalTransactionRole.Launcher) {
        check();
        return;
    }
    // 如果是发起者(Launcher),xid在前面创建时是为null的,所以这里肯定仍是不存在,否则抛错。  
    if (xid != null) {
        throw new IllegalStateException();
    }
    if (RootContext.getXID() != null) {
        throw new IllegalStateException();
    }
    // 调用DefaultTransactionManager#begin拿到xid。这里对应2PC中发起全局事务的流程。
    // 配置事务状态,绑定xid到上下文
    xid = transactionManager.begin(null, null, name, timeout);
    status = GlobalStatus.Begin;
    RootContext.bind(xid);
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Begin new global transaction [" + xid + "]");
    }

}

// DefaultTransactionManager#begin
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
    throws TransactionException {
    // 封装请求
    GlobalBeginRequest request = new GlobalBeginRequest();
    request.setTransactionName(name);
    request.setTimeout(timeout);
    // 内部请求发起时,最终会用到 SeataProperties#txServiceGroup 到file.conf/nacos配置中找对应的server ip和端口(配置key为service.vgroup_mapping.{$txServiceGroup})
    GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request);
    if (response.getResultCode() == ResultCode.Failed) {
        throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
    }
    return response.getXid();
}
           

执行本地业务

获取本地连接

使用了ConnectionProxy代理原有Connection

@Override
public ConnectionProxy getConnection(String username, String password) throws SQLException {
    Connection targetConnection = targetDataSource.getConnection(username, password);
    return new ConnectionProxy(this, targetConnection);
}
           
本地事务执行

执行过程一般会使用到DataSourceTransactionManager控制业务层事务流程,过程中使用的DataSource和Connection均是被Seata代理过的。

DataSourceTransactionManager#doBegin

本地事务提交

调用链

DataSourceTransactionManager#doCommit

​ -> ConnectionProxy#commit

​ =>ConnectionProxy#processGlobalTransactionCommit

​ =>ConnectionProxy#register()

​ =>targetConnection.commit

// DataSourceTransactionManager#doCommit,由于用到getConnection,所以返回的是ConnectionProxy
@Override
protected void doCommit(DefaultTransactionStatus status) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
    Connection con = txObject.getConnectionHolder().getConnection();
    try {
        // ConnectionProxy#commit
        con.commit();
    }
    catch (SQLException ex) {
        throw new TransactionSystemException("Could not commit JDBC transaction", ex);
    }
}

// ConnectionProxy#commit 
private void doCommit() throws SQLException {
    if (context.inGlobalTransaction()) {
        // 如果在全局事务中
        processGlobalTransactionCommit();
    } else if (context.isGlobalLockRequire()) {
        // 如果不在全局事务中,只是需要用到全局锁做读隔离。内部获取到全局锁才可以提交
        processLocalCommitWithGlobalLocks();
    } else {
        // 非全局事务的业务,直接按本地逻辑提交
        targetConnection.commit();
    }
}

private void processGlobalTransactionCommit() throws SQLException {
    try {
        // 注册分支事务,内部会向server发送请求,并拿到分支id
        register();
    } catch (TransactionException e) {
        recognizeLockKeyConflictException(e, context.buildLockKeys());
    }
    try {
        if (context.hasUndoLog()) {
            // 如果写入了undoLog,这里调用insertUndoLog对undolog入库
            UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
        }
        // 提交本地事务
        targetConnection.commit();
    } catch (Throwable ex) {
        LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
        report(false);
        throw new SQLException(ex);
    }
    
    // 向server汇报一阶段完成
    report(true);
    context.reset();
}

private void register() throws TransactionException {
    Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
        null, context.getXid(), null, context.buildLockKeys());
    context.setBranchId(branchId);
}
           
本地事务回滚

DataSourceTransactionManager#doRollback -> ConnectionProxy#rollback

@Override
public void rollback() throws SQLException {
    // 回滚本地事务
    targetConnection.rollback();
    if (context.inGlobalTransaction()) {
        if (context.isBranchRegistered()) {
            // 如果是全局事务,需要向server汇报执行结果
            report(false);
        }
    }
    context.reset();
}
           

2、二阶段

RMInboundHandler回调方法处理了二阶段事务(提交、回滚消息),抽象实现AbstractRMHandler,具体实现RMHandlerTCC、RMHandlerSaga、DefaultRMHandler、RMHandlerAT

public interface RMInboundHandler {
    // 处理提交
    BranchCommitResponse handle(BranchCommitRequest request);
    // 处理回滚
    BranchRollbackResponse handle(BranchRollbackRequest request);
    // 处理undolog
    void handle(UndoLogDeleteRequest request);
}
           

全局事务处理

入口与调用链

由RMClient#init时设置的

rmRpcClient.setClientMessageListener(new RmMessageListener(DefaultRMHandler.get()))

处理。

调用链

RmMessageListener#onMessage

=>RmMessageListener#handleBranchCommit

​ =>(DefaultRMHandler)AbstractRMHandler#onRequest(AbstractMessage request, RpcContext context)

​ =>DefaultRMHandler#handle(BranchCommitRequest)

​ =>(RMHandlerAT)AbstractRMHandler#handle(BranchCommitRequest)

​ =>(RMHandlerAT)AbstractRMHandler#doBranchCommit

​ =>DataSourceManager#branchCommit

// RmMessageListener#onMessage
@Override
public void onMessage(RpcMessage request, String serverAddress, ClientMessageSender sender) {
    Object msg = request.getBody();
    if (msg instanceof BranchCommitRequest) {
        // 分支提交逻辑
        handleBranchCommit(request, serverAddress, (BranchCommitRequest)msg, sender);
    } else if (msg instanceof BranchRollbackRequest) {
        // 分支回滚逻辑
        handleBranchRollback(request, serverAddress, (BranchRollbackRequest)msg, sender);
    }else if (msg instanceof UndoLogDeleteRequest) {
        // 全局事务提交后清除undolog的逻辑
        handleUndoLogDelete((UndoLogDeleteRequest) msg);
    }
}

// RmMessageListener#handleBranchCommit
private void handleBranchCommit(RpcMessage request, String serverAddress,
                                BranchCommitRequest branchCommitRequest,
                                ClientMessageSender sender) {

    BranchCommitResponse resultMessage = null;
    try {
        // 调用AbstractRMHandler#onRequest
        resultMessage = (BranchCommitResponse)handler.onRequest(branchCommitRequest, null);
        sender.sendResponse(request, serverAddress, resultMessage);
    } catch (Exception e) {
        if (resultMessage == null) {
            resultMessage = new BranchCommitResponse();
        }
        resultMessage.setResultCode(ResultCode.Failed);
        resultMessage.setMsg(e.getMessage());
        sender.sendResponse(request, serverAddress, resultMessage);
    }
}


// (DefaultRMHandler)AbstractRMHandler#onRequest
@Override
public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
    if (!(request instanceof AbstractTransactionRequestToRM)) {
        throw new IllegalArgumentException();
    }
    AbstractTransactionRequestToRM transactionRequest = (AbstractTransactionRequestToRM)request;
    // 设置消息的处理器。这里就是当前实例。由于RmMessageListener构建时使用的是DefaultRMHandler,所以这里就是DefaultRMHandler
    transactionRequest.setRMInboundMessageHandler(this);

    // request本身内置了handle处理。这里可以看到AbstractMessage的实现对应了各种消息
    return transactionRequest.handle(context);
}

// 提交请求
public class BranchCommitRequest extends AbstractBranchEndRequest {

    @Override
    public short getTypeCode() {
        return MessageType.TYPE_BRANCH_COMMIT;
    }
    @Override
    public AbstractTransactionResponse handle(RpcContext rpcContext) {
        // 实际调用DefaultRMHandler
        return handler.handle(this);
    }
}

// DefaultRMHandler#handle(BranchCommitRequest)
@Override
public BranchCommitResponse handle(BranchCommitRequest request) {
    // 使用策略模式拿到对应处理器处理。之类跟进RMHandlerAT
    return getRMHandler(request.getBranchType()).handle(request);
}

// 由于RMHandlerAT没有实现handle,所以使用了的父类的实现AbstractRMHandler#handle(BranchCommitRequest)
@Override
public BranchCommitResponse handle(BranchCommitRequest request) {
    BranchCommitResponse response = new BranchCommitResponse();
    // 处理提交请求。AbstractCallback是个回调,exceptionHandleTemplate直接调用了其实现的方法对请求进行处理
    exceptionHandleTemplate(new AbstractCallback<BranchCommitRequest, BranchCommitResponse>() {
        @Override
        public void execute(BranchCommitRequest request, BranchCommitResponse response)
            throws TransactionException {
            doBranchCommit(request, response);
        }
    }, request, response);
    return response;
}

// AbstractRMHandler#doBranchCommit
protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response)
    throws TransactionException {
    String xid = request.getXid();
    long branchId = request.getBranchId();
    String resourceId = request.getResourceId();
    String applicationData = request.getApplicationData();
    // 最终的最终,还是委托RM处理。AT对应的RM是DataSourceManager
    BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId,
                                                            applicationData);
    response.setXid(xid);
    response.setBranchId(branchId);
    response.setBranchStatus(status);
}

// DataSourceManager#branchCommit
@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                 String applicationData) throws TransactionException {
    // DataSourceManager#init方法中给出的asyncWorker实现是AsyncWorker
    return asyncWorker.branchCommit(branchType, xid, branchId, resourceId, applicationData);
}
// DataSourceManager#init
@Override
public void init() {
    AsyncWorker asyncWorker = new AsyncWorker();
    asyncWorker.init();
    initAsyncWorker(asyncWorker);
}

// AsyncWorker#branchCommit
@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
    // 入缓存队列。可以看到即使入队失败,也会提交掉
    if (!ASYNC_COMMIT_BUFFER.offer(new Phase2Context(branchType, xid, branchId, resourceId, applicationData))) {
        LOGGER.warn("Async commit buffer is FULL. Rejected branch [" + branchId + "/" + xid + "] will be handled by housekeeping later.");
    }
    return BranchStatus.PhaseTwo_Committed;
}

/**
 * AsyncWorker#Init.从队列中取任务执行二阶段提交。
 */
public synchronized void init() {
    LOGGER.info("Async Commit Buffer Limit: " + ASYNC_COMMIT_BUFFER_LIMIT);
    timerExecutor = new ScheduledThreadPoolExecutor(1,
                                                    new NamedThreadFactory("AsyncWorker", 1, true));
    timerExecutor.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {

                doBranchCommits();

            } catch (Throwable e) {
                LOGGER.info("Failed at async committing ... " + e.getMessage());

            }
        }
    }, 10, 1000 * 1, TimeUnit.MILLISECONDS);
}
           

四、多服务调用AT模式示例

1、服务与身份

MicroServiceFirst: 发起者,简写M1;M1即是TM又是RM

MicroServiceSecond: 参与者,简写M2

M1执行本地事务中调用M2,M2执行了分支事务

2、必要条件

  1. M1和M2启用了seata;
  2. M1方法标注了@GlobalTransactional,以被GlobalTransactionScanner生成代理对象,添加GlobalTransactionalInterceptor拦截器;
    • M2方法可以不标注@GlobalTransactional,但是需要代理数据源,并在rpc中拿到XID放到上下文中
  3. M1到M2的远程调用可以支持Seata上下文处理,并且具备必要SDK以和TC通讯。

3、第一阶段

发起者控制主流程

M1注册全局事务
  1. M1开启全局事务时向TC请求注册全局事务,拿到XID;
M1执行本地事务

获取连接时会拿到ConnectionProxy;

开启本地事务没什么特殊的地方;

执行本地事务,但未提交本地事务;

M1调用远程服务M2

一般在本地事务内部会调用RPC,并在RPC中附带XID

  • M2收到RPC,进行处理
    • RPC方式支持Seata,一般都是引入Interceptor预处理XID到上下文;
    • M2执行本地事务
      • 由于数据源被代理,并且Interceptor注入了XID,在提交本地事务时,会先进行分支事务的注册拿到branId
        • 注册前需要拿到全局锁
      • 注册完毕后,会处理插入undolog,然后和本地事务一起提交
  • M2向TC报告本地事务执行状态为成功,如果遇到异常回滚了,会报告异常。假设这里成功。
    本地commit成功时,后进行report到TC,如果本地异常了,会先report异常到TC,而后抛出异常传递到上层,触发上层的本地回滚逻辑
    • 本地事务提交完毕后,M2就很自然地释放本地事务占用的锁资源
    • 注意,M2此时并没有释放全局锁
  • M1继续处理自己的业务
  • M1提交本地事务
    • M1提交前需要尝试获取全局锁,因为和M2同属一个全局事务,所以可以拿到全局锁
    • 流程同M2

4、第二阶段

发起者收尾流程

第一阶段处理完毕后,实质上所有的分支事务都已经提交完毕了,二阶段M1根据参与者执行状态向TC报告是否提交全局事务(注意:一般的理论模型是由TC决定)。

无论处理如何,正常情况均会删去undolog。

  • M1报告事务处理状态到TC
  • M1的本地事务处理完毕后,开始汇报全局事务的结果。假设都成功,这里决定提交全局事务
    • 因此这里rpc调用的远程事务如果无法让M1感知到异常,可能M1最终的决策是提交。
  • TC收到提交请求后,修改全局事务状态为异步提交,然后返回提交成功给M1。
  • 流程实际结束,等待TC的提交决定,删除undolog。

继续阅读