開篇
這篇文章的目的主要是講解Fescar TC執行rollback的流程,目的是講解清楚rollback流程中的一些步驟。
遺憾的是因為rollback本身涉及Fescar的分支事務注冊上報,如果事先不了解Fescar的分支事務,有些邏輯了解起來會有一些奇怪,對于branchSession本身還未了解,是以隻能單獨講解rollback流程。
背景
說明:
-
分支事務中資料的 本地鎖 由本地事務管理,在分支事務 Phase1 結束時釋放。
同時,随着本地事務結束,連接配接 也得以釋放。
- 分支事務中資料的 全局鎖 在事務協調器側管理,在決議 Phase2 全局送出時,全局鎖馬上可以釋放。隻有在決議全局復原的情況下,全局鎖 才被持有至分支的 Phase2 結束。
這個設計,極大地減少了分支事務對資源(資料和連接配接)的鎖定時間,給整體并發和吞吐的提升提供了基礎。
這裡需要重點指出的是:Phase1階段的commit()操作是各個分支事務本地的事務操作。Phase2階段的操作是全局的commit()和rollback()。TC-rollback流程指的就是Phase2階段。
TC rollback流程介紹
rollback主流程
- 1.根據transactionId查找begin階段生成的GlobalSession對象。
- 2.對GlobalSession對象進行close操作。
- 3.TC通知所有RM(各分支事務的資料總管)進行全局復原操作(doGlobalRollback)。
TC rollback源碼分析
public class DefaultCore implements Core {
public GlobalStatus rollback(String xid) throws TransactionException {
// 查找全局GlobalSession對象
GlobalSession globalSession =
SessionHolder.findGlobalSession(XID.getTransactionId(xid));
if (globalSession == null) {
return GlobalStatus.Finished;
}
GlobalStatus status = globalSession.getStatus();
// Highlight: Firstly, close the session,
// then no more branch can be registered.
// 關閉全局的GlobalSession對象。
globalSession.close();
if (status == GlobalStatus.Begin) {
globalSession.changeStatus(GlobalStatus.Rollbacking);
// 執行全局的rollback操作
doGlobalRollback(globalSession, false);
}
return globalSession.getStatus();
}
}
- 查找全局的GlobalSession對象。
- 關閉GlobalSession對象。
- 執行全局的rollback操作。
GlobalSession關閉操作
public class GlobalSession implements SessionLifecycle, SessionStorable {
public void close() throws TransactionException {
if (active) {
for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onClose(this);
}
}
}
public void onEnd(GlobalSession globalSession) throws TransactionException {
removeGlobalSession(globalSession);
}
public void onStatusChange(GlobalSession globalSession, GlobalStatus status) throws TransactionException {
updateGlobalSessionStatus(globalSession, status);
}
public void updateGlobalSessionStatus(GlobalSession session, GlobalStatus status) throws TransactionException {
transactionStoreManager.writeSession(LogOperation.GLOBAL_UPDATE, session);
}
public void changeStatus(GlobalStatus status) throws TransactionException {
for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onStatusChange(this, status);
}
this.status = status;
}
}
- GlobalSession的close操作調用生命周期監聽器lifecycleListener.onClose()。
- lifecycleListener指的是DefaultSessionManager對象。
DefaultSessionManager操作
public class SessionHolder {
public static GlobalSession findGlobalSession(Long transactionId)
throws TransactionException {
return getRootSessionManager().findGlobalSession(transactionId);
}
}
public class DefaultSessionManager extends AbstractSessionManager {}
public abstract class AbstractSessionManager implements
SessionManager, SessionLifecycleListener {
protected Map<Long, GlobalSession> sessionMap = new ConcurrentHashMap<>();
public GlobalSession findGlobalSession(Long transactionId)
throws TransactionException {
return sessionMap.get(transactionId);
}
public void onClose(GlobalSession globalSession) throws TransactionException {
globalSession.setActive(false);
}
public void removeGlobalSession(GlobalSession session)
throws TransactionException {
transactionStoreManager.writeSession(LogOperation.GLOBAL_REMOVE, session);
sessionMap.remove(session.getTransactionId());
}
}
- 生命周期監聽器的onClose()設定GlobalSession對象的active狀态為false。
- findGlobalSession()方法從DefaultSessionManager傳回GlobalSession對象。
public class DefaultCore implements Core {
public void doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {
for (BranchSession branchSession : globalSession.getReverseSortedBranches()) {
BranchStatus currentBranchStatus = branchSession.getStatus();
if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {
continue;
}
try {
BranchStatus branchStatus = resourceManagerInbound.branchRollback(XID.generateXID(branchSession.getTransactionId()), branchSession.getBranchId(),
branchSession.getResourceId(), branchSession.getApplicationData());
switch (branchStatus) {
case PhaseTwo_Rollbacked:
globalSession.removeBranch(branchSession);
LOGGER.error("Successfully rolled back branch " + branchSession);
continue;
case PhaseTwo_RollbackFailed_Unretryable:
changeToRollbackFailedStatus(globalSession);
globalSession.end();
LOGGER.error("Failed to rollback global[" + globalSession.getTransactionId() + "] since branch[" + branchSession.getBranchId() + "] rollback failed");
return;
default:
LOGGER.info("Failed to rollback branch " + branchSession);
if (!retrying) {
queueToRetryRollback(globalSession);
}
return;
}
} catch (Exception ex) {
LOGGER.info("Exception rollbacking branch " + branchSession, ex);
if (!retrying) {
queueToRetryRollback(globalSession);
if (ex instanceof TransactionException) {
throw (TransactionException) ex;
} else {
throw new TransactionException(ex);
}
}
}
}
if (globalSession.hasBranch()) {
changeToRollbackFailedStatus(globalSession);
} else {
changeToRollbackedStatus(globalSession);
}
globalSession.end();
}
}
- doGlobalRollback()周遊GlobalSession當中所有的branchSession執行復原操作。
- 内部涉及到GlobalSession的狀态遷移,這部分後面統一通過狀态遷移實作。