天天看点

seata源码解析:事务状态及全局锁的存储

seata源码解析:事务状态及全局锁的存储

事务状态的存储

在seata中,无论你使用哪种事务模式,都会将全局事务状态和分支事务状态存储下来。有三种存储模式可供你选择,db,redis,file。

本文只会详细分析db这一种存储模式,其他的类似。

当使用db这一种模式时,seata server都会将全局事务的状态存在global_table表中,将分支事务的状态存在branch_table表中,如下所示

-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
    `xid`                       VARCHAR(128) NOT NULL,
    `transaction_id`            BIGINT,
    `status`                    TINYINT      NOT NULL,
    `application_id`            VARCHAR(32),
    `transaction_service_group` VARCHAR(32),
    `transaction_name`          VARCHAR(128),
    `timeout`                   INT,
    `begin_time`                BIGINT,
    `application_data`          VARCHAR(2000),
    `gmt_create`                DATETIME,
    `gmt_modified`              DATETIME,
    PRIMARY KEY (`xid`),
    KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
    KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
    `branch_id`         BIGINT       NOT NULL,
    `xid`               VARCHAR(128) NOT NULL,
    `transaction_id`    BIGINT,
    `resource_group_id` VARCHAR(32),
    `resource_id`       VARCHAR(256),
    `branch_type`       VARCHAR(8),
    `status`            TINYINT,
    `client_id`         VARCHAR(64),
    `application_data`  VARCHAR(2000),
    `gmt_create`        DATETIME(6),
    `gmt_modified`      DATETIME(6),
    PRIMARY KEY (`branch_id`),
    KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;      

global_table表中status表示的含义为全局事务的状态,对应的枚举类为GlobalStatus

branch_table表中status表示的含义为分支事务的状态,对应的枚举类为BranchStatus

当事务发生问题时,我们就可以通过global_table和branch_table的status字段判断事务是哪一步执行失败了。

全局事务的状态表

seata源码解析:事务状态及全局锁的存储

分支事务的状态表

seata源码解析:事务状态及全局锁的存储

状态解释来自官网:http://seata.io/zh-cn/docs/user/appendix/global-transaction-status.html

源码分析

前面的文章我们说过当执行如下命令时,会调用Server.java中的main方法

seata-server.sh -p 8091      
public class Server {

    public static void main(String[] args) throws IOException {

        // 省略部分代码...

        // SessionHolder负责事务日志的持久化存储
        // 设置存储模式,有三种可选类型,file,db,redis
        SessionHolder.init(parameterParser.getStoreMode());

        // 创建事务协调器
        DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);
        // 初始化5个定时任务
        coordinator.init();

        // 省略部分代码...
        System.exit(0);
    }
}      

SessionHolder#init根据传入的存储模式初始化4个不同的SessionManager,SessionManager是用来存储全局事务和分支事务状态的。其中全局事务用GlobalSession来表示,分支事务用BranchSession来表示,一个GlobalSession包含多个BranchSession

// 保存了所有的GlobalSession
private static SessionManager ROOT_SESSION_MANAGER;
// 需要异步commit的GlobalSession
private static SessionManager ASYNC_COMMITTING_SESSION_MANAGER;
// 需要重试commit的GlobalSession
private static SessionManager RETRY_COMMITTING_SESSION_MANAGER;
// 需要重试roollback的GlobalSession
private static SessionManager RETRY_ROLLBACKING_SESSION_MANAGER;      

在init方法中通过SPI的方式实例化对应的类

// SessionHolder
public static void init(String mode) {
    if (StringUtils.isBlank(mode)) {
        mode = CONFIG.getConfig(ConfigurationKeys.STORE_MODE);
    }
    StoreMode storeMode = StoreMode.get(mode);
    if (StoreMode.DB.equals(storeMode)) {
        // 通过spi加载SessionManager
        ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName());
        ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
            new Object[] {ASYNC_COMMITTING_SESSION_MANAGER_NAME});
        RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
            new Object[] {RETRY_COMMITTING_SESSION_MANAGER_NAME});
        RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
            new Object[] {RETRY_ROLLBACKING_SESSION_MANAGER_NAME});
    } else if (StoreMode.FILE.equals(storeMode)) {
        // 省略其他存储方式的加载逻辑
    }
    // 删除已经完成的GlobalSession
    reload(storeMode);
}      

4种类型的SessionManager都是同一个实例,只是调用的构造方法不同而已。以DataBaseSessionManager为例,ROOT_SESSION_MANAGER调用了无参数构造函数,而其他SessionManager传入了taskName属性

为什么要搞4个SessionManager?

其实就是用不同的SessionManager管理不同状态的任务,这样逻辑比较清晰。

当不同的SessionManager调用allSessions方法时,返回的就是对应状态的GlobalSession,逻辑比较清晰

DataBaseSessionManager#allSessions

seata源码解析:事务状态及全局锁的存储

我们来看一下SessionManager的继承关系

seata源码解析:事务状态及全局锁的存储

SessionLifecycleListener看接口名字就是基于观察者模式设计的,当GlobalSession状态发生改变的时候,会发布通知给监听者,然后监听者做相应动作。目前SessionLifecycleListener接口的实现类只有各种SessionManager,当收到状态改变的通知时,将其状态存储下来

AbstractSessionManager则是一个抽象类,当SessionLifecycleListener接口方法被回调时,调用SessionManager定义的动作方法

public abstract class AbstractSessionManager implements SessionManager, SessionLifecycleListener {

    protected TransactionStoreManager transactionStoreManager;

    // 省略部分代码
    // 重写了SessionManager接口方法
    @Override
    public void addGlobalSession(GlobalSession session) throws TransactionException {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("MANAGER[" + name + "] SESSION[" + session + "] " + LogOperation.GLOBAL_ADD);
        }
        writeSession(LogOperation.GLOBAL_ADD, session);
    }

    // 重写了SessionManager接口方法
    @Override
    public void updateGlobalSessionStatus(GlobalSession session, GlobalStatus status) throws TransactionException {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("MANAGER[" + name + "] SESSION[" + session + "] " + LogOperation.GLOBAL_UPDATE);
        }
        writeSession(LogOperation.GLOBAL_UPDATE, session);
    }


    // 重写了SessionLifecycleListener接口方法
    @Override
    public void onBegin(GlobalSession globalSession) throws TransactionException {
        addGlobalSession(globalSession);
    }

    // 重写了SessionLifecycleListener接口方法
    @Override
    public void onStatusChange(GlobalSession globalSession, GlobalStatus status) throws TransactionException {
        updateGlobalSessionStatus(globalSession, status);
    }

}      

可以看到AbstractSessionManager并没有实现SessionManager接口的方法,而是直接抛出异常,说明具体的存储逻辑交给子类来实现了

AbstractSessionManager有3个实现类,说明seata支持3种存储模式。而最终存储的工作是交给TransactionStoreManager来实现的

seata源码解析:事务状态及全局锁的存储

这些增加事务和事务状态变化的持久化操作非常简单,就是执行插入sql和更新sql

说回我们的启动流程,在DefaultCoordinator#init方法中,初始化5个定时任务

  1. retryRollbacking:分支事务回滚失败时,不断重试
  2. retryCommitting:分支事务提交失败时,不断重试
  3. asyncCommitting:执行异步commit,用在at模式,因为at模式的commit操作其实就是删除undolog,可以异步执行
  4. timeoutCheck:当事务处于开始状态,将状态设置为超时回滚,将其放入重试回滚管理器,让其回滚全局事务
  5. undoLogDelete:向rm端发送请求,删除7天(默认)之前的undolog
seata源码解析:事务状态及全局锁的存储

这些重试操作执行的逻辑和全局事务的提交/回滚逻辑一致,就不介绍了

// DefaultCoordinator
public void init() {
    // 重试rollback定时任务
    retryRollbacking.scheduleAtFixedRate(() -> {
        boolean lock = SessionHolder.retryRollbackingLock();
        if (lock) {
            try {
                handleRetryRollbacking();
            } catch (Exception e) {
                LOGGER.info("Exception retry rollbacking ... ", e);
            } finally {
                SessionHolder.unRetryRollbackingLock();
            }
        }
    }, 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

    // 重试commit定时任务
    retryCommitting.scheduleAtFixedRate(() -> {
        boolean lock = SessionHolder.retryCommittingLock();
        if (lock) {
            try {
                handleRetryCommitting();
            } catch (Exception e) {
                LOGGER.info("Exception retry committing ... ", e);
            } finally {
                SessionHolder.unRetryCommittingLock();
            }
        }
    }, 0, COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

    // 省略部分代码
}      

全局锁

在前面的文章中我们说过,在Seata AT模式中,我们用全局锁来避免脏写,同时也可以用全局锁将默认的隔离级别从读未提交升高为读已提交。

全局锁是存在TC端的,所以在TC端要提供相应的接口,来进行加锁,解锁,相应的资源是否被加锁等操作!

当分支事务提交的时候,需要把修改的资源锁定。当全局事务提交后才会把相应的资源解锁。

各种事务的加锁状态会存在lock_table表中

-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
    `row_key`        VARCHAR(128) NOT NULL, -- resource_id+table_name+pk的组合
    `xid`            VARCHAR(128),
    `transaction_id` BIGINT,
    `branch_id`      BIGINT       NOT NULL,
    `resource_id`    VARCHAR(256),
    `table_name`     VARCHAR(32),
    `pk`             VARCHAR(36),
    `gmt_create`     DATETIME,
    `gmt_modified`   DATETIME,
    PRIMARY KEY (`row_key`),
    KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;      

用个例子演示一下全局锁的工作流程,

假如account_info表中有如下2个账户数据,其中id是主键

id user_id
1 1001
2 1002

假如说执行如下sql

update account_info set balance = 100      

当进行commit的时候,需要加全局锁,此时根据update语句构建出select语句,找出影响的主键,即1和2

通过主键构建出lockKey=account_info:1,2

lockKey的构建规则如下

如果主键是一列(id列),则形式如下

// 表名:主键值1,主键值2
account_info:1,2      

如果主键是多列(id列+user_id列),则形式如下

// 表名:主键值1_主键值a,主键值1_主键值b
account_info:1_1001,2_1002      

当然有可能一个事务中,有可能有多个表,多个表的构建规则如下(中间通过;分隔即可)

// 表名1:主键值1,主键值2;表名2:主键值1,主键值2
account_flow:1,2;account_info:1,2      

加锁

当加锁是会往lock_table中假如如下2个记录(省略无关的列)

row_key xid
jdbc:mysql://myhost:3306/db_account_1^^^account_info^^^1 127.21.0.14:18091:6449339005964652705
jdbc:mysql://myhost:3306/db_account_2^^^account_info^^^1 127.21.0.14:18091:6449339005964652705

row_key的构建规则如下

rowKey = resourceId + "^^^" + tableName + "^^^" +      

tcc模式下resourceId为@TwoPhaseBusinessAction注解的name属性

而在at和xa模式中resourceId都为数据库连接url

解锁

根据xid和row_key删除记录

查询是否能加锁

根据row_key从lock_table中查询记录,如果记录中的xid和查询传递过来的xid不一致则加锁失败,如果没有记录或者记录的xid和传递过来的xid一致,则加锁成功

seata源码解析:事务状态及全局锁的存储

源码分析

TC端定义的锁操作接口

public interface LockManager {

    // 对分支事务的资源加锁
    boolean acquireLock(BranchSession branchSession) throws TransactionException;

    // 对分支事务的资源解锁
    boolean releaseLock(BranchSession branchSession) throws TransactionException;

    // 对全局事务中的所有分支事务的资源解锁
    boolean releaseGlobalSessionLock(GlobalSession globalSession) throws TransactionException;

    // 根据xid resourceId lockKey 查询是否已经加锁
    boolean isLockable(String xid, String resourceId, String lockKey) throws TransactionException;

    // 清除所有锁
    void cleanAllLocks() throws TransactionException;

}      

老规矩,我们还是只分析db这种存储模式,最终的加解锁操作会交给Locker

seata源码解析:事务状态及全局锁的存储

加锁

// AbstractLockManager
public boolean acquireLock(BranchSession branchSession) throws TransactionException {
    if (branchSession == null) {
        throw new IllegalArgumentException("branchSession can't be null for memory/file locker.");
    }
    String lockKey = branchSession.getLockKey();
    if (StringUtils.isNullOrEmpty(lockKey)) {
        // no lock
        return true;
    }
    // get locks of branch
    // 创建 RowLock 集合,一条加锁记录对应一个RowLock对象
    List<RowLock> locks = collectRowLocks(branchSession);
    if (CollectionUtils.isEmpty(locks)) {
        // no lock
        return true;
    }
    return getLocker(branchSession).acquireLock(locks);
}      

把需要加锁的资源转换成RowLock集合

protected List<RowLock> collectRowLocks(BranchSession branchSession) {
    List<RowLock> locks = new ArrayList<>();
    if (branchSession == null || StringUtils.isBlank(branchSession.getLockKey())) {
        return locks;
    }
    String xid = branchSession.getXid();
    // 得到资源id,也就是数据库连接url
    String resourceId = branchSession.getResourceId();
    long transactionId = branchSession.getTransactionId();

    String lockKey = branchSession.getLockKey();

    return collectRowLocks(lockKey, resourceId, xid, transactionId, branchSession.getBranchId());
}      

转换RowLock的过程,基本就是对lockKey的解析过程,

protected List<RowLock> collectRowLocks(String lockKey, String resourceId, String xid, Long transactionId,
                                        Long branchID) {
    List<RowLock> locks = new ArrayList<RowLock>();

    // 对多个记录加锁,中间使用;分隔
    String[] tableGroupedLockKeys = lockKey.split(";");
    for (String tableGroupedLockKey : tableGroupedLockKeys) {
        // 表名和记录主键值之间用:分隔
        int idx = tableGroupedLockKey.indexOf(":");
        if (idx < 0) {
            return locks;
        }
        // 要加锁的表名
        String tableName = tableGroupedLockKey.substring(0, idx);
        // 加锁的记录主键值,如果需要一次加锁多条记录,记录之间用,分隔
        String mergedPKs = tableGroupedLockKey.substring(idx + 1);
        if (StringUtils.isBlank(mergedPKs)) {
            return locks;
        }
        String[] pks = mergedPKs.split(",");
        if (pks == null || pks.length == 0) {
            return locks;
        }
        // 一个主键创建一个RowLock对象
        for (String pk : pks) {
            if (StringUtils.isNotBlank(pk)) {
                RowLock rowLock = new RowLock();
                rowLock.setXid(xid);
                rowLock.setTransactionId(transactionId);
                rowLock.setBranchId(branchID);
                rowLock.setTableName(tableName);
                rowLock.setPk(pk);
                rowLock.setResourceId(resourceId);
                locks.add(rowLock);
            }
        }
    }
    return locks;
}      

构造好RowLock集合,接下来就是调用DataBaseLocker执行真正的加锁操作

首先将RowLock转为LockDO

protected List<LockDO> convertToLockDO(List<RowLock> locks) {
    List<LockDO> lockDOs = new ArrayList<>();
    if (CollectionUtils.isEmpty(locks)) {
        return lockDOs;
    }
    for (RowLock rowLock : locks) {
        LockDO lockDO = new LockDO();
        lockDO.setBranchId(rowLock.getBranchId());
        lockDO.setPk(rowLock.getPk());
        lockDO.setResourceId(rowLock.getResourceId());
        // rowKey = resourceId + "^^^" + tableName + "^^^" + pk
        lockDO.setRowKey(getRowKey(rowLock.getResourceId(), rowLock.getTableName(), rowLock.getPk()));
        lockDO.setXid(rowLock.getXid());
        lockDO.setTransactionId(rowLock.getTransactionId());
        lockDO.setTableName(rowLock.getTableName());
        lockDOs.add(lockDO);
    }
    return lockDOs;
}      
// LockStoreDataBaseDAO
public boolean acquireLock(List<LockDO> lockDOs) {
    Connection conn = null;
    PreparedStatement ps = null;
    ResultSet rs = null;
    Set<String> dbExistedRowKeys = new HashSet<>();
    boolean originalAutoCommit = true;
    if (lockDOs.size() > 1) {
        // 过滤掉重复的加锁记录
        lockDOs = lockDOs.stream().filter(LambdaUtils.distinctByKey(LockDO::getRowKey)).collect(Collectors.toList());
    }
    try {
        conn = lockStoreDataSource.getConnection();
        if (originalAutoCommit = conn.getAutoCommit()) {
            conn.setAutoCommit(false);
        }
        //check lock
        StringJoiner sj = new StringJoiner(",");
        for (int i = 0; i < lockDOs.size(); i++) {
            sj.add("?");
        }
        boolean canLock = true;
        //query
        // select xid, transaction_id, branch_id, resource_id, table_name, pk, row_key, gmt_create, gmt_modified
        // from lock_table where row_key in ('?', '?', '?')
        String checkLockSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getCheckLockableSql(lockTable, sj.toString());
        ps = conn.prepareStatement(checkLockSQL);
        for (int i = 0; i < lockDOs.size(); i++) {
            ps.setString(i + 1, lockDOs.get(i).getRowKey());
        }
        rs = ps.executeQuery();
        String currentXID = lockDOs.get(0).getXid();
        while (rs.next()) {
            String dbXID = rs.getString(ServerTableColumnsName.LOCK_TABLE_XID);
            // 查出来的记录的row_key和当前的不一样,则说明被别的事务加锁了
            if (!StringUtils.equals(dbXID, currentXID)) {
                if (LOGGER.isInfoEnabled()) {
                    String dbPk = rs.getString(ServerTableColumnsName.LOCK_TABLE_PK);
                    String dbTableName = rs.getString(ServerTableColumnsName.LOCK_TABLE_TABLE_NAME);
                    Long dbBranchId = rs.getLong(ServerTableColumnsName.LOCK_TABLE_BRANCH_ID);
                    LOGGER.info("Global lock on [{}:{}] is holding by xid {} branchId {}", dbTableName, dbPk, dbXID,
                        dbBranchId);
                }
                canLock &= false;
                break;
            }
            // 已经被自己加锁的记录
            dbExistedRowKeys.add(rs.getString(ServerTableColumnsName.LOCK_TABLE_ROW_KEY));
        }

        // 有记录已经加过锁,回滚退出
        if (!canLock) {
            conn.rollback();
            return false;
        }
        // 此次需要加锁的记录
        List<LockDO> unrepeatedLockDOs = null;
        if (CollectionUtils.isNotEmpty(dbExistedRowKeys)) {
            unrepeatedLockDOs = lockDOs.stream().filter(lockDO -> !dbExistedRowKeys.contains(lockDO.getRowKey()))
                .collect(Collectors.toList());
        } else {
            unrepeatedLockDOs = lockDOs;
        }
        if (CollectionUtils.isEmpty(unrepeatedLockDOs)) {
            conn.rollback();
            return true;
        }
        //lock
        // 执行加锁操作
        if (unrepeatedLockDOs.size() == 1) {
            LockDO lockDO = unrepeatedLockDOs.get(0);
            if (!doAcquireLock(conn, lockDO)) {
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("Global lock acquire failed, xid {} branchId {} pk {}", lockDO.getXid(), lockDO.getBranchId(), lockDO.getPk());
                }
                conn.rollback();
                return false;
            }
        } else {
            if (!doAcquireLocks(conn, unrepeatedLockDOs)) {
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("Global lock batch acquire failed, xid {} branchId {} pks {}", unrepeatedLockDOs.get(0).getXid(),
                        unrepeatedLockDOs.get(0).getBranchId(), unrepeatedLockDOs.stream().map(lockDO -> lockDO.getPk()).collect(Collectors.toList()));
                }
                conn.rollback();
                return false;
            }
        }
        conn.commit();
        return true;
    } catch (SQLException e) {
        throw new StoreException(e);
    } finally {
        IOUtil.close(rs, ps);
        if (conn != null) {
            try {
                if (originalAutoCommit) {
                    conn.setAutoCommit(true);
                }
                conn.close();
            } catch (SQLException e) {
            }
        }
    }
}      

解锁

public boolean unLock(List<LockDO> lockDOs) {
    Connection conn = null;
    PreparedStatement ps = null;
    try {
        conn = lockStoreDataSource.getConnection();
        conn.setAutoCommit(true);

        StringJoiner sj = new StringJoiner(",");
        for (int i = 0; i < lockDOs.size(); i++) {
            sj.add("?");
        }
        //batch release lock
        // delete from lock_table where xid = ? and row_key in (?, ?, ?)
        String batchDeleteSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getBatchDeleteLockSql(lockTable, sj.toString());
        ps = conn.prepareStatement(batchDeleteSQL);
        ps.setString(1, lockDOs.get(0).getXid());
        for (int i = 0; i < lockDOs.size(); i++) {
            ps.setString(i + 2, lockDOs.get(i).getRowKey());
        }
        ps.executeUpdate();
    } catch (SQLException e) {
        throw new StoreException(e);
    } finally {
        IOUtil.close(ps, conn);
    }
    return true;
}      

参考博客