前言
fescar釋出已有時日,分布式事務一直是業界備受關注的領域,fescar釋出一個月左右便受到了近5000個star足以說明其熱度。當然,在fescar出來之前,已經有比較成熟的分布式事務的解決方案開源了,比較典型的方案如LCN(https://github.com/codingapi/tx-lcn)的2pc型無侵入事務,目前lcn已發展到5.0,已支援和fescar事務模型類似的TCX型事務。還有如TCC型事務實作hmily(https://github.com/yu199195/hmily)、tcc-transaction(https://github.com/changmingxie/tcc-transaction)等。在微服務架構流行的當下、阿裡這種開源大戶背景下,fescar的釋出無疑又掀起了研究分布式事務的熱潮。fescar脫胎于阿裡雲商業分布式事務服務GTS,線上上環境提供這種公共服務其模式肯定經受了非常嚴苛的考驗。其分布式事務模型TXC又仿于傳統事務模型XA方案,主要差別在于資料總管的定位一個在應用層一個在資料庫層。部落客覺得fescar的txc模型實作非常有研究的價值,是以今天我們來好好翻一翻fescar項目的代碼。本文篇幅較長,浏覽并了解本文大概耗時30~60分鐘左右。
項目位址
fescar:https://github.com/alibaba/fescar
本博文所述代碼為fescar的0.1.2-SNAPSHOT版本,根據fescar後期的疊代計劃,其項目結構和子產品實作都可能有很大的改變,特此說明。
fescar的TXC模型

上圖為fescar官方針對TXC模型制作的示意圖。不得不說大廠的圖制作的真的不錯,結合示意圖我們可以看到TXC實作的全貌。TXC的實作通過三個元件來完成。也就是上圖的三個深黃色部分,其作用如下,:
- TM:全局事務管理器,在标注開啟fescar分布式事務的服務端開啟,并将全局事務發送到TC事務控制端管理
- TC:事務控制中心,控制全局事務的送出或者復原。這個元件需要獨立部署維護,目前隻支援單機版本,後續疊代計劃會有叢集版本
- RM:資料總管,主要負責分支事務的上報,本地事務的管理
一段話簡述其實作過程:服務起始方發起全局事務并注冊到TC。在調用協同服務時,協同服務的事務分支事務會先完成階段一的事務送出或復原,并生成事務復原的undo_log日志,同時注冊目前協同服務到TC并上報其事務狀态,歸并到同一個業務的全局事務中。此時若沒有問題繼續下一個協同服務的調用,期間任何協同服務的分支事務復原,都會通知到TC,TC在通知全局事務包含的所有已完成一階段送出的分支事務復原。如果所有分支事務都正常,最後回到全局事務發起方時,也會通知到TC,TC在通知全局事務包含的所有分支删除復原日志。在這個過程中為了解決寫隔離和度隔離的問題會涉及到TC管理的全局鎖。
本博文的目标是深入代碼細節,探究其基本思路是如何實作的。首先會從項目的結構來簡述每個子產品的作用,繼而結合官方自帶的examples執行個體來探究整個分布式事務的實作過程。
項目結構解析
項目拉下來,用IDE打開後的目錄結構如下,下面先大緻的看下每個子產品的實作
- common :公共元件,提供常用輔助類,靜态變量、擴充機制類加載器、以及定義全局的異常等
- config : 配置加載解析子產品,提供了配置的基礎接口,目前隻有檔案配置實作,後續會有nacos等配置中心的實作
- core : 核心子產品主要封裝了TM、RM和TC通訊用RPC相關内容
- dubbo :dubbo子產品主要适配dubbo通訊架構,使用dubbo的filter機制來傳統全局事務的資訊到分支
- examples :簡單的示範執行個體子產品,等下從這個子產品入手探索
- rm-datasource :資源管理子產品,比較核心的一個子產品,個人認為這個子產品命名為core要更合理一點。代理了JDBC的一些類,用來解析sql生成復原日志、協調管理本地事務
- server : TC元件所在,主要協調管理全局事務,負責全局事務的送出或者復原,同時管理維護全局鎖。
- spring :和spring內建的子產品,主要是aop邏輯,是整個分布式事務的入口,研究fescar的突破口
- tm : 全局事務事務管理子產品,管理全局事務的邊界,全局事務開啟復原點都在這個子產品控制
通過【examples】子產品的執行個體看下效果
第一步、先啟動TC也就是【Server】子產品,main方法直接啟動就好,預設服務端口8091
第二步、回到examples子產品,将訂單,業務,賬戶、倉庫四個服務的配置檔案配置好,主要是mysql資料源和zookeeper連接配接位址,這裡要注意下,預設dubbo的zk注冊中心依賴沒有,啟動的時候回抛找不到class的異常,需要添加如下的依賴:
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
第三步、在BusinessServiceImpl中的模拟抛異常的地方打個斷點,依次啟動OrderServiceImpl、StorageServiceImpl、AccountServiceImpl、BusinessServiceImpl四個服務、等進斷點後,檢視資料庫account_tbl表,金額已減去400元,變成了599元。然後放開斷點、BusinessServiceImpl子產品模拟的異常觸發,全局事務復原,account_tbl表的金額就又復原到999元了
如上,我們已經體驗到fescar事務的控制能力了,下面我們具體看下它是怎麼控制的。
fescar事務過程分析
首先分析配置檔案
這個是一個鐵律,任何一個技術或架構要內建,配置檔案肯定是一個突破口。從上面的例子我們了解到,執行個體子產品的配置檔案中配置了一個全局事務掃描器執行個體,如:
<bean class="com.alibaba.fescar.spring.annotation.GlobalTransactionScanner">
<constructor-arg value="dubbo-demo-app"/>
<constructor-arg value="my_test_tx_group"/>
</bean>
這個執行個體在項目啟動時會掃描所有執行個體,具體實作見【spring】子產品。并将标注了@GlobalTransactional注解的方法織入GlobalTransactionalInterceptor的invoke方法邏輯。同時應用啟動時,會初始化TM(TmRpcClient)和RM(RmRpcClient)的執行個體,這個時候,服務已經和TC事務控制中心勾搭上了。在往下看就涉及到TM子產品的事務模闆類TransactionalTemplate。
【TM】子產品啟動全局事務
全局事務的開啟,送出、復原都被封裝在TransactionalTemplate中完成了,代碼如:
public Object execute(TransactionalExecutor business) throws TransactionalExecutor.ExecutionException {
// 1. get or create a transaction
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
// 2. begin transaction
try {
tx.begin(business.timeout(), business.name());
} catch (TransactionException txe) {
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.BeginFailure);
}
Object rs = null;
try {
// Do Your Business
rs = business.execute();
} catch (Throwable ex) {
// 3. any business exception, rollback.
try {
tx.rollback();
// 3.1 Successfully rolled back
throw new TransactionalExecutor.ExecutionException(tx, TransactionalExecutor.Code.RollbackDone, ex);
} catch (TransactionException txe) {
// 3.2 Failed to rollback
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.RollbackFailure, ex);
}
}
// 4. everything is fine, commit.
try {
tx.commit();
} catch (TransactionException txe) {
// 4.1 Failed to commit
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.CommitFailure);
}
return rs;
}
更詳細的實作在【TM】子產品中被分成了兩個Class實作,如下:
DefaultGlobalTransaction :全局事務具體的開啟,送出、復原動作
DefaultTransactionManager :負責使用TmRpcClient向TC控制中心發送指令,如開啟全局事務(GlobalBeginRequest)、送出(GlobalCommitRequest)、復原(GlobalRollbackRequest)、查詢狀态(GlobalStatusRequest)等。
以上是TM子產品核心内容點,TM子產品完成全局事務開啟後,接下來就開始看看全局事務iD,xid是如何傳遞、RM元件是如何介入的
【dubbo】全局事務xid的傳遞
首先是xid的傳遞,目前已經實作了dubbo架構實作的微服務架構下的傳遞,其他的像spring cloud和motan等的想要實作也很容易,通過一般RPC通訊架構都有的filter機制,将xid從全局事務的發起節點傳遞到服務協從節點,從節點接收到後綁定到目前線程上線文環境中,用于在分支事務執行sql時判斷是否加入全局事務。fescar的實作見【dubbo】子產品如下:
@Activate(group = { Constants.PROVIDER, Constants.CONSUMER }, order = 100)
public class TransactionPropagationFilter implements Filter {
private static final Logger LOGGER = LoggerFactory.getLogger(TransactionPropagationFilter.class);
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
String xid = RootContext.getXID();
String rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("xid in RootContext[" + xid + "] xid in RpcContext[" + rpcXid + "]");
}
boolean bind = false;
if (xid != null) {
RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);
} else {
if (rpcXid != null) {
RootContext.bind(rpcXid);
bind = true;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("bind[" + rpcXid + "] to RootContext");
}
}
}
try {
return invoker.invoke(invocation);
} finally {
if (bind) {
String unbindXid = RootContext.unbind();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("unbind[" + unbindXid + "] from RootContext");
}
if (!rpcXid.equalsIgnoreCase(unbindXid)) {
LOGGER.warn("xid in change during RPC from " + rpcXid + " to " + unbindXid);
if (unbindXid != null) {
RootContext.bind(unbindXid);
LOGGER.warn("bind [" + unbindXid + "] back to RootContext");
}
}
}
}
}
}
上面代碼rpcXid不為空時,就加入到了RootContext的ContextCore中,這裡稍微深入講下。ContextCore是一個可擴充實作的接口,目前預設的實作是ThreadLocalContextCore,基于ThreadLocal來儲存維護目前的xid。這裡fescar提供了可擴充的機制,實作在【common】子產品中,通過一個自定義的類加載器EnhancedServiceLoader加載需要擴充的服務類,這樣隻需要在擴充類加上@LoadLevel注解。标記order屬性聲明高優先級别,就可以達到擴充實作的目的。
【RM】子產品本地資源管理的介入
fescar針對本地事務相關的接口,通過代理機制都實作了一遍代理類,如資料源(DataSourceProxy)、ConnectionProxy、StatementProxy等。這個在配置檔案中也可以看出來,也就是說,我們要使用fescar分布式事務,一定要配置fescar提供的代理資料源。如:
配置好代理資料源後,從DataSourceProxy出發,本地針對資料庫的所有操作過程我們就可以随意控制了。從上面xid傳遞,已經知道了xid被儲存在RootContext中了,那麼請看下面的代碼,就非常清楚了:
首先看StatementProxy的一段代碼
在看ExecuteTemplate中的代碼
和【TM】子產品中的事務管理模闆類TransactionlTemplate類似,這裡非常關鍵的邏輯代理也被封裝在了ExecuteTemplate模闆類中。因重寫了Statement有了StatementProxy實作,在執行原JDBC的executeUpdate方法時,會調用到ExecuteTemplate的execute邏輯。在sql真正執行前,會判斷RootCOntext目前上下文中是否包含xid,也就是判斷目前是否是全局分布式事務。如果不是,就直接使用本地事務,如果是,這裡RM就會增加一些分布式事務相關的邏輯了。這裡根據sql的不同的類型,fescar封裝了五個不同的執行器來處理,分别是UpdateExecutor、DeleteExecutor、InsertExecutor、SelectForUpdateExecutor、PlainExecutor,結構如下圖:
PlainExecutor:
原生的JDBC接口實作,未做任何處理,提供給全局事務中的普通的select查詢使用
UpdateExecutor、DeleteExecutor、InsertExecutor:
三個DML增删改執行器實作,主要在sql執行的前後對sql語句進行了解析,實作了如下兩個抽象接口方法:
protected abstract TableRecords beforeImage() throws SQLException;
protected abstract TableRecords afterImage(TableRecords beforeImage) throws SQLException;
在這個過程中通過解析sql生成了提供復原操作的undo_log日志,日志目前是儲存在msyql中的,和業務sql操作共用同一個事務。表的結構如下:
rollback_info儲存的undo_log詳細資訊,是longblob類型的,結構如下:
{
"branchId":3958194,
"sqlUndoLogs":[
{
"afterImage":{
"rows":[
{
"fields":[
{
"keyType":"PrimaryKey",
"name":"ID",
"type":4,
"value":10
},
{
"keyType":"NULL",
"name":"COUNT",
"type":4,
"value":98
}
]
}
],
"tableName":"storage_tbl"
},
"beforeImage":{
"rows":[
{
"fields":[
{
"keyType":"PrimaryKey",
"name":"ID",
"type":4,
"value":10
},
{
"keyType":"NULL",
"name":"COUNT",
"type":4,
"value":100
}
]
}
],
"tableName":"storage_tbl"
},
"sqlType":"UPDATE",
"tableName":"storage_tbl"
}
],
"xid":"192.168.7.77:8091:3958193"
}
這裡貼的是一個update的操作,undo_log記錄的非常的詳細,通過全局事務xid關聯branchid,記錄資料操作的表名,操作字段名,以及sql執行前後的記錄數,如這個記錄,表名=storage_tbl,sql執行前ID=10,count=100,sql執行後id=10,count=98。如果整個全局事務失敗,需要復原的時候就可以生成:
update storage_tbl set count = 100 where id = 10;
這樣的復原sql語句執行了。
SelectForUpdateExecutor:
fescar的AT模式在本地事務之上預設支援讀未送出的隔離級别,但是通過SelectForUpdateExecutor執行器,可以支援讀已送出的隔離級别。代碼如:
@Override
public Object doExecute(Object... args) throws Throwable {
SQLSelectRecognizer recognizer = (SQLSelectRecognizer) sqlRecognizer;
Connection conn = statementProxy.getConnection();
ResultSet rs = null;
Savepoint sp = null;
LockRetryController lockRetryController = new LockRetryController();
boolean originalAutoCommit = conn.getAutoCommit();
StringBuffer selectSQLAppender = new StringBuffer("SELECT ");
selectSQLAppender.append(getTableMeta().getPkName());
selectSQLAppender.append(" FROM " + getTableMeta().getTableName());
String whereCondition = null;
ArrayList<Object> paramAppender = new ArrayList<>();
if (statementProxy instanceof ParametersHolder) {
whereCondition = recognizer.getWhereCondition((ParametersHolder) statementProxy, paramAppender);
} else {
whereCondition = recognizer.getWhereCondition();
}
if (!StringUtils.isEmpty(whereCondition)) {
selectSQLAppender.append(" WHERE " + whereCondition);
}
selectSQLAppender.append(" FOR UPDATE");
String selectPKSQL = selectSQLAppender.toString();
try {
if (originalAutoCommit) {
conn.setAutoCommit(false);
}
sp = conn.setSavepoint();
rs = statementCallback.execute(statementProxy.getTargetStatement(), args);
while (true) {
// Try to get global lock of those rows selected
Statement stPK = null;
PreparedStatement pstPK = null;
ResultSet rsPK = null;
try {
if (paramAppender.isEmpty()) {
stPK = statementProxy.getConnection().createStatement();
rsPK = stPK.executeQuery(selectPKSQL);
} else {
pstPK = statementProxy.getConnection().prepareStatement(selectPKSQL);
for (int i = 0; i < paramAppender.size(); i++) {
pstPK.setObject(i + 1, paramAppender.get(i));
}
rsPK = pstPK.executeQuery();
}
TableRecords selectPKRows = TableRecords.buildRecords(getTableMeta(), rsPK);
statementProxy.getConnectionProxy().checkLock(selectPKRows);
break;
} catch (LockConflictException lce) {
conn.rollback(sp);
lockRetryController.sleep(lce);
} finally {
if (rsPK != null) {
rsPK.close();
}
if (stPK != null) {
stPK.close();
}
if (pstPK != null) {
pstPK.close();
}
}
}
} finally {
if (sp != null) {
conn.releaseSavepoint(sp);
}
if (originalAutoCommit) {
conn.setAutoCommit(true);
}
}
return rs;
}
關鍵代碼見:
TableRecords selectPKRows = TableRecords.buildRecords(getTableMeta(), rsPK);
statementProxy.getConnectionProxy().checkLock(selectPKRows);
通過selectPKRows表操作記錄拿到lockKeys,然後到TC控制器端查詢是否被全局鎖定了,如果被鎖定了,就重新嘗試,直到鎖釋放傳回查詢結果。
分支事務的注冊和上報
在本地事務送出前,fescar會注冊和上報分支事務相關的資訊,見ConnectionProxy類的commit部分代碼:
@Override
public void commit() throws SQLException {
if (context.inGlobalTransaction()) {
try {
register();
} catch (TransactionException e) {
recognizeLockKeyConflictException(e);
}
try {
if (context.hasUndoLog()) {
UndoLogManager.flushUndoLogs(this);
}
targetConnection.commit();
} catch (Throwable ex) {
report(false);
if (ex instanceof SQLException) {
throw (SQLException) ex;
} else {
throw new SQLException(ex);
}
}
report(true);
context.reset();
} else {
targetConnection.commit();
}
}
從這段代碼我們可以看到,首先是判斷是了是否是全局事務,如果不是,就直接送出了,如果是,就先向TC控制器注冊分支事務,為了寫隔離,在TC端會涉及到全局鎖的擷取。然後儲存了用于復原操作的undo_log日志,繼而真正送出本地事務,最後向TC控制器上報事務狀态。此時,階段一的本地事務已完成了。
【server】子產品協調全局
關于server子產品,我們可以聚焦在DefaultCoordinator這個類,這個是AbstractTCInboundHandler控制處理器預設實作。主要實作了全局事務開啟,送出,復原,狀态查詢,分支事務注冊,上報,鎖檢查等接口,如:
回到一開始的TransactionlTemplate,如果整個分布式事務失敗需要復原了,首先是TM向TC發起復原的指令,然後TC接收到後,解析請求後會被路由到預設控制器類的doGlobalRollback方法内,最終在TC控制器端執行的代碼如下:
@Override
public void doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {
for (BranchSession branchSession : globalSession.getReverseSortedBranches()) {
BranchStatus currentBranchStatus = branchSession.getStatus();
if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {
continue;
}
try {
BranchStatus branchStatus = resourceManagerInbound.branchRollback(XID.generateXID(branchSession.getTransactionId()), branchSession.getBranchId(),
branchSession.getResourceId(), branchSession.getApplicationData());
switch (branchStatus) {
case PhaseTwo_Rollbacked:
globalSession.removeBranch(branchSession);
LOGGER.error("Successfully rolled back branch " + branchSession);
continue;
case PhaseTwo_RollbackFailed_Unretryable:
GlobalStatus currentStatus = globalSession.getStatus();
if (currentStatus.name().startsWith("Timeout")) {
globalSession.changeStatus(GlobalStatus.TimeoutRollbackFailed);
} else {
globalSession.changeStatus(GlobalStatus.RollbackFailed);
}
globalSession.end();
LOGGER.error("Failed to rollback global[" + globalSession.getTransactionId() + "] since branch[" + branchSession.getBranchId() + "] rollback failed");
return;
default:
LOGGER.info("Failed to rollback branch " + branchSession);
if (!retrying) {
queueToRetryRollback(globalSession);
}
return;
}
} catch (Exception ex) {
LOGGER.info("Exception rollbacking branch " + branchSession, ex);
if (!retrying) {
queueToRetryRollback(globalSession);
if (ex instanceof TransactionException) {
throw (TransactionException) ex;
} else {
throw new TransactionException(ex);
}
}
}
}
GlobalStatus currentStatus = globalSession.getStatus();
if (currentStatus.name().startsWith("Timeout")) {
globalSession.changeStatus(GlobalStatus.TimeoutRollbacked);
} else {
globalSession.changeStatus(GlobalStatus.Rollbacked);
}
globalSession.end();
}
如上代碼可以看到,復原時從全局事務會話中疊代每個分支事務,然後通知每個分支事務復原。分支服務接收到請求後,首先會被路由到RMHandlerAT中的doBranchRollback方法,繼而調用了RM中的branchRollback方法,代碼如下:
@Override
public BranchStatus branchRollback(String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
DataSourceProxy dataSourceProxy = get(resourceId);
if (dataSourceProxy == null) {
throw new ShouldNeverHappenException();
}
try {
UndoLogManager.undo(dataSourceProxy, xid, branchId);
} catch (TransactionException te) {
if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
} else {
return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
}
}
return BranchStatus.PhaseTwo_Rollbacked;
}
RM分支事務端最後執行的是UndoLogManager的undo方法,通過xid和branchid從資料庫查詢出復原日志,完成資料復原操作,整個過程都是同步完成的。如果全局事務是成功的,TC也會有類似的上述協調過程,隻不過是異步的将本次全局事務相關的undo_log清除了而已。至此,就完成了2階段的送出或復原,也就完成了完整的全局事務事務的控制。
感興趣的可以自己來我的Java架構群,可以擷取免費的學習資料,群号:
855801563對Java技術,架構技術感興趣的同學,歡迎加群,一起學習,互相讨論。