開篇
這篇文章是接着
Fescar example解析 - TM流程的下一步分析,主要是對TM的處理邏輯的進一步分析,理清楚TM(Transaction Manager )的處理步驟以及代碼調用鍊。
這篇文章的結論是TM執行事務操作包括begin/commit/rollback都是通過DefaultTransactionManager類來實作,實作形式是TM和TC進行網絡通信,在整個TM->TC的過程中TM擔當了Client端的角色,TC擔當了Server端的角色。
背景介紹
事務資料摘自
Fescar概覽。
與XA 的模型類似,我們定義 3 個元件來協定分布式事務的處理過程。

- Transaction Coordinator (TC): 事務協調器,維護全局事務的運作狀态,負責協調并驅動全局事務的送出或復原。
- Transaction Manager (TM): 控制全局事務的邊界,負責開啟一個全局事務,并最終發起全局送出或全局復原的決議。
- Resource Manager (RM): 控制分支事務,負責分支注冊、狀态彙報,并接收事務協調器的指令,驅動分支(本地)事務的送出和復原。
一個典型的分布式事務過程:
- TM 向 TC 申請開啟一個全局事務,全局事務建立成功并生成一個全局唯一的 XID。
- XID 在微服務調用鍊路的上下文中傳播。
- RM 向 TC 注冊分支事務,将其納入 XID 對應全局事務的管轄。
- TM 向 TC 發起針對 XID 的全局送出或復原決議。
- TC 排程 XID 下管轄的全部分支事務完成送出或復原請求。
執行過程
說明,整個執行流程如下:
- 1.TransactionalTemplate通過GlobalTransactionContext.getCurrentOrCreate()傳回GlobalTransaction對象。
- 2.GlobalTransactionContext的createNew()方法建立DefaultGlobalTransaction對象。
- 3.DefaultGlobalTransaction的構造方法當中建立DefaultTransactionManager對象。
- 4.TransactionalTemplate通過DefaultGlobalTransaction執行begin/commit/rollback等操作。
- 5.DefaultGlobalTransaction内部通過DefaultTransactionManager執行begin/commit/rollback等操作。
源碼解析
public class TransactionalTemplate {
public Object execute(TransactionalExecutor business) throws TransactionalExecutor.ExecutionException {
// 1. 建立一個GlobalTransaction對象
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
// 2. 通過GlobalTransaction開始執行事務
try {
tx.begin(business.timeout(), business.name());
} catch (TransactionException txe) {
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.BeginFailure);
}
}
說明:
- 建立tx對象,GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate()。
- 執行全局事務,tx.beigin()及其他省略的一部分代碼。
public class GlobalTransactionContext {
private static final ThreadLocal<GlobalTransaction> THREAD_TRANSACTION_CONTEXT = new ThreadLocal<>();
private GlobalTransactionContext() {
}
// 建立GlobalTransaction對象
private static GlobalTransaction createNew() {
GlobalTransaction tx = new DefaultGlobalTransaction();
THREAD_TRANSACTION_CONTEXT.set(tx);
return THREAD_TRANSACTION_CONTEXT.get();
}
public static GlobalTransaction getCurrent() {
GlobalTransaction tx = THREAD_TRANSACTION_CONTEXT.get();
if (tx != null) {
return tx;
}
String xid = RootContext.getXID();
if (xid == null) {
return null;
}
tx = new DefaultGlobalTransaction(xid);
THREAD_TRANSACTION_CONTEXT.set(tx);
return THREAD_TRANSACTION_CONTEXT.get();
}
public static GlobalTransaction getCurrentOrCreate() {
GlobalTransaction tx = getCurrent();
if (tx == null) {
return createNew();
}
return tx;
}
}
- createNew()方法建立GlobalTransaction tx對象,類型是DefaultGlobalTransaction。
- 儲存tx到線程當中實作線程隔離,THREAD_TRANSACTION_CONTEXT.set(tx)。
- GlobalTransaction對象負責執行事務的begin()、commit()、rollback()等方法。
public class DefaultGlobalTransaction implements GlobalTransaction {
private static final int DEFAULT_GLOBAL_TX_TIMEOUT = 60000;
private static final String DEFAULT_GLOBAL_TX_NAME = "default";
private TransactionManager transactionManager;
private String xid;
private GlobalStatus status = GlobalStatus.UnKnown;
private GlobalTransactionRole role = GlobalTransactionRole.Launcher;
DefaultGlobalTransaction(String xid) {
this.transactionManager = DefaultTransactionManager.get();
this.xid = xid;
if (xid != null) {
status = GlobalStatus.Begin;
role = GlobalTransactionRole.Participant;
}
}
@Override
public void begin(int timeout, String name) throws TransactionException {
if (xid == null && role == GlobalTransactionRole.Launcher) {
xid = transactionManager.begin(null, null, name, timeout);
status = GlobalStatus.Begin;
RootContext.bind(xid);
} else {
if (xid == null) {
throw new ShouldNeverHappenException(role + " is NOT in a global transaction context.");
}
LOGGER.info(role + " is already in global transaction " + xid);
}
}
@Override
public void commit() throws TransactionException {
check();
RootContext.unbind();
if (role == GlobalTransactionRole.Participant) {
// Participant has no responsibility of committing
return;
}
status = transactionManager.commit(xid);
}
@Override
public void rollback() throws TransactionException {
check();
RootContext.unbind();
if (role == GlobalTransactionRole.Participant) {
// Participant has no responsibility of committing
return;
}
status = transactionManager.rollback(xid);
}
@Override
public GlobalStatus getStatus() throws TransactionException {
check();
status = transactionManager.getStatus(xid);
return status;
}
}
- DefaultGlobalTransaction構造函數建立transactionManager 對象,this.transactionManager = DefaultTransactionManager.get();
- DefaultGlobalTransaction的begin/commit/rollback通過TransactionManager的begin/commit/rollback實作。
public class DefaultTransactionManager implements TransactionManager {
private static class SingletonHolder {
private static final TransactionManager INSTANCE = new DefaultTransactionManager();
}
public static TransactionManager get() {
return SingletonHolder.INSTANCE;
}
private DefaultTransactionManager() {
}
@Override
public String begin(String applicationId, String transactionServiceGroup,
String name, int timeout) throws TransactionException {
GlobalBeginRequest request = new GlobalBeginRequest();
request.setTransactionName(name);
request.setTimeout(timeout);
GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
return response.getXid();
}
@Override
public GlobalStatus commit(String xid) throws TransactionException {
long txId = XID.getTransactionId(xid);
GlobalCommitRequest globalCommit = new GlobalCommitRequest();
globalCommit.setTransactionId(txId);
GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
return response.getGlobalStatus();
}
@Override
public GlobalStatus rollback(String xid) throws TransactionException {
long txId = XID.getTransactionId(xid);
GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();
globalRollback.setTransactionId(txId);
GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback);
return response.getGlobalStatus();
}
@Override
public GlobalStatus getStatus(String xid) throws TransactionException {
long txId = XID.getTransactionId(xid);
GlobalStatusRequest queryGlobalStatus = new GlobalStatusRequest();
queryGlobalStatus.setTransactionId(txId);
GlobalStatusResponse response = (GlobalStatusResponse) syncCall(queryGlobalStatus);
return response.getGlobalStatus();
}
private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
try {
return (AbstractTransactionResponse) TmRpcClient.getInstance().sendMsgWithResponse(request);
} catch (TimeoutException toe) {
throw new TransactionException(TransactionExceptionCode.IO, toe);
}
}
}
- DefaultTransactionManager 是單例實作全局唯一。
- DefaultTransactionManager 是TM實作begin/commit/rollback的核心邏輯。
- DefaultTransactionManager 的begin/commit/rollback通過和TC通信實作。
- DefaultTransactionManager 的syncCall實作和TC通信。