天天看點

rabbitmq實戰 - 消息确認機制:Spring整合實戰

【服務端确認】

1.确認message成功發送到交換器:exchange

2.确認message成功從exchange發送到了隊列:queue

# 導入依賴

<!-- spring-rabbit -->
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>1.7.14.RELEASE</version>
    <exclusions>
        <exclusion>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<!-- amqp-client -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.12.0</version>
</dependency>
           

# 消息提供者配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:p="http://www.springframework.org/schema/p"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:aop="http://www.springframework.org/schema/aop"
       xmlns:tx="http://www.springframework.org/schema/tx"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
              https://www.springframework.org/schema/rabbit/spring-rabbit.xsd
              http://www.springframework.org/schema/beans
             https://www.springframework.org/schema/beans/spring-beans-4.3.xsd
             http://www.springframework.org/schema/context
             https://www.springframework.org/schema/context/spring-context-4.3.xsd
              http://www.springframework.org/schema/aop
              https://www.springframework.org/schema/aop/spring-aop-4.3.xsd
              http://www.springframework.org/schema/tx
              https://www.springframework.org/schema/tx/spring-tx-4.3.xsd">

    <context:component-scan base-package="com.learn" />

    <bean id="msgConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />


    <!-- 1.定義連接配接工廠 publisher-confirms="true" 表示使用了消息确認,此時不可以再使用事務 -->
    <rabbit:connection-factory id="connectionFactory"
                               host="192.168.199.237"
                               port="5672"
                               username="app_client"
                               password="client123"
                               virtual-host="/app"
                               publisher-confirms="true"
                               publisher-returns="true" />

    <!-- 配置消息發送确認回調方法-->
    <bean id="confirmCallback" class="com.learn.config.MsgSendConfirmCallback"></bean>

    <!-- 2.定義 rabbitmq 的模闆 -->
    <rabbit:template id="template"
                     message-converter="msgConverter"
                     connection-factory="connectionFactory"
                     exchange="my_direct_exchange" reply-timeout="2000"
                     routing-key="my.binding"
                     confirm-callback="confirmCallback"
                     return-callback="confirmCallback" 
                     mandatory="true" />

    <rabbit:admin connection-factory="connectionFactory" auto-startup="true"/>

    <!-- 1.定義隊列 -->
    <rabbit:queue id="my_queue"
                  name="my_queue"
                  auto-declare="true"
                  durable="true"
                  auto-delete="false"
                  exclusive="false"/>

    <!-- 2.定義交換機 -->
    <rabbit:direct-exchange id="my_direct_exchange"
                            name="my_direct_exchange"
                            auto-declare="true"
                            durable="true"
                            auto-delete="true">
        <rabbit:bindings>
            <rabbit:binding queue="my_queue" key="my.binding" />
        </rabbit:bindings>
    </rabbit:direct-exchange>


</beans>

           

# 消息确認回調

package com.learn.config;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;

import javax.annotation.PostConstruct;

public class MsgSendConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
        rabbitTemplate.setMandatory(true);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if(ack) {
            System.out.println("消息發送成功: " + correlationData);
        } else {
            System.out.println("消息發送失敗,id= " + correlationData + " 原因:" + cause);
        }

    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("returned message: " + new String(message.getBody()));
        System.out.println("應答碼: " + replyCode);
        System.out.println("描述:" + replyText);
        System.out.println("消息使用的交換器 exchange : " + exchange);
        System.out.println("消息使用的路由鍵 routing : " + routingKey);
    }
    

}
           

【消費者端确認】

# 消息消費者配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:p="http://www.springframework.org/schema/p"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:aop="http://www.springframework.org/schema/aop"
       xmlns:tx="http://www.springframework.org/schema/tx"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
              https://www.springframework.org/schema/rabbit/spring-rabbit.xsd
              http://www.springframework.org/schema/beans
             https://www.springframework.org/schema/beans/spring-beans-4.3.xsd
             http://www.springframework.org/schema/context
             https://www.springframework.org/schema/context/spring-context-4.3.xsd
              http://www.springframework.org/schema/aop
              https://www.springframework.org/schema/aop/spring-aop-4.3.xsd
              http://www.springframework.org/schema/tx
              https://www.springframework.org/schema/tx/spring-tx-4.3.xsd">

    <context:component-scan base-package="com.learn" />

    <!-- 1.配置連接配接工廠 -->
    <rabbit:connection-factory id="connectionFactory"
                               host="192.168.199.237"
                               port="5672"
                               username="app_client"
                               password="client123"
                               virtual-host="/app"
                               connection-timeout="3000"
                               channel-cache-size="5"/>

    <!-- 2. rabbitmq的管理:包括隊列、交換機的聲明 -->
    <rabbit:admin connection-factory="connectionFactory" />

    <!-- 3.配置隊列 -->
    <rabbit:queue name="my_queue" auto-declare="true" auto-delete="false" exclusive="false" durable="true"/>

    <!-- 4.定義交換機 + 綁定隊列 + 指定比對條件表達式(bindingkey) -->
    <rabbit:direct-exchange name="my_direct_exchange" auto-declare="true" durable="true" auto-delete="false">
        <rabbit:bindings>
            <rabbit:binding queue="my_queue" key="my.queue.key" />
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!-- 5.監聽生産者發送的消息,接收消息的處理類 -->
    <bean id="confirmListener" class="com.learn.listeners.ConsumeConfirmListener" />

    <!-- 6.監聽容器類,當收到消息時,會執行内部的配置 -->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
        <!-- 指明消費者,監聽的隊列對象,以及接收到消息的處理方法 -->
        <rabbit:listener ref="confirmListener" method="onMessage" queue-names="my_queue"/>
    </rabbit:listener-container>


</beans>

           

# 監聽器 - 開啟消費端手動确認

package com.learn.listeners;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * 消費端 - 手動确認
 */
public class ConsumeConfirmListener implements ChannelAwareMessageListener {

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        try {
            System.out.println("consumer received msg: " + new String(message.getBody()) );
            // 手動确認:ack
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            e.printStackTrace();
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
        }
        
    }
    
}