天天看點

java實作rabbitmq簡單隊列模型,生産者 消費者 消息隊列

生産者向隊列發送消息,随機消費者從隊列中接收消息

java實作rabbitmq簡單隊列模型,生産者 消費者 消息隊列
  1. 建立使用者和虛拟主機

通過rabbitmq提供的使用者管理界面可以很輕松的建立使用者和虛拟主機,并且需要将使用者綁定到對應的虛拟主機。自帶有

guest

使用者和

/

虛拟主機,也可以直接用這兩個既有資訊。我們建立了名為

wuwl

的使用者和

/vh

的虛拟主機,注意虛拟主機需要以

/

開頭。

java實作rabbitmq簡單隊列模型,生産者 消費者 消息隊列
  1. 導入依賴
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.10.0</version>
</dependency>
           
  1. 建立連接配接,這裡把ConnectionUtils 視為一個工具類,生産者消費者都需要連接配接對象
public class ConnectionUtils {
    public static Connection getConnection() throws IOException, TimeoutException {
        // 建立連接配接mq的連接配接工廠對象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 設定連接配接rabbitmq主機
        connectionFactory.setHost("192.168.20.128");
        // 設定端口号
        connectionFactory.setPort(5672);
        // 設定虛拟主機
        connectionFactory.setVirtualHost("/vh");
        // 設定使用者名和密碼
        connectionFactory.setUsername("wuwl");
        connectionFactory.setPassword("123456");

        // 擷取連接配接對象
        return connectionFactory.newConnection();
    }
}
           
  1. 建立生産者
public class Provider {
    public void send() throws IOException, TimeoutException {
        Connection connection = null;
        Channel channel = null;
        try {
            connection = ConnectionUtils.getConnection();
            // 擷取連接配接通道
            channel = connection.createChannel();

            /** 通道綁定對應的消息隊列
             * 參數一:隊列名稱,如果隊列不存在,自動建立隊列
             * 參數二:定義隊列是否持久化
             * 參數三:是否獨占隊列
             * 參數四:是否在消費完成後自動删除隊列
             */
            channel.queueDeclare("wuwl",true,false,false,null);

            /**
             * 釋出消息
             * 參數一: 交換機名稱
             * 參數二:隊列名稱
             * 參數三:傳遞消息額外設定
             * 參數四:消息的具體内容
             */
            channel.basicPublish("","wuwl",null,"hello rabbitmq".getBytes());
        }finally {
            if (channel !=null && channel.isOpen()) {
                channel.close();
            }
            if (connection != null && connection.isOpen()) {
                connection.close();
            }
        }
    }

    public static void main(String[] args) throws IOException, TimeoutException {
        Provider provider = new Provider();
        provider.send();
    }
}
           
  1. 建立消費者,消費者的通道和連接配接對象不關閉則一直處于監聽狀态,隻有生産者發送新的消息至消息隊列,消費者會立即消費掉
public class Consumer {
    public void consume() throws IOException, TimeoutException {
        Connection connection = null;
        Channel channel = null;
        try {
            connection = ConnectionUtils.getConnection();
            // 擷取連接配接通道
            channel = connection.createChannel();

            /** 通道綁定對應的消息隊列
             * 參數一:隊列名稱,如果隊列不存在,自動建立隊列
             * 參數二:定義隊列是否持久化
             * 參數三:是否獨占隊列
             * 參數四:是否在消費完成後自動删除隊列
             */
            channel.queueDeclare("wuwl", true, false, false, null);

            /**
             * 消費
             * 參數一: 交換機名稱
             * 參數二:隊列名稱
             * 參數三:傳遞消息額外設定
             * 參數四:消息的具體内容
             */
            channel.basicConsume("wuwl",true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消費消息:" + new String(body));
                }
            });
        } finally {
//            if (channel != null && channel.isOpen()) {
//                channel.close();
//            }
//            if (connection != null && connection.isOpen()) {
//                connection.close();
//            }
        }
    }

    public static void main(String[] args) throws IOException, TimeoutException {
        Consumer consumer = new Consumer();
        consumer.consume();
    }

}

           
  1. 測試

    單獨運作生産者,我們可以在管理界面看到我們定義的queue,以及未被消費的效益一條

java實作rabbitmq簡單隊列模型,生産者 消費者 消息隊列

啟動消費者後,消息被消費,持續監聽中

java實作rabbitmq簡單隊列模型,生産者 消費者 消息隊列

此時隊列中待消費的消息數量為0

java實作rabbitmq簡單隊列模型,生産者 消費者 消息隊列

再次通過生産者發送一條消息至消息隊列,消費者直接消費,隊列中待消費消息數量為0

java實作rabbitmq簡單隊列模型,生産者 消費者 消息隊列