天天看點

RabbitMQ:@RabbitListener注解簡化消息監聽

​pom.xml​

​:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.2</version>
    </parent>

    <packaging>jar</packaging>

    <groupId>com.kaven</groupId>
    <artifactId>springboot</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <name>springboot</name>
    <description>springboot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>      

​application.yml​

​:

spring:
  rabbitmq:
    addresses: 192.168.1.9:5672
    username: admin
    password: admin      

​User​

​類(消息負載的實體類):

package com.kaven.springboot.rabbitmq;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

@Setter
@Getter
@ToString
@AllArgsConstructor
public class User {
    private String username;
    private String password;
    private String code;
}      

​Json2UserMessageConverter​

​​類(消息轉換器,将​

​json​

​​資料轉換成​

​User​

​​對象,​

​json​

​​資料由消息體的​

​byte[]​

​生成):

package com.kaven.springboot.rabbitmq;

import com.google.gson.Gson;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;

@Component("json2UserMessageConverter")
public class Json2UserMessageConverter implements MessageConverter {

    private static final Gson GSON = new Gson();

    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        return new Message(GSON.toJson(object).getBytes(StandardCharsets.UTF_8));
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        return GSON.fromJson(new String(message.getBody(), StandardCharsets.UTF_8), User.class);
    }
}      

​Consumer​

​​類(消息監聽,使用​

​@RabbitListener​

​注解簡化消息監聽):

package com.kaven.springboot.rabbitmq;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Component
public class Consumer {

    @RabbitListener(
            bindings = {
                    @QueueBinding(
                            value = @Queue("queue.user"),
                            exchange = @Exchange(value = "exchange.user", type = ExchangeTypes.TOPIC),
                            key = {"*.user"}
                    )
            },
            messageConverter = "json2UserMessageConverter"

    )
    public void process(User user) {
        System.out.println("Consumer - process 接收消息: " + user);
    }
}      

​Producer​

​類(用于釋出消息):

package com.kaven.springboot.rabbitmq;

import com.google.gson.Gson;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.UUID;

@Component
public class Producer {

    private static final Gson GSON = new Gson();

    @Resource
    private final RabbitTemplate rabbitTemplate;

    public Producer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendMsg(User user) {
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setExpiration("10000");
        Message message = new Message(GSON.toJson(user).getBytes(StandardCharsets.UTF_8), messageProperties);
        rabbitTemplate.send("exchange.user", "new.user", message, correlationId);
    }
}      

​ProducerController​

​類(用于釋出消息的接口):

package com.kaven.springboot.rabbitmq;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
public class ProducerController {

    @Resource
    private Producer producer;

    @GetMapping("/send")
    public String send(User user) {
        producer.sendMsg(user);
        return "資料發送成功";
    }
}      

啟動類:

package com.kaven.springboot;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringbootApplication {
    public static void main(String[] args) {
        SpringApplication application = new SpringApplication(SpringbootApplication.class);
        application.run(args);
    }
}      

啟動應用,使用​

​Postman​

​請求接口。

RabbitMQ:@RabbitListener注解簡化消息監聽
Consumer - process 接收消息: User(username=kaven, password=itkaven, code=908899)