天天看點

(轉)RabbitMQ學習之spring整合發送異步消息(注解實作)

http://blog.csdn.net/zhu_tianwei/article/details/40919249

實作使用Exchange類型為DirectExchange. routingkey的名稱預設為Queue的名稱。注解實作異步發送消息。

1.生産者配置ProducerConfiguration.Java

[java]  view plain  copy    print?

  1. package cn.slimsmart.rabbitmq.demo.spring.async;  
  2. import java.util.concurrent.atomic.AtomicInteger;  
  3. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;  
  4. import org.springframework.amqp.rabbit.connection.ConnectionFactory;  
  5. import org.springframework.amqp.rabbit.core.RabbitTemplate;  
  6. import org.springframework.beans.factory.annotation.Autowired;  
  7. import org.springframework.beans.factory.config.BeanPostProcessor;  
  8. import org.springframework.context.annotation.Bean;  
  9. import org.springframework.context.annotation.Configuration;  
  10. import org.springframework.scheduling.annotation.Scheduled;  
  11. import org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor;  
  12. import com.rabbitmq.client.AMQP;  
  13. @Configuration  
  14. public class ProducerConfiguration {  
  15.     // 指定隊列名稱 routingkey的名稱預設為Queue的名稱,使用Exchange類型為DirectExchange  
  16.     protected final String helloWorldQueueName = "spring-queue-async";  
  17.     // 建立連結  
  18.     @Bean  
  19.     public ConnectionFactory connectionFactory() {  
  20.         CachingConnectionFactory connectionFactory = new CachingConnectionFactory("192.168.36.102");  
  21.         connectionFactory.setUsername("admin");  
  22.         connectionFactory.setPassword("admin");  
  23.         connectionFactory.setPort(AMQP.PROTOCOL.PORT);  
  24.         return connectionFactory;  
  25.     }  
  26.     // 建立rabbitTemplate 消息模闆類  
  27.     @Bean  
  28.     public RabbitTemplate rabbitTemplate() {  
  29.         RabbitTemplate template = new RabbitTemplate(connectionFactory());  
  30.         template.setRoutingKey(this.helloWorldQueueName);  
  31.         return template;  
  32.     }  
  33.     //建立一個排程  
  34.     @Bean  
  35.     public ScheduledProducer scheduledProducer() {  
  36.         return new ScheduledProducer();  
  37.     }  
  38.     @Bean  
  39.     public BeanPostProcessor postProcessor() {  
  40.         return new ScheduledAnnotationBeanPostProcessor();  
  41.     }  
  42.     static class ScheduledProducer {  
  43.         @Autowired  
  44.         private volatile RabbitTemplate rabbitTemplate;  
  45.         //自增整數  
  46.         private final AtomicInteger counter = new AtomicInteger();  
  47.         @Scheduled(fixedRate = 3000)  
  48.         public void sendMessage() {  
  49.             rabbitTemplate.convertAndSend("Hello World " + counter.incrementAndGet());  
  50.         }  
  51.     }  
  52. }  

2.生産者啟動類Producer,java

[java]  view plain  copy    print?

  1. package cn.slimsmart.rabbitmq.demo.spring.async;  
  2. import org.springframework.context.annotation.AnnotationConfigApplicationContext;  
  3. public class Producer {  
  4.     public static void main(String[] args) {  
  5.         new AnnotationConfigApplicationContext(ProducerConfiguration.class);  
  6.     }  
  7. }  

3.接收消息處理類ReceiveMsgHandler.java

[java]  view plain  copy    print?

  1. package cn.slimsmart.rabbitmq.demo.spring.async;  
  2. public class ReceiveMsgHandler {  
  3.     public void handleMessage(String text) {  
  4.         System.out.println("Received: " + text);  
  5.     }  
  6. }  

4.消費者配置ConsumerConfiguration

[java]  view plain  copy    print?

  1. package cn.slimsmart.rabbitmq.demo.spring.async;  
  2. import org.springframework.amqp.core.AmqpAdmin;  
  3. import org.springframework.amqp.core.Queue;  
  4. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;  
  5. import org.springframework.amqp.rabbit.connection.ConnectionFactory;  
  6. import org.springframework.amqp.rabbit.core.RabbitAdmin;  
  7. import org.springframework.amqp.rabbit.core.RabbitTemplate;  
  8. import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;  
  9. import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;  
  10. import org.springframework.context.annotation.Bean;  
  11. import org.springframework.context.annotation.Configuration;  
  12. import com.rabbitmq.client.AMQP;  
  13. @Configuration  
  14. public class ConsumerConfiguration {  
  15.     // 指定隊列名稱 routingkey的名稱預設為Queue的名稱,使用Exchange類型為DirectExchange  
  16.     protected String springQueueDemo = "spring-queue-async";  
  17.     // 建立連結  
  18.     @Bean  
  19.     public ConnectionFactory connectionFactory() {  
  20.         CachingConnectionFactory connectionFactory = new CachingConnectionFactory(  
  21.                 "192.168.36.102");  
  22.         connectionFactory.setUsername("admin");  
  23.         connectionFactory.setPassword("admin");  
  24.         connectionFactory.setPort(AMQP.PROTOCOL.PORT);  
  25.         return connectionFactory;  
  26.     }  
  27.     // 建立rabbitAdmin 代理類  
  28.     @Bean  
  29.     public AmqpAdmin amqpAdmin() {  
  30.         return new RabbitAdmin(connectionFactory());  
  31.     }  
  32.     // 建立rabbitTemplate 消息模闆類  
  33.     @Bean  
  34.     public RabbitTemplate rabbitTemplate() {  
  35.         RabbitTemplate template = new RabbitTemplate(connectionFactory());  
  36.         // The routing key is set to the name of the queue by the broker for the  
  37.         // default exchange.  
  38.         template.setRoutingKey(this.springQueueDemo);  
  39.         // Where we will synchronously receive messages from  
  40.         template.setQueue(this.springQueueDemo);  
  41.         return template;  
  42.     }  
  43.     //  
  44.     // Every queue is bound to the default direct exchange  
  45.     public Queue helloWorldQueue() {  
  46.         return new Queue(this.springQueueDemo);  
  47.     }  
  48.     @Bean  
  49.     public SimpleMessageListenerContainer listenerContainer() {  
  50.         SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();  
  51.         container.setConnectionFactory(connectionFactory());  
  52.         container.setQueueNames(this.springQueueDemo);  
  53.         container.setMessageListener(new MessageListenerAdapter(  
  54.                 new ReceiveMsgHandler()));  
  55.         return container;  
  56.     }  
  57. }  

5.消費者啟動類Consumer.java

[java]  view plain  copy    print?

  1. package cn.slimsmart.rabbitmq.demo.spring.async;  
  2. import org.springframework.context.annotation.AnnotationConfigApplicationContext;  
  3. public class Consumer {  
  4.     public static void main(String[] args) {  
  5.         new AnnotationConfigApplicationContext(ConsumerConfiguration.class);  
  6.     }  
  7. }  

啟動接收消息,再發送消息

[sql]  view plain  copy    print?

  1. Received: Hello World 1  
  2. Received: Hello World 2  
  3. Received: Hello World 3  
  4. Received: Hello World 4  
  5. Received: Hello World 5  
  6. Received: Hello World 6  
  7. Received: Hello World 7  
  8. ......  

若報spring-queue-async消息隊列不存在,請在控制台添加。

轉載于:https://www.cnblogs.com/telwanggs/p/7124778.html