天天看点

消息队列之简单队列简单队列

文章目录

  • 简单队列
    • 1、简单队列的使用

简单队列

1、简单队列的使用

pom添加依赖

<!--添加rabbitmq依赖-->
    <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.8.0</version>
    </dependency>
           
//连接工具类
public class ConnectionUtils {
    /**
     * 获取MQ连接
     */

    public static Connection getConnection() throws IOException, TimeoutException {
        //定义一个连接共存
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务地址
        factory.setHost("8.129.166.53");
        //AMQP 5672
        factory.setPort(5672);
        //vhost
        factory.setVirtualHost("/my_virtual");
        //用户名
        factory.setUsername("wjm");
        //密码
        factory.setPassword("qwe123");
        return factory.newConnection();
    }
}
           
//测试类
package com.study;

import com.rabbitmq.client.*;
import com.study.rabbitmq.util.ConnectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath:spring-bean.xml"})
public class TestRabbitMQ {
    private static Logger logger= LogManager.getLogger(LogManager.ROOT_LOGGER_NAME);
    private static final String QUEUE_NAME="test_simple_queue";

    @Test
    public void testSend() throws IOException, TimeoutException {
        //获取一个连接
        Connection connection = ConnectionUtils.getConnection();
        //从连接中获取一个通道
        Channel channel = connection.createChannel();
        //创建队列声明
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        String msg = "第一次测试数据!";
        channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
        System.out.println("--send msg:" + msg);

        channel.close();
        connection.close();
    }

    @Test
    public void testGet() throws IOException, TimeoutException {
        //获取一个连接
        Connection connection = ConnectionUtils.getConnection();
        //创建频道
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        DefaultConsumer  consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msgString = new String(body, "utf-8");
                logger.info(msgString);
                System.out.println("消费者获取消息:" + msgString);
            }
        };
        /** 3.监听队列 */
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

           

登录网页客户端可在Queue中查看