開篇
這篇文章的目的是介紹ExecuteTemplate中調用的Executor的執行過程。
Executor的類圖
說明:
- BaseTransactionalExecutor提供解析SQL語句擷取中繼資料的功能。
- AbstractDMLBaseExecutor提供執行SQL的功能包括doExecute。
- Insert/Delete/UpdateExecutor提供SQL執行前後鏡像擷取功能。
InsertExecutor執行過程
- Executor的整個執行的過程(以InsertExecutor為例),整個是父類調用子類的過程。
- 對比InsertExecutor的類圖的接口能夠更好的了解整個調用過程。
Executor源碼介紹 - 執行過程
StatementProxy
public class StatementProxy<T extends Statement> extends AbstractStatementProxy<T> {
@Override
public ResultSet executeQuery(String sql) throws SQLException {
this.targetSQL = sql;
return ExecuteTemplate.execute(this, new StatementCallback<ResultSet, T>() {
@Override
public ResultSet execute(Statement statement, Object... args) throws SQLException {
return statement.executeQuery((String) args[0]);
}
}, sql);
}
}
- StatementProxy的核心調用邏輯。
- ExecuteTemplate.execute負責調用ExecuteTemplate。
- StatementCallback()負責在Executor當中被回調。
ExecuteTemplate
public class ExecuteTemplate {
public static <T, S extends Statement> T execute(StatementProxy<S> statementProxy,
StatementCallback<T, S> statementCallback,
Object... args) throws SQLException {
return execute(null, statementProxy, statementCallback, args);
}
public static <T, S extends Statement> T execute(SQLRecognizer sqlRecognizer,
StatementProxy<S> statementProxy,
StatementCallback<T, S> statementCallback,
Object... args) throws SQLException {
if (!RootContext.inGlobalTransaction()) {
// Just work as original statement
return statementCallback.execute(statementProxy.getTargetStatement(), args);
}
// sqlRecognizer 執行過程中傳入值為null,走的分支代碼
if (sqlRecognizer == null) {
// SQLVisitorFactory.get是一個核心代碼,會具體分析
sqlRecognizer = SQLVisitorFactory.get(
statementProxy.getTargetSQL(),
statementProxy.getConnectionProxy().getDbType());
}
Executor<T> executor = null;
if (sqlRecognizer == null) {
executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
} else {
switch (sqlRecognizer.getSQLType()) {
case INSERT:
executor = new InsertExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
break;
case UPDATE:
executor = new UpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
break;
case DELETE:
executor = new DeleteExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
break;
case SELECT_FOR_UPDATE:
executor = new SelectForUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
break;
default:
executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
break;
}
}
T rs = null;
try {
rs = executor.execute(args);
} catch (Throwable ex) {
if (ex instanceof SQLException) {
throw (SQLException) ex;
} else {
// Turn everything into SQLException
new SQLException(ex);
}
}
return rs;
}
}
- ExecuteTemplate 執行InsertExecutor的整個過程如上述代碼。
- 擷取SQLRecognizer sqlRecognizer對象,sqlRecognizer = SQLVisitorFactory.get()。
- 建立InsertExecutor對象,executor = new InsertExecutor()。
- 執行InsertExecutor對象并傳回結果,rs = executor.execute(args);
BaseTransactionalExecutor
public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor {
protected StatementProxy<S> statementProxy;
protected StatementCallback<T, S> statementCallback;
protected SQLRecognizer sqlRecognizer;
private TableMeta tableMeta;
public BaseTransactionalExecutor(StatementProxy<S> statementProxy,
StatementCallback<T, S> statementCallback, SQLRecognizer sqlRecognizer) {
this.statementProxy = statementProxy;
this.statementCallback = statementCallback;
this.sqlRecognizer = sqlRecognizer;
}
@Override
public Object execute(Object... args) throws Throwable {
String xid = RootContext.getXID();
statementProxy.getConnectionProxy().bind(xid);
return doExecute(args);
}
protected abstract Object doExecute(Object... args) throws Throwable;
}
- 關注BaseTransactionalExecutor的構造函數。
- BaseTransactionalExecutor的execute是執行的入口函數,調用抽象方法doExecute()
- BaseTransactionalExecutor的doExecute()在子類中實作。
AbstractDMLBaseExecutor
public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractDMLBaseExecutor.class);
public AbstractDMLBaseExecutor(StatementProxy<S> statementProxy,
StatementCallback<T, S> statementCallback, SQLRecognizer sqlRecognizer) {
super(statementProxy, statementCallback, sqlRecognizer);
}
@Override
public T doExecute(Object... args) throws Throwable {
AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
if (connectionProxy.getAutoCommit()) {
return executeAutoCommitTrue(args);
} else {
return executeAutoCommitFalse(args);
}
}
protected T executeAutoCommitFalse(Object[] args) throws Throwable {
TableRecords beforeImage = beforeImage();
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
TableRecords afterImage = afterImage(beforeImage);
prepareUndoLog(beforeImage, afterImage);
return result;
}
protected T executeAutoCommitTrue(Object[] args) throws Throwable {
T result = null;
AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
LockRetryController lockRetryController = new LockRetryController();
try {
connectionProxy.setAutoCommit(false);
while (true) {
try {
result = executeAutoCommitFalse(args);
connectionProxy.commit();
break;
} catch (LockConflictException lockConflict) {
connectionProxy.getTargetConnection().rollback();
lockRetryController.sleep(lockConflict);
}
}
} catch (Exception e) {
// when exception occur in finally,this exception will lost, so just print it here
LOGGER.error("exception occur", e);
throw e;
} finally {
connectionProxy.setAutoCommit(true);
}
return result;
}
}
- BaseTransactionalExecutor的調用子類AbstractDMLBaseExecutor的doExecute方法。
- AbstractDMLBaseExecutor的doExecute()執行executeAutoCommitFalse或executeAutoCommitTrue方法。
- executeAutoCommitTrue方法内部也是通過executeAutoCommitFalse實作的。
- executeAutoCommitFalse()按照儲存beforeImage()、statementCallback.execute()、afterImage()、prepareUndoLog()順序執行。
- beforeImage()負責儲存執行前鏡像、afterImage()負責儲存執行後鏡像、prepareUndoLog()負責儲存復原日志。
- statementCallback.execute()負責執行statement操作。
InsertExecutor
public class InsertExecutor<T, S extends Statement> extends AbstractDMLBaseExecutor<T, S> {
public InsertExecutor(StatementProxy statementProxy,
StatementCallback statementCallback, SQLRecognizer sqlRecognizer) {
super(statementProxy, statementCallback, sqlRecognizer);
}
@Override
protected TableRecords beforeImage() throws SQLException {
return TableRecords.empty(getTableMeta());
}
@Override
protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {
SQLInsertRecognizer recogizier = (SQLInsertRecognizer)sqlRecognizer;
List<String> insertColumns = recogizier.getInsertColumns();
TableMeta tmeta = getTableMeta();
TableRecords afterImage = null;
if (tmeta.containsPK(insertColumns)) {
// insert values including PK
List<Object> pkValues = null;
String pk = tmeta.getPkName();
for (int paramIdx = 0; paramIdx < insertColumns.size(); paramIdx++) {
if (insertColumns.get(paramIdx).equalsIgnoreCase(pk)) {
if (statementProxy instanceof PreparedStatementProxy) {
pkValues = ((PreparedStatementProxy) statementProxy).getParamsByIndex(paramIdx);
} else {
List<List<Object>> insertRows = recogizier.getInsertRows();
pkValues = new ArrayList<>(insertRows.size());
for (List<Object> row : insertRows) {
pkValues.add(row.get(paramIdx));
}
}
break;
}
}
if (pkValues == null) {
throw new ShouldNeverHappenException();
}
afterImage = getTableRecords(pkValues);
} else {
// PK is just auto generated
Map<String, ColumnMeta> pkMetaMap = getTableMeta().getPrimaryKeyMap();
if (pkMetaMap.size() != 1) {
throw new NotSupportYetException();
}
ColumnMeta pkMeta = pkMetaMap.values().iterator().next();
if (!pkMeta.isAutoincrement()) {
throw new ShouldNeverHappenException();
}
ResultSet genKeys = null;
try {
genKeys = statementProxy.getTargetStatement().getGeneratedKeys();
} catch (SQLException e) {
// java.sql.SQLException: Generated keys not requested. You need to
// specify Statement.RETURN_GENERATED_KEYS to
// Statement.executeUpdate() or Connection.prepareStatement().
if ("S1009".equalsIgnoreCase(e.getSQLState())) {
genKeys = statementProxy.getTargetStatement().executeQuery("SELECT LAST_INSERT_ID()");
} else {
throw e;
}
}
List<Object> pkValues = new ArrayList<>();
while (genKeys.next()) {
Object v = genKeys.getObject(1);
pkValues.add(v);
}
afterImage = getTableRecords(pkValues);
}
if (afterImage == null) {
throw new SQLException("Failed to build after-image for insert");
}
return afterImage;
}
}
- InsertExecutor主要實作的功能是beforeImage()和afterImage()。
- beforeImage()儲存插入前鏡像。
- afterImage()儲存插入後鏡像。
期待
該篇文章把Executor的執行過程講解清楚了,後續針對Executor中涉及的通用功能代碼進行介紹。