準備工作:
activemq的消息确認機制就是文檔中說的ack機制有:
AUTO_ACKNOWLEDGE = 1 自動确認
CLIENT_ACKNOWLEDGE = 2 用戶端手動确認
DUPS_OK_ACKNOWLEDGE = 3 自動批量确認
SESSION_TRANSACTED = 0 事務送出并确認
INDIVIDUAL_ACKNOWLEDGE = 4 單條消息确認 activemq 獨有
ACK模式描述了Consumer與broker确認消息的方式(時機),比如當消息被Consumer接收之後,Consumer将在何時确認消息。
對于broker而言,隻有接收到ACK指令,才會認為消息被正确的接收或者處理成功了,通過ACK,可以在consumer(/producer)
與Broker之間建立一種簡單的“擔保”機制.
手動确認和單條消息确認需要手動的在用戶端調用message.acknowledge()
消息重發機制RedeliveryPolicy 有幾個屬性如下:
RedeliveryPolicy redeliveryPolicy= new RedeliveryPolicy();
//是否在每次嘗試重新發送失敗後,增長這個等待時間
redeliveryPolicy.setUseExponentialBackOff(true);
//重發次數,預設為6次 這裡設定為10次
redeliveryPolicy.setMaximumRedeliveries(10);
//重發時間間隔,預設為1秒
redeliveryPolicy.setInitialRedeliveryDelay(1);
//第一次失敗後重新發送之前等待500毫秒,第二次失敗再等待500 * 2毫秒,這裡的2就是value
redeliveryPolicy.setBackOffMultiplier(2);
//是否避免消息碰撞
redeliveryPolicy.setUseCollisionAvoidance(false);
//設定重發最大拖延時間-1 表示沒有拖延隻有UseExponentialBackOff(true)為true時生效
redeliveryPolicy.setMaximumRedeliveryDelay(-1);
以下情況會導緻消息重發:
1.在使用事務的Session中,調用rollback()方法;
2.在使用事務的Session中,調用commit()方法之前就關閉了Session;
3.在Session中使用CLIENT_ACKNOWLEDGE簽收模式或者INDIVIDUAL_ACKNOWLEDGE模式,并且調用了recover()方法。
可以通過設定ActiveMQConnectionFactory來定制想要的再次傳送政策。
需要注意的是:使用手動簽收模式,如果用戶端沒有調用message.acknowledge()方法是不會立刻重發消息的,隻有目前Coustomer重新開機時才能重新接受消息
spring boot 整合activemq 需要jar
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
因自己的需求,自己在預設的配置類中增加自己的配置
ActiveMQ4Config如下:
package com.zyc.activemq;
import javax.jms.Queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
@EnableJms
@Configuration
public class ActiveMQ4Config {
@Bean
public Queue queue(){
return new ActiveMQQueue("queue1");
}
@Bean
public RedeliveryPolicy redeliveryPolicy(){
RedeliveryPolicy redeliveryPolicy= new RedeliveryPolicy();
//是否在每次嘗試重新發送失敗後,增長這個等待時間
redeliveryPolicy.setUseExponentialBackOff(true);
//重發次數,預設為6次 這裡設定為10次
redeliveryPolicy.setMaximumRedeliveries(10);
//重發時間間隔,預設為1秒
redeliveryPolicy.setInitialRedeliveryDelay(1);
//第一次失敗後重新發送之前等待500毫秒,第二次失敗再等待500 * 2毫秒,這裡的2就是value
redeliveryPolicy.setBackOffMultiplier(2);
//是否避免消息碰撞
redeliveryPolicy.setUseCollisionAvoidance(false);
//設定重發最大拖延時間-1 表示沒有拖延隻有UseExponentialBackOff(true)為true時生效
redeliveryPolicy.setMaximumRedeliveryDelay(-1);
return redeliveryPolicy;
}
@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory (@Value("${activemq.url}")String url,RedeliveryPolicy redeliveryPolicy){
ActiveMQConnectionFactory activeMQConnectionFactory =
new ActiveMQConnectionFactory(
"admin",
"admin",
url);
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
return activeMQConnectionFactory;
}
@Bean
public JmsTemplate jmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory,Queue queue){
JmsTemplate jmsTemplate=new JmsTemplate();
jmsTemplate.setDeliveryMode(2);//進行持久化配置 1表示非持久化,2表示持久化
jmsTemplate.setConnectionFactory(activeMQConnectionFactory);
jmsTemplate.setDefaultDestination(queue); //此處可不設定預設,在發送消息時也可設定隊列
jmsTemplate.setSessionAcknowledgeMode(4);//用戶端簽收模式
return jmsTemplate;
}
//定義一個消息監聽器連接配接工廠,這裡定義的是點對點模式的監聽器連接配接工廠
@Bean(name = "jmsQueueListener")
public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(activeMQConnectionFactory);
//設定連接配接數
factory.setConcurrency("1-10");
//重連間隔時間
factory.setRecoveryInterval(1000L);
factory.setSessionAcknowledgeMode(4);
return factory;
}
}
消費者如下:使用異步監聽(使用監聽器形式)
package com.zyc.activemq.consumer;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
private final static Logger logger = LoggerFactory
.getLogger(Consumer.class);
@JmsListener(destination = "queue1", containerFactory = "jmsQueueListener")
public void receiveQueue(final TextMessage text, Session session)
throws JMSException {
try {
logger.debug("Consumer收到的封包為:" + text.getText());
text.acknowledge();// 使用手動簽收模式,需要手動的調用,如果不在catch中調用session.recover()消息隻會在重新開機服務後重發
} catch (Exception e) {
session.recover();// 此不可省略 重發資訊使用
}
}
}
生産者如下:
package com.zyc.activemq.producer;
import javax.jms.Destination;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
@Component
public class Producer {
@Autowired
private JmsTemplate jmsTemplate;
/**
* 發送消息,estination是發送到的隊列,message是待發送的消息
* @param destination
* @param message
*/
public void sendMessage(Destination destination, final String message) {
System.out.println(jmsTemplate.getDeliveryMode());
jmsTemplate.convertAndSend(destination, message);
}
/**
* 發送消息,message是待發送的消息
* @param message
*/
public void sendMessage(final String message) {
System.out.println(jmsTemplate.getDeliveryMode());
jmsTemplate.convertAndSend("queue1",message);
}
}
application.properties配置檔案如下:
spring.datasource.url=jdbc:mysql://localhost:3306/mydb
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
activemq.url=failover:(tcp://127.0.0.1:61616)
測試如下:如果不知道springboot junit 測試可參考 springboot junit測試
package com.zyc;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.junit4.SpringRunner;
import com.zyc.activemq.producer.Producer;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = App.class)
public class ApplicationTest {
@Autowired
private Producer producer;
@Test
public void testActivemq(){
producer.sendMessage("look this is a message==zycc==");
while(true){}
}
}
本文參考了
http://shift-alt-ctrl.iteye.com/blog/2020182
http://blog.csdn.net/varyall/article/details/49907995