天天看點

Fescar example解析 - GlobalTransaction

開篇

 這篇文章是接着

Fescar example解析 - TM流程

的下一步分析,主要是對TM的處理邏輯的進一步分析,理清楚TM(Transaction Manager )的處理步驟以及代碼調用鍊。

 這篇文章的結論是TM執行事務操作包括begin/commit/rollback都是通過DefaultTransactionManager類來實作,實作形式是TM和TC進行網絡通信,在整個TM->TC的過程中TM擔當了Client端的角色,TC擔當了Server端的角色。

背景介紹

事務資料摘自

Fescar概覽

與XA 的模型類似,我們定義 3 個元件來協定分布式事務的處理過程。

Fescar example解析 - GlobalTransaction
  • Transaction Coordinator (TC): 事務協調器,維護全局事務的運作狀态,負責協調并驅動全局事務的送出或復原。
  • Transaction Manager (TM): 控制全局事務的邊界,負責開啟一個全局事務,并最終發起全局送出或全局復原的決議。
  • Resource Manager (RM): 控制分支事務,負責分支注冊、狀态彙報,并接收事務協調器的指令,驅動分支(本地)事務的送出和復原。

一個典型的分布式事務過程:

  1. TM 向 TC 申請開啟一個全局事務,全局事務建立成功并生成一個全局唯一的 XID。
  2. XID 在微服務調用鍊路的上下文中傳播。
  3. RM 向 TC 注冊分支事務,将其納入 XID 對應全局事務的管轄。
  4. TM 向 TC 發起針對 XID 的全局送出或復原決議。
  5. TC 排程 XID 下管轄的全部分支事務完成送出或復原請求。
Fescar example解析 - GlobalTransaction

執行過程

說明,整個執行流程如下:

  • 1.TransactionalTemplate通過GlobalTransactionContext.getCurrentOrCreate()傳回GlobalTransaction對象。
  • 2.GlobalTransactionContext的createNew()方法建立DefaultGlobalTransaction對象。
  • 3.DefaultGlobalTransaction的構造方法當中建立DefaultTransactionManager對象。
  • 4.TransactionalTemplate通過DefaultGlobalTransaction執行begin/commit/rollback等操作。
  • 5.DefaultGlobalTransaction内部通過DefaultTransactionManager執行begin/commit/rollback等操作。

源碼解析

public class TransactionalTemplate {

    public Object execute(TransactionalExecutor business) throws TransactionalExecutor.ExecutionException {

        // 1. 建立一個GlobalTransaction對象
        GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

        // 2. 通過GlobalTransaction開始執行事務
        try {
            tx.begin(business.timeout(), business.name());

        } catch (TransactionException txe) {
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                TransactionalExecutor.Code.BeginFailure);

        }
}           

說明:

  • 建立tx對象,GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate()。
  • 執行全局事務,tx.beigin()及其他省略的一部分代碼。
public class GlobalTransactionContext {

    private static final ThreadLocal<GlobalTransaction> THREAD_TRANSACTION_CONTEXT = new ThreadLocal<>();

    private GlobalTransactionContext() {
    }

    // 建立GlobalTransaction對象
    private static GlobalTransaction createNew() {
        GlobalTransaction tx = new DefaultGlobalTransaction();
        THREAD_TRANSACTION_CONTEXT.set(tx);
        return THREAD_TRANSACTION_CONTEXT.get();
    }

    public static GlobalTransaction getCurrent() {
        GlobalTransaction tx = THREAD_TRANSACTION_CONTEXT.get();
        if (tx != null) {
            return tx;
        }
        String xid = RootContext.getXID();
        if (xid == null) {
            return null;
        }
        tx = new DefaultGlobalTransaction(xid);
        THREAD_TRANSACTION_CONTEXT.set(tx);
        return THREAD_TRANSACTION_CONTEXT.get();
    }

    public static GlobalTransaction getCurrentOrCreate() {
        GlobalTransaction tx = getCurrent();
        if (tx == null) {
            return createNew();
        }
        return tx;
    }
}           
  • createNew()方法建立GlobalTransaction tx對象,類型是DefaultGlobalTransaction。
  • 儲存tx到線程當中實作線程隔離,THREAD_TRANSACTION_CONTEXT.set(tx)。
  • GlobalTransaction對象負責執行事務的begin()、commit()、rollback()等方法。
public class DefaultGlobalTransaction implements GlobalTransaction {

    private static final int DEFAULT_GLOBAL_TX_TIMEOUT = 60000;
    private static final String DEFAULT_GLOBAL_TX_NAME = "default";

    private TransactionManager transactionManager;

    private String xid;

    private GlobalStatus status = GlobalStatus.UnKnown;

    private GlobalTransactionRole role = GlobalTransactionRole.Launcher;

    DefaultGlobalTransaction(String xid) {
        this.transactionManager = DefaultTransactionManager.get();
        this.xid = xid;
        if (xid != null) {
            status = GlobalStatus.Begin;
            role = GlobalTransactionRole.Participant;
        }
    }

    @Override
    public void begin(int timeout, String name) throws TransactionException {
        if (xid == null && role == GlobalTransactionRole.Launcher) {
            xid = transactionManager.begin(null, null, name, timeout);
            status = GlobalStatus.Begin;
            RootContext.bind(xid);
        } else {
            if (xid == null) {
                throw new ShouldNeverHappenException(role + " is NOT in a global transaction context.");
            }
            LOGGER.info(role + " is already in global transaction " + xid);
        }

    }

    @Override
    public void commit() throws TransactionException {
        check();
        RootContext.unbind();
        if (role == GlobalTransactionRole.Participant) {
            // Participant has no responsibility of committing
            return;
        }
        status = transactionManager.commit(xid);

    }

    @Override
    public void rollback() throws TransactionException {
        check();
        RootContext.unbind();
        if (role == GlobalTransactionRole.Participant) {
            // Participant has no responsibility of committing
            return;
        }
        status = transactionManager.rollback(xid);

    }

    @Override
    public GlobalStatus getStatus() throws TransactionException {
        check();
        status = transactionManager.getStatus(xid);
        return status;
    }
}           
  • DefaultGlobalTransaction構造函數建立transactionManager 對象,this.transactionManager = DefaultTransactionManager.get();
  • DefaultGlobalTransaction的begin/commit/rollback通過TransactionManager的begin/commit/rollback實作。
public class DefaultTransactionManager implements TransactionManager {

    private static class SingletonHolder {
        private static final TransactionManager INSTANCE = new DefaultTransactionManager();
    }

    public static TransactionManager get() {
        return SingletonHolder.INSTANCE;
    }

    private DefaultTransactionManager() {

    }

    @Override
    public String begin(String applicationId, String transactionServiceGroup, 
                        String name, int timeout) throws TransactionException {
        GlobalBeginRequest request = new GlobalBeginRequest();
        request.setTransactionName(name);
        request.setTimeout(timeout);
        GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
        return response.getXid();
    }

    @Override
    public GlobalStatus commit(String xid) throws TransactionException {
        long txId = XID.getTransactionId(xid);
        GlobalCommitRequest globalCommit = new GlobalCommitRequest();
        globalCommit.setTransactionId(txId);
        GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
        return response.getGlobalStatus();
    }

    @Override
    public GlobalStatus rollback(String xid) throws TransactionException {
        long txId = XID.getTransactionId(xid);
        GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();
        globalRollback.setTransactionId(txId);
        GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback);
        return response.getGlobalStatus();
    }

    @Override
    public GlobalStatus getStatus(String xid) throws TransactionException {
        long txId = XID.getTransactionId(xid);
        GlobalStatusRequest queryGlobalStatus = new GlobalStatusRequest();
        queryGlobalStatus.setTransactionId(txId);
        GlobalStatusResponse response = (GlobalStatusResponse) syncCall(queryGlobalStatus);
        return response.getGlobalStatus();
    }

    private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
        try {
            return (AbstractTransactionResponse) TmRpcClient.getInstance().sendMsgWithResponse(request);
        } catch (TimeoutException toe) {
            throw new TransactionException(TransactionExceptionCode.IO, toe);
        }
    }
}           
  • DefaultTransactionManager 是單例實作全局唯一。
  • DefaultTransactionManager 是TM實作begin/commit/rollback的核心邏輯。
  • DefaultTransactionManager 的begin/commit/rollback通過和TC通信實作。
  • DefaultTransactionManager 的syncCall實作和TC通信。

繼續閱讀