天天看点

Seata源码分析之Resource

目录

一、Resource

二、TCCResource

三、DataSourceProxy

四、ConnectionProxy

五、ConnectionContext

六、StatementProxy

七、PreparedStatementProxy

八、ExecuteTemplate

一、Resource

Resource能被ResourceManager管理并且能够关联globalTransaction。

public interface Resource {

    // 主从的datasource应该有相同的groupId
    String getResourceGroupId();

    // 当前datasource或者tccResourceId的resourceId
    String getResourceId();

    // 获取resource的类型, 有AT、TCC两种
    BranchType getBranchType();
}
           

二、TCCResource

TCCResource实现Resource接口,为用户提供复杂业务sql自定义手动模式

public class TCCResource implements Resource {

    private String resourceGroupId = "DEFAULT";
    private String appName;
    private String actionName;
    private Object targetBean;
    private Method prepareMethod;
    private String commitMethodName;
    private Method commitMethod;
    private String rollbackMethodName;
    private Method rollbackMethod;

    @Override
    public String getResourceGroupId() {
        return resourceGroupId;
    }

    @Override
    public String getResourceId() {
        return actionName;
    }

    @Override
    public BranchType getBranchType() {
        return BranchType.TCC;
    }
}
           

三、DataSourceProxy

DataSourceProxy实现Resource接口,BranchType为AT自动模式。它继承AbstractDataSourceProxy代理类,所有的DataSource相关的方法调用传入的targetDataSource代理类的方法,除了创建connection方法为创建ConnectionProxy代理类。对象初始化时获取连接的jdbcUrl作为resourceId,并注册至DefaultResourceManager进行管理。同时还提供获取原始连接不被代理的getPlainConnection方法。

public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {

    private String resourceGroupId;

    private static final String DEFAULT_RESOURCE_GROUP_ID = "DEFAULT";

    private String jdbcUrl;

    private String dbType;

    public DataSourceProxy(DataSource targetDataSource, String resourceGroupId) {
        super(targetDataSource);
        init(targetDataSource, resourceGroupId);
    }

    private void init(DataSource dataSource, String resourceGroupId) {
        this.resourceGroupId = resourceGroupId;
        try (Connection connection = dataSource.getConnection()) {
            jdbcUrl = connection.getMetaData().getURL();
            dbType = JdbcUtils.getDbType(jdbcUrl, null);
        } catch (SQLException e) {
            throw new IllegalStateException("can not init dataSource", e);
        }
        DefaultResourceManager.get().registerResource(this);
    }

    public Connection getPlainConnection() throws SQLException {
        return targetDataSource.getConnection();
    }

    public String getDbType() {
        return dbType;
    }

    @Override
    public ConnectionProxy getConnection() throws SQLException {
        Connection targetConnection = targetDataSource.getConnection();
        return new ConnectionProxy(this, targetConnection);
    }

    @Override
    public ConnectionProxy getConnection(String username, String password) throws SQLException {
        Connection targetConnection = targetDataSource.getConnection(username, password);
        return new ConnectionProxy(this, targetConnection);
    }

    @Override
    public String getResourceGroupId() {
        return resourceGroupId;
    }

    @Override
    public String getResourceId() {
        if (jdbcUrl.contains("?")) {
            return jdbcUrl.substring(0, jdbcUrl.indexOf("?"));
        } else {
            return jdbcUrl;
        }
    }

    @Override
    public BranchType getBranchType() {
        return BranchType.AT;
    }
}
           

四、ConnectionProxy

ConnectionProxy继承AbstractConnectionProxy代理类,创建Statement对象方法为创建StatementProxy类,创建PreparedStatement对象方法为创建PreparedStatementProxy代理类。持有ConnectionContext对象,保存当前connection的相关context数据。主要分析它的commit,rollback和setAutoCommit方法。

public class ConnectionProxy extends AbstractConnectionProxy {

    private ConnectionContext context = new ConnectionContext();
    private static final int DEFAULT_REPORT_RETRY_COUNT = 5;
  
    public ConnectionProxy(DataSourceProxy dataSourceProxy, Connection targetConnection) {
        super(dataSourceProxy, targetConnection);
    }

    public ConnectionContext getContext() {
        return context;
    }

    public void bind(String xid) {
        context.bind(xid);
    }

    public void setGlobalLockRequire(boolean isLock) {
        context.setGlobalLockRequire(isLock);
    }

    public boolean isGlobalLockRequire() {
        return context.isGlobalLockRequire();
    }

    // 检查lockKeys是否锁住query
    public void checkLock(String lockKeys) throws SQLException {
        // Just check lock without requiring lock by now.
        try {
            boolean lockable = DefaultResourceManager.get().lockQuery(BranchType.AT, getDataSourceProxy().getResourceId(), context.getXid(), lockKeys);
            if (!lockable) {
                throw new LockConflictException();
            }
        } catch (TransactionException e) {
            recognizeLockKeyConflictException(e);
        }
    }

    public void appendUndoLog(SQLUndoLog sqlUndoLog) {
        context.appendUndoItem(sqlUndoLog);
    }

    public void appendLockKey(String lockKey) {
        context.appendLockKey(lockKey);
    }

    // connection事务提交
    public void commit() throws SQLException {
        // 如果开启了全球事务
        if (context.inGlobalTransaction()) {
	    // 处理全球事务提交逻辑
            processGlobalTransactionCommit();
        } else if (context.isGlobalLockRequire()) {
	    // 如果设置了全球锁,处理全球锁提交逻辑
            processLocalCommitWithGlobalLocks();
        } else {
	    // 默认代理连接直接提交
            targetConnection.commit();
        }
    }

    // 处理全球锁提交逻辑
    private void processLocalCommitWithGlobalLocks() throws SQLException {
       // 检查锁先
        checkLock(context.buildLockKeys());
        try {
            targetConnection.commit();
        } catch (Throwable ex) {
            throw new SQLException(ex);
        }
        context.reset();
    }

    // 处理全球事务提交逻辑
    private void processGlobalTransactionCommit() throws SQLException {
        try {
	     // 注册
            register();
        } catch (TransactionException e) {
            recognizeLockKeyConflictException(e);
        }

        try {
            if (context.hasUndoLog()) {
	        // 如果有UndoLogs,则调用UndoLogManager异步删除UndoLogs
                UndoLogManager.flushUndoLogs(this);
            }
	    // 代理连接提交
            targetConnection.commit();
        } catch (Throwable ex) {
	    // 提交报错,报告第一步提交失败错误,让其他分支数据回滚
            report(false);
            if (ex instanceof SQLException) {
                throw new SQLException(ex);
            }
        }
	// 报告成功
        report(true);
        context.reset();
    }

    // 分支注册,获取branchId
    private void register() throws TransactionException {
        Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
                null, context.getXid(), null, context.buildLockKeys());
        context.setBranchId(branchId);
    }

    // connection事务回滚调用
    public void rollback() throws SQLException {
        // 数据直接回滚
        targetConnection.rollback();
	// 如果开启了全球事务,且注册了当前分支至当前全球事务管理中
	// 提交报错,报告第一步提交失败错误,让其他分支数据回滚
        if (context.inGlobalTransaction()) {
            if (context.isBranchRegistered()) {
                report(false);
            }
        }
        context.reset();
    }

    @Override
    public void setAutoCommit(boolean autoCommit) throws SQLException {
        // change autocommit from false to true, we should commit() first according to JDBC spec.
        if ((autoCommit) && !getAutoCommit()) {
            commit();
        }
        targetConnection.setAutoCommit(autoCommit);
    }
 
    // 报告当前分支事务是否成功,让TC发起其他分支回滚或提交请求,重试次数默认为5次
    private void report(boolean commitDone) throws SQLException {
        int retry = REPORT_RETRY_COUNT;
        while (retry > 0) {
            try {
                DefaultResourceManager.get().branchReport(BranchType.AT, context.getXid(), context.getBranchId(),
                        (commitDone ? BranchStatus.PhaseOne_Done : BranchStatus.PhaseOne_Failed), null);
                return;
            } catch (Throwable ex) {
                LOGGER.error("Failed to report [" + context.getBranchId() + "/" + context.getXid() + "] commit done ["
                        + commitDone + "] Retry Countdown: " + retry);
                retry--;

                if (retry == 0) {
                    throw new SQLException("Failed to report branch status " + commitDone, ex);
                }
            }
        }
    }
}
           

五、ConnectionContext

ConnectionContext持有当前连接数据对象,xid当前全球事务唯一id,没有全球事务为null,branchId当前事务连接分支,受全球事务管控。isGlobalLockRequire锁,lockKeysBuffer当前操作的表和主键字符串拼接而成的字符串集合。SQLUndoLog当前业务处理生成的待回滚日志。

public class ConnectionContext {
    private String xid;
    private Long branchId;
    private boolean isGlobalLockRequire;
    //table and primary key should not be duplicated
    private Set<String> lockKeysBuffer = new HashSet<>();
    private List<SQLUndoLog> sqlUndoItemsBuffer = new ArrayList<>();

    boolean isGlobalLockRequire() {
        return isGlobalLockRequire;
    }

    void setGlobalLockRequire(boolean isGlobalLockRequire) {
        this.isGlobalLockRequire = isGlobalLockRequire;
    }

    void appendLockKey(String lockKey) {
        lockKeysBuffer.add(lockKey);
    }

    void appendUndoItem(SQLUndoLog sqlUndoLog) {
        sqlUndoItemsBuffer.add(sqlUndoLog);
    }

    public boolean inGlobalTransaction() {
        return xid != null;
    }

    public boolean isBranchRegistered() {
        return branchId != null;
    }

    void bind(String xid) {
        if (xid == null) {
            throw new IllegalArgumentException("xid should not be null");
        }
        if (!inGlobalTransaction()) {
            setXid(xid);
        } else {
            if (!this.xid.equals(xid)) {
                throw new ShouldNeverHappenException();
            }
        }
    }

    public boolean hasUndoLog() {
        return sqlUndoItemsBuffer.size() > 0;
    }

    public String getXid() {
        return xid;
    }
    void setXid(String xid) {
        this.xid = xid;
    }

    public Long getBranchId() {
        return branchId;
    }

    void setBranchId(Long branchId) {
        this.branchId = branchId;
    }

    void reset(){
        this.reset(null);
    }

    void reset(String xid) {
        this.xid = xid;
        branchId = null;
        this.isGlobalLockRequire = false;
        lockKeysBuffer.clear();
        sqlUndoItemsBuffer.clear();
    }

    public String buildLockKeys() {
        if (lockKeysBuffer.isEmpty()) {
            return null;
        }
        StringBuilder appender = new StringBuilder();
        Iterator<String> iterable = lockKeysBuffer.iterator();
        while (iterable.hasNext()) {
            appender.append(iterable.next());
            if (iterable.hasNext()) {
                appender.append(";");
            }
        }
        return appender.toString();
    }

    public List<SQLUndoLog> getUndoItems() {
        return sqlUndoItemsBuffer;
    }
}
           

六、StatementProxy

StatementProxy继承AbstractStatementProxy类,泛型继承Statement,可以为PreparedStatement或者为CallableStatement,目前只提供了PreparedStatement的实现,未提供CallableStatement调用存储过程的实现。AbstractStatementProxy代理statement的方法,只有当executeBatch方法如果是开启了全球事务,则不支持。StatementProxy的executeQuery,executeUpdate和execute执行sql方法,都是调用ExecuteTemplate的execute模板方法,对sql进行解析,添加日志处理等工作。StatementCallback回调方法为原生statement的对应方法。

public class StatementProxy<T extends Statement> extends AbstractStatementProxy<T> {

    public StatementProxy(AbstractConnectionProxy connectionWrapper, T targetStatement, String targetSQL)
        throws SQLException {
        super(connectionWrapper, targetStatement, targetSQL);
    }

    public StatementProxy(AbstractConnectionProxy connectionWrapper, T targetStatement) throws SQLException {
        this(connectionWrapper, targetStatement, null);
    }

    @Override
    public ConnectionProxy getConnectionProxy() {
        return (ConnectionProxy) super.getConnectionProxy();
    }

    @Override
    public ResultSet executeQuery(String sql) throws SQLException {
        this.targetSQL = sql;
        return ExecuteTemplate.execute(this, new StatementCallback<ResultSet, T>() {
            @Override
            public ResultSet execute(Statement statement, Object... args) throws SQLException {
                return statement.executeQuery((String) args[0]);
            }
        }, sql);
    }

    @Override
    public int executeUpdate(String sql) throws SQLException {
        this.targetSQL = sql;
        return ExecuteTemplate.execute(this, new StatementCallback<Integer, T>() {
            @Override
            public Integer execute(Statement statement, Object... args) throws SQLException {
                return statement.executeUpdate((String) args[0]);
            }
        }, sql);
    }

    @Override
    public boolean execute(String sql) throws SQLException {
        this.targetSQL = sql;
        return ExecuteTemplate.execute(this, new StatementCallback<Boolean, T>() {
            @Override
            public Boolean execute(T statement, Object... args) throws SQLException {
                return statement.execute((String) args[0]);
            }
        }, sql);
    }

    @Override
    public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
        this.targetSQL = sql;
        return ExecuteTemplate.execute(this, new StatementCallback<Integer, T>() {
            @Override
            public Integer execute(T statement, Object... args) throws SQLException {
                return statement.executeUpdate((String) args[0],(int)args[1]);
            }
        }, sql,autoGeneratedKeys);
    }

    @Override
    public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
        this.targetSQL = sql;
        return ExecuteTemplate.execute(this, new StatementCallback<Integer, T>() {
            @Override
            public Integer execute(T statement, Object... args) throws SQLException {
                return statement.executeUpdate((String) args[0],(int [])args[1]);
            }
        }, sql,columnIndexes);
    }

    @Override
    public int executeUpdate(String sql, String[] columnNames) throws SQLException {
        this.targetSQL = sql;
        return ExecuteTemplate.execute(this, new StatementCallback<Integer, T>() {
            @Override
            public Integer execute(T statement, Object... args) throws SQLException {
                return statement.executeUpdate((String) args[0],(String[])args[1]);
            }
        }, sql,columnNames);
    }

    @Override
    public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
        this.targetSQL = sql;
        return ExecuteTemplate.execute(this, new StatementCallback<Boolean, T>() {
            @Override
            public Boolean execute(T statement, Object... args) throws SQLException {
                return statement.execute((String) args[0],(int)args[1]);
            }
        }, sql,autoGeneratedKeys);
    }

    @Override
    public boolean execute(String sql, int[] columnIndexes) throws SQLException {
        this.targetSQL = sql;
        return ExecuteTemplate.execute(this, new StatementCallback<Boolean, T>() {
            @Override
            public Boolean execute(T statement, Object... args) throws SQLException {
                return statement.execute((String) args[0],(int[])args[1]);
            }
        }, sql,columnIndexes);
    }

    @Override
    public boolean execute(String sql, String[] columnNames) throws SQLException {
        this.targetSQL = sql;
        return ExecuteTemplate.execute(this, new StatementCallback<Boolean, T>() {
            @Override
            public Boolean execute(T statement, Object... args) throws SQLException {
                return statement.execute((String) args[0],(String[])args[1]);
            }
        }, sql,columnNames);
    }
}
           

七、PreparedStatementProxy

PreparedStatementProxy继承AbstractPreparedStatementProxy,AbstractPreparedStatementProxy继承StatementProxy<PreparedStatement>类,实现PreparedStatement和ParametersHolder接口。ParametersHolder持有当前PreparedStatement设置的参数,为模板方法的sql解析和日志生成提供方便。执行execute,executeQuery和executeUpdate同样调用模板方法。

public class PreparedStatementProxy extends AbstractPreparedStatementProxy
    implements PreparedStatement, ParametersHolder {

    @Override
    public ArrayList<Object>[] getParameters() {
        return parameters;
    }

    private void init() throws SQLException {
        int paramCount = targetStatement.getParameterMetaData().getParameterCount();
        this.parameters = new ArrayList[paramCount];
        for (int i = 0; i < paramCount; i++) {
            parameters[i] = new ArrayList<>();
        }
    }

    public PreparedStatementProxy(AbstractConnectionProxy connectionProxy, PreparedStatement targetStatement,
                                  String targetSQL) throws SQLException {
        super(connectionProxy, targetStatement, targetSQL);
        init();
    }

    @Override
    public boolean execute() throws SQLException {
        return ExecuteTemplate.execute(this, new StatementCallback<Boolean, PreparedStatement>() {
            @Override
            public Boolean execute(PreparedStatement statement, Object... args) throws SQLException {
                return statement.execute();
            }
        });
    }

    @Override
    public ResultSet executeQuery() throws SQLException {
        return ExecuteTemplate.execute(this, new StatementCallback<ResultSet, PreparedStatement>() {
            @Override
            public ResultSet execute(PreparedStatement statement, Object... args) throws SQLException {
                return statement.executeQuery();
            }
        });
    }

    @Override
    public int executeUpdate() throws SQLException {
        return ExecuteTemplate.execute(this, new StatementCallback<Integer, PreparedStatement>() {
            @Override
            public Integer execute(PreparedStatement statement, Object... args) throws SQLException {
                return statement.executeUpdate();
            }
        });
    }
}
           

八、ExecuteTemplate

ExecuteTemplate为具体statement的execute,executeQuery和executeUpdate执行提供模板方法。

public class ExecuteTemplate {

    public static <T, S extends Statement> T execute(StatementProxy<S> statementProxy,
                                                     StatementCallback<T, S> statementCallback,
                                                     Object... args) throws SQLException {
        return execute(null, statementProxy, statementCallback, args);
    }

    public static <T, S extends Statement> T execute(SQLRecognizer sqlRecognizer,
                                                     StatementProxy<S> statementProxy,
                                                     StatementCallback<T, S> statementCallback,
                                                     Object... args) throws SQLException {

        // 如果当前context没有开启全球事务,而且没有全球锁,则回调targetStatement执行原生方法
        if (!RootContext.inGlobalTransaction() && !RootContext.requireGlobalLock()) {
            // Just work as original statement
            return statementCallback.execute(statementProxy.getTargetStatement(), args);
        }
	// 通过SQLVisitorFactory找到sqlRecognizer
        if (sqlRecognizer == null) {
            sqlRecognizer = SQLVisitorFactory.get(
                    statementProxy.getTargetSQL(),
                    statementProxy.getConnectionProxy().getDbType());
        }
        Executor<T> executor = null;
        if (sqlRecognizer == null) {
            executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
        } else {
	    // 根据不同的sql类型获取不同的Executor执行器。
            switch (sqlRecognizer.getSQLType()) {
                case INSERT:
                    executor = new InsertExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                case UPDATE:
                    executor = new UpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                case DELETE:
                    executor = new DeleteExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                case SELECT_FOR_UPDATE:
                    executor = new SelectForUpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                default:
                    executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
                    break;
            }
        }
        T rs = null;
        try {
	    // 调用执行器执行返回结果
            rs = executor.execute(args);
        } catch (Throwable ex) {
            if (!(ex instanceof SQLException)) {
                // Turn other exception into SQLException
                ex = new SQLException(ex);
            }
            throw (SQLException)ex;
        }
        return rs;
    }
}
           

继续阅读