文章目錄
- Seata事務處理流程
-
- 前言
- 一、元件裝配
- 二、核心元件源碼分析
-
- 1、GlobalTransactionScanner
-
- 1.1、職能詳解
-
- 1.1.1、AbstractAutoProxyCreator
-
- postProcessBeforeInitialization
- wrapIfNecessary
- 1.1.2、InitializingBean
- 2、GlobalTransactionalInterceptor
-
- 2.1、職能詳解
-
- 2.1.1、ConfigurationChangeListener
- 2.1.2、MethodInterceptor
- 3、全局事務控制依賴的核心元件一覽
-
- TMClient
- RMClient
- TransactionalTemplate
- GlobalTransaction
- TransactionManager
- ResourceManager
- DataSourceProxy/ConnectionProxy
- 三、攔截處理流程
- 1、一階段
-
-
- transactionalTemplate#execute編排整體事務
- 擷取或建立事務
-
- GlobalTransactionContext#getCurrentOrCreate
- 開啟事務
-
- TransactionalTemplate#beginTransaction
- DefaultGlobalTransaction#begin
- 執行本地業務
-
- 擷取本地連接配接
- 本地事務執行
- 本地事務送出
- 本地事務復原
- 2、二階段
-
- 全局事務處理
-
- 入口與調用鍊
-
- 四、多服務調用AT模式示例
-
- 1、服務與身份
- 2、必要條件
- 3、第一階段
-
- 發起者控制主流程
-
- M1注冊全局事務
- M1執行本地事務
- M1調用遠端服務M2
- 4、第二階段
-
- 發起者收尾流程
Seata事務處理流程
前言
最近在攻克分布式相關的技術棧,剛好需要一個case輔助了解分布式事務和共識,就用seata看了下工程實踐中的實作模型。
本文主要記錄分析用戶端的行為。
參考 https://blog.csdn.net/hosaos/article/details/89403552
參考官方講解 http://seata.io/zh-cn/docs/overview/what-is-seata.html
版本0.9.0。注意距離新版本較遠,可能在細節實作上有所不同。
<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-seata</artifactId> <version>2.1.1.RELEASE</version> </dependency> <!-- 内部依賴 --> <dependency> <groupId>io.seata</groupId> <artifactId>seata-all</artifactId> <version>0.9.0</version> </dependency>
一、元件裝配
GlobalTransactionAutoConfiguration
依賴spring.factories機制自動裝配了
GlobalTransactionScanner
元件
@Configuration
@EnableConfigurationProperties(SeataProperties.class)
public class GlobalTransactionAutoConfiguration {
...
@Bean
public GlobalTransactionScanner globalTransactionScanner() {
String applicationName = applicationContext.getEnvironment()
.getProperty("spring.application.name");
String txServiceGroup = seataProperties.getTxServiceGroup();
if (StringUtils.isEmpty(txServiceGroup)) {
txServiceGroup = applicationName + "-seata-service-group";
seataProperties.setTxServiceGroup(txServiceGroup);
}
return new GlobalTransactionScanner(applicationName, txServiceGroup);
}
}
二、核心元件源碼分析
1、GlobalTransactionScanner
1.1、職能詳解
初始化必要元件;掃描代理資料源,織入全局事務的處理邏輯
1.1.1、AbstractAutoProxyCreator
postProcessBeforeInitialization
目的:代理JDBC資料源,攔截DataSource所有方法的執行,以織入全局事務的處理邏輯。
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof DataSource && !(bean instanceof DataSourceProxy) && ConfigurationFactory.getInstance().getBoolean(DATASOURCE_AUTOPROXY, false)) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Auto proxy of [" + beanName + "]");
}
DataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) bean);
return Enhancer.create(bean.getClass(), (org.springframework.cglib.proxy.MethodInterceptor) (o, method, args, methodProxy) -> {
Method m = BeanUtils.findDeclaredMethod(DataSourceProxy.class, method.getName(), method.getParameterTypes());
if (null != m) {
return m.invoke(dataSourceProxy, args);
} else {
boolean oldAccessible = method.isAccessible();
try {
method.setAccessible(true);
return method.invoke(bean, args);
} finally {
//recover the original accessible for security reason
method.setAccessible(oldAccessible);
}
}
});
}
return bean;
}
wrapIfNecessary
為指定的bean生成代理對象(加入攔截器),以應對TCC和全局事務的處理。
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
if (disableGlobalTransaction) {
return bean;
}
synchronized (PROXYED_SET) {
// 代理完畢加入集合,確定隻會代理一次
if (PROXYED_SET.contains(beanName)) {
return bean;
}
interceptor = null;
//check TCC proxy
// 根據bean定義資訊,決定是否用TCC代理模式
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
} else {
Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
if (!existsAnnotation(new Class[]{serviceInterface})
&& !existsAnnotation(interfacesIfJdk)) {
return bean;
}
if (interceptor == null) {
// 不是TCC,就是用這個攔截器,處理@GlobalTransactional和@GlobalLock
interceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
}
}
if (!AopUtils.isAopProxy(bean)) {
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
} else {
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
for (Advisor avr : advisor) {
advised.addAdvisor(0, avr);
}
}
PROXYED_SET.add(beanName);
return bean;
}
}
1.1.2、InitializingBean
- 根據disableGlobalTransaction配置決定是否初始化元件,由seata配置檔案給出
- 如果disableGlobalTransaction == false,初始化TMClient和RMClient
private final boolean disableGlobalTransaction =
ConfigurationFactory.getInstance().getBoolean("service.disableGlobalTransaction", false);
@Override
public void afterPropertiesSet() {
// 控制事務生效的開關
if (disableGlobalTransaction) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global transaction is disabled.");
}
return;
}
initClient();
}
private void initClient() {
if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
throw new IllegalArgumentException(
"applicationId: " + applicationId + ", txServiceGroup: " + txServiceGroup);
}
//init TM
TMClient.init(applicationId, txServiceGroup);
//init RM
RMClient.init(applicationId, txServiceGroup);
registerSpringShutdownHook();
}
2、GlobalTransactionalInterceptor
2.1、職能詳解
2.1.1、ConfigurationChangeListener
允許運作中調整seata全局事務的開啟和關閉
2.1.2、MethodInterceptor
處理@GlobalTransactional和@GlobalLock,控制全局事務處理流程。
@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
Class<?> targetClass = (methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
// 拿到注解
final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);
if (globalTransactionalAnnotation != null) {
// 處理标注了@GlobalTransactional的方法
return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
} else if (globalLockAnnotation != null) {
return handleGlobalLock(methodInvocation);
} else {
return methodInvocation.proceed();
}
}
3、全局事務控制依賴的核心元件一覽
TMClient
僅有一個init方法,初始化TmRpcClient,用于和TC互動
public static void init(String applicationId, String transactionServiceGroup) {
TmRpcClient tmRpcClient = TmRpcClient.getInstance(applicationId, transactionServiceGroup);
// 内部初始化其他元件,執行一些定時任務
tmRpcClient.init();
}
RMClient
僅有一個init方法
- 初始化RmRpcClient,用于和TC通訊;
- 設定RM,用于注冊分支事務,上報事務執行狀态,處理二階段事務。
- 這裡給出的DefaultResourceManager組合并持有了SPI找到的RM(resourceManagers字段),用政策模式委托給具體的ResourceManager處理對應的事務。
- 設定RmMessageListener,處理事件
- 處理器使用了DefaultRMHandler,請求委托給其處理
- DefaultRMHandler
- 處理器使用了DefaultRMHandler,請求委托給其處理
public static void init(String applicationId, String transactionServiceGroup) {
RmRpcClient rmRpcClient = RmRpcClient.getInstance(applicationId, transactionServiceGroup);
// 注冊RM。内部使用了SPI機制加載各種模式的ResourceManager。AT在這裡對應的是DataSourceManager
rmRpcClient.setResourceManager(DefaultResourceManager.get());
// 監聽器用于處理RPC請求
rmRpcClient.setClientMessageListener(new RmMessageListener(DefaultRMHandler.get()));
rmRpcClient.init();
}
其内部注冊了監聽器RmMessageListener,用于處理TC發回的二階段RPC,一般需要委托給RM處理
TransactionalTemplate
- TransactionalTemplate#execute是GlobalTransactionalInterceptor攔截業務的入口
- 擁有成員
- GlobalTransaction
GlobalTransaction
- 預設實作DefaultGlobalTransaction
- 全局事務抽象,裝飾/代理了TransactionManager,是以擁有對全局事務的控制api,如開始、送出、復原,擷取狀态,擷取xid,報告執行狀态等;
- 擁有成員
- TransactionManager
TransactionManager
- 分布式事務中TM角色,用于定義一個全局事務并控制(通知TC)送出還是復原。
- 預設實作DefaultTransactionManager
- 面向全局事務進行操作,與本地事務關聯不大。
- 但是本地事務的正常/異常,會導緻在二階段觸發送出和復原
- 一般在本地事務中嵌套調用了rpc遠端事務,如果需要復原,就需要在發起者處能夠感覺到異常,進而觸發復原邏輯
- TM需要感覺遠端事務的異常決定發往TC是送出還是復原
- TC的送出在AT模式下是異步的,交給定時任務去做。
- 但是本地事務的正常/異常,會導緻在二階段觸發送出和復原
- 成員
- TmRpcClient
- 用于和TC通訊,以操作在服務端建立的全局事務會話
- TmRpcClient
ResourceManager
- 分布式事務中本地事務管理器RM角色,用于控制分支事務(對本地事務的操作)。
- 提供注冊/取消分支、送出/復原分支事務、分支事務狀态報告、擷取全局鎖等功能
- 在不同的事務模式下,有不同的實作
- AT對應DataSourceManager
- SagaResourceManager
- TCCResourceManager
- 政策組合實作DefaultResourceManager
- 利用SPI機制加載了各個模式的實作,并根據模式比對到不同實作
DataSourceProxy/ConnectionProxy
DataSourceProxy實作并包裝了現有DataSource,用ConnectionProxy代理了擷取到的連接配接Connection,以在開啟、送出和復原事務的進行中織入全局事務的邏輯。
依托于spring平台,業務事務控制邏輯總入口TransactionAspectSupport#invokeWithinTransaction
三、攔截處理流程
1、一階段
委托DefaultGlobalTransaction實作了全局事務的流程
GlobalTransactionalInterceptor#handleGlobalTransaction調用transactionalTemplate#execute方法,開始處理全局事務。
transactionalTemplate#execute編排整體事務
public Object execute(TransactionalExecutor business) throws Throwable {
// 1. 根據上下文中的XID是否存在判定是否需要建立事務,這裡并不會通信。
// 如果建立了事務,對應的角色是全局事務發起者GlobalTransactionRole.Launcher,且xid仍留白;
// 如果有XID,對應角色為參與者
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
// 1.1 get transactionInfo
// 來資源調用時傳入的匿名實作,目前場景來自于注解 @GlobalTransactional
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
try {
// 2. begin transaction
beginTransaction(txInfo, tx);
Object rs = null;
try {
// Do Your Business
// 執行業務。注意,業務中操作了被seata代理的datasource,将會走代理邏輯。
rs = business.execute();
} catch (Throwable ex) {
// 3.the needed business exception to rollback.
completeTransactionAfterThrowing(txInfo,tx,ex);
throw ex;
}
// 4. everything is fine, commit.
commitTransaction(tx);
return rs;
} finally {
//5. clear
triggerAfterCompletion();
cleanUp();
}
}
擷取或建立事務
GlobalTransactionContext#getCurrentOrCreate
// 不存在就建立
public static GlobalTransaction getCurrentOrCreate() {
GlobalTransaction tx = getCurrent();
if (tx == null) {
return createNew();
}
return tx;
}
// 如果目前上下文xid不存在,傳回null,外部會進行建立邏輯;
// 如果xid存在,那麼目前元件角色是參與者,且狀态直接指定為Begin;
private static GlobalTransaction getCurrent() {
String xid = RootContext.getXID();
if (xid == null) {
return null;
}
return new DefaultGlobalTransaction(xid, GlobalStatus.Begin, GlobalTransactionRole.Participant);
}
// 建立新的事務。注意,建立的事務狀态是未知的,而非開始。在開啟事務時才會調整事務狀态。
private static GlobalTransaction createNew() {
GlobalTransaction tx = new DefaultGlobalTransaction();
return tx;
}
DefaultGlobalTransaction() {
this(null, GlobalStatus.UnKnown, GlobalTransactionRole.Launcher);
}
開啟事務
TransactionalTemplate#beginTransaction
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {
// 給出回調。這裡預設沒有注入任何hook
triggerBeforeBegin();
// 調用DefaultGlobalTransaction#begin開啟事務。将處理發起者和參與者各自的邏輯,區分點就是目前上下文有無XID
tx.begin(txInfo.getTimeOut(), txInfo.getName());
// 給出回調。這裡預設沒有注入任何hook
triggerAfterBegin();
} catch (TransactionException txe) {
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.BeginFailure);
}
}
DefaultGlobalTransaction#begin
發起者
- 調用TM,向TC發送請求注冊全局事務,拿到傳回的XID
參與者
- 檢查XID必須存在
@Override
public void begin(int timeout, String name) throws TransactionException {
// 如果是事務參與者,隻需要檢查必要狀态。實際要求XID必須存在
if (role != GlobalTransactionRole.Launcher) {
check();
return;
}
// 如果是發起者(Launcher),xid在前面建立時是為null的,是以這裡肯定仍是不存在,否則抛錯。
if (xid != null) {
throw new IllegalStateException();
}
if (RootContext.getXID() != null) {
throw new IllegalStateException();
}
// 調用DefaultTransactionManager#begin拿到xid。這裡對應2PC中發起全局事務的流程。
// 配置事務狀态,綁定xid到上下文
xid = transactionManager.begin(null, null, name, timeout);
status = GlobalStatus.Begin;
RootContext.bind(xid);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Begin new global transaction [" + xid + "]");
}
}
// DefaultTransactionManager#begin
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
// 封裝請求
GlobalBeginRequest request = new GlobalBeginRequest();
request.setTransactionName(name);
request.setTimeout(timeout);
// 内部請求發起時,最終會用到 SeataProperties#txServiceGroup 到file.conf/nacos配置中找對應的server ip和端口(配置key為service.vgroup_mapping.{$txServiceGroup})
GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request);
if (response.getResultCode() == ResultCode.Failed) {
throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
}
return response.getXid();
}
執行本地業務
擷取本地連接配接
使用了ConnectionProxy代理原有Connection
@Override
public ConnectionProxy getConnection(String username, String password) throws SQLException {
Connection targetConnection = targetDataSource.getConnection(username, password);
return new ConnectionProxy(this, targetConnection);
}
本地事務執行
執行過程一般會使用到DataSourceTransactionManager控制業務層事務流程,過程中使用的DataSource和Connection均是被Seata代理過的。
DataSourceTransactionManager#doBegin
本地事務送出
調用鍊
DataSourceTransactionManager#doCommit
-> ConnectionProxy#commit
=>ConnectionProxy#processGlobalTransactionCommit
=>ConnectionProxy#register()
=>targetConnection.commit
// DataSourceTransactionManager#doCommit,由于用到getConnection,是以傳回的是ConnectionProxy
@Override
protected void doCommit(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
try {
// ConnectionProxy#commit
con.commit();
}
catch (SQLException ex) {
throw new TransactionSystemException("Could not commit JDBC transaction", ex);
}
}
// ConnectionProxy#commit
private void doCommit() throws SQLException {
if (context.inGlobalTransaction()) {
// 如果在全局事務中
processGlobalTransactionCommit();
} else if (context.isGlobalLockRequire()) {
// 如果不在全局事務中,隻是需要用到全局鎖做讀隔離。内部擷取到全局鎖才可以送出
processLocalCommitWithGlobalLocks();
} else {
// 非全局事務的業務,直接按本地邏輯送出
targetConnection.commit();
}
}
private void processGlobalTransactionCommit() throws SQLException {
try {
// 注冊分支事務,内部會向server發送請求,并拿到分支id
register();
} catch (TransactionException e) {
recognizeLockKeyConflictException(e, context.buildLockKeys());
}
try {
if (context.hasUndoLog()) {
// 如果寫入了undoLog,這裡調用insertUndoLog對undolog入庫
UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
}
// 送出本地事務
targetConnection.commit();
} catch (Throwable ex) {
LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
report(false);
throw new SQLException(ex);
}
// 向server彙報一階段完成
report(true);
context.reset();
}
private void register() throws TransactionException {
Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
null, context.getXid(), null, context.buildLockKeys());
context.setBranchId(branchId);
}
本地事務復原
DataSourceTransactionManager#doRollback -> ConnectionProxy#rollback
@Override
public void rollback() throws SQLException {
// 復原本地事務
targetConnection.rollback();
if (context.inGlobalTransaction()) {
if (context.isBranchRegistered()) {
// 如果是全局事務,需要向server彙報執行結果
report(false);
}
}
context.reset();
}
2、二階段
RMInboundHandler回調方法處理了二階段事務(送出、復原消息),抽象實作AbstractRMHandler,具體實作RMHandlerTCC、RMHandlerSaga、DefaultRMHandler、RMHandlerAT
public interface RMInboundHandler {
// 處理送出
BranchCommitResponse handle(BranchCommitRequest request);
// 處理復原
BranchRollbackResponse handle(BranchRollbackRequest request);
// 處理undolog
void handle(UndoLogDeleteRequest request);
}
全局事務處理
入口與調用鍊
由RMClient#init時設定的
rmRpcClient.setClientMessageListener(new RmMessageListener(DefaultRMHandler.get()))
處理。
調用鍊
RmMessageListener#onMessage
=>RmMessageListener#handleBranchCommit
=>(DefaultRMHandler)AbstractRMHandler#onRequest(AbstractMessage request, RpcContext context)
=>DefaultRMHandler#handle(BranchCommitRequest)
=>(RMHandlerAT)AbstractRMHandler#handle(BranchCommitRequest)
=>(RMHandlerAT)AbstractRMHandler#doBranchCommit
=>DataSourceManager#branchCommit
// RmMessageListener#onMessage
@Override
public void onMessage(RpcMessage request, String serverAddress, ClientMessageSender sender) {
Object msg = request.getBody();
if (msg instanceof BranchCommitRequest) {
// 分支送出邏輯
handleBranchCommit(request, serverAddress, (BranchCommitRequest)msg, sender);
} else if (msg instanceof BranchRollbackRequest) {
// 分支復原邏輯
handleBranchRollback(request, serverAddress, (BranchRollbackRequest)msg, sender);
}else if (msg instanceof UndoLogDeleteRequest) {
// 全局事務送出後清除undolog的邏輯
handleUndoLogDelete((UndoLogDeleteRequest) msg);
}
}
// RmMessageListener#handleBranchCommit
private void handleBranchCommit(RpcMessage request, String serverAddress,
BranchCommitRequest branchCommitRequest,
ClientMessageSender sender) {
BranchCommitResponse resultMessage = null;
try {
// 調用AbstractRMHandler#onRequest
resultMessage = (BranchCommitResponse)handler.onRequest(branchCommitRequest, null);
sender.sendResponse(request, serverAddress, resultMessage);
} catch (Exception e) {
if (resultMessage == null) {
resultMessage = new BranchCommitResponse();
}
resultMessage.setResultCode(ResultCode.Failed);
resultMessage.setMsg(e.getMessage());
sender.sendResponse(request, serverAddress, resultMessage);
}
}
// (DefaultRMHandler)AbstractRMHandler#onRequest
@Override
public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
if (!(request instanceof AbstractTransactionRequestToRM)) {
throw new IllegalArgumentException();
}
AbstractTransactionRequestToRM transactionRequest = (AbstractTransactionRequestToRM)request;
// 設定消息的處理器。這裡就是目前執行個體。由于RmMessageListener建構時使用的是DefaultRMHandler,是以這裡就是DefaultRMHandler
transactionRequest.setRMInboundMessageHandler(this);
// request本身内置了handle處理。這裡可以看到AbstractMessage的實作對應了各種消息
return transactionRequest.handle(context);
}
// 送出請求
public class BranchCommitRequest extends AbstractBranchEndRequest {
@Override
public short getTypeCode() {
return MessageType.TYPE_BRANCH_COMMIT;
}
@Override
public AbstractTransactionResponse handle(RpcContext rpcContext) {
// 實際調用DefaultRMHandler
return handler.handle(this);
}
}
// DefaultRMHandler#handle(BranchCommitRequest)
@Override
public BranchCommitResponse handle(BranchCommitRequest request) {
// 使用政策模式拿到對應處理器處理。之類跟進RMHandlerAT
return getRMHandler(request.getBranchType()).handle(request);
}
// 由于RMHandlerAT沒有實作handle,是以使用了的父類的實作AbstractRMHandler#handle(BranchCommitRequest)
@Override
public BranchCommitResponse handle(BranchCommitRequest request) {
BranchCommitResponse response = new BranchCommitResponse();
// 處理送出請求。AbstractCallback是個回調,exceptionHandleTemplate直接調用了其實作的方法對請求進行處理
exceptionHandleTemplate(new AbstractCallback<BranchCommitRequest, BranchCommitResponse>() {
@Override
public void execute(BranchCommitRequest request, BranchCommitResponse response)
throws TransactionException {
doBranchCommit(request, response);
}
}, request, response);
return response;
}
// AbstractRMHandler#doBranchCommit
protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response)
throws TransactionException {
String xid = request.getXid();
long branchId = request.getBranchId();
String resourceId = request.getResourceId();
String applicationData = request.getApplicationData();
// 最終的最終,還是委托RM處理。AT對應的RM是DataSourceManager
BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId,
applicationData);
response.setXid(xid);
response.setBranchId(branchId);
response.setBranchStatus(status);
}
// DataSourceManager#branchCommit
@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
// DataSourceManager#init方法中給出的asyncWorker實作是AsyncWorker
return asyncWorker.branchCommit(branchType, xid, branchId, resourceId, applicationData);
}
// DataSourceManager#init
@Override
public void init() {
AsyncWorker asyncWorker = new AsyncWorker();
asyncWorker.init();
initAsyncWorker(asyncWorker);
}
// AsyncWorker#branchCommit
@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
// 入緩存隊列。可以看到即使入隊失敗,也會送出掉
if (!ASYNC_COMMIT_BUFFER.offer(new Phase2Context(branchType, xid, branchId, resourceId, applicationData))) {
LOGGER.warn("Async commit buffer is FULL. Rejected branch [" + branchId + "/" + xid + "] will be handled by housekeeping later.");
}
return BranchStatus.PhaseTwo_Committed;
}
/**
* AsyncWorker#Init.從隊列中取任務執行二階段送出。
*/
public synchronized void init() {
LOGGER.info("Async Commit Buffer Limit: " + ASYNC_COMMIT_BUFFER_LIMIT);
timerExecutor = new ScheduledThreadPoolExecutor(1,
new NamedThreadFactory("AsyncWorker", 1, true));
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
doBranchCommits();
} catch (Throwable e) {
LOGGER.info("Failed at async committing ... " + e.getMessage());
}
}
}, 10, 1000 * 1, TimeUnit.MILLISECONDS);
}
四、多服務調用AT模式示例
1、服務與身份
MicroServiceFirst: 發起者,簡寫M1;M1即是TM又是RM
MicroServiceSecond: 參與者,簡寫M2
M1執行本地事務中調用M2,M2執行了分支事務
2、必要條件
- M1和M2啟用了seata;
- M1方法标注了@GlobalTransactional,以被GlobalTransactionScanner生成代理對象,添加GlobalTransactionalInterceptor攔截器;
- M2方法可以不标注@GlobalTransactional,但是需要代理資料源,并在rpc中拿到XID放到上下文中
- M1到M2的遠端調用可以支援Seata上下文處理,并且具備必要SDK以和TC通訊。
3、第一階段
發起者控制主流程
M1注冊全局事務
- M1開啟全局事務時向TC請求注冊全局事務,拿到XID;
M1執行本地事務
擷取連接配接時會拿到ConnectionProxy;
開啟本地事務沒什麼特殊的地方;
執行本地事務,但未送出本地事務;
M1調用遠端服務M2
一般在本地事務内部會調用RPC,并在RPC中附帶XID
- M2收到RPC,進行處理
- RPC方式支援Seata,一般都是引入Interceptor預處理XID到上下文;
- M2執行本地事務
- 由于資料源被代理,并且Interceptor注入了XID,在送出本地事務時,會先進行分支事務的注冊拿到branId
- 注冊前需要拿到全局鎖
- 注冊完畢後,會處理插入undolog,然後和本地事務一起送出
- 由于資料源被代理,并且Interceptor注入了XID,在送出本地事務時,會先進行分支事務的注冊拿到branId
- M2向TC報告本地事務執行狀态為成功,如果遇到異常復原了,會報告異常。假設這裡成功。
本地commit成功時,後進行report到TC,如果本地異常了,會先report異常到TC,而後抛出異常傳遞到上層,觸發上層的本地復原邏輯
- 本地事務送出完畢後,M2就很自然地釋放本地事務占用的鎖資源
- 注意,M2此時并沒有釋放全局鎖
- M1繼續處理自己的業務
- M1送出本地事務
- M1送出前需要嘗試擷取全局鎖,因為和M2同屬一個全局事務,是以可以拿到全局鎖
- 流程同M2
4、第二階段
發起者收尾流程
第一階段處理完畢後,實質上所有的分支事務都已經送出完畢了,二階段M1根據參與者執行狀态向TC報告是否送出全局事務(注意:一般的理論模型是由TC決定)。
無論處理如何,正常情況均會删去undolog。
- M1報告事務處理狀态到TC
- M1的本地事務處理完畢後,開始彙報全局事務的結果。假設都成功,這裡決定送出全局事務
- 是以這裡rpc調用的遠端事務如果無法讓M1感覺到異常,可能M1最終的決策是送出。
- TC收到送出請求後,修改全局事務狀态為異步送出,然後傳回送出成功給M1。
- 流程實際結束,等待TC的送出決定,删除undolog。