天天看點

SpringBoot使用RabbitMQ(二)

如未看第一章的建議先看第一章基本用法: SpringBoot使用RabbitMQ ,這裡講解SpringBoot使用RabbitMQ進行有回調的用法和消費者端手動簽收消息的用法。

一、yml檔案配置

server:
  port: 8080
spring:
  application:
    name: rabbit-confirm
  rabbitmq:
    template:
      # 使用return-callback時必須設定mandatory為true
      mandatory: true
    # 消息發送到交換機确認機制,是否确認回調
    publisher-confirms: true
    # 消息發送到交換機确認機制,是否傳回回調
    publisher-returns: true
    listener:
      simple:
        # 并發消費者初始化值
        concurrency: 5
        # 最大值
        max-concurrency: 10
        # 每個消費者每次監聽時可拉取處理的消息數量
        prefetch: 20
        # 确認模式設定為手動簽收
        acknowledge-mode: manual           

二、定義配置類

/**
 * @author Gjing
 **/
@Configuration
public class ConfirmConfiguration {

    /**
     * 聲明confirm.message隊列
     */
    @Bean
    public Queue confirmQueue() {
        return new Queue("confirm.message");
    }

    /**
     * 聲明一個名為exchange-2的交換機
     */
    @Bean
    public TopicExchange exchange2() {
        return new TopicExchange("exchange-2");
    }

    /**
     * 将confirm.message的隊列綁定到exchange-2交換機
     */
    @Bean
    public Binding bindMessage1() {
        return BindingBuilder.bind(confirmQueue()).to(exchange2()).with("confirm.message");
    }
}           

三、定義生産者

/**
 * @author Gjing
 **/
@Component
@Slf4j
public class ConfirmProducer {
    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 如果消息沒有到exchange,則confirm回調,ack=false
     * 如果消息到達exchange,則confirm回調,ack=true
     * exchange到queue成功,則不回調return
     * exchange到queue失敗,則回調return(需設定mandatory=true,否則不回回調,消息就丢了)
     */
    private final RabbitTemplate.ConfirmCallback confirmCallback = (correlationData, ack, cause) -> {
        if (!ack) {
            log.error("消息發送失敗:correlationData: {},cause: {}", correlationData, cause);
        }else {
            log.info("消息發送成功:correlationData: {},ack: {}", correlationData, ack);
        }
    };

    private final RabbitTemplate.ReturnCallback returnCallback = (message, replyCode, replyText, exchange, routeKey) ->
            log.error("消息丢失: exchange: {},routeKey: {},replyCode: {},replyText: {}", exchange, routeKey, replyCode, replyText);

    /**
     * 發送消息
     * @param message 消息内容
     */
    public void send(String message) {
        // 建構回調傳回的資料
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(TimeUtil.localDateTimeToStamp(LocalDateTime.now()) + "");

        Message message1 = MessageBuilder.withBody(message.toString().getBytes())
                .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
                // 将CorrelationData的id 與 Message的correlationId綁定,然後關系儲存起來,然後人工處理
                .setCorrelationId(correlationData.getId())
                .build();
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        rabbitTemplate.convertAndSend("exchange-2", "confirm.message", message1, correlationData);
    }
}           

四、定義消費者

/**
 * @author Gjing
 **/
@Component
@Slf4j
public class ConfirmConsumer {

    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "confirm.message",durable = "true")
            ,exchange = @Exchange(value = "exchange-2",type = "topic")
            ,key = "confirm.message"))
    public void receive(String message, Message message1, Channel channel) throws IOException {
        log.info("消費者收到消息:{}", message);
        long deliverTag = message1.getMessageProperties().getDeliveryTag();
        //第一個deliveryTag參數為每條資訊帶有的tag值,第二個multiple參數為布爾類型
        //為true時會将小于等于此次tag的所有消息都确認掉,如果為false則隻确認目前tag的資訊,可根據實際情況進行選擇。
        channel.basicAck(deliverTag, false);
    }
}           

五、建立controller調用

/**
 * @author Gjing
 **/
@RestController
public class ConfirmController {
    @Resource
    private ConfirmProducer confirmProducer;

    @PostMapping("/confirm-message")
    public void confirmMessage() {
        confirmProducer.send("hello confirm message");
    }
}           

六、執行結果

SpringBoot使用RabbitMQ(二)

以上為個人了解,如有誤歡迎各位指正