天天看點

RabbitMQ(五)——Spring boot內建RabbitMQ前言1、項目搭建2、引入配置3、發送消息4、兩種消費消息的方式總結

前言

前面幾篇RabbitMQ的模式幾乎介紹清楚了RabbitMQ的基礎架構,以及消息确認機制,這裡打算将RabbitMQ內建到springboot中,便于日後工作查找

1、項目搭建

在idea中建構一個springboot項目,同時引入web和RabbitMQ子產品。

RabbitMQ(五)——Spring boot內建RabbitMQ前言1、項目搭建2、引入配置3、發送消息4、兩種消費消息的方式總結

2、引入配置

application.properties檔案中加入如下配置,一部分是連接配接配置,另一部分是exchange的名稱和routing key的配置。

#對于rabbitMQ的支援
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest


#rabbitmq
spring.rabbitmq.learn.exchange.name=learn:info:mq:exchange
spring.rabbitmq.learn.queue.name=learn:info:mq:queue
spring.rabbitmq.learn.routing.key.name=learn:info:mq:routing:key
           

建立一個RabbitConfig,将這個類作為一個Configuration交給spring托管,如下所示:

/**
 * autor:liman
 * createtime:2019/10/27
 * comment:
 */
@Configuration
public class RabbitConfig {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    private Environment env;

    @Bean
    public ConnectionFactory connectionFactory() {
        String host = env.getProperty("spring.rabbitmq.host");
        int port = Integer.valueOf(env.getProperty("spring.rabbitmq.port"));
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
        connectionFactory.setUsername(env.getProperty("spring.rabbitmq.username"));
        connectionFactory.setPassword(env.getProperty("spring.rabbitmq.password"));
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPublisherConfirms(true);
        return connectionFactory;
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    //必須是prototype類型
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        return template;
    }

    /**
     * 針對消費者配置
     * 1. 設定交換機類型
     * 2. 将隊列綁定到交換機
     FanoutExchange: 将消息分發到所有的綁定隊列,無routingkey的概念
     DirectExchange:按照routingkey分發到指定隊列
     TopicExchange:多關鍵字比對
     */
    @Bean
    public DirectExchange defaultExchange() {
        return new DirectExchange(env.getProperty("spring.rabbitmq.learn.exchange.name"));
    }

    /**
     * 擷取隊列A
     * @return
     */
    @Bean
    public Queue learnQueue() {
        return new Queue(env.getProperty("spring.rabbitmq.learn.queue.name"), true); //隊列持久
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(learnQueue()).to(defaultExchange())
                .with(env.getProperty("spring.rabbitmq.learn.routing.key.name"));
    }
}
           

也就是在這個configuration中完成了RabbitTemplate的執行個體化。 

3、發送消息

在一個測試controller中發送消息

/**
 * autor:liman
 * createtime:2019/10/27
 * comment:
 */
@RestController
public class RabbitController {

    private static final Logger log = LoggerFactory.getLogger(RabbitController.class);

    private static final String prefix = "/rabbit";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private ObjectMapper objectMapper;

    @Autowired
    private Environment env;


    @RequestMapping(value=prefix+"/learn",method = RequestMethod.GET)
    public String sendMessage(String message){
        log.info("待發送消息:{}",message);
        try{
			//這裡即開始發送消息,設定交換機和綁定鍵,完成消息的發送
            rabbitTemplate.setExchange(env.getProperty("spring.rabbitmq.learn.exchange.name"));
            rabbitTemplate.setRoutingKey(env.getProperty("spring.rabbitmq.learn.routing.key.name"));
            Message msg = MessageBuilder.withBody(objectMapper.writeValueAsBytes(message))
                    .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                    .build();
            rabbitTemplate.convertAndSend(msg);

        }catch (Exception e){
            log.error("發送消息異常:{}",e.fillInStackTrace());
        }
        return "success";
    }

}
           

4、兩種消費消息的方式

其實消息的消費,我個人了解為有兩種方式,一種是@RabbitListener注解,另一種是實作ChannelAwareMessageListener接口

[email protected]注解消費消息

該種消費消息的方式比較簡單,在任意方法中引入@RabbitListener注解即可,這裡作為消費端,隻需要指定敢興趣的queue即可

@Component
public class AnnotationListener {

    private static final Logger log = LoggerFactory.getLogger(AnnotationListener.class);

    @Autowired
    private ObjectMapper objectMapper;

    @RabbitListener(queues = "${spring.rabbitmq.learn.queue.name}")
    public void consumerMessage(@Payload byte[] message){
        String result = null;
        try {
            result = objectMapper.readValue(message, String.class);
            log.info("注解的方式消費消息:{}",result);
        } catch (IOException e) {
            e.printStackTrace();
        }
        log.info("接收到的消息:{}",result);
    }
}
           

4.2實作ChannelAwareMessageListener接口消費消息

該方法似乎比較複雜,分為兩步,新增一個實體類并實作ChannelAwareMessageListener接口

/**
 * autor:liman
 * createtime:2019/10/27
 * comment:
 */
@Component
public class ImplementListener implements ChannelAwareMessageListener {

    private static final Logger log = LoggerFactory.getLogger(ImplementListener.class);

    @Autowired
    private ObjectMapper objectMapper;

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long tag = message.getMessageProperties().getDeliveryTag();
        try{
            byte[] body = message.getBody();
            String result = objectMapper.readValue(body, String.class);
            log.info("非注解方式開始消費消息:{}",result);
        }catch (Exception e){
            log.error("消費消息異常,異常資訊為:{}",e.fillInStackTrace());
        }
    }
}
           

在RabbitConfig中綁定消費者,如下所示:

@Autowired
private ImplementListener implementListener;

@Bean
public SimpleMessageListenerContainer getListenerContainer(@Qualifier("learnQueue") Queue queue){
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory());
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    container.setQueues(queue);
    container.setMessageListener(implementListener);
    return container;
}
           

 測試

兩個都能正常消費消息,但是這裡不貼出簡單的消費測試結果,而是在同時引入兩種消費消息的方式的時候的測試結果:

修改了一下消息發送的controller

package com.learn.springbootrabbitmqblog.controller;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

/**
 * autor:liman
 * createtime:2019/10/27
 * comment:
 */
@RestController
public class RabbitController {

    private static final Logger log = LoggerFactory.getLogger(RabbitController.class);

    private static final String prefix = "/rabbit";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private ObjectMapper objectMapper;

    @Autowired
    private Environment env;


    @RequestMapping(value = prefix + "/learn", method = RequestMethod.GET)
    public String sendMessage(String message) {
        log.info("待發送消息:{}", message);
        try {
            //多線程發送消息
            for(int i =0 ;i<100;i++){
                new Thread(new sendThread(rabbitTemplate,objectMapper,message+i)).start();
            }

        } catch (Exception e) {
            log.error("發送消息異常:{}", e.fillInStackTrace());
        }
        return "success";
    }

    private class sendThread implements Runnable {

        private RabbitTemplate rabbitTemplate;
        private ObjectMapper objectMapper;
        private String message;


        public sendThread(RabbitTemplate rabbitTemplate, ObjectMapper objectMapper, String message) {
            this.rabbitTemplate = rabbitTemplate;
            this.objectMapper = objectMapper;
            this.message = message;
        }

        @Override
        public void run() {
            try {
                rabbitTemplate.setExchange(env.getProperty("spring.rabbitmq.learn.exchange.name"));
                rabbitTemplate.setRoutingKey(env.getProperty("spring.rabbitmq.learn.routing.key.name"));
                Message msg = null;

                msg = MessageBuilder.withBody(objectMapper.writeValueAsBytes(message))
                        .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                        .build();
                rabbitTemplate.convertAndSend(msg);
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }

        }
    }

}
           

會通過負載均衡的方式發送消息,運作結果如下

RabbitMQ(五)——Spring boot內建RabbitMQ前言1、項目搭建2、引入配置3、發送消息4、兩種消費消息的方式總結

總結

本篇部落格隻是記錄了springboot中對RabbitMQ簡單的內建,但是還沒有用到較好的業務場景,後續會補充總結springboot中的消息确認機制。

繼續閱讀