天天看點

Fescar TC-commit流程

開篇

 這篇文章的目的主要是講解Fescar TC執行commit的流程,目的是講解清楚commit流程中的一些步驟。

 遺憾的是因為commit本身Fescar的分支事務注冊上報,如果事先不了解Fescar的分支事務,有些邏輯了解起來會有一些奇怪,對于branchSession本身還未了解,是以隻能單獨講解commit流程。

背景

說明:

  • 分支事務中資料的 本地鎖 由本地事務管理,在分支事務 Phase1 結束時釋放。

    同時,随着本地事務結束,連接配接 也得以釋放。

  • 分支事務中資料的 全局鎖 在事務協調器側管理,在決議 Phase2 全局送出時,全局鎖馬上可以釋放。隻有在決議全局復原的情況下,全局鎖 才被持有至分支的 Phase2 結束。

這個設計,極大地減少了分支事務對資源(資料和連接配接)的鎖定時間,給整體并發和吞吐的提升提供了基礎。

這裡需要重點指出的是:Phase1階段的commit()操作是各個分支事務本地的事務操作。Phase2階段的操作是全局的commit()和rollback()。TC-commit流程指的就是Phase2階段。

TC commit流程介紹

  • 1.根據transactionId查找begin階段生成的GlobalSession對象。
  • 2.對GlobalSession對象進行清理操作,删除分支事務的鎖并清理GlobalSession對象。
  • 3.TC通知所有RM(各分支事務的資料總管)進行全局送出操作(doGlobalCommit)。

TC commit源碼分析

public class DefaultCoordinator extends AbstractTCInboundHandler
    implements TransactionMessageHandler, ResourceManagerInbound {

    @Override
    protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)
        throws TransactionException {
        response.setGlobalStatus(core.commit(XID.generateXID(request.getTransactionId())));
    }
}           
  • DefaultCoordinator的doGlobalCommit()作為全局復原入口
  • core.commit()根據XID去執行全局commit()操作。

Commit 主流程

public class DefaultCore implements Core {
    public GlobalStatus commit(String xid) throws TransactionException {
         // 1.查找GlobalSession
        GlobalSession globalSession = SessionHolder.findGlobalSession(XID.getTransactionId(xid));
        
        if (globalSession == null) {
            return GlobalStatus.Finished;
        }
        
        GlobalStatus status = globalSession.getStatus();
        // 2.關閉全局session并執行清理工作
        globalSession.closeAndClean(); // Highlight: Firstly, close the session, then no more branch can be registered.

        // 3.執行GlobalCommit通知動作
        if (status == GlobalStatus.Begin) {
            if (globalSession.canBeCommittedAsync()) {
                asyncCommit(globalSession);
            } else {
                doGlobalCommit(globalSession, false);
            }

        }

        // 傳回GlobalCommit後的狀态
        return globalSession.getStatus();
    }
}           
  • DefaultCore是全局復原的核心邏輯。
  • SessionHolder.findGlobalSession查找全局的GlobalSession對象。
  • GlobalSession執行closeAndClean操作。
  • DefaultCore執行doGlobalCommit通知TC執行全局復原操作。

查找GlobalSession

public class SessionHolder {
    public static GlobalSession findGlobalSession(Long transactionId) throws TransactionException {
        return getRootSessionManager().findGlobalSession(transactionId);
    }
}

public class DefaultSessionManager extends AbstractSessionManager {}

public abstract class AbstractSessionManager implements SessionManager, SessionLifecycleListener {

    protected Map<Long, GlobalSession> sessionMap = new ConcurrentHashMap<>();

    public GlobalSession findGlobalSession(Long transactionId) throws TransactionException {
        return sessionMap.get(transactionId);
    }
}           
  • findGlobalSession()方法從DefaultSessionManager當中擷取GlobalSession。
  • DefaultSessionManager的父類AbstractSessionManager的findGlobalSession從sessionMap擷取GlobalSession對象。

GlobalSession的closeAndClean

public class GlobalSession implements SessionLifecycle, SessionStorable {

    public void closeAndClean() throws TransactionException {
        close();
        clean();
    }

    public void close() throws TransactionException {
        if (active) {
            for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
                lifecycleListener.onClose(this);
            }
        }
    }

    private void clean() throws TransactionException {
        for (BranchSession branchSession : branchSessions) {
            branchSession.unlock();
        }
    }
}



public class DefaultSessionManager extends AbstractSessionManager {}
public abstract class AbstractSessionManager implements SessionManager, SessionLifecycleListener {

    public void onClose(GlobalSession globalSession) throws TransactionException {
        globalSession.setActive(false);
    }
}           
  • GlobalSession的執行closeAndClean操作,先執行close再執行clean。
  • lifecycleListener.onClose()執行DefaultSessionManager的onClose()。
  • DefaultSessionManager的onClose()把設定active辨別為false。
  • clean()操作對所有的分支事務branchSession釋放鎖。這部分邏輯比較複雜單獨列出。

BranchSession的unlock

public class BranchSession implements Lockable, Comparable<BranchSession>, SessionStorable {

    public boolean unlock() throws TransactionException {
        if (lockHolder.size() == 0) {
            return true;
        }
        Iterator<Map.Entry<Map<String, Long>, Set<String>>> it = lockHolder.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Map<String, Long>, Set<String>> entry = it.next();
            Map<String, Long> bucket = entry.getKey();
            Set<String> keys = entry.getValue();
            synchronized (bucket) {
                for (String key : keys) {
                    Long v = bucket.get(key);
                    if (v == null) {
                        continue;
                    }
                    if (v.longValue() == getTransactionId()) {
                        bucket.remove(key);
                    }
                }
            }
        }
        lockHolder.clear();
        return true;
    }
}           
  • BranchSession的unlock()操作對BranchSession 進行清理。
  • BranchSession内部的資料由于暫未閱讀該部分代碼是以暫時不能解釋清楚。
  • 全局清除lockHolder。

TC執行GlobalCommit

public class DefaultCore implements Core {

    public void doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
        // 周遊所有的BranchSession執行復原操作
        for (BranchSession branchSession : globalSession.getSortedBranches()) {
            BranchStatus currentStatus = branchSession.getStatus();
            if (currentStatus == BranchStatus.PhaseOne_Failed) {
                continue;
            }
            try {
                BranchStatus branchStatus = resourceManagerInbound.branchCommit(XID.generateXID(
                                            branchSession.getTransactionId()), branchSession.getBranchId(),
                    branchSession.getResourceId(), branchSession.getApplicationData());

                switch (branchStatus) {
                    case PhaseTwo_Committed:
                        globalSession.removeBranch(branchSession);
                        continue;
                    case PhaseTwo_CommitFailed_Unretryable:
                        if (globalSession.canBeCommittedAsync()) {
                            LOGGER.error("By [{}], failed to commit branch {}", branchStatus, branchSession);
                            continue;
                        } else {
                            globalSession.changeStatus(GlobalStatus.CommitFailed);
                            globalSession.end();
                            LOGGER.error("Finally, failed to commit global[{}] since branch[{}] commit failed",
                                globalSession.getTransactionId(), branchSession.getBranchId());
                            return;
                        }
                    default:
                        if (!retrying) {
                            queueToRetryCommit(globalSession);
                            return;
                        }
                        if (globalSession.canBeCommittedAsync()) {
                            LOGGER.error("By [{}], failed to commit branch {}", branchStatus, branchSession);
                            continue;
                        } else {
                            LOGGER.error(
                                "Failed to commit global[{}] since branch[{}] commit failed, will retry later.",
                                globalSession.getTransactionId(), branchSession.getBranchId());
                            return;
                        }

                }

            } catch (Exception ex) {
                LOGGER.info("Exception committing branch {}", branchSession, ex);
                if (!retrying) {
                    queueToRetryCommit(globalSession);
                    if (ex instanceof TransactionException) {
                        throw (TransactionException) ex;
                    } else {
                        throw new TransactionException(ex);
                    }
                }

            }

        }
        if (globalSession.hasBranch()) {
            return;
        }
        globalSession.changeStatus(GlobalStatus.Committed);
        globalSession.end();
    }
}           
  • 對所有的BranchSession執行branchCommit通知。
  • 針對branchCommit傳回狀态進行判斷,有一些邏輯在裡面,後續閱讀了Branch相關資料後再補充狀态轉移圖。

繼續閱讀