天天看點

Fescar TC-rollback流程

開篇

 這篇文章的目的主要是講解Fescar TC執行rollback的流程,目的是講解清楚rollback流程中的一些步驟。

 遺憾的是因為rollback本身涉及Fescar的分支事務注冊上報,如果事先不了解Fescar的分支事務,有些邏輯了解起來會有一些奇怪,對于branchSession本身還未了解,是以隻能單獨講解rollback流程。

背景

說明:

  • 分支事務中資料的 本地鎖 由本地事務管理,在分支事務 Phase1 結束時釋放。

    同時,随着本地事務結束,連接配接 也得以釋放。

  • 分支事務中資料的 全局鎖 在事務協調器側管理,在決議 Phase2 全局送出時,全局鎖馬上可以釋放。隻有在決議全局復原的情況下,全局鎖 才被持有至分支的 Phase2 結束。

這個設計,極大地減少了分支事務對資源(資料和連接配接)的鎖定時間,給整體并發和吞吐的提升提供了基礎。

這裡需要重點指出的是:Phase1階段的commit()操作是各個分支事務本地的事務操作。Phase2階段的操作是全局的commit()和rollback()。TC-rollback流程指的就是Phase2階段。

TC rollback流程介紹

rollback主流程

  • 1.根據transactionId查找begin階段生成的GlobalSession對象。
  • 2.對GlobalSession對象進行close操作。
  • 3.TC通知所有RM(各分支事務的資料總管)進行全局復原操作(doGlobalRollback)。

TC rollback源碼分析

public class DefaultCore implements Core {
    public GlobalStatus rollback(String xid) throws TransactionException {
        // 查找全局GlobalSession對象
        GlobalSession globalSession = 
         SessionHolder.findGlobalSession(XID.getTransactionId(xid));

        if (globalSession == null) {
            return GlobalStatus.Finished;
        }
        GlobalStatus status = globalSession.getStatus();
        // Highlight: Firstly, close the session, 
        // then no more branch can be registered.
    
        // 關閉全局的GlobalSession對象。
        globalSession.close(); 

        if (status == GlobalStatus.Begin) {
            globalSession.changeStatus(GlobalStatus.Rollbacking);
            // 執行全局的rollback操作
            doGlobalRollback(globalSession, false);

        }
        return globalSession.getStatus();
    }
}           
  • 查找全局的GlobalSession對象。
  • 關閉GlobalSession對象。
  • 執行全局的rollback操作。

GlobalSession關閉操作

public class GlobalSession implements SessionLifecycle, SessionStorable {

    public void close() throws TransactionException {
        if (active) {
            for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
                lifecycleListener.onClose(this);
            }
        }
    }

    public void onEnd(GlobalSession globalSession) throws TransactionException {
        removeGlobalSession(globalSession);
    }

    public void onStatusChange(GlobalSession globalSession, GlobalStatus status) throws TransactionException {
        updateGlobalSessionStatus(globalSession, status);
    }

    public void updateGlobalSessionStatus(GlobalSession session, GlobalStatus status) throws TransactionException {
        transactionStoreManager.writeSession(LogOperation.GLOBAL_UPDATE, session);
    }

    public void changeStatus(GlobalStatus status) throws TransactionException {
        for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
            lifecycleListener.onStatusChange(this, status);
        }
        this.status = status;
    }

}           
  • GlobalSession的close操作調用生命周期監聽器lifecycleListener.onClose()。
  • lifecycleListener指的是DefaultSessionManager對象。

DefaultSessionManager操作

public class SessionHolder {
    public static GlobalSession findGlobalSession(Long transactionId) 
      throws TransactionException {
        return getRootSessionManager().findGlobalSession(transactionId);
    }
}

public class DefaultSessionManager extends AbstractSessionManager {}

public abstract class AbstractSessionManager implements 
             SessionManager, SessionLifecycleListener {

    protected Map<Long, GlobalSession> sessionMap = new ConcurrentHashMap<>();

    public GlobalSession findGlobalSession(Long transactionId) 
      throws TransactionException {
        return sessionMap.get(transactionId);
    }

    public void onClose(GlobalSession globalSession) throws TransactionException {
        globalSession.setActive(false);

    }

    public void removeGlobalSession(GlobalSession session) 
                throws TransactionException {
        transactionStoreManager.writeSession(LogOperation.GLOBAL_REMOVE, session);
        sessionMap.remove(session.getTransactionId());

    }
}           
  • 生命周期監聽器的onClose()設定GlobalSession對象的active狀态為false。
  • findGlobalSession()方法從DefaultSessionManager傳回GlobalSession對象。
public class DefaultCore implements Core {

   public void doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {
        for (BranchSession branchSession : globalSession.getReverseSortedBranches()) {
            BranchStatus currentBranchStatus = branchSession.getStatus();
            if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {
                continue;
            }
            try {
                BranchStatus branchStatus = resourceManagerInbound.branchRollback(XID.generateXID(branchSession.getTransactionId()), branchSession.getBranchId(),
                    branchSession.getResourceId(), branchSession.getApplicationData());

                switch (branchStatus) {
                    case PhaseTwo_Rollbacked:
                        globalSession.removeBranch(branchSession);
                        LOGGER.error("Successfully rolled back branch " + branchSession);
                        continue;
                    case PhaseTwo_RollbackFailed_Unretryable:
                        changeToRollbackFailedStatus(globalSession);
                        globalSession.end();
                        LOGGER.error("Failed to rollback global[" + globalSession.getTransactionId() + "] since branch[" + branchSession.getBranchId() + "] rollback failed");
                        return;
                    default:
                        LOGGER.info("Failed to rollback branch " + branchSession);
                        if (!retrying) {
                            queueToRetryRollback(globalSession);
                        }
                        return;

                }
            } catch (Exception ex) {
                LOGGER.info("Exception rollbacking branch " + branchSession, ex);
                if (!retrying) {
                    queueToRetryRollback(globalSession);
                    if (ex instanceof TransactionException) {
                        throw (TransactionException) ex;
                    } else {
                        throw new TransactionException(ex);
                    }
                }

            }

        }
        if (globalSession.hasBranch()) {
            changeToRollbackFailedStatus(globalSession);
        } else {
            changeToRollbackedStatus(globalSession);
        }
        globalSession.end();
    }
}           
  • doGlobalRollback()周遊GlobalSession當中所有的branchSession執行復原操作。
  • 内部涉及到GlobalSession的狀态遷移,這部分後面統一通過狀态遷移實作。

繼續閱讀