天天看點

(需求實戰_01) SpringBoot2.x 整合RabbitMQ_生産端

文章目錄

  • ​​一、依賴配置引入​​
  • ​​1. 引入SpringBoot整合RabbitMQ依賴​​
  • ​​2. 生産者配置檔案​​
  • ​​3. 主配置​​
  • ​​二、代碼Conding​​
  • ​​2.1. 生産者代碼​​
  • ​​2.2. 實體對象​​
  • ​​2.3. 測試類​​

一、依賴配置引入

1. 引入SpringBoot整合RabbitMQ依賴

<!--springboot整合RabbitMQ依賴-->
       <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>      

2. 生産者配置檔案

#RabbitMQ 連接配接資訊
spring:
  rabbitmq:
    addresses: 127.0.0.1
    port: 5672
    username: admin
    password: admin
    #虛拟主機
    virtual-host: /admin
    #連接配接逾時時間
    connection-timeout: 15000
    ##開啟 confirm 确認機制
    #發送确認 對應RabbitTemplate.ConfirmCallback接口
    #消息發送成功 有2個重要參數
    # ack 狀态為true correlationId 全局唯一ID用于辨別每一支隊列
    publisher-confirms: true
    #開啟 return 确認機制
    publisher-returns: true
    #設定為 true 後 消費者在消息沒有被路由到合适隊列情況下會被return監聽,而不會自動删除
    #發送失敗回退,對應RabbitTemplate.ReturnCallback接口
    template:
      mandatory: true      

3. 主配置

package com.gblfy.springboot.config;

import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

@Configuration
@ComponentScan({"com.gblfy.springboot.*"})
public class MainConfig {
}      

二、代碼Conding

2.1. 生産者代碼

package com.gblfy.springboot.producer;

import com.gblfy.springboot.entity.Order;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
public class RabbitMQSender {

    //自動注入RabbitTemplate模闆類
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * MQ發送 字元串類型消息+額外的屬性
     *
     * @param message
     * @param properties
     * @throws Exception
     */
    //發送消息方法調用: 建構Message消息
    public void send(Object message, Map<String, Object> properties) throws Exception {
        //構造一個添加額外屬性的容器 儲存額外消息
        MessageHeaders mhs = new MessageHeaders(properties);
        Message msg = MessageBuilder.createMessage(message, mhs);

        //自動簽收
        rabbitTemplate.setConfirmCallback(confirmCallback);
        //消息确認
        rabbitTemplate.setReturnCallback(returnCallback);

        //id + 時間戳 全局唯一
        CorrelationData correlationData = new CorrelationData("1234567890");
        rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData);
    }

    /**
     * 發送MQ 對象類型消息
     *
     * @param order
     * @throws Exception
     */
    //發送消息方法調用: 建構自定義對象消息
    public void sendOrder(Order order) throws Exception {
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        //id + 時間戳 全局唯一
        CorrelationData correlationData = new CorrelationData("0987654321");
        rabbitTemplate.convertAndSend("exchange-2", "springboot.ff", order, correlationData);
    }

    //回調函數: confirm确認
    final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.err.println("correlationData: " + correlationData);
            System.err.println("ack: " + ack);
            if (!ack) {
                System.err.println("異常處理....");
            }
        }
    };
    //回調函數: return傳回
    final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
        @Override
        public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
                                    String exchange, String routingKey) {
            System.err.println("return exchange: " + exchange + ", routingKey: "
                    + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
        }
    };
}      

2.2. 實體對象

package com.gblfy.springboot.entity;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class Order implements Serializable {

    private String id;
    private String name;
}      

2.3. 測試類

package com.gblfy.springboot;

import com.gblfy.springboot.entity.Order;
import com.gblfy.springboot.producer.RabbitMQSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

@RunWith(SpringRunner.class)
@SpringBootTest
public class ProducerApplicationTests {

    @Test
    public void contextLoads() {
    }

    @Autowired
    private RabbitMQSender rabbitMQSender;

    private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

    @Test
    public void testSender1() throws Exception {
        Map<String, Object> properties = new HashMap<>();
        properties.put("number", "12345");
        properties.put("send_time", simpleDateFormat.format(new Date()));
        rabbitMQSender.send("Hello RabbitMQ For Spring Boot!", properties);
    }

    @Test
    public void testSender2() throws Exception {
        Order order = new Order("001", "第一個訂單");
        rabbitMQSender.sendOrder(order);
    }
}