天天看點

RabbitMQ(一):Java簡單操作RabbitMQ

Java簡單操作RabbitMQ

    • 前言
      • 一、添加pom依賴
      • 二、生産者
      • 三、消費者

前言

目前正在學習rabbitmq,簡單記錄一下自己的代碼

一、添加pom依賴

<dependency>
   <groupId>com.rabbitmq</groupId>
   <artifactId>amqp-client</artifactId>
</dependency>
           

二、生産者

public class Send {
	// 定義一個消息隊列名稱
    private final static String QUEUE_NAME = "FIRST-MQ";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 建立新的連接配接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("106.**.**.82");
        factory.setPort(5672);
        // 設定虛拟主機 一個mq的服務可以設定多個虛拟機,每個虛拟機相當于獨立的mq
        factory.setVirtualHost("/");
        factory.setUsername("user");
        factory.setPassword("pwd");
        Connection connection = null;
        Channel channel = null;
        try{
            connection = factory.newConnection();
            // 建立内部會話通道,生産者和mq服務通信都在channel中完成
            channel = connection.createChannel();
            // 聲明交換機,這裡暫時沒有

            // 聲明建立隊列,如果沒有,則建立
            // 參數:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
            /**
             * queue:隊列名稱
             * durable:是否持久化,存到磁盤中
             * exclusive:是否排他,是否獨占連接配接,該隊列隻允許在這個connection中通路,如果關閉連接配接,則删除隊列
             * autoDelete:不再使用時自動删除。和exclusive 一同設為true 可以實作臨時隊列。
             * args:擴充參數,比如存活時間
             */
            channel.queueDeclare(QUEUE_NAME,false,false,false, null);
            String message = "HELLO WORLD";
            // 發送消息
            // 參數:String exchange, String routingKey, BasicProperties props, byte[] body
            /**
             * exchange:交換機 如果不指定(""),就預設交換機
             * routingKey:路由key;交換機根據路由key将消息轉發到指定的隊列,如果使用預設交換機,routingKey為隊列名稱
             * props:額外屬性
             * body:消息内容
             */
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println("send message:"+message);
        } catch (TimeoutException | IOException e) {
            e.printStackTrace();
        } finally {
            // 先關閉通道
            channel.close();
            // 再關閉連接配接
            connection.close();
        }
    }
}
           

三、消費者

public class Recv {

    private final static String QUEUE_NAME = "FIRST-MQ";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("106.*5.**.8*");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("user");
        factory.setPassword("pwd");

        try{
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            // 聲明隊列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" Waiting for messages. ");

            // 定義實作消費方法
            Consumer consumer = new DefaultConsumer(channel) {
                /**
                 * 當接收到消息後,此方法将被調用
                 * @description:
                 * @param consumerTag: (非必須)消費者标簽:辨別消費者,在監聽隊列的時候設定channel.basicConsume中設定
                 * @param envelope: 信封,通過envelope 可以擷取很多資訊,交換機、消息id(可用于消息接收ack)、路由Key等等
                 * @param properties:消息的屬性,發送消息時 設定的額外屬性
                 * @param body :消息内容
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, StandardCharsets.UTF_8);
                    System.out.println("receive:" + message);
                    TimeUnit.SECONDS.sleep(1);
                    // 消息id
                    envelope.getDeliveryTag();
                    // 手動回複ack
                    // channel.basicAck(envelope.getDeliveryTag(),false);
                }
            };
            // 接收消息 監聽隊列
            // 參數:String queue, boolean autoAck, Consumer callback
            /**
             * queue:隊列
             * autoAck:自動回複:當消費者接收到消息後,告訴mq消息已經接收。TRUE:自動回複,false:程式設計回複
             * callback:消費方法,當消費者接收消息執行的方法。
             */
            channel.basicConsume(QUEUE_NAME, true, consumer);
            TimeUnit.SECONDS.sleep(15);
        } catch (IOException | TimeoutException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}