天天看点

学习消息队列RabbitMQ

文章目录

    • 一、简介
    • 二、导入RabbitMQ的依赖包:
    • 三、RabbitMQ几种使用方式
      • 1、Hello World
        • 添加发布者代码:
        • 添加接收者代码:
      • 2、工作队列(work queue)
        • 发布者代码:
        • 消费者1代码:
        • 消费者2代码:
      • 3、订阅模式:
        • (1)订阅之Fanout模型
          • 生产者代码:
          • 消费者1代码:
          • 消费者2代码:
        • (2)订阅之Direct模型:
          • 发布者代码:
          • 消费者代码:
        • (3)订阅之Topic模型:
          • 发布者代码:
          • 消费者代码:

一、简介

RabbitMQ是一个消息代理。它的核心思想非常简单:接收并转发消息。你可以把它想象成一个邮局:当你把邮件丢进邮箱时,你非常确定邮递员先生会把它送到收件人手中。在这个比喻中,RabbitMQ就是邮箱、邮局和邮递员。

RabbitMQ和邮局的主要区别是它处理的不是纸张。它接收、存储并转发二进制数据块,也就是message,消息。

RabbitMQ官网: https://www.rabbitmq.com/

RabbitMQ官方教程: https://www.rabbitmq.com/#getstarted

学习教程:https://www.rabbitmq.com/tutorials/tutorial-one-python.html

RabbitMQ常用的交换器类型有direct、topic、fanout三种。

二、导入RabbitMQ的依赖包:

<!--消息队列RabbitMQ的依赖包-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit-test</artifactId>
    <scope>test</scope>
</dependency>
           

三、RabbitMQ几种使用方式

1、Hello World

学习消息队列RabbitMQ

P(producer/ publisher):生产者,发送消息的服务

C(consumer):消费者,接收消息的服务

红色区域就是MQ中的Queue,可以把它理解成一个邮箱

  • 首先信件来了不强求必须马上马去拿
  • 其次,它是有最大容量的(受主机和磁盘的限制,是一个缓存区)
  • 允许多个消费者监听同一个队列,争抢消息

添加发布者代码:

@Slf4j
public class RabbitMQProducer {
    private final static String QUEEN_NAME = "Hello World";
    private final static String MESSAGE = "第一个RabbitMQ";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");//设置主机名
        try {
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEEN_NAME, false, false, false, null);
            channel.basicPublish("", QUEEN_NAME, null, MESSAGE.getBytes());
            log.info("发送消息:'{}'", MESSAGE);
            channel.close();
            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }

}
           

添加接收者代码:

@Slf4j
public class RabbitMQConsumer {
    private final static String QUEEN_NAME = "Hello World";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try {
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEEN_NAME, false, false, false, null);
            /**
             *  @Author: chen
             *  @Date: 2021/8/17 17:28
             *  @Description: 第一种方式利用匿名类创建
             */
//            Consumer consumer = new DefaultConsumer(channel){
//                @Override
//                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                    String message = new String(body, "UTF-8");
//                    log.info("消费者收到消息:'{}'", message);
//                }
//            };
//            channel.basicConsume(QUEEN_NAME, true, consumer);
            /**
             *  @Author: chen
             *  @Date: 2021/8/17 17:28
             *  @Description: 第二种方式利用Lambda创建
             */
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                log.info("消费者收到消息:'{}'", message);
            };
            channel.basicConsume(QUEEN_NAME, true, deliverCallback, consumerTag -> {});
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }

    }
}
           

发送者会通过RabbitMQ发送一条信息,而接收者会把它打印出来。接收者将会一直运行,等待接收消息。

布置成功之后就可以在浏览器输入:http://服务器的ip:15672/ 可以找到登陆入口。

2、工作队列(work queue)

学习消息队列RabbitMQ

Worker模型中也只有一个工作队列。但它是一种竞争消费模式。可以看到同一个队列我们绑定上了多个消费者,消费者争抢着消费消息,这可以有效的避免消息堆积。

比如对于短信微服务集群来说就可以使用这种消息模型,来了请求大家抢着消费掉。

如何实现这种架构:对于上面的HelloWorld这其实就是相同的服务我们启动了多次罢了,自然就是这种架构。

发布者代码:

/**
 *  @Author: chen
 *  @Date: 2021/8/18 10:53
 *  @Description: 学习地址:https://www.rabbitmq.com/tutorials/tutorial-two-java.html
 */
public class NewTask {
    //定义一个消息队列的名称
    private static final String QUEEN_NAME = "work";

    public static void main(String[] args) {
        //开启连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置主机名称或IP
        factory.setHost("localhost");
        try {
            //开启连接,抛出异常
            Connection connection = factory.newConnection();
            //开启连接通道
            Channel channel = connection.createChannel();
            //设置队列参数
            boolean autoAck = true; //消息确认与持久性,消费者发回确认消息,告诉 RabbitMQ 特定消息已被接收、处理,并且 RabbitMQ 可以自由删除它。
            channel.queueDeclare(QUEEN_NAME, autoAck, false, false, null);
            //发布消息
            String []msg = {"one", "two", "three", "four", "five", "six", "seven", "eight", "night", "ten"};
            String message = String.join("-", msg);//从控制台编译
            System.out.println(message);
            //我们需要将我们的消息标记为持久性 - 通过将MessageProperties(实现BasicProperties)设置为值PERSISTENT_TEXT_PLAIN
            channel.basicPublish("", QUEEN_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
            //关闭通道
            channel.close();
            //关闭连接
            connection.close();
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}
           

消费者1代码:

@Slf4j
public class WorkConsumer {
    //定义消息队列名称
    private static final String QUEEN_NAME = "work";

    public static void main(String[] args) {
        //开启连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置主机名称或IP
        factory.setHost("localhost");
        try {
            //开启连接,抛出异常
            Connection connection = factory.newConnection();
            //开启连接通道
            Channel channel = connection.createChannel();
            //设置队列参数
            boolean autoAck = true; //消息确认与持久性,消费者发回确认消息,告诉 RabbitMQ 特定消息已被接收、处理,并且 RabbitMQ 可以自由删除它。
            channel.queueDeclare(QUEEN_NAME, autoAck, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

            channel.basicQos(1);    //公平调度,一次只接受一条未确认的消息

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println("接收到的信息:" + message);
                try {
                    //do somethings
                    doWork(message);
                } finally {
                    System.out.println(" [x] Done");
                    //当您的客户端退出时,消息将被重新传送(这可能看起来像随机重新传送),但 RabbitMQ 将消耗越来越多的内存,因为它无法释放任何未确认的消息。所以利用basicAck()必须在接收交付的同一通道上发送确认
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);//确认消息,设置为false
                }
            };
            channel.basicConsume(QUEEN_NAME, false, deliverCallback, consumerTag -> {});
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }

    //模拟执行时间的假任务
    private static void doWork(String message) {
        int count = 0;
        for (char ch
        : message.toCharArray()){
            //遇到'-'符号耗时1s
            if (ch == '-'){
                try {
                    log.info("等待时间:{}秒" , ++count);
                    Thread.sleep(1000);
                } catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
        }
    }
}
           

消费者2代码:

@Slf4j
public class WorkConsumer2 {
    //定义消息队列名称
    private static final String QUEEN_NAME = "work";

    public static void main(String[] args) {
        //开启连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置主机名称或IP
        factory.setHost("localhost");
        try {
            //开启连接,抛出异常
            Connection connection = factory.newConnection();
            //开启连接通道
            Channel channel = connection.createChannel();
            //设置队列参数
            boolean autoAck = true; //消息确认与持久性,消费者发回确认消息,告诉 RabbitMQ 特定消息已被接收、处理,并且 RabbitMQ 可以自由删除它。
            channel.queueDeclare(QUEEN_NAME, autoAck, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

            channel.basicQos(1);    //公平调度,一次只接受一条未确认的消息

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println("接收到的信息:" + message);
                try {
                    //do somethings
                    doWork(message);
                } finally {
                    System.out.println(" [x] Done");
                    //当您的客户端退出时,消息将被重新传送(这可能看起来像随机重新传送),但 RabbitMQ 将消耗越来越多的内存,因为它无法释放任何未确认的消息。所以利用basicAck()必须在接收交付的同一通道上发送确认
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);//确认消息,设置为false
                }
            };
            channel.basicConsume(QUEEN_NAME, false, deliverCallback, consumerTag -> {});
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }

    //模拟执行时间的假任务
    private static void doWork(String message) {
        int count = 0;
        for (char ch
                : message.toCharArray()){
            //遇到'-'符号耗时1s
            if (ch == '-'){
                try {
                    log.info("工人做这项工作时间:{}秒" , ++count);
                    Thread.sleep(1000);
                } catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
        }
    }
}
           

总结:在工作队列当中,假如将消费者比作工人,队列表示工人需要被分派的任务,那么当工人没有完成一项工作的时候,是不会重新获取其他任务,同时如果因为某个原因导致任务中断,那么RabbitMQ就不会将这项任务在消息队列中去除,而是会被分派给空闲的工人,直到这项任务被完成,也可称为消息应答或消息确认。

3、订阅模式:

订阅模型借助一个新的概念:Exchange(交换机)实现,不同的订阅模型本质上是根据交换机(Exchange)的类型划分的。

订阅模型有三种

Fanout(广播模型): 将消息发送给绑定给交换机的所有队列(因为他们使用的是同一个RoutingKey)。

Direct(定向): 把消息发送给拥有指定Routing Key (路由键)的队列。

Topic(通配符): 把消息传递给拥有 符合Routing Patten(路由模式)的队列。

(1)订阅之Fanout模型

这个模型的特点就是它在发送消息的时候,并没有指明Rounting Key , 或者说他指定了Routing Key,但是所有的消费者都知道,大家都能接收到消息,就像听广播。

临时队列:

在 Java 客户端中,当我们不向queueDeclare()提供参数时, 我们会创建一个具有生成名称的非持久、独占、自动删除队列:

此时queueName包含一个随机队列名称。例如,它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg。

绑定:

学习消息队列RabbitMQ

我们已经创建了一个fanout交换和一个队列。现在我们需要告诉交换器向我们的队列发送消息。交换和队列之间的这种关系称为绑定。

从现在开始,logs交换会将消息附加到我们的队列中。

学习消息队列RabbitMQ

发出日志消息的生产者程序与之前的教程看起来没有太大区别。最重要的变化是我们现在想要将消息发布到我们的logs交换而不是无名的交换。我们需要在发送时提供一个routingKey,但它的值在fanout交换时被忽略。

生产者代码:
/**
 *  @Author: chen
 *  @Date: 2021/8/18 15:02
 *  @Description: RabbitMQ订阅模型:Fanout
 *  学习地址:https://www.rabbitmq.com/tutorials/tutorial-three-java.html
 */
public class EmitLog {
    //创建交换机名称
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置主机名或者IP
        factory.setHost("localhost");
        try {
            //创建连接
            Connection connection = factory.newConnection();
            //开启连接通道
            Channel channel = connection.createChannel();
            //定义交换机类型为Fanout
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            //设置发布的消息
            String []msg = {"one", "two", "three", "four", "five", "six", "seven", "eight", "night", "ten"};
            for (int i = 0; i < 10; i++) {
                channel.basicPublish(EXCHANGE_NAME, "", null, msg[i].getBytes("UTF-8"));
            }
            System.out.println(msg);
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }

    }
}
           
消费者1代码:
public class LogsConsumer {
    //创建交换机名称
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置主机名或IP
        factory.setHost("localhost");
        try {
            //创建连接
            Connection connection = factory.newConnection();
            //开启通道
            Channel channel = connection.createChannel();
            //定义交换机名称
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            //创建随机队列
            String queueName = channel.queueDeclare().getQueue();
            //绑定随机队列与交换机
            channel.queueBind(queueName, EXCHANGE_NAME, "");

            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message =  new String(delivery.getBody(),"UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            };
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});

        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}
           
消费者2代码:
public class LogsConsumer2 {
    //创建交换机名称
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置主机名或IP
        factory.setHost("localhost");
        try {
            //创建连接
            Connection connection = factory.newConnection();
            //开启通道
            Channel channel = connection.createChannel();
            //定义交换机名称
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            //创建随机队列
            String queueName = channel.queueDeclare().getQueue();
            //绑定随机队列与交换机
            channel.queueBind(queueName, EXCHANGE_NAME, "");

            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message =  new String(delivery.getBody(),"UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            };
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});

        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}
           

总结:这种发布/订阅模式是一种广播机制,并没有设置路由key模式。它在发布者代码中创建一个交换机,并没有没有在发布者代码中创建队列,而是在消费者中创建临时队列,通过临时队列与交换机相连,从而达到广播效果。

(2)订阅之Direct模型:

我们使用的是fanout交换,它没有给我们很大的灵活性——它只能进行无意识的广播。

我们将改用direct交换。direct交换背后的路由算法很简单 - 消息进入其绑定密钥与消息的路由密钥完全匹配的队列 。

学习消息队列RabbitMQ

P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。

X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列

C1:消费者,其所在队列指定了需要routing key 为 error 的消息

C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

拥有不同的RoutingKey的消费者,会收到来自交换机的不同信息,而不是大家都使用同一个Routing Key 和广播模型区分开来。

发布者代码:
/**
 *  @Author: chen
 *  @Date: 2021/8/18 16:14
 *  @Description: 订阅模式:Direct
 */
public class EmitLogDirect {
    //创建交换机名称
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置主机名或IP
        factory.setHost("localhost");
        try {
            //创建连接
            Connection connection = factory.newConnection();
            //开启连接通道
            Channel channel = connection.createChannel();
            //定义交换机类型与名称
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
            //设置绑定key与信息
            String []bindKey = {"info","warn", "error", "test"};
            String []message = {"Hello","Warning","Error","ok"};
            //发布消息
            for (int i = 0; i < bindKey.length; i++) {
                channel.basicPublish(EXCHANGE_NAME, bindKey[i], null, message[i].getBytes("UTF-8"));
                System.out.println(" [x] Sent '" + bindKey[i] + "':'" + message[i] + "'");
            }
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }

    }
}
           
消费者代码:
public class LogsDirectConsumer {
    //创建交换机名称
    private static final String EXCHANGE_NAME = "direct_logs";

    //创建线程订阅'info','warn','error','test'
    public static Runnable runnable = () -> {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置主机名或IP
        factory.setHost("localhost");
        try {
            //创建连接
            Connection connection = factory.newConnection();
            //开启连接通道
            Channel channel = connection.createChannel();
            //定义交换机类型与名称
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
            //创建临时队列
            String queueName = channel.queueDeclare().getQueue();
            //绑定队列、交换机和路由密钥
            String []bindKey = {"info","warn", "error", "test"};
            for (String severity :
                    bindKey) {
                channel.queueBind(queueName, EXCHANGE_NAME, severity);
            }
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" +
                        delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
            };
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    };

    //创建线程只订阅'info','warn','error'
    public static Runnable runnable1 = () -> {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置主机名或IP
        factory.setHost("localhost");
        try {
            //创建连接
            Connection connection = factory.newConnection();
            //开启连接通道
            Channel channel = connection.createChannel();
            //定义交换机类型与名称
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
            //创建临时队列
            String queueName = channel.queueDeclare().getQueue();
            //绑定队列、交换机和路由密钥
            String []bindKey = {"info","warn", "error"};
            for (String severity :
                    bindKey) {
                channel.queueBind(queueName, EXCHANGE_NAME, severity);
            }
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received queue2 '" +
                        delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
            };
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    };

    public static void main(String[] args) {
        new Thread(runnable, "queue1").start();//订阅'info','warn','error','test'
        new Thread(runnable1, "queue2").start();//只订阅'info','warn','error'
        new Thread(runnable, "queue3").start();//订阅'info','warn','error','test'
    }
}
           

总结:在direct订阅模型中,生产者订阅交换机发布消息,产生routingKey(用于消费者绑定);而在消费者中,根据消费者的需要,可以随意订阅自己所需的信息(通过绑定routingKey实现),即选择性接收。

(3)订阅之Topic模型:

学习消息队列RabbitMQ

类似于Direct模型。区别是Topic的Routing Key支持通配符。

topic交换功能强大,可以像其他交换一样运行。

当队列与“ # ”(散列)绑定键绑定时——它将接收所有消息,而不管路由键——就像在fanout交换中一样。

当绑定中不使用特殊字符“ * ”(星号)和“ # ”(散列)时,主题交换的行为就像direct交换一样。

通配符如下:

  • (星号)可以正好代替一个词。
  • # (hash) 可以代替零个或多个单词。

根据上图所示,以下的代码是根据上图进行编写的。

我们发送描述动物的信息。这是由三个字符以及两个点组合的路由密钥进行发送。消费者那边用*.orange.*、*.*.rabbit以及lazy.#进行适配;生产者那边发布动物的消息以及设置路由key:quick.orange.rabbit、lazy.orange.elephant、quick.orange.fox、lazy.brown.fox、lazy.pink.rabbit、quick.brown.fox、quick.orange.male.rabbit、lazy.orange.male.rabbit

发布者代码:
/**
 * @Author: chen
 * @Date: 2021/8/19 9:42
 * @Description: RabbitMQ订阅模型: Topics
 * 学习网址: https://www.rabbitmq.com/tutorials/tutorial-five-java.html
 */
public class EmitLogTopic {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        String[] routingKey = {"quick.orange.rabbit", "lazy.orange.elephant", "quick.orange.fox", "lazy.brown.fox",
                "lazy.pink.rabbit", "quick.brown.fox", "quick.orange.male.rabbit", "lazy.orange.male.rabbit"};

        String[] message = {"快.橙子.兔子", "慢.橙色.大象", "快.橙色.狐狸", "慢.棕色.狐狸",
                "慢.粉色.兔子", "快.棕色.狐狸", "快.橙色.雄性.兔子", "慢.橙色.雄性.兔子"};

        for (int i = 0; i < routingKey.length; i++) {
            channel.basicPublish(EXCHANGE_NAME, routingKey[i], null, message[i].getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + routingKey[i] + "':'" + message[i] + "'");
        }
        channel.close();
        connection.close();
    }

}
           
消费者代码:
public class LogsTopicsConsumer {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void consumer(String []routingKey){
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = null;
        try {
            connection = factory.newConnection();
            Channel channel = connection.createChannel();

            channel.exchangeDeclare(EXCHANGE_NAME, "topic");

            String queueName = channel.queueDeclare().getQueue();

            for (String key :
                    routingKey) {
                channel.queueBind(queueName, EXCHANGE_NAME, key);
            }

            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" +
                        delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
            };
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }


    public static void main(String[] args) {
        String[] routingKey1 = {"*.orange.*"};
        String[] routingKey2 = {"*.*.rabbit"};
        String[] routingKey3 = {"lazy.#"};
        consumer(routingKey1);
        consumer(routingKey2);
        consumer(routingKey3);
    }

}
           

结果分析:

路由键设置为“ quick.orange.rabbit ”的消息将发送到两个队列。

消息“ lazy.orange.elephant ”也会发给他们两个。

“ quick.orange.fox ”只会进入第一个队列,而“ lazy.brown.fox ”只会进入第二个队列。“ lazy.pink.rabbit ”只会被传送到第二个队列一次,即使它匹配了两个绑定。“ quick.brown.fox ”不匹配任何绑定,因此将被丢弃。

如果我们违反合同并发送一到四个字的消息,例如 “ quick.orange.male.rabbit ”,这些消息不会匹配任何绑定并且会丢失。

另一方面,“ lazy.orange.male.rabbit ”,即使它有四个单词,也会匹配最后一个绑定,并将被传递到第二个队列。

下一篇blog文章:SpringBoot + RabbitMQ整合