天天看点

Spring ActiveMQ 整合: JMS 事务管理

1.为什么要用事务?

       消息事务是在生产者producer到broker或broker到consumer过程中同一个session中发生的,保证几条消息在发送过程中的原子性。

       可以在connection的createSession方法中指定一个布尔值开启,如果消息确认机制是事务确认,那么在发送message的过程中session就会开启事务(实际上broker的),不用用户显示调用 beginTransaction,这时所有通过session发送的消息都被缓存下来,用户调用session.commit时会发送所有消息,当发送出现异常时用户可以调用rollback进行回滚操作,只有在开启事务状态下有效。

为什么commit之后,不会有持久的消息重新传送呢?

       原因在于commit操作会自动将为签收确认的消息进行签收确认,如果是当前接收但未签收确认的消息,都会被确认处理。因而在commit之后不会有持久化的消息出现。

2.activeMQ支持的事务:

ActiveMQ有支持两种事务,

  • JMS transactions - the commit() / rollback() methods on a Session (which is like doing commit() / rollback() on a JDBC connection)
  • XA Transactions - where the​​XASession​​​ acts as an​​XAResource​​ by communicating with the Message Broker, rather like a JDBC Connection takes place in an XA transaction by communicating with the database.

在支持事务的session中,producer发送message时在message中带有transaction ID。broker收到message后判断是否有transaction ID,如果有就把message保存在transaction store中,等待commit或者rollback消息。所以ActiveMq的事务是针对broker而不是producer的,不管session是否commit,broker都会收到message。

如果producer发送模式选择了persistent,那么message过期后会进入死亡队列。在message进入死亡队列之前,ActiveMQ会删除message中的transaction ID,这样过期的message就不在事务中了,不会保存在transaction store中,会直接进入死亡队列。具体删除transaction ID的地方是在:org.apache.activemq.util.BrokerSupport的doResend,将transaction ID保存在了originalTransactionID中,删除了transaction ID。

JMS transactions.

JMS transactions事务的配置:

      ①建立JMS事务,并引入关联链接事务。

         ②.设置一个jmsTamplat,并关联监听容器。

[html] 
    ​​view plain​​​
     ​​​copy​​
1. <!-- jms事务 -->
2. <bean id="jmsTransactionManager"
3. class="org.springframework.jms.connection.JmsTransactionManager">
4. <property name="connectionFactory" ref="connectionFactory" />
5. </bean>
6. <tx:annotation-driven transaction-manager="jmsTransactionManager" />
7. 
8. 
9. <!-- 消息监听容器 消息接收监听器用于异步接收消息 -->
10. <bean id="jmsContainerOne" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
11. <property name="connectionFactory" ref="connectionFactory" />
12. <property name="destination" ref="destinationOne" />
13. <property name="messageListener" ref="consumerMessageListenerOfOne" />
14. <!-- <property name="sessionTransacted" value="true"/> -->  <!-- 给listener添加事务,只负责接收消息的回滚 (有了transactionManager就不用这个了,这个功能不全) 设置后好像并没有起作用 不知道为啥 -->
15. <!-- <property name="transactionManager" ref="jtaTransactionManager"/> --> <!-- 接收消息和数据库访问处于同一事务中 jta -->
16. <property name="transactionManager" ref="jmsTransactionManager" /> <!--jms事务 -->
17. <property name="sessionAcknowledgeMode" value="4"></property>   <!-- 应答模式是 INDIVIDUAL_ACKNOWLEDGE 
18. <!-- ActiveMQ:设置多个并行的消费者 -->
19. <property name="concurrency" value="2-3" />
20. </bean>      

上面配置文件配置完成后,在接收着那边接受消息失败后,进行事务回滚。

[html] 
    ​​view plain​​​
     ​​​copy​​ 
1. session.rollback();      

   具体实现:

[html] 
    ​​view plain​​​
     ​​​copy​​ 
1. public void onMessage(Message message, Session session) {
2. textMsg = (TextMessage) message;
3. try {
4. System.out.println(1);
5. endStr = textMsg.getText();
6. endInt = Integer.parseInt(endStr);
7. System.out.println("消息:==="+endInt);
8. //只要被确认后   就会出队,接受失败没有确认成功,会在原队列里面
9. textMsg.acknowledge();
10. 
11. } catch (Exception e) {
12. try {
13. session.rollback();
14. System.out.println("测试回滚");
15. e.printStackTrace();
16. System.out.println("异常信息是:===:" + e.getMessage());
17. }
18. }      

  另外,activeMQ还有一种JtaTransactionManager 事务控制。