天天看點

Spring Cloud LCN 分布式事務

1,分布式事務

分布式事務産生的原因:當系統從單體應用按照領域,做高内聚、低耦合的拆分後,一個單體應用變成了多個分布式子系統。分布式子系統互相之間的協作。由于不在一個程序内,不是一個資料庫連接配接,當服務編排中,各子系統協同完成業務時,需要保證所有協作子系統要麼都成功,要麼都失敗,這即滿足在分布式環境下,多個系統的操作原子性的分布式事務。大型分布式應用由于資料的快速擴張,以及資料的高可用,通常需要跨網絡線性的伸縮。那麼在很多時候,需要考慮的不僅僅是資料的一緻性,本文僅讨論在特殊的、必須實作分布式事務的場景下,分布式事務的實作理論及LCN架構。

分布式事務常見算法:

2PC: two-phase commit,2階段送出。即将分布式事務分成2個階段:prepare、commit。事務協調者向所有參與發起prepare,所有參與者應答。當所有參與者應答yes,協調者向所有參與者發起commit,否則向所有參與者發起rollback。這要求所有參與者都實作prepare、commit和rollback接口。

2PC算法中,參與方在等待協調者送出事務通知時是阻塞狀态,協調者單點的話,問題會很嚴重。但即使協調者存在叢集的情況下,避免了嚴重的阻塞問題。但是2PC算法本身還存在出現資料不一緻的可能。比如:協調者叢集接收到所有參與者的yes應答,則通知所有參與者commit;但是當通知發送時,某個參與者由于網絡原因,不能正常接收并處理事務送出,而其他參與者都已經成功commit;此時,所有參與者的操作就不再滿足原子性,導緻資料出現不一緻。

3PC: three-phase commit,3階段送出。即将分布式事務分成3個階段:canCommit、preCommit、doCommit。在2階段算法的基礎上,進一步細分,同時引入了參與方逾時等算法。通過preCommit階段後,預設認為是可以送出事務的,那麼當參與方等待逾時後,會預設送出事務。(畢竟前面已經确認過,所有系統都是可以送出事務的。canCommit類似2階段的prepare階段)。有效解決了2PC中阻塞的問題。但是類似2PC,也存在資料不一緻的可能性。比如:當第三階段,協調者發送終端指令,但是某一個參與方因網絡問題未收到,執行了commit;資料就不一緻了。

TCC:Try/Confirm/Cancel;事務協調者(一般就是事務發起方)調用所有參與者的try;任一失敗,則調用所有的cancel;如都成功,則一起執行confirm;TCC的各方法作用:try中,處理預訂資源(比如A賬戶100元餘額;try後變成:50餘額,50當機額;);在confirm中,根據try中預訂的資源(當機額)處理業務;在Cancel中,處理還原資料。(實際使用中,要根據業務場景設計Try、Confirm、Cancel代碼;并且要考慮幂等性問題)

TCC的特點:

1,實際上每個參與方的try、confirm、cancel方法,每個方法都是在獨立的本地事務中,每個方法執行就是一個完整的本地事務。

2,try隻有一次,不管成功還是失敗,決定後面是執行confirm還是執行cancel;

3,所有參與方的confirm方法(或者cancel)在執行時,必須成功,如果不成功,則一直重試。(是以需要幂等性設計)。

TCC的實作複雜度比較高,實際使通過對業務實作進行設計,以滿足特殊場景的需要。避免了資料庫層面的過高性能消耗。在必要的時候,TCC是個不錯的選擇方案。

2,LCN

2.1 LCN是什麼

LCN是國産開源的分布式事務處理架構。LCN即:lock(鎖定事務單元)、confirm(确認事務子產品狀态)、notify(通知事務)。

LCN的實作是基于3PC的算法,結合TCC的補償機制。

LCN正常執行序列圖(來源于官方):

Spring Cloud LCN 分布式事務

LCN異常執行序列圖(來源于官方):

Spring Cloud LCN 分布式事務

3,Spring Cloud 整合LCN

3.1,下載下傳LCN工程;

在LCN的github下載下傳:https://github.com/codingapi/tx-lcn/

3.2,配置tx-manager事務協調器

修改其屬性檔案: (修改下載下傳事務協調伺服器的端口、接入的服務注冊中心、使用的redis庫等的叢集或單點配置)

#######################################txmanager-start#################################################
#服務端口
server.port=8899

#tx-manager不得修改
spring.application.name=tx-manager

spring.mvc.static-path-pattern=/**
spring.resources.static-locations=classpath:/static/
#######################################txmanager-end#################################################


#zookeeper位址
#spring.cloud.zookeeper.connect-string=127.0.0.1:2181
#spring.cloud.zookeeper.discovery.preferIpAddress = true

#eureka 位址
eureka.client.service-url.defaultZone=http://eurekaserver1:/eureka/,http://eurekaserver2:8082/eureka/,http://eurekaserver3:/eureka/
eureka.instance.prefer-ip-address=true

#######################################redis-start#################################################
#redis 配置檔案,根據情況選擇叢集或者單機模式

##redis 叢集環境配置
##redis cluster
#spring.redis.cluster.nodes=127.0.0.1:7001,127.0.0.1:7002,127.0.0.1:7003
#spring.redis.cluster.commandTimeout=5000

##redis 單點環境配置
#redis
#redis主機位址
spring.redis.host=192.168.6.211
#redis主機端口
spring.redis.port=6379
#redis連結密碼
spring.redis.password=
spring.redis.pool.maxActive=10
spring.redis.pool.maxWait=-1
spring.redis.pool.maxIdle=5
spring.redis.pool.minIdle=0
spring.redis.timeout=0
#####################################redis-end###################################################

#######################################LCN-start#################################################
#業務子產品與TxManager之間通訊的最大等待時間(機關:秒)
#通訊時間是指:發起方與響應方之間完成一次的通訊時間。
#該字段代表的是Tx-Client子產品與TxManager子產品之間的最大通訊時間,超過該時間未響應本次請求失敗。
tm.transaction.netty.delaytime = 5

#業務子產品與TxManager之間通訊的心跳時間(機關:秒)
tm.transaction.netty.hearttime = 15

#存儲到redis下的資料最大儲存時間(機關:秒)
#該字段僅代表的事務子產品資料的最大儲存時間,補償資料會永久儲存。
tm.redis.savemaxtime=30

#socket server Socket對外服務端口
#TxManager的LCN協定的端口
tm.socket.port=9999

#最大socket連接配接數
#TxManager最大允許的建立連接配接數量
tm.socket.maxconnection=100

#事務自動補償 (true:開啟,false:關閉)
# 說明:
# 開啟自動補償以後,必須要配置 tm.compensate.notifyUrl 位址,僅當tm.compensate.notifyUrl 在請求補償确認時傳回success或者SUCCESS時,才會執行自動補償,否則不會自動補償。
# 關閉自動補償,當出現資料時也會 tm.compensate.notifyUrl 位址。
# 當tm.compensate.notifyUrl 無效時,不影響TxManager運作,僅會影響自動補償。
tm.compensate.auto=false

#事務補償記錄回調位址(rest api 位址,post json格式)
#請求補償是在開啟自動補償時才會請求的位址。請求分為兩種:1.補償決策,2.補償結果通知,可通過通過action參數區分compensate為補償請求、notify為補償通知。
#*注意當請求補償決策時,需要補償服務傳回"SUCCESS"字元串以後才可以執行自動補償。
#請求補償結果通知則隻需要接受通知即可。
#請求補償的樣例資料格式:
#{"groupId":"TtQxTwJP","action":"compensate","json":"{\"address\":\"133.133.5.100:8081\",\"className\":\"com.example.demo.service.impl.DemoServiceImpl\",\"currentTime\":1511356150413,\"data\":\"C5IBLWNvbS5leGFtcGxlLmRlbW8uc2VydmljZS5pbXBsLkRlbW9TZXJ2aWNlSW1wbAwSBHNhdmUbehBqYXZhLmxhbmcuT2JqZWN0GAAQARwjeg9qYXZhLmxhbmcuQ2xhc3MYABABJCo/cHVibGljIGludCBjb20uZXhhbXBsZS5kZW1vLnNlcnZpY2UuaW1wbC5EZW1vU2VydmljZUltcGwuc2F2ZSgp\",\"groupId\":\"TtQxTwJP\",\"methodStr\":\"public int com.example.demo.service.impl.DemoServiceImpl.save()\",\"model\":\"demo1\",\"state\":0,\"time\":36,\"txGroup\":{\"groupId\":\"TtQxTwJP\",\"hasOver\":1,\"isCompensate\":0,\"list\":[{\"address\":\"133.133.5.100:8899\",\"isCompensate\":0,\"isGroup\":0,\"kid\":\"wnlEJoSl\",\"methodStr\":\"public int com.example.demo.service.impl.DemoServiceImpl.save()\",\"model\":\"demo2\",\"modelIpAddress\":\"133.133.5.100:8082\",\"channelAddress\":\"/133.133.5.100:64153\",\"notify\":1,\"uniqueKey\":\"bc13881a5d2ab2ace89ae5d34d608447\"}],\"nowTime\":0,\"startTime\":1511356150379,\"state\":1},\"uniqueKey\":\"be6eea31e382f1f0878d07cef319e4d7\"}"}
#請求補償的傳回資料樣例資料格式:
#SUCCESS
#請求補償結果通知的樣例資料格式:
#{"resState":true,"groupId":"TtQxTwJP","action":"notify"}
tm.compensate.notifyUrl=http://ip:port/path

#補償失敗,再次嘗試間隔(秒),最大嘗試次數3次,當超過3次即為補償失敗,失敗的資料依舊還會存在TxManager下。
tm.compensate.tryTime=30

#各事務子產品自動補償的時間上限(毫秒)
#指的是子產品執行自動逾時的最大時間,該最大時間若過段會導緻事務機制異常,該時間必須要子產品之間通訊的最大超過時間。
#例如,若子產品A與子產品B,請求逾時的最大時間是5秒,則建議改時間至少大于5秒。
tm.compensate.maxWaitTime=5000
#######################################LCN-end#################################################

logging.level.com.codingapi=debug
           

3.3 啟動事務協調者

啟動事務協調者,讓事務協調者注入進入eureka;(注意,配置中的redis等必須正常啟動)

啟動成功後,檢查tx-manager協調者,見下圖:

Spring Cloud LCN 分布式事務

3.4 事務參與方配置

假定:事務參與方已經是正常運作的服務提供者。樣例中的資料庫是mysql,連接配接池采用druid;

3.4.1 pom檔案引入LCN db插件和springcloud支援:

<properties>
        <lcn.last.version>4.1.0</lcn.last.version>
    </properties>


        <!-- 引入LCN-->
        <dependency>
            <groupId>com.codingapi</groupId>
            <artifactId>transaction-springcloud</artifactId>
            <version>${lcn.last.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.codingapi</groupId>
            <artifactId>tx-plugins-db</artifactId>
            <version>${lcn.last.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
           

3.4.2 yml添加配置

tm:
  manager:
    url: http://127.0.0.1:8899/tx/manager/
           

3.4.3 添加TxManagerTxUrlService到spring中

package com.mark.springcloud.service.impl;

import com.codingapi.tx.config.service.TxManagerTxUrlService;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

/**
 * 添加從注冊中心擷取url;注意通過注解放入容器。
 */
@Service
public class TxManagerTxUrlServiceImpl implements TxManagerTxUrlService{
    @Value("${tm.manager.url}")
    private String url;
    @Override
    public String getTxUrl() {
        return url;
    }
}
           

3.4.4 事務參與方服務:

package com.mark.springcloud.service.impl;

import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import com.codingapi.tx.annotation.ITxTransaction;
import com.mark.springcloud.dao.DeptDao;
import com.mark.springcloud.entities.Dept;
import com.mark.springcloud.service.DeptService;
/**
 * 注意需要實作 ITxTransaction;
 */
@Service
public class DeptServiceImpl implements DeptService, ITxTransaction {
    @Autowired
    private DeptDao dao;

    //注意需要開啟事務
    @Override
    @Transactional
    public boolean add(Dept dept) {
        boolean rtnValue = dao.addDept(dept);
        return rtnValue;
    }
}
           

3.4.5 啟動事務參與方

啟動spring boot應用。

3.5 事務發起方配置

正常情況下,一個服務一般即可能是事務的發起方也是事務的參與方。(3.4.1-3.4.3在測試事務發起方、參與方都是同樣配置。是以直接略過,隻描述發起方特有代碼)

3.5.1 參照樣例,實作TxManagerHttpRequestService

package com.mark.springcloud.controller;

import com.codingapi.tx.netty.service.TxManagerHttpRequestService;
import com.lorne.core.framework.utils.http.HttpUtils;
import org.springframework.stereotype.Service;

/**
 * 常見TxManagerHttpRequestService重寫get、post方法;
 */

@Service
public class TxManagerHttpRequestServiceImpl implements TxManagerHttpRequestService{

    @Override
    public String httpGet(String url) {
        System.out.println("httpGet-start");
        String res = HttpUtils.get(url);
        System.out.println("httpGet-end");
        return res;
    }

    @Override
    public String httpPost(String url, String params) {
        System.out.println("httpPost-start");
        String res = HttpUtils.post(url,params);
        System.out.println("httpPost-end");
        return res;
    }
}
           

3.5.2 事務發起方服務處理

package com.mark.springcloud.controller;

import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.codingapi.tx.annotation.TxTransaction;
import com.mark.springcloud.entities.Dept;
import com.mark.springcloud.service.DeptClientService;

@RestController
public class DeptController_Consumer
{
    @Autowired
    private DeptClientService service;
    //@TxTransaction(isStart = true)注解修飾該方法為事務發起方,開啟事務組。
    @TxTransaction(isStart = true)
    @RequestMapping(value = "/consumer/dept/add")
    public Object add(Dept dept)
    {
        Object rtnObj = this.service.add(dept);
        int x = (int)(Math.random()*);
        //事務發起方随機數小于5時,抛出異常,則事務參與方事務會復原。否則正常執行,事務參與方事務正常送出。
        if (x < ) {
            int m = /;
        }
        return rtnObj;
    }
}
           

3.5.3 啟動事務發起方

啟動spring boot 應用。

4,測試事務

調用事務發起方服務,事務正常受事務協調者控制,當發起方和參與方都正常執行無異常時,事務正常送出,否則復原。

4,總結

Spring Cloud 內建LCN進行分布式事務控制使用簡單,整個原理也很清晰。但是必須意識到,分布式事務在微服務環境下不可避免的有較大開銷,請盡量采用通過消息隊列實作的最終一緻性設計。

繼續閱讀