天天看點

RabbitMQ:MessageListenerAdapter消息監聽器擴充卡使用與源碼分析

​pom.xml​

​:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.2</version>
    </parent>

    <packaging>jar</packaging>

    <groupId>com.kaven</groupId>
    <artifactId>springboot</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <name>springboot</name>
    <description>springboot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>      

​application.properties​

​:

spring.rabbitmq.host=192.168.1.9
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtualHost=/
spring.rabbitmq.exchange=""
spring.rabbitmq.routingKey=kaven
spring.rabbitmq.queue=kaven      

​RabbitMQProperties​

​​類(​

​RabbitMQ​

​的參數類):

package com.kaven.springboot.rabbitmq;

import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(prefix = "spring.rabbitmq")
@Setter
@Getter
public class RabbitMQProperties {

    private String host;
    private int port;
    private String username;
    private String password;
    private String exchange;
    private String queue;
    private String routingKey;
    private String virtualHost;
}      

​CustomizeMessageListener​

​類(自定義消息監聽器):

package com.kaven.springboot.rabbitmq;

import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;

@Component
public class CustomizeMessageListener {
    public void customizeHandleMessage(byte[] msgBody) {
        System.out.printf("自定義處理消息: %s\n", new String(msgBody, StandardCharsets.UTF_8));
    }
}      

​RabbitMQConfig​

​​類(定義​

​RabbitMQ​

​元件的配置類):

package com.kaven.springboot.rabbitmq;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;

import javax.annotation.Resource;

@Configuration
public class RabbitMQConfig {

    @Resource
    private RabbitMQProperties properties;

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(properties.getHost(), properties.getPort());
        connectionFactory.setUsername(properties.getUsername());
        connectionFactory.setPassword(properties.getPassword());
        connectionFactory.setVirtualHost(properties.getVirtualHost());
        return connectionFactory;
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    //必須是prototype類型
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        return new RabbitTemplate(connectionFactory);
    }

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory,
                                                                         CustomizeMessageListener delegate) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        // 設定連接配接工廠
        container.setConnectionFactory(connectionFactory);
        // 指定要建立的并發消費者數量
        // 預設值為1
        container.setConcurrentConsumers(3);
        // 設定消費者數量的上限
        // 預設為concurrentConsumers
        // 消費者将按需添加
        // 不能小于concurrentConsumers
        container.setMaxConcurrentConsumers(5);
        // 設定要從中接收消息的隊列名稱
        // 參數為String... queueName
        container.setQueueNames(properties.getQueue());
        // 控制容器在消息确認方面的行為
        // 自動确認
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        // 告訴代理在單個請求中向每個消費者發送多少條消息
        // 通常可以将其設定得相當高以提高吞吐量
        container.setPrefetchCount(3);

        // 建立擴充卡
        MessageListenerAdapter adapter = new MessageListenerAdapter();
        // 設定委托對象
        adapter.setDelegate(delegate);
        // 設定預設的消息監聽方法名稱
        adapter.setDefaultListenerMethod("customizeHandleMessage");

        // 設定MessageListener(消息監聽器)
        container.setMessageListener(adapter);
        return container;
    }

    @Bean
    public DirectExchange exchange() {
        return new DirectExchange(properties.getExchange());
    }

    @Bean
    public Queue queue() {
        //隊列持久
        return new Queue(properties.getQueue(), true);
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with(properties.getRoutingKey());
    }
}      

​Producer​

​類(用于釋出消息):

package com.kaven.springboot.rabbitmq;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.UUID;

@Component
public class Producer {

    private final RabbitTemplate rabbitTemplate;

    @Resource
    private RabbitMQProperties properties;

    @Autowired
    public Producer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendMsg(String msg) {
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setExpiration("10000");
        Message message = new Message(msg.getBytes(StandardCharsets.UTF_8), messageProperties);
        rabbitTemplate.send(properties.getExchange(), properties.getRoutingKey(), message, correlationId);
    }
}      

​ProducerController​

​類(用于釋出消息的接口):

package com.kaven.springboot.rabbitmq;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
public class ProducerController {

    @Resource
    private Producer producer;

    @GetMapping("/send")
    public String send(String msg) {
        producer.sendMsg(msg);
        return "發送消息成功";
    }
}      

啟動類:

package com.kaven.springboot;

import com.kaven.springboot.rabbitmq.RabbitMQProperties;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.ConfigurationPropertiesScan;

@SpringBootApplication
@ConfigurationPropertiesScan(basePackageClasses = {RabbitMQProperties.class})
public class SpringbootApplication {
    public static void main(String[] args) {
        SpringApplication application = new SpringApplication(SpringbootApplication.class);
        application.run(args);
    }
}      

啟動應用,請求接口。

RabbitMQ:MessageListenerAdapter消息監聽器擴充卡使用與源碼分析

控制台的輸出如下圖所示,說明消息監聽器擴充卡起作用了,​

​CustomizeMessageListener​

​​類的​

​customizeHandleMessage​

​方法被調用了。

RabbitMQ:MessageListenerAdapter消息監聽器擴充卡使用與源碼分析

下面這兩行代碼設定了消息監聽器擴充卡需要将消息監聽業務委托給哪個對象,以及需要調用該對象的哪個方法。

// 設定委托對象
        adapter.setDelegate(delegate);
        // 設定預設的消息監聽方法名稱
        adapter.setDefaultListenerMethod("customizeHandleMessage");      

還有其他方式來指定需要調用該對象的方法名稱。

Map<String, String> methodNameMap = new HashMap<>(8);
        methodNameMap.put(properties.getQueue(), "customizeHandleMessage");
        // 設定隊列名稱或消費者标簽到方法名稱的映射
        adapter.setQueueOrTagToMethodName(methodNameMap);      

​MessageListenerAdapter​

​本身就是一種消息監聽器。

RabbitMQ:MessageListenerAdapter消息監聽器擴充卡使用與源碼分析

隻不過它在消息監聽方法中将消息監聽業務委托給了指定對象的指定方法。

/**
   * 将消息委托給目标監聽器方法
   */
  @Override
  public void onMessage(Message message, Channel channel) throws Exception { // NOSONAR
    // 檢查委托是否是本身
    // 在這種情況下,擴充卡将僅充當傳遞
    Object delegateListener = getDelegate();
    if (!delegateListener.equals(this)) {
      if (delegateListener instanceof ChannelAwareMessageListener) {
        ((ChannelAwareMessageListener) delegateListener).onMessage(message, channel);
        return;
      }
      else if (delegateListener instanceof MessageListener) {
        ((MessageListener) delegateListener).onMessage(message);
        return;
      }
    }

    // 擷取處理方法
    Object convertedMessage = extractMessage(message);
    String methodName = getListenerMethodName(message, convertedMessage);
    if (methodName == null) {
      throw new AmqpIllegalStateException("No default listener method specified: "
          + "Either specify a non-null value for the 'defaultListenerMethod' property or "
          + "override the 'getListenerMethodName' method.");
    }

    // 使用适當的參數調用處理方法
    Object[] listenerArguments = buildListenerArguments(convertedMessage, channel, message);
    Object result = invokeListenerMethod(methodName, listenerArguments, message);
    if (result != null) {
      handleResult(new InvocationResult(result, null, null, null, null), message, channel);
    }
    else {
      logger.trace("No result object given - no result to handle");
    }
  }      

​MessageListenerAdapter​

​擷取需要調用指定對象的方法名稱源碼:

/**
   * 确定将處理給定消息的監聽器方法的名稱
   * 預設實作首先查詢queueOrTagToMethodName映射
   * 在消費者隊列或消費者标簽上尋找比對
   * 如果未找到比對項,則僅傳回設定的預設監聽器方法(defaultListenerMethod)
   * 如果未設定defaultListenerMethod,則傳回“handleMessage”(defaultListenerMethod的預設值)
   */
  protected String getListenerMethodName(Message originalMessage, Object extractedMessage) {
    if (this.queueOrTagToMethodName.size() > 0) {
      MessageProperties props = originalMessage.getMessageProperties();
      String methodName = this.queueOrTagToMethodName.get(props.getConsumerQueue());
      if (methodName == null) {
        methodName = this.queueOrTagToMethodName.get(props.getConsumerTag());
      }
      if (methodName != null) {
        return methodName;
      }
    }
    return getDefaultListenerMethod();
  }

  protected String getDefaultListenerMethod() {
    return this.defaultListenerMethod;
  }      

​MessageListenerAdapter​

​調用指定對象的方法傳入的參數:

Object[] listenerArguments = buildListenerArguments(convertedMessage, channel, message);      
protected Object[] buildListenerArguments(Object extractedMessage, Channel channel, Message message) {
    return new Object[] { extractedMessage };
  }      

​convertedMessage​

​​是​

​byte[]​

​​類型,而​

​listenerArguments​

​​是​

​Object[]​

​​類型,并且隻有一個數組元素,就是​

​convertedMessage​

​。

RabbitMQ:MessageListenerAdapter消息監聽器擴充卡使用與源碼分析
// 調用指定的監聽器方法
  protected Object invokeListenerMethod(String methodName, Object[] arguments, Message originalMessage) {
    try {
      MethodInvoker methodInvoker = new MethodInvoker();
      methodInvoker.setTargetObject(getDelegate());
      methodInvoker.setTargetMethod(methodName);
      methodInvoker.setArguments(arguments);
      methodInvoker.prepare();
      return methodInvoker.invoke();
    }
    catch (InvocationTargetException ex) {
      Throwable targetEx = ex.getTargetException();
      if (targetEx instanceof IOException) {
        throw new AmqpIOException((IOException) targetEx); // NOSONAR lost stack trace
      }
      else {
        throw new ListenerExecutionFailedException("Listener method '" // NOSONAR lost stack trace
            + methodName + "' threw exception", targetEx, originalMessage);
      }
    }
    catch (Exception ex) {
      ArrayList<String> arrayClass = new ArrayList<>();
      if (arguments != null) {
        for (Object argument : arguments) {
          arrayClass.add(argument.getClass().toString());
        }
      }
      throw new ListenerExecutionFailedException("Failed to invoke target method '" + methodName
          + "' with argument type = [" + StringUtils.collectionToCommaDelimitedString(arrayClass)
          + "], value = [" + ObjectUtils.nullSafeToString(arguments) + "]", ex, originalMessage);
    }
  }