源碼版本為1.3.0
分支事務執行前,需要把分支注冊到seata server
代碼 AbstractCore # branchRegister
@Override
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,
String applicationData, String lockKeys) throws TransactionException {
GlobalSession globalSession = assertGlobalSessionNotNull(xid, false);
return SessionHolder.lockAndExecute(globalSession, () -> {
globalSessionStatusCheck(globalSession);
globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,
applicationData, lockKeys, clientId);
branchSessionLock(globalSession, branchSession);
try {
globalSession.addBranch(branchSession);
最重要的一步就是擷取該分支事務的全局鎖
如果使用mysql來做分布式鎖就是這樣的 代碼在 DataBaseLocker # acquireLock
LockStoreDataBaseDAO # acquireLock(List<LockDO> lockDOs)
@Override
public boolean acquireLock(List<LockDO> lockDOs) {
Connection conn = null;
PreparedStatement ps = null;
ResultSet rs = null;
Set<String> dbExistedRowKeys = new HashSet<>();
boolean originalAutoCommit = true;
if (lockDOs.size() > 1) {
lockDOs = lockDOs.stream().filter(LambdaUtils.distinctByKey(LockDO::getRowKey)).collect(Collectors.toList());
}
try {
conn = lockStoreDataSource.getConnection();
if (originalAutoCommit = conn.getAutoCommit()) {
conn.setAutoCommit(false);
}
//check lock
StringJoiner sj = new StringJoiner(",");
for (int i = 0; i < lockDOs.size(); i++) {
sj.add("?");
}
boolean canLock = true;
//query
String checkLockSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getCheckLockableSql(lockTable, sj.toString());
ps = conn.prepareStatement(checkLockSQL);
for (int i = 0; i < lockDOs.size(); i++) {
ps.setString(i + 1, lockDOs.get(i).getRowKey());
}
rs = ps.executeQuery();
String currentXID = lockDOs.get(0).getXid();
while (rs.next()) {
String dbXID = rs.getString(ServerTableColumnsName.LOCK_TABLE_XID);
if (!StringUtils.equals(dbXID, currentXID)) {//如果存在行記錄而且還不是一個全局事務ID,擷取鎖失敗
if (LOGGER.isInfoEnabled()) {
String dbPk = rs.getString(ServerTableColumnsName.LOCK_TABLE_PK);
String dbTableName = rs.getString(ServerTableColumnsName.LOCK_TABLE_TABLE_NAME);
Long dbBranchId = rs.getLong(ServerTableColumnsName.LOCK_TABLE_BRANCH_ID);
LOGGER.info("Global lock on [{}:{}] is holding by xid {} branchId {}", dbTableName, dbPk, dbXID,
dbBranchId);
}
canLock &= false;
break;
}
dbExistedRowKeys.add(rs.getString(ServerTableColumnsName.LOCK_TABLE_ROW_KEY));
}
if (!canLock) {
conn.rollback();
return false;
}
List<LockDO> unrepeatedLockDOs = null;
if (CollectionUtils.isNotEmpty(dbExistedRowKeys)) {
unrepeatedLockDOs = lockDOs.stream().filter(lockDO -> !dbExistedRowKeys.contains(lockDO.getRowKey()))
.collect(Collectors.toList());
} else {
unrepeatedLockDOs = lockDOs;
}
if (CollectionUtils.isEmpty(unrepeatedLockDOs)) {
conn.rollback();
return true;
}
//lock
if (unrepeatedLockDOs.size() == 1) {
LockDO lockDO = unrepeatedLockDOs.get(0);
if (!doAcquireLock(conn, lockDO)) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global lock acquire failed, xid {} branchId {} pk {}", lockDO.getXid(), lockDO.getBranchId(), lockDO.getPk());
}
conn.rollback();
return false;
}
} else {
if (!doAcquireLocks(conn, unrepeatedLockDOs)) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global lock batch acquire failed, xid {} branchId {} pks {}", unrepeatedLockDOs.get(0).getXid(),
unrepeatedLockDOs.get(0).getBranchId(), unrepeatedLockDOs.stream().map(lockDO -> lockDO.getPk()).collect(Collectors.toList()));
}
conn.rollback();
return false;
}
}
conn.commit();
return true;
來看看語句
/**
* The constant CHECK_LOCK_SQL.
*/
private static final String CHECK_LOCK_SQL = "select " + ALL_COLUMNS + " from " + LOCK_TABLE_PLACE_HOLD
+ " where " + ServerTableColumnsName.LOCK_TABLE_ROW_KEY + " in (" + IN_PARAMS_PLACE_HOLD + ")";
CREATE TABLE IF NOT EXISTS `lock_table`
(
`row_key` VARCHAR(128) NOT NULL,
`xid` VARCHAR(96),
`transaction_id` BIGINT,
`branch_id` BIGINT NOT NULL,
`resource_id` VARCHAR(256),
`table_name` VARCHAR(32),
`pk` VARCHAR(36),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`row_key`),
KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;