天天看點

springboot整合activemq,應答模式,消息重發機制,消息持久化

準備工作:

activemq的消息确認機制就是文檔中說的ack機制有:

AUTO_ACKNOWLEDGE = 1    自動确認

CLIENT_ACKNOWLEDGE = 2    用戶端手動确認   

DUPS_OK_ACKNOWLEDGE = 3    自動批量确認

SESSION_TRANSACTED = 0    事務送出并确認

INDIVIDUAL_ACKNOWLEDGE = 4    單條消息确認 activemq 獨有

ACK模式描述了Consumer與broker确認消息的方式(時機),比如當消息被Consumer接收之後,Consumer将在何時确認消息。

對于broker而言,隻有接收到ACK指令,才會認為消息被正确的接收或者處理成功了,通過ACK,可以在consumer(/producer)

與Broker之間建立一種簡單的“擔保”機制.

手動确認和單條消息确認需要手動的在用戶端調用message.acknowledge()

消息重發機制RedeliveryPolicy 有幾個屬性如下:

RedeliveryPolicy redeliveryPolicy= new RedeliveryPolicy();
	        //是否在每次嘗試重新發送失敗後,增長這個等待時間
	        redeliveryPolicy.setUseExponentialBackOff(true);
	        //重發次數,預設為6次   這裡設定為10次
	        redeliveryPolicy.setMaximumRedeliveries(10);
	        //重發時間間隔,預設為1秒
	        redeliveryPolicy.setInitialRedeliveryDelay(1);
	        //第一次失敗後重新發送之前等待500毫秒,第二次失敗再等待500 * 2毫秒,這裡的2就是value
	        redeliveryPolicy.setBackOffMultiplier(2);
	        //是否避免消息碰撞
	        redeliveryPolicy.setUseCollisionAvoidance(false);
	        //設定重發最大拖延時間-1 表示沒有拖延隻有UseExponentialBackOff(true)為true時生效
	        redeliveryPolicy.setMaximumRedeliveryDelay(-1);
           

以下情況會導緻消息重發:

1.在使用事務的Session中,調用rollback()方法;

2.在使用事務的Session中,調用commit()方法之前就關閉了Session;

3.在Session中使用CLIENT_ACKNOWLEDGE簽收模式或者INDIVIDUAL_ACKNOWLEDGE模式,并且調用了recover()方法。

可以通過設定ActiveMQConnectionFactory來定制想要的再次傳送政策。

需要注意的是:使用手動簽收模式,如果用戶端沒有調用message.acknowledge()方法是不會立刻重發消息的,隻有目前Coustomer重新開機時才能重新接受消息

spring boot 整合activemq 需要jar

<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-activemq</artifactId>
		</dependency>
           

因自己的需求,自己在預設的配置類中增加自己的配置

ActiveMQ4Config如下:

package com.zyc.activemq;

import javax.jms.Queue;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;

@EnableJms  
@Configuration  
public class ActiveMQ4Config {  

	@Bean
	public Queue queue(){
		return new ActiveMQQueue("queue1");
	}

	@Bean
	public RedeliveryPolicy redeliveryPolicy(){
	        RedeliveryPolicy  redeliveryPolicy=   new RedeliveryPolicy();
	        //是否在每次嘗試重新發送失敗後,增長這個等待時間
	        redeliveryPolicy.setUseExponentialBackOff(true);
	        //重發次數,預設為6次   這裡設定為10次
	        redeliveryPolicy.setMaximumRedeliveries(10);
	        //重發時間間隔,預設為1秒
	        redeliveryPolicy.setInitialRedeliveryDelay(1);
	        //第一次失敗後重新發送之前等待500毫秒,第二次失敗再等待500 * 2毫秒,這裡的2就是value
	        redeliveryPolicy.setBackOffMultiplier(2);
	        //是否避免消息碰撞
	        redeliveryPolicy.setUseCollisionAvoidance(false);
	        //設定重發最大拖延時間-1 表示沒有拖延隻有UseExponentialBackOff(true)為true時生效
	        redeliveryPolicy.setMaximumRedeliveryDelay(-1);
	        return redeliveryPolicy;
	}
    @Bean
    public ActiveMQConnectionFactory activeMQConnectionFactory (@Value("${activemq.url}")String url,RedeliveryPolicy redeliveryPolicy){  
        ActiveMQConnectionFactory activeMQConnectionFactory =  
                new ActiveMQConnectionFactory(
                       "admin",
                        "admin",
                        url);
        activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
        return activeMQConnectionFactory;
    }
    
    @Bean
    public JmsTemplate jmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory,Queue queue){
    	JmsTemplate jmsTemplate=new JmsTemplate();
    	jmsTemplate.setDeliveryMode(2);//進行持久化配置 1表示非持久化,2表示持久化
    	jmsTemplate.setConnectionFactory(activeMQConnectionFactory);
    	jmsTemplate.setDefaultDestination(queue); //此處可不設定預設,在發送消息時也可設定隊列
    	jmsTemplate.setSessionAcknowledgeMode(4);//用戶端簽收模式
    	return jmsTemplate;
    }
    
    //定義一個消息監聽器連接配接工廠,這裡定義的是點對點模式的監聽器連接配接工廠
    @Bean(name = "jmsQueueListener")
    public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory) {
        DefaultJmsListenerContainerFactory factory =
                new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(activeMQConnectionFactory);
        //設定連接配接數
        factory.setConcurrency("1-10");
        //重連間隔時間
        factory.setRecoveryInterval(1000L);
        factory.setSessionAcknowledgeMode(4);
        return factory;
    }

}  
           

消費者如下:使用異步監聽(使用監聽器形式)

package com.zyc.activemq.consumer;

import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer {

	private final static Logger logger = LoggerFactory
			.getLogger(Consumer.class);
	
	@JmsListener(destination = "queue1", containerFactory = "jmsQueueListener")
	public void receiveQueue(final TextMessage text, Session session)
			throws JMSException {
		try {
			logger.debug("Consumer收到的封包為:" + text.getText());
			text.acknowledge();// 使用手動簽收模式,需要手動的調用,如果不在catch中調用session.recover()消息隻會在重新開機服務後重發
		} catch (Exception e) {	
			session.recover();// 此不可省略 重發資訊使用
		}
	}
}
           

生産者如下:

package com.zyc.activemq.producer;

import javax.jms.Destination;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

@Component
public class Producer {
	
	@Autowired
	private JmsTemplate jmsTemplate;
	/**
	 * 發送消息,estination是發送到的隊列,message是待發送的消息
	 * @param destination
	 * @param message
	 */
	public void sendMessage(Destination destination, final String message) {
		System.out.println(jmsTemplate.getDeliveryMode());
		jmsTemplate.convertAndSend(destination, message);
	}
	/**
	 * 發送消息,message是待發送的消息
	 * @param message
	 */
	public void sendMessage(final String message) {
		System.out.println(jmsTemplate.getDeliveryMode());
			jmsTemplate.convertAndSend("queue1",message);
	}

}
           

application.properties配置檔案如下:

spring.datasource.url=jdbc:mysql://localhost:3306/mydb
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.jdbc.Driver

activemq.url=failover:(tcp://127.0.0.1:61616)
           

測試如下:如果不知道springboot junit 測試可參考 springboot junit測試

package com.zyc;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.junit4.SpringRunner;
import com.zyc.activemq.producer.Producer;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = App.class)
public class ApplicationTest {

	@Autowired
	private Producer producer;
	@Test
	public void testActivemq(){
		producer.sendMessage("look this is a message==zycc==");
		while(true){}
	}
}
           

本文參考了

http://shift-alt-ctrl.iteye.com/blog/2020182

http://blog.csdn.net/varyall/article/details/49907995

繼續閱讀