天天看點

rabbitTemplate rabbitmq 釋出模式與消息确認rabbitTemplate rabbitmq 釋出模式與消息确認人工确認消息發送和消費成功

rabbitTemplate rabbitmq 釋出模式與消息确認

2、組成

  • 生産者
  • 消息隊列
  • 消費者
  • 交換機:
    • 隔離生産者和消息隊列,充當二者中間體。
    • 接受相應的消息并綁定到指定隊列

3、三種釋出模式

  • 根據交換機類型不同,分為3種:
    • Direct<直接>:1對1-----一個消息隻能被一個消費者消費
    • Topic<主題>:1對多-----一個消息可以被多個消費者消費**(糾正:一個消息可被多個隊列接收,若多個消費者監聽同一個隊列,會以輪詢方式被多個消費者接收,本質依然是1對1,類Direct。詳述見下方評論!)**

      将路由和某個模式比對,# 比對一個或者多個,* 比對一個。例如 Good.insert Good.delete,則Good.#都能獲得

    • Fanout<分列>:廣播

4、Direct釋出模式

  • 核心依賴及application.properties(除端口外不變)

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
           

配置檔案

spring.rabbitmq.host=101.132.43.162
spring.rabbitmq.port=5672
spring.rabbitmq.username=rebbitmq
spring.rabbitmq.password=3321
           
  • 生産者工程

    • 4.1、建立消息隊列

兩種方式:代碼建立

/**
     * 模式為direct,直接發送到對應的消息隊列
     * @return
     */
    @Bean
    public Queue directQueue(){
        return new Queue("direct");
    }

           

(2)RabbitMQ管理界面手動添加隊列

rabbitTemplate rabbitmq 釋出模式與消息确認rabbitTemplate rabbitmq 釋出模式與消息确認人工确認消息發送和消費成功
  • 4.2、發送端

API:amqpTemplate.convertAndSend(“隊列名”,“消息内容”)

此處隊列名必須與建立的隊列一緻。

public void send(String str) {
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setMessageId(UUID.randomUUID().toString());
    Message message = new Message(str.getBytes(), messageProperties);
    rabbitTemplate.convertAndSend("direct",message);
}
           
  • 4.3、測試類
  • 消費者工程

  • 4.1、建立消息隊列

同上:略

  • 4.2、接收端

@RabbitListener(queues = “direct”):監聽器監聽指定隊列

@RabbitHandler
@RabbitListener(queues = "direct")
public void Handler(Message message, Channel channel) throws IOException {
    String messageId = message.getMessageProperties().getMessageId();
    System.out.println("消息編号:"+messageId);
    String s = new String(message.getBody(), StandardCharsets.UTF_8);
    System.out.println("消息消費:"+s);
    try {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    }catch (Exception e){
        channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
    }

}
           

5、Topic釋出模式

  • 生産者工程

  • 5.1、建立消息隊列

API:BindingBuilder.bind(指定隊列).to(交換機).with(路由鍵);

路由鍵相當于隊列名

@Bean(name = "message")
public Queue queue(){
    return  new Queue("topic.message");
}
@Bean(name="messages")
public Queue queue1(){
    return new Queue("topic.messages");
}

@Bean
public TopicExchange topicExchange(){
    return new TopicExchange("exchange");
}
@Bean
public FanoutExchange fanoutExchange(){
    return new FanoutExchange("fanoutExchange");
}

@Bean
public Binding binding(@Qualifier("message") Queue message, TopicExchange topicExchange){
    return BindingBuilder.bind(message).to(topicExchange).with("topic.message");
}


@Bean
public Binding binding1(@Qualifier("messages") Queue queue,TopicExchange topicExchange){
    return BindingBuilder.bind(queue).to(topicExchange).with("topic.#");
}
           

![img](mq使用.assets/20190312154751707.png

  • 5.2、發送端:

API:amqpTemplate.convertAndSend(“交換機名”,“路由鍵”,“消息内容”)

RabbitMQ将會根據第二個參數去尋找有沒有比對此規則的隊列,如果有,則把消息給它,如果有不止一個,則把消息分發給比對的隊列(每個隊列都有消息!)

@Override
public void send1(String str){
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setMessageId(UUID.randomUUID().toString());
    Message message = new Message(str.getBytes(), messageProperties);
    rabbitTemplate.convertAndSend("exchange","topic.message",message);
}
           
  • 5.3、測試類
  • 消費者工程

@RabbitHandler
@RabbitListener(queues = "topic.message")
public void Handler1(Message message,Channel channel) throws IOException {
    String messageId = message.getMessageProperties().getMessageId();
    System.out.println("topic.message消息編号" + messageId);
    String str = new String(message.getBody(), StandardCharsets.UTF_8);
    System.out.println("topic.message消息體" + str);
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}

@RabbitHandler
@RabbitListener(queues = "topic.messages")
public void Handler2(Message message,Channel channel) throws IOException {
    String messageId = message.getMessageProperties().getMessageId();
    System.out.println("topic.messages消息編号" + messageId);
    String str = new String(message.getBody(), StandardCharsets.UTF_8);
    System.out.println("topic.messages消息體" + str);
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
           

注意:topic模式中隻有消費端監聽的路由鍵符合發送端的路由規則(路由鍵決定)的隊列才會收到消息。

6、Fanout釋出模式

  • 廣播:發送到路由器的消息會使得綁定到該路由器的每一個Queue接收到消息,這個時候就算指定了Key,或者規則(即上文中convertAndSend方法的參數2),也會被忽略!
  • 交換機類型:FanoutExchange
  • API:amqpTemplate.convertAndSend(“交換機名”,“ ”,“消息内容”);//路由鍵被忽略
  • 消費端:隻要是綁定到該交換機上的都能收到消息。

    在配置類中增加

@Bean(name = "fanoutMessage")
public Queue queueC(){
    return  new Queue("fanoutMessage");
}

   @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanoutExchange");
    }

  @Bean
    public Binding binding2(@Qualifier("fanoutMessage") Queue queue,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }
           

生産者服務

@Override
public void send2(String str){
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setMessageId(UUID.randomUUID().toString());
    Message message = new Message(str.getBytes(), messageProperties);
    rabbitTemplate.convertAndSend("fanoutExchange","",message);
}
           

消費者服務

@RabbitHandler
@RabbitListener(queues = "fanoutMessage")
public void Handler3(Message message,Channel channel) throws IOException {
    String messageId = message.getMessageProperties().getMessageId();
    System.out.println("fanoutMessage消息編号" + messageId);
    String str = new String(message.getBody(), StandardCharsets.UTF_8);
    System.out.println("fanoutMessage消息體" + str);
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
           

人工确認消息發送和消費成功

在application.properties中增加配置

#設定消息确認成功确認後進行回調
#NONE值是禁用釋出确認模式,是預設值
#CORRELATED值是釋出消息成功到交換器後會觸發回調方法,如1示例
#SIMPLE值經測試有兩種效果,其一效果和CORRELATED值一樣會觸發回調方法,其二在釋出消息成功後使用rabbitTemplate調用waitForConfirms或waitForConfirmsOrDie方法等待broker節點傳回發送結果,根據傳回結果來判定下一步的邏輯,要注意的點是waitForConfirmsOrDie方法如果傳回false則會關閉channel,則接下來無法發送消息到broker;
#調用confirmCallback接口
spring.rabbitmq.publisher-confirm-type=correlated
#設定成功到達路由器後不會調用returnsCallback 方法,否則則會調用returnsCallback
spring.rabbitmq.publisher-returns=true

           

消息隊列配置

package cn.atiaozao.springbootmqprovider.config;

import com.rabbitmq.client.AMQP;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MqConfig {

    /**
     * 模式為direct,直接發送到對應的消息隊列
     * @return
     */
    @Bean
    public Queue directQueue(){
        return new Queue("direct");
    }
    
    @Bean(name = "message")
    public Queue queue(){
        return  new Queue("topic.message");
    }
      @Bean(name="messages")
    public Queue queue1(){
        return new Queue("topic.messages");
    }
    
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("exchange");
    }
 
  //消息綁定
    @Bean
    public Binding binding(@Qualifier("message") Queue message, TopicExchange topicExchange){
        return BindingBuilder.bind(message).to(topicExchange).with("topic.message");
    }
		
	//表達式綁定
    @Bean
    public Binding binding1(@Qualifier("messages") Queue queue,TopicExchange topicExchange){
        return BindingBuilder.bind(queue).to(topicExchange).with("topic.#");
    }
}
           

消息服務方法

package cn.atiaozao.springbootmqprovider.service.impl;

import cn.atiaozao.springbootmqprovider.service.RabbitMqService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.UUID;

@Component
public class RabiitmqServiceImpl implements RabbitMqService {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private ConfirmMessageImpl confirmMessageImpl;

    /**
     * 使用@PostConstruct 在注入完成之後進行設定消息push 确認的回調方法
     *
     */
    @PostConstruct
    public void init(){
        //消息成功到達指定隊列的回調方法
        rabbitTemplate.setConfirmCallback(confirmMessageImpl);
        //消息如果未成功到達隊列的
        rabbitTemplate.setReturnCallback(confirmMessageImpl);
    }
    @Override

    public void send(String str) {
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setMessageId(UUID.randomUUID().toString());
        Message message = new Message(str.getBytes(), messageProperties);
        rabbitTemplate.convertAndSend("direct",message);
    }
    @Override
    public void send1(String str){
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setMessageId(UUID.randomUUID().toString());
        Message message = new Message(str.getBytes(), messageProperties);
        rabbitTemplate.convertAndSend("exchange","topic.message",message);
    }

    @Override
    public void send2(String str){
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setMessageId(UUID.randomUUID().toString());
        Message message = new Message(str.getBytes(), messageProperties);
        rabbitTemplate.convertAndSend("exchange","",message);
    }
}
           

消息成功到達路由器後的确認回調方法

package cn.atiaozao.springbootmqprovider.service.impl;

import lombok.extern.java.Log;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Log
@Component
public class ConfirmMessageImpl implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        System.out.println("消息确認成功"+b+"|傳回錯誤消息"+s+"|附帶資料"+correlationData);
    }


    @Override
    public void returnedMessage(Message message, int i, String s, String s1, String s2) {
        System.out.println("消息發送失敗|"+message.getBody());
    }
}
           

消息消費服務

#設定消息隊列使用手工确認的方式
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.direct.acknowledge-mode=manual
           

消息消費監聽

package cn.atiaozao.springbootmqconsumer.consumer;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.logging.Handler;


@Component
public class DirectConsumerHander {

    @RabbitHandler
    @RabbitListener(queues = "direct")
    public void Handler(Message message, Channel channel) throws IOException {
        String messageId = message.getMessageProperties().getMessageId();
        System.out.println("消息編号:"+messageId);
        String s = new String(message.getBody(), StandardCharsets.UTF_8);
        System.out.println("消息消費:"+s);
        try {
            //消息人工确認
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
        }catch (Exception e){
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
        }

    }
    @RabbitHandler
    @RabbitListener(queues = "topic.message")
    public void Handler1(Message message,Channel channel) throws IOException {
        String messageId = message.getMessageProperties().getMessageId();
        System.out.println("topic.message消息編号" + messageId);
        String str = new String(message.getBody(), StandardCharsets.UTF_8);
        System.out.println("topic.message消息體" + str);
        //消息人工确認
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    }

    @RabbitHandler
    @RabbitListener(queues = "topic.messages")
    public void Handler2(Message message,Channel channel) throws IOException {
        String messageId = message.getMessageProperties().getMessageId();
        System.out.println("topic.messages消息編号" + messageId);
        String str = new String(message.getBody(), StandardCharsets.UTF_8);
        System.out.println("topic.messages消息體" + str);
        //消息人工确認
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    }

}