天天看點

分布式事務中間件 Fescar—RM 子產品源碼解讀

前言

在SOA、微服務架構流行的年代,許多複雜業務上需要支援多資源占用場景,而在分布式系統中因為某個資源不足而導緻其它資源占用復原的系統設計一直是個難點。我所在的團隊也遇到了這個問題,為解決這個問題上,團隊采用的是阿裡開源的分布式中間件Fescar的解決方案,并詳細了解了Fescar内部的工作原理,解決在使用Fescar中間件過程中的一些疑慮的地方,也為後續團隊在繼續使用該中間件奠定理論基礎。

目前分布式事務解決方案基本是圍繞兩階段送出模式來設計的,按對業務是有侵入分為:對業務無侵入的基于XA協定的方案,但需要資料庫支援XA協定并且性能較低;對業務有侵入的方案包括:TCC等。Fescar就是基于兩階段送出模式設計的,以高效且對業務零侵入的方式,解決微服務場景下面臨的分布式事務問題。Fescar設計上将整體分成三個大子產品,即TM、RM、TC,具體解釋如下:

  1. TM(Transaction Manager):全局事務管理器,控制全局事務邊界,負責全局事務開啟、全局送出、全局復原。
  2. RM(Resource Manager):資料總管,控制分支事務,負責分支注冊、狀态彙報,并接收事務協調器的指令,驅動分支(本地)事務的送出和復原。
  3. TC(Transaction Coordinator):事務協調器,維護全局事務的運作狀态,負責協調并驅動全局事務的送出或復原。

本文将深入到Fescar的RM子產品源碼去介紹Fescar是如何在完成分支送出和復原的基礎上又做到零侵入,進而極大友善業務方進行業務系統開發。

一、從配置開始解讀

上圖是Fescar源碼examples子產品dubbo-order-service.xml内的配置,資料源采用druid的DruidDataSource,但實際jdbcTemplate執行時并不是用該資料源,而用的是Fescar對DruidDataSource的代理DataSourceProxy,是以,與RM相關的代碼邏輯基本上都是從DataSourceProxy這個代理資料源開始的。

Fescar采用2PC來完成分支事務的送出與復原,具體怎麼做到的呢,下面就分别介紹Phase1、Phase2具體做了些什麼。

二、Phase1—分支(本地)事務執行

Fescar将一個本地事務做為一個分布式事務分支,是以若幹個分布在不同微服務中的本地事務共同組成了一個全局事務,結構如下。

那麼,一個本地事務中SQL是如何執行呢?在Spring中,本質上都是從jdbcTemplate開始的,比如下面的SQL語句:

jdbcTemplate.update("update storage_tbl set count = count - ? where commodity_code = ?", new Object[] {count, commodityCode});           

一般JdbcTemplate執行流程如下圖所示:

由于在配置中,JdbcTemplate資料源被配置成了Fescar實作DataSourceProxy,進而控制了後續的資料庫連接配接使用的是Fescar提供的ConnectionProxy,Statment使用的是Fescar實作的StatmentProxy,最終Fescar就順理成章地實作了在本地事務執行前後增加所需要的邏輯,比如:完成分支事務的快照記錄和分支事務執行狀态的上報等等。

DataSourceProxy擷取ConnectionProxy:

ConnectionProxy擷取StatmentProxy:

在擷取到StatmentProxy後,可以調用excute方法執行sql了

而真正excute實作邏輯如下:

  1. 首先會檢查目前本地事務是否處于全局事務中,如果不處于,直接使用預設的Statment執行,避免因引入Fescar導緻非全局事務中的SQL執行性能下降。
  2. 解析Sql,有緩存機制,因為有些sql解析會比較耗時,可能會導緻在應用啟動後剛開始的那段時間裡處理全局事務中的sql執行效率降低。
  3. 對于INSERT、UPDATE、DELETE、SELECT..FOR UPDATE這四種類型的sql會專門實作的SQL執行器進行處理,其它SQL直接是預設的Statment執行。
  4. 傳回執行結果,如有異常則直接抛給上層業務代碼進行處理。

再來看一下關鍵的INSERT、UPDATE、DELETE、SELECT..FOR UPDATE這四種類型的sql如何執行的,先看一下具體類圖結構:

為結省篇幅,選擇UpdateExecutor實作源碼看一下,先看入口BaseTransactionalExecutor.execute,該方法将ConnectionProxy與Xid(事務ID)進行綁定,這樣後續判斷目前本地事務是否處理全局事務中隻需要看ConnectionProxy中Xid是否為空。

然後,執行AbstractDMLBaseExecutor中實作的doExecute方法

基本邏輯如下:

  1. 先判斷是否為Auto-Commit模式
  2. 如果非Auto-Commit模式,則先查詢Update前對應行記錄的快照beforeImage,再執行Update語句,完成後再查詢Update後對應行記錄的快照afterImage,最後将beforeImage、afterImage生成UndoLog追加到Connection上下文ConnectionContext中。(注:擷取beforeImage、afterImage方法在UpdateExecutor類下,一般是構造一條Select...For Update語句擷取執行前後的行記錄,同時會檢查是否有全局鎖沖突,具體可參考源碼)
  3. 如果是Auto-Commit模式,先将送出模式設定成非自動Commit,再執行2中的邏輯,再執行connectionProxy.commit()方法,由于執行2過程和commit時都可能會出現全局鎖沖突問題,增加了一個循環等待重試邏輯,最後将connection的模式設定成Auto-Commit模式

如果本地事務執行過程中發生異常,業務上層會接收到該異常,至于是給TM子產品傳回成功還是失敗,由業務上層實作決定,如果傳回失敗,則TM裁決對全局事務進行復原;如果本地事務執行過程未發生異常,不管是非Auto-Commit還是Auto-Commit模式,最後都會調用connectionProxy.commit()對本地事務進行送出,在這裡會建立分支事務、上報分支事務的狀态以及将UndoLog持久化到undo_log表中,具體代碼如下圖:

基本邏輯:

  1. 判斷目前本地事務是否處于全局事務中(也就判斷ConnectionContext中的xid是否為空)。
  2. 如果不處于全局事務中,則調用targetConnection對本地事務進行commit。
  3. 如果處于全局事務中,首先建立分支事務,再将ConnectionContext中的UndoLog寫入到undo_log表中,然後調用targetConnection對本地事務進行commit,将UndoLog與業務SQL一起送出,最後上報分支事務的狀态(成功 or 失敗),并将ConnectionContext上下文重置。

綜上所述,RM子產品通過對JDBC資料源進行代理,幹預業務SQL執行過程,加入了很多流程,比如業務SQL解析、業務SQL執行前後的資料快照查詢并組織成UndoLog、全局鎖檢查、分支事務注冊、UndoLog寫入并随本地事務一起Commit、分支事務狀态上報等。通過這種方式,Fescar真正做到了對業務代碼無侵入,隻需要通過簡單的配置,業務方就可以輕松享受Fescar所帶來的功能。Phase1整體流程引用Fescar官方圖總結如下:

三、Phase2-分支事務送出或復原

階段2完成的是全局事物的最終送出或復原,當全局事務中所有分支事務全部完成并且都執行成功,這時TM會發起全局事務送出,TC收到全全局事務送出消息後,會通知各分支事務進行送出;同理,當全局事務中所有分支事務全部完成并且某個分支事務失敗了,TM會通知TC協調全局事務復原,進而TC通知各分支事務進行復原。

在業務應用啟動過程中,由于引入了Fescar用戶端,RmRpcClient會随應用一起啟動,該RmRpcClient采用Netty實作,可以接收TC消息和向TC發送消息,是以RmRpcClient是與TC收發消息的關鍵子產品。

public class RMClientAT {

    public static void init(String applicationId, String transactionServiceGroup) {
        RmRpcClient rmRpcClient = RmRpcClient.getInstance(applicationId, transactionServiceGroup);
        AsyncWorker asyncWorker = new AsyncWorker();
        asyncWorker.init();
        DataSourceManager.init(asyncWorker);
        rmRpcClient.setResourceManager(DataSourceManager.get());
        rmRpcClient.setClientMessageListener(new RmMessageListener(new RMHandlerAT()));
        rmRpcClient.init();
    }
}           

上述代碼展示是的RmRpcClient初始化過程,有三個關鍵類RMHandlerAT、AsyncWorker和DataSourceManager。RMHandlerAT具有了分支送出和復原兩個方法,分支送出或復原的邏輯可以從這裡開始看;AsyncWorker是一個異步Worker,主要是完成分支事務異步送出的功能,具有失敗重試功能;DataSourceManager對資料源管理和維護。

下面分成兩部分來講:分支事務送出、分去事務復原。

3.1、分支事務送出

在接收到TC發起的全局送出消息後,經RmRpcClient對通信協定的處理,再交由RMHandlerAT來完成對分支事務的送出,分支事務送出從RMHandlerAT.doBranchCommit()開始,但最後由AsyncWorker異步Worker完成,直接看AsyncWorker中的代碼實作:

分支事務送出關鍵邏輯在doBranchCommits方法中:

該方法主要是批量删除UndoLog日志,但并未使用ConnectionProxy去執行删除SQL,可能原因是:1、完全沒必要 2、考慮效率優先

同樣,對于分支事務送出也引用Fescar官方一張圖來結尾:

3.2、分支事務復原

同樣,分支事務復原是從RMHandlerAT.doBranchRollback開始的,然後到了dataSourceManager.branchRollback,最後完成分支事務復原邏輯的是UndoLogManager.undo方法。

@Override
    protected void RMHandlerAT:doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response) throws TransactionException {
        String xid = request.getXid();
        long branchId = request.getBranchId();
        String resourceId = request.getResourceId();
        String applicationData = request.getApplicationData();
        LOGGER.info("AT Branch rolling back: " + xid + " " + branchId + " " + resourceId);
        BranchStatus status = dataSourceManager.branchRollback(xid, branchId, resourceId, applicationData);
        response.setBranchStatus(status);
        LOGGER.info("AT Branch rollback result: " + status);
    }
    
     @Override
    public BranchStatus DataSourceManager:branchRollback(String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
        DataSourceProxy dataSourceProxy = get(resourceId);
        if (dataSourceProxy == null) {
            throw new ShouldNeverHappenException();
        }
        try {
            UndoLogManager.undo(dataSourceProxy, xid, branchId);
        } catch (TransactionException te) {
            if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
                return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
            } else {
                return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
            }
        }
        return BranchStatus.PhaseTwo_Rollbacked;
    }           

UndoLogManager.undo方法源碼如下:

從上圖可以看出,整個復原到全局事務之前狀态的代碼邏輯集中在如下代碼中:

AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(dataSourceProxy.getDbType(), sqlUndoLog);
undoExecutor.executeOn(conn);           

首先通過UndoExecutorFactory擷取到對應的UndoExecutor,然後再執行UndoExecutor的executeOn方法完成復原操作。目前三種類型的UndoExecutor結構如下:

undoExecutor.executeOn源碼如下:

至此,整個分支事務復原就結束了,分支事務復原整體時序圖如下:

引入Fescar官方對分支事務復原原理介紹圖作為結尾:

綜合上述,Fescar在Phase2通過UndoLog自動完成分支事務送出與復原,在這個過程中不需要業務方做任何處理,業務方無感覺,因些在該階段對業務代碼也是無侵入的。

四、總結

本文主要介紹了RM子產品的相關代碼,将RM子產品按2PC模式分成Phase1和Phase2分别進行介紹,從Fescar源碼上看,整個源碼結構清晰,有利于研發人員快速學習Fescar的原理。在使用方面,隻需進行簡單的配置,就可以享受Fescar帶來的便捷功能,對業務做到了無侵入;同時在性能方面,Fescar在分支事務送出過程中采用異步模式,減少了全局鎖的占用時間,進而提升了整體性能。後續,将繼續學習Fescar的其它子產品(TM、TC)與全局鎖的實作邏輯,并做相關總結介紹。

參考

作者介紹

王慎波,社群ID wangshenbo,阿裡巴巴進階開發工程師,專注于供應鍊平台的研發,對供應鍊系統中遇到的複雜業務場景的技術解決方案有思考和總結,長期關注分布式系統、分布式事務、領域驅動設計和微服務架構等。