天天看點

分布式事務解決方案——TCC

TCC是Try、Confirm、Cancel三個詞語的縮寫,TCC要求每個分支事務實作三個操作:預處理Try、确認Confirm、撤銷Cancel。

1、Try 階段是做業務檢查(一緻性)及資源預留(隔離),此階段僅是一個初步操作,它和後續的Confirm一起才能真正構成一個完整的業務邏輯。

2、Confirm 階段是做确認送出,Try階段所有分支事務執行成功後開始執行Confirm。通常情況下,TCC認為Confirm階段是不會出錯的,若Confirm階段真的出錯了,則重試或人工處理。

3、Cancel 階段是在業務執行錯誤需要復原的狀态下執行分支事務的業務取消,預留資源釋放。通常情況下,TCC認為Cancel階段也是一定成功的,若Cancel階段真的出錯了,則重試或人工處理。

TM(事務管理器)首先發起所有的分支事務的try操作,任何一個分支事務的try操作執行失敗,TM将會發起所有分支事務的Cancel操作;若try操作全部成功,TM将會發起所有分支事務的Confirm操作,其中Confirm/Cancel操作若執行失敗,TM會進行重試。

所有try都成功

分布式事務解決方案——TCC

有一個try失敗

分布式事務解決方案——TCC

特别提醒

  1. 所有分支事務的try階段執行成功,則會執行confirm階段。TCC認為confirm階段一定會執行成功,如果confirm執行失敗,則會重試或者人工處理錯誤。
  1. 任何一個分支事務的try階段執行失敗,則會執行cancel階段。TCC認為cancel階段一定會執行成功,如果cancel執行失敗,則會重試或者人工處理錯誤。

TCC需要注意三種異常處理分别是:空復原、幂等、懸挂。

空復原

分布式事務解決方案——TCC

事務管理器調用服務的try操作,可能會出現因為丢包而導緻的網絡逾時,導緻應用的try階段沒執行,事務管理器認為執行try逾時,會觸發cancel操作。這就導緻cancel比try先執行。

懸挂

分布式事務解決方案——TCC

1、事務協調器在調用 TCC 服務的一階段 Try 操作時,可能會出現因網絡擁堵而導緻的逾時。

2、此時事務管理器會觸發二階段復原,調用 TCC 服務的 Cancel 操作,Cancel 執行正常。

3、在此之後,擁堵在網絡上的一階段 Try 資料包被 TCC 服務收到,出現了二階段 Cancel 請求比一階段 Try 請求先執行的情況。

4、此 TCC 服務在執行晚到的 Try 之後,将永遠不會再收到二階段的 Confirm 或者 Cancel ,造成 TCC 服務懸挂。

幂等

try、confirm、cancel都會被重複調用,需要做幂等處理。

處理空復原、懸挂、幂等

可以使用日志表,來解決 空復原、懸挂、幂等。

建立3張表

try階段日志表local_try_log
字段 注釋
tx_no 全局事務id
create_time 建立時間
confirm階段日志表local_confirm_log
字段 注釋
tx_no 全局事務id
create_time 建立時間
cancel階段日志表local_cancel_log
字段 注釋
tx_no 全局事務id
create_time 建立時間

例子

從銀行1轉賬10元給銀行2,使用TCC方案

方案1

銀行1

try:
    幂等校驗,查找try日志(全局事務id是主鍵)
    懸挂處理,查找confirm、cancel日志(全局事務id是主鍵)
    檢查餘額是否夠10元
    鎖定10元
    插入try日志(全局事務id是主鍵)
confirm:
    幂等校驗,查找confirm日志(全局事務id是主鍵)
    扣減10元
    删除鎖定10元
    插入confirm日志(全局事務id是主鍵)
cancel:
    cancel幂等校驗,查找cancel日志(全局事務id是主鍵)
    空復原處理,查找try日志(全局事務id是主鍵)
    增加餘額10元(復原)
    删除鎖定10元
    插入cancel日志(全局事務id是主鍵)
           

銀行2

try:
    幂等校驗,查找try日志(全局事務id是主鍵)
    懸挂處理,查找confirm、cancel日志(全局事務id是主鍵)
    插入待激活10元
    插入try日志(全局事務id是主鍵)
confirm:
    幂等校驗,查找confirm日志(全局事務id是主鍵)
    正式增加30元
    删除待激活10元
    插入confirm日志(全局事務id是主鍵)
cancel:
    空
           

由于業務很簡單,上面的流程還可以取消鎖定,解鎖的操作,直接在銀行1的try中扣減10元,流程如下。

方案2

銀行1

try:
    幂等校驗,查找try日志(全局事務id是主鍵)
    懸挂處理,查找confirm、cancel日志(全局事務id是主鍵)
    檢查餘額是否夠10元
    扣減10元
    插入try日志(全局事務id是主鍵)
confirm:
    空
cancel:
    cancel幂等校驗,查找cancel日志(全局事務id是主鍵)
    空復原處理,查找try日志(全局事務id是主鍵)
    增加餘額10元(復原)
    插入cancel日志(全局事務id是主鍵)
           

銀行2

try:
    空
confirm:
    幂等校驗,查找confirm日志(全局事務id是主鍵)
    正式增加30元
    插入confirm日志(全局事務id是主鍵)
cancel:
    空
           

Hmily實作方案2的代碼

服務1

@Service
@Slf4j
public class Bank1ServiceImpl implements Bank1Service {

    @Autowired
    AccountInfoDao accountInfoDao;
    @Autowired
    HmilyLogDao hmilyLogDao;

    @Autowired
    Bank2Client bank2Client;

    @Override
    @Transactional(rollbackFor = Exception.class)
    @Hmily(confirmMethod = "confirm", cancelMethod = "cancel")
    public void updateAccountBalance(String msg, Double amount) {
        // 全局事務id
        String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
        log.info("bank1 try 開始,transId={}", transId);

        // 幂等判斷
        int existTry = hmilyLogDao.isExistTry(transId);
        // 通故全局事務id查找到try日志,表明已經隻執行過try
        if (existTry > 0) {
            log.info("已經執行過try,無需重複執行try,transId={}", transId);
            return;
        }

        // 懸挂處理
        int existConfirm = hmilyLogDao.isExistConfirm(transId);
        int existCancel = hmilyLogDao.isExistCancel(transId);
        // 通故全局事務id查找到confirm、cancel日志,表明已經隻執行過confirm、cancel
        if (existConfirm > 0 || existCancel > 0) {
            log.info("confirm,cancel有一個已經執行過,try不能再次執行,transId={}", transId);
            return;
        }

        // 制造空復原
        if (StringUtils.equals("制造空復原", msg)) {
            throw new RuntimeException("try方法沒修改資料庫就抛出異常,cancel方法會執行,形成空復原,transId=" + transId);
        }

        // blank1減金額
        accountInfoDao.subtractAccountBalance("1", amount);

        // 添加try日志記錄,try日志和扣減餘額在同一個本地事務中,要麼都成功,要麼都失敗
        // 日志的元件id必須是全局事務id,如果同一個事物重複調用try,到這一步會報主鍵重複
        hmilyLogDao.addTry(transId);

        // 遠端調用
        Boolean result = bank2Client.transfer(msg, amount);
        if (!result) {
            throw new RuntimeException("調用bank2失敗");
        }

        // bank1調用bank2成功後,發生異常,模拟復原
        if (StringUtils.equals("bank1調用bank2成功後,發生異常,模拟復原", msg)) {
            throw new RuntimeException("bank1調用bank2成功後,發生異常,模拟復原,transId=" + transId);
        }
    }

    public void confirm(String accountNo, Double amount) {
        String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
        log.info("bank1 confirm 開始執行,transId={}", transId);
    }

    @Transactional(rollbackFor = Exception.class)
    public void cancel(String msg, Double amount) {
        // 全局事務id
        String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
        log.info("bank1 cancel 開始執行,transId={}", transId);

        // 幂等判斷
        int existCancel = hmilyLogDao.isExistCancel(transId);
        if (existCancel > 0) {
            log.info("cancel已經執行過,無需重複執行,transId={}", transId);
            return;
        }

        // 處理空復原
        int existTry = hmilyLogDao.isExistTry(transId);
        if (existTry == 0) {
            log.info("try未執行過,不能執行cancel,transId={}", transId);
            return;
        }

        // bank1復原,加錢
        accountInfoDao.addAccountBalance(msg, amount);
        // 添加日志
        hmilyLogDao.addCancel(transId);
    }

}
           
@Service
@Slf4j
public class Bank2ServiceImpl implements Bank2Service {

    @Autowired
    AccountInfoDao accountInfoDao;

    @Autowired
    HmilyLogDao hmilyLogDao;

    @Override
    @Hmily(confirmMethod = "confirm", cancelMethod = "cancel")
    public void updateAccountBalance(String msg, Double amount) {
        String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
        log.info("bank2 try 開始執行,transId:{}",transId);
    }

    @Transactional(rollbackFor = Exception.class)
    public void confirm(String msg, Double amount) {
        // 全局事務id
        String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
        log.info("bank2 confirm 開始執行,transId:{}",transId);

        int existConfirm = hmilyLogDao.isExistConfirm(transId);
        if (existConfirm > 0) {
            log.info("bank2 confirm 已經執行過,無需再次執行,transId", transId);
            return;
        }

        // bank2加錢
        accountInfoDao.addAccountBalance("2", amount);
        // 添加confirm日志
        hmilyLogDao.addConfirm(transId);

        // bank2 confirm,抛出異常,會重試
        if (StringUtils.equals("confirm抛出異常會重試", msg)) {
            throw new RuntimeException("confirm抛出異常會重試,transId=" + transId);
        }
    }

    public void cancel(String msg, Double amount) {
        String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
        log.info("bank2 cancel 開始執行,transId:{}",transId);
    }

}