天天看點

消息隊列之簡單隊列簡單隊列

文章目錄

  • 簡單隊列
    • 1、簡單隊列的使用

簡單隊列

1、簡單隊列的使用

pom添加依賴

<!--添加rabbitmq依賴-->
    <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.8.0</version>
    </dependency>
           
//連接配接工具類
public class ConnectionUtils {
    /**
     * 擷取MQ連接配接
     */

    public static Connection getConnection() throws IOException, TimeoutException {
        //定義一個連接配接共存
        ConnectionFactory factory = new ConnectionFactory();
        //設定服務位址
        factory.setHost("8.129.166.53");
        //AMQP 5672
        factory.setPort(5672);
        //vhost
        factory.setVirtualHost("/my_virtual");
        //使用者名
        factory.setUsername("wjm");
        //密碼
        factory.setPassword("qwe123");
        return factory.newConnection();
    }
}
           
//測試類
package com.study;

import com.rabbitmq.client.*;
import com.study.rabbitmq.util.ConnectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath:spring-bean.xml"})
public class TestRabbitMQ {
    private static Logger logger= LogManager.getLogger(LogManager.ROOT_LOGGER_NAME);
    private static final String QUEUE_NAME="test_simple_queue";

    @Test
    public void testSend() throws IOException, TimeoutException {
        //擷取一個連接配接
        Connection connection = ConnectionUtils.getConnection();
        //從連接配接中擷取一個通道
        Channel channel = connection.createChannel();
        //建立隊列聲明
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        String msg = "第一次測試資料!";
        channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
        System.out.println("--send msg:" + msg);

        channel.close();
        connection.close();
    }

    @Test
    public void testGet() throws IOException, TimeoutException {
        //擷取一個連接配接
        Connection connection = ConnectionUtils.getConnection();
        //建立頻道
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        DefaultConsumer  consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msgString = new String(body, "utf-8");
                logger.info(msgString);
                System.out.println("消費者擷取消息:" + msgString);
            }
        };
        /** 3.監聽隊列 */
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

           

登入網頁用戶端可在Queue中檢視