天天看点

RabbitMQ的Routing 路由模式(Direct)

RabbitMQ的Routing 路由模式

  • 模式说明:
    1. 队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)
    2. 消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey
    3. Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的Routingkey 与消息的 Routing key 完全一致,才会接收到消息
  • Routing 路由模式的图片
    RabbitMQ的Routing 路由模式(Direct)
  • 图解:
    • P:生产者,向 Exchange 发送消息,发送消息时,会指定一个routing key
    • X:Exchange(交换机),接收生产者的消息,然后把消息递交给与 routing key 完全匹配的队列
    • C1:消费者,其所在队列指定了需要 routing key 为 error 的消息
    • C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息
  • 其实这个Routing 路由模式是Exchange常见类型之一的

    Direct

  • 代码编写生产者(Direct模式)步骤
    1. 创建工厂
    2. 设置参数
    3. 创建连接
    4. 创建管道channel
    5. 创建交换机
    6. 创建队列(

      test_direct_queue1

      test_direct_queue2

      )
    7. 绑定队列和交换机
    8. 发送消息
    9. 关闭连接
package com.yang;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer_Routing {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1 .创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost("192.168.20.146");
        factory.setPort(5672);
        factory.setUsername("test");
        factory.setPassword("test");
        factory.setVirtualHost("/test");
        //3. 创建连接
        Connection connection = factory.newConnection();
        //4. 创建管道channel
        Channel channel = connection.createChannel();
        //5. 创建交换机
        String exchangeName = "test_direct";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,null);
        //6. 创建队列
        String queueName1 = "test_direct_queue1";
        String queueName2 = "test_direct_queue2";
        channel.queueDeclare(queueName1,true,false,false,null);
        channel.queueDeclare(queueName2,true,false,false,null);
        //7. 绑定交换机和队列
        channel.queueBind(queueName1,exchangeName,"error");

        channel.queueBind(queueName2,exchangeName,"info");
        channel.queueBind(queueName2,exchangeName,"error");
        channel.queueBind(queueName2,exchangeName,"warning");
        //8. 发送消息
        String body = "日志信息:张三调用了delete方法...出错误了。。。日志级别:error...";
        channel.basicPublish(exchangeName,"error",null,body.getBytes());
        //9. 释放资源
    }
}

           
  • 尝试运行
    RabbitMQ的Routing 路由模式(Direct)
  • 说明因为我们两个队列的

    routingKey

    都有

    error

    (步骤7),而我们发送的消息的

    routingKey

    同样写着

    error

    (步骤8)
  • 假如我们发送的步骤8的是

    info

    那么

    queueName2

    会增加一条队列变成2,而

    queueName1

    不变(原有都是1基础上的改变)
//8. 发送消息
        String body = "日志信息:张三调用了普通方法..日志级别:info...";
        channel.basicPublish(exchangeName,"info",null,body.getBytes());
           
  • 结果
    RabbitMQ的Routing 路由模式(Direct)
  • 代码编写消费者(Direct模式)步骤
    1. 创建工厂
    2. 设置参数
    3. 创建连接
    4. 创建管道channel
    5. 创建交换机
    6. 创建队列(

      test_direct_queue1

      test_direct_queue2

      ) (生产者创建过)
    7. 绑定队列和交换机 (生产者已经绑定过)
    8. 接收消息
    9. 关闭连接 (因为要一直接收)
package com.yang;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer_Routing1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost("192.168.20.146");
        factory.setPort(5672);
        factory.setUsername("test");
        factory.setPassword("test");
        factory.setVirtualHost("/test");
        //3. 创建连接
        Connection connection = factory.newConnection();
        //4. 创建channel管道
        Channel channel = connection.createChannel();
        //5. 创建交换机(不需要)
        //6. 创建队列(不需要)
        //7. 绑定队列和交换机(不需要)

        //8. 接收消息
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        };
        String queueName1 = "test_direct_queue1";
        String queueName2 = "test_direct_queue2";
        channel.basicConsume(queueName1,true,consumer);
        //9.关闭连接(不需要)
        
    }
}

           
  • 第二个队列只需要队列名改一下就行了!
  • 两个的执行结果
    1. Consumer_Routing1(队列:

      test_direct_queue1

      )
      RabbitMQ的Routing 路由模式(Direct)
    2. Consumer_Routing2(队列:

      test_direct_queue2

      )
      RabbitMQ的Routing 路由模式(Direct)
  • 小结
    • Routing 模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合 routing key 的队列。

以上就是Routing 路由模式的全部内容