maven
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.5.4</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.3.5.RELEASE</version>
</dependency>
</dependencies>
生産者
public class Producer {
@SuppressWarnings("resource")
public static void main(String[] args) {
new AnnotationConfigApplicationContext(ProducerConfiguration.class);
}
}
package product;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.amqp.support.converter.SerializerMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor;
import com.rabbitmq.client.AMQP;
@Configuration
public class ProducerConfiguration {
// 指定隊列名稱 routingkey的名稱預設為Queue的名稱,使用Exchange類型為DirectExchange
protected final String VirtualHost = "testhost";
protected final String EXCHANGE = "amq.direct";
protected final String ROUTEKING = "routingKeyB";
protected final String helloWorldQueueName = "testB";
private Map<String, Message> map = new ConcurrentHashMap<>();
public void add(String id, Message message) {
map.put(id, message);
}
public void del(String id) {
map.remove(id);
}
// 建立連結
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("127.0.0.1");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setPort(AMQP.PROTOCOL.PORT);
connectionFactory.setVirtualHost(VirtualHost);
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
//connectionFactory.setConnectionListeners(listeners);
return connectionFactory;
}
// 建立rabbitTemplate 消息模闆類
@Bean
@Scope("prototype")
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setExchange(EXCHANGE);
template.setRoutingKey(ROUTEKING);
template.setMessageConverter(new SerializerMessageConverter());
template.setMandatory(true);
return template;
}
// 建立rabbitAdmin 代理類
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
//建立一個排程
@Bean
public ScheduledProducer scheduledProducer() {
return new ScheduledProducer();
}
@Bean
public BeanPostProcessor postProcessor() {
return new ScheduledAnnotationBeanPostProcessor();
}
class ScheduledProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
//自增整數
private final AtomicInteger counter = new AtomicInteger();
/**
* 每3秒發送一條消息
*
* Spring3中加強了注解的使用,其中計劃任務也得到了增強,
現在建立一個計劃任務隻需要兩步就完成了:
建立一個Java類,添加一個無參無傳回值的方法,在方法上用@Scheduled注解修飾一下;
在Spring配置檔案中添加三個<task:**** />節點;
參考:http://zywang.iteye.com/blog/949123
*/
@Scheduled(fixedRate = 3000)
public void sendMessage() {
rabbitTemplate = rabbitTemplate();
//Exchange出錯由此方法監聽
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){
@Override
public void confirm(CorrelationData correlationData, boolean ack) {
if (!ack) {
System.out.println("生産者發送mq消息确認失敗"+correlationData.getId());
} else {
//處理丢失的消息(nack)
System.out.println("生産者發送mq消息确認成功");
del(correlationData.getId());
}
}
});
//路由鍵routingKey出錯由此方法監聽,
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback(){
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange,
String routingKey) {
System.out.println("returnedMessage收到回調,成功發送到broker");
System.out.println("confirm收到回調,message:" + message.toString());
System.out.println("confirm收到回調,replyCode:" + replyCode);
System.out.println("confirm收到回調,replyText:" + replyText);
System.out.println("confirm收到回調,exchange:" + exchange);
System.out.println("confirm收到回調,routingKey:" + routingKey);
}
});
String uuid = UUID.randomUUID().toString();
rabbitTemplate.correlationConvertAndSend("Producer " + counter.getAndIncrement(),new CorrelationData(uuid));
add(uuid,getMessage("Producer " + counter,"發送"));
System.out.println("生産者發送成功 " + counter);
System.out.println(map.toString());
}
public Message getMessage(String messageBody, String massageId) {
MessageProperties messageproperties = new MessageProperties();
messageproperties.setMessageId(massageId);
Message message = new Message(messageBody.getBytes(), messageproperties);
return message;
}
}
}
消費者
public class Consumer {
@SuppressWarnings("resource")
public static void main(String[] args) {
new AnnotationConfigApplicationContext(ConsumerConfiguration.class);
}
}
@Configuration
public class ConsumerConfiguration {
// 指定隊列名稱 routingkey的名稱預設為Queue的名稱,使用Exchange類型為DirectExchange
protected static final String VirtualHost = "testhost";
protected static final String EXCHANGE = "amq.direct";
protected static final String ROUTEKING = "routingKey";
protected static final String springQueueDemo = "testB";
// 建立連結
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(
"127.0.0.1");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setPort(AMQP.PROTOCOL.PORT);
connectionFactory.setVirtualHost(ConsumerConfiguration.VirtualHost);
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
// 建立rabbitAdmin 代理類
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
// 建立rabbitTemplate 消息模闆類
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setExchange(ConsumerConfiguration.EXCHANGE);
return template;
}
@Bean
public SimpleMessageListenerContainer listenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueueNames("testB"); // 監聽的隊列
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // NONE 代表自動确認;MANUAL 代表手動确認
container.setMessageListener(new MessageListenerAdapter(
new ConsumerMsgHandler()));
return container;
}
}
public class ConsumerMsgHandler extends MessageListenerAdapter{
@Override
public void onMessage(Message message, Channel channel) throws Exception {
System.out.println(new String(message.getBody()));
if(message.getMessageProperties().getHeaders().get("error") == null){
//consumerConfiguration.sendMq(message);
// basicAck方法表示确認成功傳回Rabbit伺服器
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("消費者消息已經确認");
}else {
/*multiple:為了減少網絡流量,手動确認可以被批處理,當該參數為 true 時,
則可以一次性确認 delivery_tag 小于等于傳入值的所有消息*/
// basicNack方法表示确認失敗傳回,當它的參數requeue為true時,表示Rabbit伺服器會重發傳入目前方法的資料;
// 當它的參數requeue為false時,表示Rabbit伺服器會丢棄傳入目前方法的資料,不在重發
// channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
// basicReject方法表示确認失敗傳回,直接丢棄傳入目前方法的資料,不在重發
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("消費者消息拒絕");
}
}
}
參考連結:https://www.jianshu.com/p/2c5eebfd0e95