分布式事務中間件對⽐與選擇
- tx-lcn
- EasyTransaction
- ByteTCC
- Seata
Seata實作分布式事務
我們主要以Seata的分布式事務架構進行介紹分析,相關的并且針對于其三種模式進行分别說明介紹。
搭建Seata Server
前往https://github.com/seata/seata/releases 下載下傳Seata安裝包,本書使⽤Seata 1.0.0。将⽬錄切換⾄Seata根⽬錄,根據作業系統,執⾏對應指令,即可啟動Seata Server。
Linux/Unix/Mac
sh ./bin/seata-server.sh
Windows
bin\seata-server.bat
啟動時,也可指定參數
$ sh ./bin/seata-server.sh -p 8091 -h 127.0.0.1 -m file
⽀持的參數如下表所示
Seata AT模式
- ⼀階段:業務資料和復原⽇志記錄在同⼀個本地事務中送出,釋放本地鎖和連接配接資源。
- ⼆階段:送出異步化,⾮常快速地完成。復原通過⼀階段的復原⽇志進⾏反向補償。
官⽅⽂檔
http://seata.io/zh-cn/docs/dev/mode/at-mode.html
代碼示範
Maven依賴
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
配置
seata:
tx-service-group: content-center-seata-service-group
service:
vgroup-mapping:
content-center-seata-service-group: seata-cluster
grouplist:
seata-cluster: 127.0.0.1:8091
disable-global-transaction: false
配置說明
- tx-service-group:事務分組,預設是 ${spring.application.name}-seata-service-group ,唯⼀即可。
- vgroup-mapping:事務分組映射,表示 tx-service-group 對應到哪個Seata Server叢集。
- key是tx-service-group的值,value是叢集名稱,唯⼀即可
- grouplist:叢集中所包含的Seata Server的位址清單,key是vgroup-mapping中value的值,
- value是Seata Server的位址清單
- disable-global-transaction:是否開啟全局事務開關,預設false。
在Seata1.0.0中,該配置⽆法正常讀取,這是⼀個Bug,詳⻅ https://github.com/seata/seata/issues/2114 ,好在,該配置的預設值就是false,是以不影響使⽤。
建立Seata事務記錄表
-- auto-generated definition
create table undo_log (
id bigint auto_increment comment 'increment id' primary key,
branch_id bigint not null comment 'branch transaction id',
xid varchar(100) not null comment 'global transaction id',
context varchar(128) not null comment 'undo_log context,such as serialization',
rollback_info longblob not null comment 'rollback info',
log_status int not null comment '0:normal status,1:defense status',
log_created datetime not null comment 'create datetime',
log_modified datetime not null comment 'modify datetime',
constraint ux_undo_log
unique (xid, branch_id)
) comment 'AT transaction mode undo table' charset = utf8;
Controller代碼:
private final ShareSeataService shareSeataService;
@PutMapping("/audit/seata1/{id}")
public Share auditByIdSeata1(@PathVariable Integer id, @RequestBody ShareAuditDTO auditDTO) {
return this.shareSeataService.auditById(id, auditDTO);
}
Service代碼:
- 稽核中心服務代碼(含調用使用者中心代碼接口)
@Slf4j
@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class ShareSeataService {
private final ShareMapper shareMapper;
private final UserCenterFeignClient userCenterFeignClient;
@GlobalTransactional(rollbackFor = Exception.class)
public Share auditById(Integer id, ShareAuditDTO auditDTO) {
if (AuditStatusEnum.PASS.equals(auditDTO.getAuditStatusEnum())) {
userCenterFeignClient.addBonus(id, 50);
// 故意抛異常,如果⽤戶中⼼側也能復原,說明實作了分布式事務
// throw new IllegalArgumentException("發⽣異常...");
}
this.auditByIdInDB(id, auditDTO);
return this.shareMapper.selectByPrimaryKey(id);
}
- 稽核中心服務代碼(執行操作更新資料庫機制)
public void auditByIdInDB(Integer id, ShareAuditDTO auditDTO) {
Share share = Share.builder().id(id).auditStatus(auditDTO.getAuditStatusEnum().toString()).reason(auditDTO.getReason())
.build();
this.shareMapper.updateByPrimaryKeySelective(share);
}
@GlobalTransactional 注解⽤來建立分布式事務。
被調⽤⽅代碼
Maven依賴
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
配置
seata:
tx-service-group: user-center-seata-service-group
service:
vgroup-mapping:
user-center-seata-service-group: seata-cluster
grouplist:
seata-cluster: 127.0.0.1:8091
disable-global-transaction: false
可以看出來,差距主要展現在tx-service-group的值。
Controller代碼:
@GetMapping("/add-bonus/{id}/{bonus}")
public User addBonus(@PathVariable Integer id, @PathVariable Integer bonus) {
this.userService.addBonus(id, bonus);
return this.userService.findById(id);
}
Service代碼:
@Slf4j
@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class UserService {
private final UserMapper userMapper;
private final BonusEventLogMapper bonusEventLogMapper;
public User findById(Integer id) {
// select * from user where id = #{id}
return this.userMapper.selectByPrimaryKey(id);
}
public void addBonus(Integer userId, Integer bonus) {
// 1. 為⽤戶加積分
User user = this.userMapper.selectByPrimaryKey(userId);
user.setBonus(user.getBonus() + bonus);
this.userMapper.updateByPrimaryKeySelective(user);
// 2. 記錄⽇志到bonus_event_log表⾥⾯
this.bonusEventLogMapper.insert(BonusEventLog.builder().userId(userId).value(bonus).event("CONTRIBUTE").createTime(new Date()).description("投稿加積分..").build());
log.info("積分添加完畢...");
}
}
Seata TCC模式
- ⼀階段 prepare ⾏為
- ⼆階段 commit 或 rollback ⾏為
需要實作的3個⽅法:
- ⼀階段:
- ⽤于業務預處理的⽅法,即 Try 階段、的⽅法,⽐如當機⽤戶的部分餘額等等;
- ⼆階段:
- ⽤于送出業務的⽅法,即 Commit ⽅法,⽐如扣除⽤戶之前當機的部分餘額;
- ⽤于復原業務的⽅法,即 Rollback ⽅法,⽐如返還之前當機的⽤戶餘額;
官⽅⽂檔
http://seata.io/zh-cn/docs/dev/mode/tcc-mode.html
代碼示範
行為操作接口
@LocalTCC
public interface TccActionOne {
@TwoPhaseBusinessAction(name = "TccActionOne", commitMethod = "commit", rollbackMethod = "rollback")
boolean prepare(BusinessActionContext actionContext, int a);
boolean commit(BusinessActionContext actionContext);
boolean rollback(BusinessActionContext actionContext);
}
接口實作類
- 實作類1
@Component
public class TccActionOneImpl implements TccActionOne {
@Override
public boolean prepare(BusinessActionContext actionContext, int a) {
// 這⾥是本地玩的,也可以調⽤其他微服務的接⼝
String xid = actionContext.getXid();
System.out.println("TccActionOne prepare, xid:" + xid);
return true;
}
@Override
public boolean commit(BusinessActionContext actionContext) {
// 這⾥是本地玩的,也可以調⽤其他微服務的接⼝
String xid = actionContext.getXid();
System.out.println("TccActionOne commit, xid:" + xid);
ResultHolder.setActionOneResult(xid, "T");
return true;
}
@Override
public boolean rollback(BusinessActionContext actionContext) {
// 這⾥是本地玩的,也可以調⽤其他微服務的接⼝
String xid = actionContext.getXid();
System.out.println("TccActionOne rollback, xid:" + xid);
ResultHolder.setActionOneResult(xid, "R");
return true;
}
}
@LocalTCC
public interface TccActionTwo {
@TwoPhaseBusinessAction(name = "TccActionTwo", commitMethod = "commit", rollbackMethod = "rollback")
boolean prepare(BusinessActionContext actionContext, int a);
boolean commit(BusinessActionContext actionContext);
boolean rollback(BusinessActionContext actionContext);
}
- 實作類2
@Component
public class TccActionTwoImpl implements TccActionTwo {
@Override
public boolean prepare(BusinessActionContext actionContext, String b) {
// 這⾥是本地玩的,也可以調⽤其他微服務的接⼝
String xid = actionContext.getXid();
System.out.println("TccActionTwo prepare, xid:" + xid);
return true;
}
@Override
public boolean commit(BusinessActionContext actionContext) {
// 這⾥是本地玩的,也可以調⽤其他微服務的接⼝
String xid = actionContext.getXid();
System.out.println("TccActionTwo commit, xid:" + xid);
ResultHolder.setActionTwoResult(xid, "T");
return true;
}
@Override
public boolean rollback(BusinessActionContext actionContext) {
// 這⾥是本地玩的,也可以調⽤其他微服務的接⼝
String xid = actionContext.getXid();
System.out.println("TccActionTwo rollback, xid:" + xid);
ResultHolder.setActionTwoResult(xid, "R");
return true;
}
}
- 聚合實作服務業務實作類執行
@Service
public class ShareSeataService{
@Autowired
TccActionOne tccActionOne;
@Autowired
TccActionTwo tccActionTwo;
@GlobalTransactional
public void tccTransactionCommit(Map<String, String> paramMap) {
//第一個TCC 事務參與者
boolean result = tccActionOne.prepare(null, "one");
if (!result) {
paramMap.put("xid",RootContext.getXID());
throw new RuntimeException("TccActionOne failed.");
}
List list = new ArrayList();
list.add("c1");
list.add("c2");
result = tccActionTwo.prepare(null, "two");
if (!result) {
paramMap.put("xid",RootContext.getXID());
throw new RuntimeException("TccActionTwo failed.");
}
paramMap.put("xid",RootContext.getXID());
return ;
}
}
// 復原的代碼相似,就不寫了
- 執行調用點
@Slf4j
@RestController
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class ShareAdminController {
private final ShareSeataService shareSeataService;
@GetMapping("tcc-commit")
public String tccTransactionCommit() {
Map<String, String> map = new HashMap<>();
this.shareSeataService.tccTransactionCommit(map);
String xid = map.get("xid");
// 結果T
return ResultHolder.getActionOneResult(xid);
}
@GetMapping("/tcc-rollback")
public String tccTransactionRollback() {
Map<String, String> map = new HashMap<>();
try {
this.shareSeataService.tccTransactionRollback(map);
} catch (Throwable t) {
log.warn("事務復原..", t);
}
String xid = map.get("xid");
// 結果R
return ResultHolder.getActionOneResult(xid);
}
}
定義狀态機⽂件:
{
"Name": "reduceInventoryAndBalance",
"Comment": "reduce inventory then reduce balance in a transaction",
"StartState": "ReduceInventory",
"Version": "0.0.1",
"States": {
"ReduceInventory": {
"Type": "ServiceTask",
"ServiceName": "inventoryAction",
"ServiceMethod": "reduce",
"CompensateState": "CompensateReduceInventory",
"Next": "ChoiceState",
"Input": [
"$.[businessKey]",
"$.[count]"
],
"Output": {
"reduceInventoryResult": "$.#root"
},
"Status": {
"#root == true": "SU",
"#root == false": "FA",
"$Exception{java.lang.Throwable}": "UN"
}
},
"ChoiceState": {
"Type": "Choice",
"Choices": [
{
"Expression": "[reduceInventoryResult] == true",
"Next": "ReduceBalance"
}
],
"Default": "Fail"
},
"ReduceBalance": {
"Type": "ServiceTask",
"ServiceName": "balanceAction",
"ServiceMethod": "reduce",
"CompensateState": "CompensateReduceBalance",
"Input": [
"$.[businessKey]",
"$.[amount]",
{
"throwException": "$.[mockReduceBalanceFail]"分布式事務27
}
],
"Output": {
"compensateReduceBalanceResult": "$.#root"
},
"Status": {
"#root == true": "SU",
"#root == false": "FA",
"$Exception{java.lang.Throwable}": "UN"
},
"Catch": [
{
"Exceptions": [
"java.lang.Throwable"
],
"Next": "CompensationTrigger"
}
],
"Next": "Succeed"
},
"CompensateReduceInventory": {
"Type": "ServiceTask",
"ServiceName": "inventoryAction",
"ServiceMethod": "compensateReduce",
"Input": [
"$.[businessKey]"
]
},
"CompensateReduceBalance": {
"Type": "ServiceTask",
"ServiceName": "balanceAction",
"ServiceMethod": "compensateReduce",
"Input": [
"$.[businessKey]"
]
},
"CompensationTrigger": {
"Type": "CompensationTrigger",
"Next": "Fail"
},
"Succeed": {
"Type": "Succeed"
},
"Fail": {
"Type": "Fail",
"ErrorCode": "PURCHASE_FAILED",
"Message": "purchase failed"
}
}
}
測試代碼:
public class LocalSagaTransactionStarter {
public static void main(String[] args) {
AbstractApplicationContext applicationContext = new ClassPathXmlApplication
Context(new String[] {"spring/seata-saga.xml"});
StateMachineEngine stateMachineEngine = (StateMachineEngine) applicationCon
text.getBean("stateMachineEngine");
transactionCommittedDemo(stateMachineEngine);
transactionCompensatedDemo(stateMachineEngine);
new ApplicationKeeper(applicationContext).keep();
}
private static void transactionCommittedDemo(StateMachineEngine stateMachineEng
ine) {
Map<String, Object> startParams = new HashMap<>(3);
String businessKey = String.valueOf(System.currentTimeMillis());
startParams.put("businessKey", businessKey);
startParams.put("count", 10);
startParams.put("amount", new BigDecimal("100"));
//sync test
StateMachineInstance inst = stateMachineEngine.startWithBusinessKey("reduce
InventoryAndBalance", null, businessKey, startParams);
Assert.isTrue(ExecutionStatus.SU.equals(inst.getStatus()), "saga transactio
n execute failed. XID: " + inst.getId());
System.out.println("saga transaction commit succeed. XID: " + inst.getId())
;
inst = stateMachineEngine.getStateMachineConfig().getStateLogStore().getSta
teMachineInstanceByBusinessKey(businessKey, null);
Assert.isTrue(ExecutionStatus.SU.equals(inst.getStatus()), "saga transactio
n execute failed. XID: " + inst.getId());
//async test
businessKey = String.valueOf(System.currentTimeMillis());
inst = stateMachineEngine.startWithBusinessKeyAsync("reduceInventoryAndBala
nce", null, businessKey, startParams, CALL_BACK);
waittingForFinish(inst);
Assert.isTrue(ExecutionStatus.SU.equals(inst.getStatus()), "saga transactio
n execute failed. XID: " + inst.getId());
分布式事務
29
System.out.println("saga transaction commit succeed. XID: " + inst.getId())
;
}
private static void transactionCompensatedDemo(StateMachineEngine stateMachineE
ngine) {
Map<String, Object> startParams = new HashMap<>(4);
String businessKey = String.valueOf(System.currentTimeMillis());
startParams.put("businessKey", businessKey);
startParams.put("count", 10);
startParams.put("amount", new BigDecimal("100"));
startParams.put("mockReduceBalanceFail", "true");
//sync test
StateMachineInstance inst = stateMachineEngine.startWithBusinessKey("reduce
InventoryAndBalance", null, businessKey, startParams);
//async test
businessKey = String.valueOf(System.currentTimeMillis());
inst = stateMachineEngine.startWithBusinessKeyAsync("reduceInventoryAndBala
nce", null, businessKey, startParams, CALL_BACK);
waittingForFinish(inst);
Assert.isTrue(ExecutionStatus.SU.equals(inst.getCompensationStatus()), "sag
a transaction compensate failed. XID: " + inst.getId());
System.out.println("saga transaction compensate succeed. XID: " + inst.getI
d());
}
private static volatile Object lock = new Object();
private static AsyncCallback CALL_BACK = new AsyncCallback() {
@Override
public void onFinished(ProcessContext context, StateMachineInstance stateMach
ineInstance) {
synchronized (lock){
lock.notifyAll();
}
}
@Override
public void onError(ProcessContext context, StateMachineInstance stateMachine
Instance, Exception exp) {
synchronized (lock){
lock.notifyAll();
}
}
};
private static void waittingForFinish(StateMachineInstance inst){
synchronized (lock){
if(ExecutionStatus.RU.equals(inst.getStatus())){
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
Seata saga模式
Saga 事務:最終一緻性
方案簡介
處理流程
Saga事務基本協定
- 每個 Saga 事務由一系列幂等的有序子事務(sub-transaction) Ti 組成。
- 每個 Ti 都有對應的幂等補償動作 Ci,補償動作用于撤銷 Ti 造成的結果。
- 可以看到,和 TCC 相比,Saga 沒有“預留”動作,它的 Ti 就是直接送出到庫。
代碼實作
// 接⼝略
public class BalanceActionImpl implements BalanceAction {
private static final Logger LOGGER = LoggerFactory.getLogger(BalanceActionImpl.class);
@Override
public boolean reduce(String businessKey, BigDecimal amount, Map<String, Object> params) {
if(params != null && "true".equals(params.get("throwException"))){
throw new RuntimeException("reduce balance failed");
}
LOGGER.info("reduce balance succeed, amount: " + amount + ", businessKey:" + businessKey);
return true;
}
@Override
public boolean compensateReduce(String businessKey, Map<String, Object> params) {
if(params != null && "true".equals(params.get("throwException"))){
throw new RuntimeException("compensate reduce balance failed");
}
LOGGER.info("compensate reduce balance succeed, businessKey:" + businessKey);
return true;
}
}
// 接⼝略
public class InventoryActionImpl implements InventoryAction {
private static final Logger LOGGER = LoggerFactory.getLogger(InventoryActionImpl.class);
@Override
public boolean reduce(String businessKey, int count) {
LOGGER.info("reduce inventory succeed, count: " + count + ", businessKey:" + businessKey);
return true;
}
@Override
public boolean compensateReduce(String businessKey) {
LOGGER.info("compensate reduce inventory succeed, businessKey:" + businessKey);
return true;
}
}
Seata XA模式
官⽅⽂檔
代碼示範
- 添加Seata的依賴 & 配置
- 你平時的JDBC代碼怎麼寫,依然怎麼寫。
4種模式對⽐與選擇
AT
優勢:
- 使⽤簡單:對業務侵⼊性⼩
缺點:
- 性能中等
- 有全局鎖
适⽤場景:
- 适⽤于對性能沒有特别⾼的要求的場景
- 适⽤于不希望對業務進⾏改造的場景
TCC
優勢
- 性能會⽐ AT 模式⾼很多
缺點
-
适⽤場景:
适⽤于核⼼系統等對性能有很⾼要求的場景
Saga
優勢:
- ⼀階段送出本地資料庫事務,⽆鎖,⾼性能;
- 參與者可以采⽤事務驅動異步執⾏,⾼吞吐;
- 補償服務即正向服務的“反向”,易于了解,易于實作;
缺點:
- Saga 模式由于⼀階段已經送出本地資料庫事務,且沒有進⾏“預留”動作,是以不能保證隔離性。後續會講到對于缺乏隔離性的應對措施。
适⽤場景:
- 業務流程⻓/多
- 參與者包含其他公司或遺留系統服務,⽆法提供 TCC 模式要求的三個接⼝
- 典型業務系統:如⾦融⽹絡(與外部⾦融機構對接)、互聯⽹微貸、管道整合、分布式架構服務
- 內建等業務系統
- 銀⾏業⾦融機構使⽤⼴泛
XA
優勢:
- ⽆侵⼊
缺點:
- 性能較差
- 需要資料庫⽀持XA
适⽤場景:
- 強⼀緻性的解決⽅案,适⽤于對⼀緻性要求⾮常⾼的場景(使⽤較少)