目录
一、用户开启事务示例
1.GlobalTransaction的api方式
[email protected]注解方式
二、TransactionManager
三、DefaultTransactionManager
四、TransactionManagerHolder
五、GlobalTransaction
六、DefaultGlobalTransaction
七、GlobalTransactionContext
一、用户开启事务示例
seata提供了两种方式供用户开启分布式事务管理,一种是具有代码嵌入的api方式,另一种则是常用的注解方式
1.GlobalTransaction的api方式
public static void main(String[] args) throws SQLException, TransactionException, InterruptedException {
String userId = "U100001";
String commodityCode = "C00321";
int commodityCount = 100;
int money = 999;
AccountService accountService = new AccountServiceImpl();
StorageService storageService = new StorageServiceImpl();
OrderService orderService = new OrderServiceImpl();
orderService.setAccountService(accountService);
//reset data
accountService.reset(userId, String.valueOf(money));
storageService.reset(commodityCode, String.valueOf(commodityCount));
orderService.reset(null, null);
//init seata; only once
String applicationId = "api";
String txServiceGroup = "my_test_tx_group";
TMClient.init(applicationId, txServiceGroup);
RMClient.init(applicationId, txServiceGroup);
//trx
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
try {
tx.begin(60000, "testBiz");
System.out.println("begin trx, xid is " + tx.getXid());
//biz operate 3 dataSources
//set >=5 will be rollback(200*5>999) else will be commit
int opCount = 5;
storageService.deduct(commodityCode, opCount);
orderService.create(userId, commodityCode, opCount);
//check data if negative
boolean needCommit = ((StorageServiceImpl)storageService).validNegativeCheck("count", commodityCode)
&& ((AccountServiceImpl)accountService).validNegativeCheck("money", userId);
//if data negative rollback else commit
if (needCommit) {
tx.commit();
} else {
System.out.println("rollback trx, cause: data negative, xid is " + tx.getXid());
tx.rollback();
}
} catch (Exception exx) {
System.out.println("rollback trx, cause: " + exx.getMessage() + " , xid is " + tx.getXid());
tx.rollback();
throw exx;
}
TimeUnit.SECONDS.sleep(10);
}
[email protected]注解方式
@GlobalTransactional(timeoutMills = 300000, name = "dubbo-demo-tx")
public void purchase(String userId, String commodityCode, int orderCount) {
LOGGER.info("purchase begin ... xid: " + RootContext.getXID());
storageService.deduct(commodityCode, orderCount);
orderService.create(userId, commodityCode, orderCount);
throw new RuntimeException("xxx");
}
下面我们分析它是如何通过TransactionManager和GlobalTransaction实现的
二、TransactionManager
TransactionManager接口提供4个方法,开启全球事务,提交全球事务,回滚全球事务和获取当前事务的状态。
public interface TransactionManager {
String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException;
GlobalStatus commit(String xid) throws TransactionException;
GlobalStatus rollback(String xid) throws TransactionException;
GlobalStatus getStatus(String xid) throws TransactionException;
}
三、DefaultTransactionManager
DefaultTransactionManager实现TransactionManager接口,4个方法分别创建对应的请求对象,调用TmRpcClient客户端使用netty连接将数据传给TC。
public class DefaultTransactionManager implements TransactionManager {
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
GlobalBeginRequest request = new GlobalBeginRequest();
request.setTransactionName(name);
request.setTimeout(timeout);
GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request);
return response.getXid();
}
@Override
public GlobalStatus commit(String xid) throws TransactionException {
GlobalCommitRequest globalCommit = new GlobalCommitRequest();
globalCommit.setXid(xid);
GlobalCommitResponse response = (GlobalCommitResponse)syncCall(globalCommit);
return response.getGlobalStatus();
}
@Override
public GlobalStatus rollback(String xid) throws TransactionException {
GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();
globalRollback.setXid(xid);
GlobalRollbackResponse response = (GlobalRollbackResponse)syncCall(globalRollback);
return response.getGlobalStatus();
}
@Override
public GlobalStatus getStatus(String xid) throws TransactionException {
GlobalStatusRequest queryGlobalStatus = new GlobalStatusRequest();
queryGlobalStatus.setXid(xid);
GlobalStatusResponse response = (GlobalStatusResponse)syncCall(queryGlobalStatus);
return response.getGlobalStatus();
}
private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
try {
return (AbstractTransactionResponse)TmRpcClient.getInstance().sendMsgWithResponse(request);
} catch (TimeoutException toe) {
throw new TransactionException(TransactionExceptionCode.IO, toe);
}
}
}
四、TransactionManagerHolder
TransactionManagerHolder为创建单例TransactionManager的工厂,可以使用EnhancedServiceLoader的spi机制加载用户自定义的类,默认为DefaultTransactionManager。
public class TransactionManagerHolder {
private static class SingletonHolder {
private static TransactionManager INSTANCE = null;
static {
try {
INSTANCE = EnhancedServiceLoader.load(TransactionManager.class);
LOGGER.info("TransactionManager Singleton " + INSTANCE);
} catch (Throwable anyEx) {
LOGGER.error("Failed to load TransactionManager Singleton! ", anyEx);
}
}
}
public static TransactionManager get() {
if (SingletonHolder.INSTANCE == null) {
throw new ShouldNeverHappenException("TransactionManager is NOT ready!");
}
return SingletonHolder.INSTANCE;
}
private TransactionManagerHolder() {
}
}
五、GlobalTransaction
GlobalTransaction接口提供给用户开启事务(超时时间,全局事务名称),提交,回滚,获取状态方法。
public interface GlobalTransaction {
void begin() throws TransactionException;
void begin(int timeout) throws TransactionException;
void begin(int timeout, String name) throws TransactionException;
void commit() throws TransactionException;
void rollback() throws TransactionException;
GlobalStatus getStatus() throws TransactionException;
String getXid();
}
六、DefaultGlobalTransaction
DefaultGlobalTransaction是GlobalTransaction接口的默认实现,它持有TransactionManager对象,默认开启事务超时时间为60秒,默认名称为default,因为调用者的业务方法可能多重嵌套创建多个GlobalTransaction对象开启事务方法,因此
GlobalTransaction有GlobalTransactionRole角色属性,只有Launcher角色的才有开启、提交、回滚事务的权利。
public class DefaultGlobalTransaction implements GlobalTransaction {
private static final int DEFAULT_GLOBAL_TX_TIMEOUT = 60000;
private static final String DEFAULT_GLOBAL_TX_NAME = "default";
private TransactionManager transactionManager;
private String xid;
private GlobalStatus status;
private GlobalTransactionRole role;
DefaultGlobalTransaction() {
this(null, GlobalStatus.UnKnown, GlobalTransactionRole.Launcher);
}
DefaultGlobalTransaction(String xid, GlobalStatus status, GlobalTransactionRole role) {
this.transactionManager = TransactionManagerHolder.get();
this.xid = xid;
this.status = status;
this.role = role;
}
@Override
public void begin(int timeout, String name) throws TransactionException {
// Launcher角色才可以开启
if (role != GlobalTransactionRole.Launcher) {
check();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Begin(): just involved in global transaction [" + xid + "]");
}
return;
}
if (xid != null) {
throw new IllegalStateException();
}
if (RootContext.getXID() != null) {
throw new IllegalStateException();
}
// 委派给transactionManager开启事务
xid = transactionManager.begin(null, null, name, timeout);
status = GlobalStatus.Begin;
RootContext.bind(xid);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Begin new global transaction [" + xid + "]");
}
}
@Override
public void commit() throws TransactionException {
// Launcher角色才可以提交
if (role == GlobalTransactionRole.Participant) {
// Participant has no responsibility of committing
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Commit(): just involved in global transaction [" + xid + "]");
}
return;
}
if (xid == null) {
throw new IllegalStateException();
}
status = transactionManager.commit(xid);
if (RootContext.getXID() != null) {
if (xid.equals(RootContext.getXID())) {
RootContext.unbind();
}
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("[" + xid + "] commit status:" + status);
}
}
@Override
public void rollback() throws TransactionException {
// Launcher角色才可以回滚
if (role == GlobalTransactionRole.Participant) {
// Participant has no responsibility of committing
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Rollback(): just involved in global transaction [" + xid + "]");
}
return;
}
if (xid == null) {
throw new IllegalStateException();
}
status = transactionManager.rollback(xid);
if (RootContext.getXID() != null) {
if (xid.equals(RootContext.getXID())) {
RootContext.unbind();
}
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("[" + xid + "] rollback status:" + status);
}
}
@Override
public GlobalStatus getStatus() throws TransactionException {
if (xid == null) {
return GlobalStatus.UnKnown;
}
status = transactionManager.getStatus(xid);
return status;
}
@Override
public String getXid() {
return xid;
}
// Participant角色assert,当前线程应该有xid信息。
private void check() {
if (xid == null) {
throw new ShouldNeverHappenException();
}
}
}
七、GlobalTransactionContext
GlobalTransactionContext为操作GlobalTransaction的工具类,提供创建新的GlobalTransaction,获取当前线程有的GlobalTransaction等方法。
public class GlobalTransactionContext {
private GlobalTransactionContext() {
}
// 创建新的DefaultGlobalTransaction
private static GlobalTransaction createNew() {
GlobalTransaction tx = new DefaultGlobalTransaction();
return tx;
}
// 获取绑定到当前线程的GlobalTransaction实例,如果当前线程没有,返回null
private static GlobalTransaction getCurrent() {
String xid = RootContext.getXID();
if (xid == null) {
return null;
}
return new DefaultGlobalTransaction(xid, GlobalStatus.Begin, GlobalTransactionRole.Participant);
}
// 获取绑定到当前线程的GlobalTransaction实例,如果当前线程没有,创建实例
public static GlobalTransaction getCurrentOrCreate() {
GlobalTransaction tx = getCurrent();
if (tx == null) {
return createNew();
}
return tx;
}
// 通过给的xid,重新加载GlobalTransaction
public static GlobalTransaction reload(String xid) throws TransactionException {
GlobalTransaction tx = new DefaultGlobalTransaction(xid, GlobalStatus.UnKnown, GlobalTransactionRole.Launcher) {
@Override
public void begin(int timeout, String name) throws TransactionException {
throw new IllegalStateException("Never BEGIN on a RELOADED GlobalTransaction. ");
}
};
return tx;
}
}