天天看点

RabbitMQ系列(三)RabbitMQ进阶-Queue队列特性 (一)简单队列

RabbitMQ Spring 项目整合

文章目录

    • RabbitMQ Spring 项目整合
        • 一 构建Maven工程
          • 1.新建mave项目
          • 2.配置mq依赖
    • 二 RabbitMQ 简单 Queue队列
        • 1.简单队列描述
        • 2.代码实战
          • 2.1 MqConnectUtil 工具类
          • 2.2 Producer 生产者
          • 2.3 Consumer 消费者
          • 2.4启动消费者

一 构建Maven工程

1.新建mave项目

1.1 新建 maven项目,后期咱们再整合spring boot amqp新建spring boot项目,路要一步一步走,先从简单的maven项目,了解原理,再从spring boot项目 看spring boot封装了什么东西,到底简化了我们的哪些操作

RabbitMQ系列(三)RabbitMQ进阶-Queue队列特性 (一)简单队列

1.2 设置 groupId和artifactId 及maven配置

RabbitMQ系列(三)RabbitMQ进阶-Queue队列特性 (一)简单队列

1.3 选择项目文件夹

RabbitMQ系列(三)RabbitMQ进阶-Queue队列特性 (一)简单队列

1.4 项目新建完成,目录结构如下

RabbitMQ系列(三)RabbitMQ进阶-Queue队列特性 (一)简单队列
2.配置mq依赖

1.5 pom中添加 RabbitMQ 客户端依赖

<dependency>
   <groupId>com.rabbitmq</groupId>
   <artifactId>amqp-client</artifactId>
   <version>3.4.1</version>
</dependency>
           

二 RabbitMQ 简单 Queue队列

1.简单队列描述

简单队列就是 1对1, 1个生产者对应1个消费者

RabbitMQ系列(三)RabbitMQ进阶-Queue队列特性 (一)简单队列

2.代码实战

java目录新建coon包及simple1包

2.1 MqConnectUtil 工具类

coon 包下面新建MqConnectUtil 连接类

package conn;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 当前描述:
 *
 * @author: jiazijie
 * @since: 2020/6/10 下午11:12
 */
public class MqConnectUtil {
    /**
     * 默认链接 / vhost
     *
     * @return
     * @throws Exception
     */
    public static Connection getConnectionDefault() throws Exception {
        //定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务地址
        factory.setHost("127.0.0.1");
        //端口
        factory.setPort(5672);
        //设置账号信息,用户名、密码、vhost
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 通过工程获取连接
        Connection connection = factory.newConnection();
        return connection;
    }


    /**
     * 链接 jzj vhost
     *
     * @return
     * @throws Exception
     */
    public static Connection getConnectionJzj() throws Exception {
        //定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务地址
        factory.setHost("127.0.0.1");
        //端口
        factory.setPort(5672);
        //设置账号信息,用户名、密码、vhost
        factory.setVirtualHost("jzj");
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 通过工程获取连接
        Connection connection = factory.newConnection();
        return connection;
    }
}
           
2.2 Producer 生产者

Producer 生产者-产生消息->发送到交换机->转发到队列存着

在simple1包下新建 SimpleQueueProducer

package simple1;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import conn.MqConnectUtil;

import java.time.LocalDate;
import java.time.LocalTime;

/**
 * 当前描述:生产者
 *
 * @author: jiazijie
 * @since: 2020/6/10 下午11:14
 */
public class SimpleQueueProducer {
    /**
     * 队列名字
     */
    private final static String QUEUE_NAME = "test_simple_queue";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = MqConnectUtil.getConnectionDefault();
        // 从连接中创建通道
        Channel channel = connection.createChannel();

        /* 声明(创建)队列  queueDeclare( String queue, boolean durable, boolean exclusive, boolean autoDelete,  Map<String, Object> arguments)
         * queue - 队列名
         * durable - 是否是持久化队列, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失
         * exclusie - 是否排外的,仅限于当前队列使用
         * autoDelete - 是否自动删除队列,当最后一个消费者断开连接之后队列是否自动被删除,可以通过界面 查看某个队列的消费者数量,当consumers = 0时队列就会自动删除
         * arguments - 队列携带的参数 比如 ttl-生命周期,x-dead-letter 死信队列等等
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 消息内容
        String message = "Hello World! Time:" + LocalDate.now() + " " + LocalTime.now();
        System.out.println(message);

        /* 发送消息 String exchange, String routingKey, BasicProperties props, byte[] body
         * exchange - 交换机 ,"" 空时候指定的是 获取的virtualHost 虚拟服务器的 默认的exchang,每个virtualHost都有一个AMQP default type:direct 直接转发
         * queuename - 队列信息
         * props - 参数信息
         * message 消息体 byte[]类型
         */
        channel.basicPublish("", "", null, message.getBytes());

        System.out.println(" **** Producer  Sent Message: '" + message + "'");
        //关闭通道和连接
        channel.close();
        connection.close();
    }
}
           

!!!!! 这个地方很关键

!!!!! 这个地方很关键

!!!!! 这个地方很关键

第一个参数 是exchange 传空就是默认的交换机

第二个参数是 routingKey,这里没有设置routingKey,用了简单队列名字作为routingKey

channel.basicPublish("", SIMPLE_QUEUE_NAME, null, message.getBytes());

运行 Produce三次,产生3条消息,可以看下监控界面 localhost:15672 guest/guest

RabbitMQ系列(三)RabbitMQ进阶-Queue队列特性 (一)简单队列
2.3 Consumer 消费者

Consumer 消费者->监听队列,有消息就从队列中取出来消息进行消费

在 simple1包下,新建SimpleQueueConsumer

package simple1;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import conn.MqConnectUtil;

/**
 * 当前描述:消费者
 *
 * @author: jiazijie
 * @since: 2020/6/10 下午11:30
 */
public class SimpleQueueConsumer {
    /**
     * 队列名字
     */
    private final static String SIMPLE_QUEUE_NAME = "simple_queue_name_A";

    public static void main(String[] argv) throws Exception {
        Connection connection = MqConnectUtil.getConnectionDefault();
        Channel channel = connection.createChannel();

        /*确保这里的队列是存在的*/
        channel.queueDeclare(SIMPLE_QUEUE_NAME, false, false, false, null);
        System.out.println(" **** Consumer Waiting for messages. To exit press CTRL+C");


        QueueingConsumer consumer = new QueueingConsumer(channel);

        /* 消息确认机制
         * autoAck true:表示自动确认,只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都会认为消息已经成功消费
         * autoAck false:表示手动确认,消费者获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态
         *          并且服务器会认为该消费者已经挂掉,不会再给其发送消息,直到该消费者反馈
         */
        channel.basicConsume(SIMPLE_QUEUE_NAME, true, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" **** Consumer Received '" + message + "'");
        }
    }

}

           

消费者 声明了队列,而且指定了消息确认机制

channel.queueDeclare(SIMPLE_QUEUE_NAME, false, false, false, null);

channel.basicConsume(SIMPLE_QUEUE_NAME, true, consumer);

正常接收 队列中过来的消息

2.4启动消费者

看下监控界面

RabbitMQ系列(三)RabbitMQ进阶-Queue队列特性 (一)简单队列

看下消费者日志

RabbitMQ系列(三)RabbitMQ进阶-Queue队列特性 (一)简单队列

可以看到,Producer生产者生产一条消息,Consumer 就能够消费一条消息

!!!!!!注意 我们在Producer中并没有指定 exchange,传入的exchange也是字符串,它使用的是每一个VirtualHost 下面默认的AMQP default exchange

如下图:

RabbitMQ系列(三)RabbitMQ进阶-Queue队列特性 (一)简单队列

下一篇 我们介绍 RabbitMQ系列(四)RabbitMQ进阶-Queue队列特性 (二)工作队列 Work模式