開篇
這篇文章的目的主要是講解RM的執行復原的Executor對象即undoExecutor,執行復原日志就是由undoExecutor去執行的。
undoExecutor源碼分析
public class UndoExecutorFactory {
public static AbstractUndoExecutor getUndoExecutor(
String dbType, SQLUndoLog sqlUndoLog) {
if (!dbType.equals(JdbcConstants.MYSQL)) {
throw new NotSupportYetException(dbType);
}
switch (sqlUndoLog.getSqlType()) {
case INSERT:
return new MySQLUndoInsertExecutor(sqlUndoLog);
case UPDATE:
return new MySQLUndoUpdateExecutor(sqlUndoLog);
case DELETE:
return new MySQLUndoDeleteExecutor(sqlUndoLog);
default:
throw new ShouldNeverHappenException();
}
}
}
說明:
- UndoExecutorFactory負責根據不同的復原日志傳回對應的undoExecutor對象。
- AbstractUndoExecutor作為復原類的抽象基類。
- MySQLUndoDeleteExecutor負責復原delete操作。
- MySQLUndoInsertExecutor負責復原insert操作。
- MySQLUndoUpdateExecutor負責復原update操作。
AbstractUndoExecutor
public abstract class AbstractUndoExecutor {
protected SQLUndoLog sqlUndoLog;
protected abstract String buildUndoSQL();
public AbstractUndoExecutor(SQLUndoLog sqlUndoLog) {
this.sqlUndoLog = sqlUndoLog;
}
public void executeOn(Connection conn) throws SQLException {
dataValidation(conn);
try {
// 拼接undoSql的模闆
String undoSQL = buildUndoSQL();
// 擷取PreparedStatement對象
PreparedStatement undoPST = conn.prepareStatement(undoSQL);
// 擷取復原的記錄
TableRecords undoRows = getUndoRows();
// 周遊所有待復原的記錄然後一條條的拼接字段
for (Row undoRow : undoRows.getRows()) {
ArrayList<Field> undoValues = new ArrayList<>();
Field pkValue = null;
for (Field field : undoRow.getFields()) {
if (field.getKeyType() == KeyType.PrimaryKey) {
pkValue = field;
} else {
undoValues.add(field);
}
}
// 針對每一條復原記錄進行準備
undoPrepare(undoPST, undoValues, pkValue);
// 執行復原操作
undoPST.executeUpdate();
}
} catch (Exception ex) {
if (ex instanceof SQLException) {
throw (SQLException) ex;
} else {
throw new SQLException(ex);
}
}
}
protected void undoPrepare(PreparedStatement undoPST,
ArrayList<Field> undoValues,
Field pkValue) throws SQLException {
int undoIndex = 0;
for (Field undoValue : undoValues) {
undoIndex++;
undoPST.setObject(undoIndex, undoValue.getValue(), undoValue.getType());
}
// PK is at last one.
// INSERT INTO a (x, y, z, pk) VALUES (?, ?, ?, ?)
// UPDATE a SET x=?, y=?, z=? WHERE pk = ?
// DELETE FROM a WHERE pk = ?
undoIndex++;
undoPST.setObject(undoIndex, pkValue.getValue(), pkValue.getType());
}
protected abstract TableRecords getUndoRows();
protected void dataValidation(Connection conn) throws SQLException {
// Validate if data is dirty.
}
}
- AbstractUndoExecutor定義了復原操作的整個指令行模闆流程。
- 拼接undoSql的模闆,buildUndoSQL()。
- 擷取PreparedStatement對象,conn.prepareStatement(undoSQL)。
- 周遊所有待復原的記錄然後一條條的拼接字段。
- 針對每一條復原記錄進行準備,undoPrepare(undoPST, undoValues, pkValue)。
- 執行復原操作,undoPST.executeUpdate()。
- buildUndoSQL()和getUndoRows()由子類具體實作。
MySQLUndoInsertExecutor
public class MySQLUndoInsertExecutor extends AbstractUndoExecutor {
@Override
protected String buildUndoSQL() {
TableRecords afterImage = sqlUndoLog.getAfterImage();
List<Row> afterImageRows = afterImage.getRows();
if (afterImageRows == null || afterImageRows.size() == 0) {
throw new ShouldNeverHappenException("Invalid UNDO LOG");
}
Row row = afterImageRows.get(0);
StringBuffer mainSQL = new StringBuffer(
"DELETE FROM " + sqlUndoLog.getTableName());
StringBuffer where = new StringBuffer(" WHERE ");
boolean first = true;
for (Field field : row.getFields()) {
if (field.getKeyType() == KeyType.PrimaryKey) {
where.append(field.getName() + " = ? ");
}
}
return mainSQL.append(where).toString();
}
@Override
protected void undoPrepare(PreparedStatement undoPST,
ArrayList<Field> undoValues, Field pkValue)
throws SQLException {
undoPST.setObject(1, pkValue.getValue(), pkValue.getType());
}
public MySQLUndoInsertExecutor(SQLUndoLog sqlUndoLog) {
super(sqlUndoLog);
}
@Override
protected TableRecords getUndoRows() {
return sqlUndoLog.getAfterImage();
}
}
- Insert的復原操作在于逆向進行delete操作,MySQLUndoInsertExecutor負責拼接delete的SQL。
- delete的SQL的where條件就是insert生成的主鍵primary key。
- 整個復原操作在父類AbstractUndoExecutor定義。
MySQLUndoDeleteExecutor
public class MySQLUndoDeleteExecutor extends AbstractUndoExecutor {
public MySQLUndoDeleteExecutor(SQLUndoLog sqlUndoLog) {
super(sqlUndoLog);
}
@Override
protected String buildUndoSQL() {
TableRecords beforeImage = sqlUndoLog.getBeforeImage();
List<Row> beforeImageRows = beforeImage.getRows();
if (beforeImageRows == null || beforeImageRows.size() == 0) {
throw new ShouldNeverHappenException("Invalid UNDO LOG");
}
Row row = beforeImageRows.get(0);
StringBuffer insertColumns = new StringBuffer();
StringBuffer insertValues = new StringBuffer();
Field pkField = null;
boolean first = true;
for (Field field : row.getFields()) {
if (field.getKeyType() == KeyType.PrimaryKey) {
pkField = field;
continue;
} else {
if (first) {
first = false;
} else {
insertColumns.append(", ");
insertValues.append(", ");
}
insertColumns.append(field.getName());
insertValues.append("?");
}
}
if (first) {
first = false;
} else {
insertColumns.append(", ");
insertValues.append(", ");
}
insertColumns.append(pkField.getName());
insertValues.append("?");
return "INSERT INTO " + sqlUndoLog.getTableName()
+ "(" + insertColumns.toString() + ")
VALUES (" + insertValues.toString() + ")";
}
@Override
protected TableRecords getUndoRows() {
return sqlUndoLog.getBeforeImage();
}
}
- Delete的復原操作在于逆向進行Insert操作,MySQLUndoDeleteExecutor負責拼接Insert的SQL。
- Insert的拼接的SQL是insert tableName (column1,column2) values (?,?).
MySQLUndoUpdateExecutor
public class MySQLUndoUpdateExecutor extends AbstractUndoExecutor {
@Override
protected String buildUndoSQL() {
TableRecords beforeImage = sqlUndoLog.getBeforeImage();
List<Row> beforeImageRows = beforeImage.getRows();
if (beforeImageRows == null || beforeImageRows.size() == 0) {
throw new ShouldNeverHappenException("Invalid UNDO LOG"); // TODO
}
Row row = beforeImageRows.get(0);
StringBuffer mainSQL = new StringBuffer(
"UPDATE " + sqlUndoLog.getTableName() + " SET ");
StringBuffer where = new StringBuffer(" WHERE ");
boolean first = true;
for (Field field : row.getFields()) {
if (field.getKeyType() == KeyType.PrimaryKey) {
where.append(field.getName() + " = ?");
} else {
if (first) {
first = false;
} else {
mainSQL.append(", ");
}
mainSQL.append(field.getName() + " = ?");
}
}
return mainSQL.append(where).toString();
}
public MySQLUndoUpdateExecutor(SQLUndoLog sqlUndoLog) {
super(sqlUndoLog);
}
@Override
protected TableRecords getUndoRows() {
return sqlUndoLog.getBeforeImage();
}
}
- update的復原操作在于逆向進行update操作,MySQLUndoUpdateExecutor負責拼接update的SQL。
- Insert的拼接的SQL是update tableName set column1=? where column=?。