天天看点

Springboot MongoDB 事务

作者:DOKER

从版本4开始,MongoDB支持 事务。事务是建立在 会话之上的,因此,需要一个活跃的 ClientSession。

除非你在你的应用程序上下文中指定一个 MongoTransactionManager,否则事务支持是 DISABLED(禁用的)。你可以使用 setSessionSynchronization(ALWAYS) 来参与正在进行的非本地 MongoDB 事务。

为了获得对事务的完全程序化控制,你可能想在 MongoOperations 上使用会话回调。

下面的例子显示了在一个 SessionCallback 中的程序化事务控制。

Example 124. 程序性事务

ClientSession session = client.startSession(options);                   

template.withSession(session)
    .execute(action -> {

        session.startTransaction();                                     

        try {

            Step step = // ...;
            action.insert(step);

            process(step);

            action.update(Step.class).apply(Update.set("state", // ...

            session.commitTransaction();                                

        } catch (RuntimeException e) {
            session.abortTransaction();                                 
        }
    }, ClientSession::close)                                            
           

获得一个新的 ClientSession。

开始事务。

如果一切按预期进行,就提交修改。

出现意外,所以要回滚一切。

完成后不要忘记关闭会话。

前面的例子让你完全控制事务行为,同时在回调中使用会话范围的 MongoOperations 实例,以确保会话被传递给每个服务器调用。为了避免这种方法带来的一些开销,你可以使用 TransactionTemplate 来消除手动事务流的一些噪音。

1. 事务和TransactionTemplate

Spring Data MongoDB事务支持一个 TransactionTemplate。下面的例子展示了如何创建和使用 TransactionTemplate。

Example 125. 事务和 TransactionTemplate

template.setSessionSynchronization(ALWAYS);                                     

// ...

TransactionTemplate txTemplate = new TransactionTemplate(anyTxManager);         

txTemplate.execute(new TransactionCallbackWithoutResult() {

    @Override
    protected void doInTransactionWithoutResult(TransactionStatus status) {     

        Step step = // ...;
        template.insert(step);

        process(step);

        template.update(Step.class).apply(Update.set("state", // ...
    };
});
           

在 Template API 配置中启用事务同步。

使用提供的 PlatformTransactionManager 创建 TransactionTemplate。

在回调中,ClientSession 和事务已经被注册。

在运行期间改变 MongoTemplate 的状态(就像你可能认为在前面列表的第1项中可能发生的那样)会导致线程和可见性问题。

2. 事务和MongoTransactionManager

MongoTransactionManager 是通往众所周知的Spring事务支持的网关。它可以让应用程序使用 Spring的事务托管功能。MongoTransactionManager 将一个 ClientSession 绑定到线程上。MongoTemplate 会检测会话,并相应地对这些与事务相关的资源进行操作。MongoTemplate 也可以参与到其他正在进行的事务中。下面的例子展示了如何用 MongoTransactionManager 创建和使用事务。

Example 126. 事务和 MongoTransactionManager

@Configuration
static class Config extends AbstractMongoClientConfiguration {

    @Bean
    MongoTransactionManager transactionManager(MongoDatabaseFactory dbFactory) {  
        return new MongoTransactionManager(dbFactory);
    }

    // ...
}

@Component
public class StateService {

    @Transactional
    void someBusinessFunction(Step step) {                                        

        template.insert(step);

        process(step);

        template.update(Step.class).apply(Update.set("state", // ...
    };
});
           

在应用 application context 中注册 MongoTransactionManager。

将方法标记为事务性。

@Transactional(readOnly = true) 建议 MongoTransactionManager 也启动一个事务,将 ClientSession 添加到发出的请求中。

3. 响应式事务

与支持响应式 ClientSession 一样,ReactiveMongoTemplate 提供了专门的方法,用于在事务中进行操作,而不必担心根据操作结果提交或停止操作。

除非你在你的 application context 中指定一个 ReactiveMongoTransactionManager,否则事务支持是 DISABLED(禁用的)。你可以使用 setSessionSynchronization(ALWAYS) 来参与正在进行的非本地MongoDB事务。

使用普通的MongoDB响应式驱动API,在一个事务性流程中的 delete 可能看起来像这样。

Example 127. 原生驱动的支持

Mono<DeleteResult> result = Mono
    .from(client.startSession())                                                             

    .flatMap(session -> {
        session.startTransaction();                                                          

        return Mono.from(collection.deleteMany(session, ...))                                

            .onErrorResume(e -> Mono.from(session.abortTransaction()).then(Mono.error(e)))   

            .flatMap(val -> Mono.from(session.commitTransaction()).then(Mono.just(val)))     

            .doFinally(signal -> session.close());                                           
      });
           

首先,我们显然需要启动session。

一旦我们有了 ClientSession,就开始事务。

通过向操作传递 ClientSession,在事务中进行操作。

如果操作异常完成,我们需要停止事务并保留错误。

当然,也可以在成功的情况下提交更改。仍然保留操作结果。

最后,我们需要确保关闭会话。

上述操作的罪魁祸首是在保留 main flow DeleteResult,而不是通过 commitTransaction() 或 abortTransaction() 发布的事务结果,这导致了相当复杂的设置。

4. 事务和TransactionalOperator

Spring Data MongoDB事务支持一个 TransactionalOperator。下面的例子展示了如何创建和使用一个 TransactionalOperator。

Example 128. 事务和 TransactionalOperator

template.setSessionSynchronization(ALWAYS);                                          

// ...

TransactionalOperator rxtx = TransactionalOperator.create(anyTxManager,
                                   new DefaultTransactionDefinition());              


Step step = // ...;
template.insert(step);

Mono<Void> process(step)
    .then(template.update(Step.class).apply(Update.set("state", …))
    .as(rxtx::transactional)                                                         
    .then();
           

为事务性参与启用事务同步。

使用提供的 ReactiveTransactionManager 创建 TransactionalOperator。

TransactionalOperator.transactional(…) 为所有上游操作提供事务管理。

5. 事务和ReactiveMongoTransactionManager

ReactiveMongoTransactionManager 是通往众所周知的 Spring事务支持 的网关。它允许应用程序利用Spring的管理事务功能。ReactiveMongoTransactionManager 将 ClientSession 绑定到 subscriber Context。ReactiveMongoTemplate 会检测会话,并对这些与事务相关的资源进行相应操作。 ReactiveMongoTemplate 也可以参与其他正在进行的事务。下面的例子展示了如何用 ReactiveMongoTransactionManager 创建和使用事务。

Example 129. 事务和 ReactiveMongoTransactionManager

@Configuration
public class Config extends AbstractReactiveMongoConfiguration {

    @Bean
    ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory factory) {  
        return new ReactiveMongoTransactionManager(factory);
    }

    // ...
}

@Service
public class StateService {

    @Transactional
    Mono<UpdateResult> someBusinessFunction(Step step) {                                  

        return template.insert(step)
            .then(process(step))
            .then(template.update(Step.class).apply(Update.set("state", …));
    };
});
           

在application context中注册 ReactiveMongoTransactionManager。

将方法标记为事务性的。

@Transactional(readOnly = true) 建议 ReactiveMongoTransactionManager 也启动一个事务,将 ClientSession 添加到发出的请求中。

6. 事务内部的特殊行为

在事务内部,MongoDB服务器有一个稍微不同的行为。

Connection Settings

MongoDB驱动提供了一个专门的副本集名称配置选项,使驱动进入自动检测模式。这个选项有助于识别主要的副本集节点和事务中的命令路由。

确保在MongoDB的URI中添加 replicaSet。请参考 连接字符串选项 以了解更多细节。

Collection Operations

MongoDB不支持集合操作,例如在事务中创建集合。这也会影响到第一次使用时发生的即时集合创建。因此,请确保所有需要的结构都已到位。

Transient Errors

MongoDB可以为在事务性操作中出现的错误添加特殊标签。这些标签可能表示暂时性的故障,这些故障可能通过重试操作而消失。我们强烈推荐 Spring Retry 用于这些目的。然而,我们可以覆写 MongoTransactionManager#doCommit(MongoTransactionObject),以实现MongoDB参考手册中所述的重试提交操作行为。

Count

MongoDB的 count 操作是基于集合统计的,可能无法反映事务中的实际情况。当在一个多文档事务中发出 count 命令时,服务器会响应 error 50851。一旦 MongoTemplate 检测到一个活动的事务,所有暴露的 count() 方法都会被转换,并使用 $match 和 $count 操作符委托给聚合框架,保留 Query 设置,如 collation。

在 aggregation count helper 中使用 geo 命令时,有一些限制。以下运算符不能使用,必须用不同的运算符代替。

  • $where → $expr
  • $near → $geoWithin with $center
  • $nearSphere → $geoWithin with $centerSphere

使用 Criteria.near(…​) 和 Criteria.nearSphere(…​) 的查询必须改写为 Criteria.within(…​) 各自的 Criteria.withinSphere(…​)。同样适用于 repository 查询方法中的 near 查询关键字,必须改为 within。也请参见MongoDB JIRA ticket DRIVERS-518 以进一步参考。

下面的片段显示了会话绑定闭包内的 count 用法。

session.startTransaction();

template.withSession(session)
    .execute(action -> {
        action.count(query(where("state").is("active")), Step.class)
        ...           

上面的片段具体化为以下命令。

db.collection.aggregate(
   [
      { $match: { state: "active" } },
      { $count: "totalEntityCount" }
   ]
)           

而不是。

db.collection.find( { state: "active" } ).count()