天天看點

RabbitMQ入門實戰(2)--Java用戶端操作RabbitMQ

本文主要以官網(https://www.rabbitmq.com/getstarted.html)的例子為參考,介紹使用Java用戶端來操作RabbitMQ,文中使用到的軟體版本:RabbitMQ 3.8.9、Java 1.8.0_191。

1、準備

1.1、引入依賴

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.10.0</version>
</dependency>
      

1.2、編寫工具類

package com.abc.demo.general.rabbit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitMQUtil {
    private static ConnectionFactory factory;

    public static Connection getConnection() throws Exception {
        if (factory == null) {
            factory = new ConnectionFactory();
            factory.setHost("10.49.196.10");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
        }
        return factory.newConnection();
    }

    public static void close(Connection connection) {
        if (connection != null) {
            try {
                connection.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void close(Channel channel) {
        if (channel != null) {
            try {
                channel.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
      

2、場景例子

2.1、Hello World

2.1.1、場景描述

RabbitMQ入門實戰(2)--Java用戶端操作RabbitMQ

最簡單的場景,一個生産者,一個消費者。

2.1.2、代碼樣例

生産者:

package com.abc.demo.general.rabbit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;


/**
 * HelloWorld,一個生産者,一個消費者
 */
public class HelloWorldProducer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            connection = RabbitMQUtil.getConnection();
            channel = connection.createChannel();
            /**
             * 聲明隊列,如果隊列不存在則建立;如果已存在則設定的參數值需跟原隊列一緻,否則會保持
             * 預設綁定到預設隊列,routingKey就是隊列名稱
             *
             * 是否持久化: 如果為false,則重新開機rabbit後,隊列會消失
             * 是否排他: 即隻允許該channel通路該隊列,一般等于true的話用于一個隊列隻能有一個消費者來消費的場景
             * 是否自動删除: 消費完消息删除該隊列
             * 其他屬性:x-queue-type(quorum、classic),預設為classic
             */
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            String message = "Hello World,曹操!";
            //消息的routingKey就是隊列名稱
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            RabbitMQUtil.close(channel);
            RabbitMQUtil.close(connection);
        }
    }

}
      

消費者:

package com.abc.demo.general.rabbit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;

/**
 * java用戶端操作rabbitmq;HelloWorld,一個生産者,一個消費者
 */
public class HelloWorldConsumer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }

}
      

2.2、Work Queues

2.2.1、場景描述

RabbitMQ入門實戰(2)--Java用戶端操作RabbitMQ

Work Queues(工作隊列模式),一個生産者,多個消費者,一條消息隻能被一個消費者消費。

2.2.2、代碼樣例

生産者:

package com.abc.demo.general.rabbit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;

/**
 * Work Queues(工作隊列模式),一個生産者,多個消費者,一條消息隻能被一個消費者消費
 */
public class WorkQueuesProducer {
    private final static String QUEUE_NAME = "work_queues";

    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            connection = RabbitMQUtil.getConnection();
            channel = connection.createChannel();
            //隊列持久化
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            for (int i = 0; i < 20; i++) {
                String message = "消息-" + i;
                //發送的消息持久化,重新開機rabbitmq消息也不會丢失
                channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
                System.out.println(" [x] Sent '" + message + "'");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            RabbitMQUtil.close(channel);
            RabbitMQUtil.close(connection);
        }
    }

}
      

消費者:

package com.abc.demo.general.rabbit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.GetResponse;

import java.io.IOException;

/**
 * Work Queues(工作隊列模式),一個生産者,多個消費者,一條消息隻能被一個消費者消費
 */
public class WorkQueuesConsumer {
    private final static String QUEUE_NAME = "work_queues";

    public static void main(String[] args) throws Exception {
        //模拟三個消費者,rabbitmq預設會把消息輪詢推給每個消費者
        for (int i = 0; i < 3; i++) {
            Connection connection = RabbitMQUtil.getConnection();
            //new Thread(new Worker(connection, i)).start();
            new Thread(new Worker2(connection, i)).start();
            //new Thread(new Worker3(connection, i)).start();
        }
    }

    /**
     * 自動确認Worker
     */
    static class Worker implements Runnable {
        private Connection connection;
        private int index;

        public Worker(Connection connection, int index) {
            this.connection = connection;
            this.index = index;
        }

        @Override
        public void run() {
            try {
                System.out.println("消費者-" + index + " 開始接受消息。");
                DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                    String message = new String(delivery.getBody(), "UTF-8");
                    System.out.println("消費者-" + index + " Received '" + message + "'");

                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                };
                Channel channel = connection.createChannel();
                channel.queueDeclare(QUEUE_NAME, true, false, false, null);
                //自動确認,如果業務處理失敗或該消費者當機,發送到該消費者的消息都會被删除
                channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 手動确認Worker
     */
    static class Worker2 implements Runnable {
        private Connection connection;
        private int index;

        public Worker2(Connection connection, int index) {
            this.connection = connection;
            this.index = index;
        }

        @Override
        public void run() {
            try {
                System.out.println("消費者-" + index + " 開始接受消息。");
                Channel channel = connection.createChannel();
                channel.queueDeclare(QUEUE_NAME, true, false, false, null);
                channel.basicQos(1);//一次隻接受一條消息
                DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                    try {
                        String message = new String(delivery.getBody(), "UTF-8");
                        System.out.println("消費者-" + index + " Received '" + message + "'");
                        //業務處理...
                        Thread.sleep(1000);
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        //在業務處理完成後手動确認;避免一個消費者當機等導緻消息丢失
                        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                    }
                };
                //autoAck設為false
                channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 拉模式消費
     */
    static class Worker3 implements Runnable {
        private Connection connection;
        private int index;

        public Worker3(Connection connection, int index) {
            this.connection = connection;
            this.index = index;
        }

        @Override
        public void run() {
            try {
                System.out.println("消費者-" + index + " 開始接受消息。");
                Channel channel = connection.createChannel();
                channel.queueDeclare(QUEUE_NAME, true, false, false, null);

                while (true) {
                    GetResponse response = channel.basicGet(QUEUE_NAME, false);
                    if (response == null) {
                        continue;
                    }
                    String message = new String(response.getBody());
                    System.out.println("消費者-" + index + " Received '" + message + "'");
                    //業務處理...
                    Thread.sleep(1000);
                    channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
      

2.3、Publish/Subscribe

2.3.1、場景描述

RabbitMQ入門實戰(2)--Java用戶端操作RabbitMQ

publish/Subscribe(釋出/訂閱模式),一條消息被發送到多個隊列。

2.3.2、代碼樣例

生産者:

package com.abc.demo.general.rabbit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * Publish/Subscribe(釋出/訂閱模式),一條消息被發送到多個隊列
 */
public class PublishSubscribeProducer {
    private static final String EXCHANGE_NAME = "logs";
    private final static String QUEUE_NAME_1 = "destination_terminal";
    private final static String QUEUE_NAME_2 = "destination_file";

    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            connection = RabbitMQUtil.getConnection();
            channel = connection.createChannel();
            //聲明exchange,類型為fanout,消息路由到所有綁定的隊列
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            //聲明隊列
            channel.queueDeclare(QUEUE_NAME_1, true, false, false, null);
            channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);
            //綁定
            channel.queueBind(QUEUE_NAME_1, EXCHANGE_NAME, "");
            channel.queueBind(QUEUE_NAME_2, EXCHANGE_NAME, "");
            for (int i = 0; i < 20; i++) {
                String message = "消息-" + i;
                channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
                System.out.println(" [x] Sent '" + message + "'");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            RabbitMQUtil.close(channel);
            RabbitMQUtil.close(connection);
        }
    }

}
      

消費者:

package com.abc.demo.general.rabbit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;

/**
 * Publish/Subscribe(釋出/訂閱模式),一條消息被發送到多個隊列
 */
public class PublishSubscribeConsumer {
    private final static String QUEUE_NAME_1 = "destination_terminal";
    private final static String QUEUE_NAME_2 = "destination_file";

    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        System.out.println("開始接受消息...");
        new Thread(() -> {
            try {
                DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                    String message = new String(delivery.getBody(), "UTF-8");
                    System.out.println("destination_terminal Received '" + message + "'");
                };
                channel.basicConsume(QUEUE_NAME_1, true, deliverCallback, consumerTag -> { });
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            try {
                DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                    String message = new String(delivery.getBody(), "UTF-8");
                    System.out.println("destination_file Received '" + message + "'");
                };
                channel.basicConsume(QUEUE_NAME_2, true, deliverCallback, consumerTag -> { });
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();
    }
}
      

2.4、Routing

2.4.1、場景描述

RabbitMQ入門實戰(2)--Java用戶端操作RabbitMQ

Routing(路由模式),隊列通過bingingKey綁定到Exchange,發送消息時指定routingKey,Exchange根據routingKey精确比對bindingKey來路由消息.

2.4.2、代碼樣例

生産者:

package com.abc.demo.general.rabbit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * Routing(路由模式),隊列通過bingingKey綁定到Exchange,發送消息時指定routingKey,Exchange根據routingKey精确比對bindingKey來路由消息
 */
public class RoutingProducer {
    private static final String EXCHANGE_NAME = "direct_logs";
    private final static String QUEUE_NAME_1 = "direct_destination_terminal";
    private final static String QUEUE_NAME_2 = "direct_destination_file";

    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            connection = RabbitMQUtil.getConnection();
            channel = connection.createChannel();
            //聲明exchange,類型為direct,根據routingKey精确比對bindingKey來路由消息
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
            //聲明隊列
            channel.queueDeclare(QUEUE_NAME_1, true, false, false, null);
            channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);
            //隊列1接受info、warn、error日志
            channel.queueBind(QUEUE_NAME_1, EXCHANGE_NAME, "info");
            channel.queueBind(QUEUE_NAME_1, EXCHANGE_NAME, "warn");
            channel.queueBind(QUEUE_NAME_1, EXCHANGE_NAME, "error");
            //隊列2隻接受error日志
            channel.queueBind(QUEUE_NAME_2, EXCHANGE_NAME, "error");
            for (int i = 0; i < 20; i++) {
                String message = "消息-" + i;
                String routingKey = "";
                if (i % 3 == 0) {
                    routingKey = "info";
                } else if (i % 3 == 1) {
                    routingKey = "warn";
                } else if (i % 3 == 2) {
                    routingKey = "error";
                }
                channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
                System.out.println(" [x] Sent '" + routingKey + ":" + message + "'");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            RabbitMQUtil.close(channel);
            RabbitMQUtil.close(connection);
        }
    }

}
      

消費者:

package com.abc.demo.general.rabbit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;

/**
 * Routing(路由模式),隊列通過bingingKey綁定到Exchange,發送消息時指定routingKey,Exchange根據routingKey精确比對bindingKey來路由消息
 */
public class RoutingConsumer {
    private final static String QUEUE_NAME_1 = "direct_destination_terminal";
    private final static String QUEUE_NAME_2 = "direct_destination_file";

    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME_1, true, false, false, null);
        channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);
        System.out.println("開始接受消息...");
        new Thread(() -> {
            try {
                DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                    String message = new String(delivery.getBody(), "UTF-8");
                    System.out.println("direct_destination_terminal Received '" + delivery.getEnvelope().getRoutingKey() + ":" + message + "'");
                };
                channel.basicConsume(QUEUE_NAME_1, true, deliverCallback, consumerTag -> { });
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            try {
                DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                    String message = new String(delivery.getBody(), "UTF-8");
                    System.out.println("direct_destination_file Received '" + delivery.getEnvelope().getRoutingKey() + ":" + message + "'");
                };
                channel.basicConsume(QUEUE_NAME_2, true, deliverCallback, consumerTag -> { });
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();

    }
}
      

2.5、Topics

2.5.1、場景描述

RabbitMQ入門實戰(2)--Java用戶端操作RabbitMQ

Topics(主題模式),隊列通過bingingKey綁定到Exchange,發送消息時指定routingKey,Exchange根據根據routingKey比對bindingKey模式來路由消息. bingingKey模式用"."分隔,"*"代表一個單詞,"#"代表0或多個單詞.

2.5.2、代碼樣例

生産者:

package com.abc.demo.general.rabbit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * Topics(主題模式),隊列通過bingingKey綁定到Exchange,發送消息時指定routingKey,Exchange根據根據routingKey比對bindingKey模式來路由消息
 * bingingKey模式用"."分隔,"*"代表一個單詞,"#"代表0或多個單詞
 */
public class TopicsProducer {
    private static final String EXCHANGE_NAME = "topics_logs";
    private final static String QUEUE_NAME_1 = "topics_destination_terminal";
    private final static String QUEUE_NAME_2 = "topics_destination_file";

    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            connection = RabbitMQUtil.getConnection();
            channel = connection.createChannel();
            //聲明exchange,類型為topic,根據routingKey比對bindingKey模式來路由消息
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
            //聲明隊列
            channel.queueDeclare(QUEUE_NAME_1, true, false, false, null);
            channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);
            //隊列1接受子產品A的所有日志
            channel.queueBind(QUEUE_NAME_1, EXCHANGE_NAME, "moduleA.*");
            //隊列2接受所有error的日志
            channel.queueBind(QUEUE_NAME_2, EXCHANGE_NAME, "*.error");
            for (int i = 0; i < 20; i++) {
                String message = "消息-" + i;
                String routingKey = "";
                if (i % 2 == 0) {
                    routingKey = "moduleA";
                } else {
                    routingKey = "moduleB";
                }
                routingKey += ".";
                if (i % 3 == 0) {
                    routingKey += "info";
                } else if (i % 3 == 1) {
                    routingKey += "warn";
                } else if (i % 3 == 2) {
                    routingKey += "error";
                }
                channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
                System.out.println(" [x] Sent '" + routingKey + ":" + message + "'");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            RabbitMQUtil.close(channel);
            RabbitMQUtil.close(connection);
        }
    }

}
      

消費者:

package com.abc.demo.general.rabbit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;

/**
 * Topics(路由模式),隊列通過bingingKey綁定到Exchange,發送消息時指定routingKey,Exchange根據routingKey比對bindingKey模式來路由消息
 * bingingKey模式用"."分隔,"*"代表一個單詞,"#"代表0或多個單詞
 */
public class TopicsConsumer {
    private final static String QUEUE_NAME_1 = "topics_destination_terminal";
    private final static String QUEUE_NAME_2 = "topics_destination_file";

    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME_1, true, false, false, null);
        channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);
        System.out.println("開始接受消息...");
        new Thread(() -> {
            try {
                DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                    String message = new String(delivery.getBody(), "UTF-8");
                    System.out.println("topics_destination_terminal Received '" + delivery.getEnvelope().getRoutingKey() + ":" + message + "'");
                };
                channel.basicConsume(QUEUE_NAME_1, true, deliverCallback, consumerTag -> { });
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            try {
                DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                    String message = new String(delivery.getBody(), "UTF-8");
                    System.out.println("topics_destination_file Received '" + delivery.getEnvelope().getRoutingKey() + ":" + message + "'");
                };
                channel.basicConsume(QUEUE_NAME_2, true, deliverCallback, consumerTag -> { });
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();
    }
}
      

2.6、Topics

2.6.1、場景描述

RabbitMQ入門實戰(2)--Java用戶端操作RabbitMQ

RPC(遠端調用)

1.用戶端把消息發送到rpc隊列

2.服務端從rpc隊列擷取消息,并把獲得到的消息作為參數來調用函數,然後把結果通過回調隊列發送給用戶端

3.用戶端從回調隊列擷取傳回結果

2.6.2、代碼樣例

服務端:

package com.abc.demo.general.rabbit;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;

/**
 * RPC(遠端調用)
 * 1.用戶端把消息發送到rpc隊列
 * 2.服務端從rpc隊列擷取消息,并把獲得到的消息作為參數來調用函數,然後把結果通過回調隊列發送給用戶端
 * 3.用戶端從回調隊列擷取傳回結果
 */
public class RPCServer {
    private static final String RPC_QUEUE_NAME = "rpc_queue";

    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
        channel.queuePurge(RPC_QUEUE_NAME);
        channel.basicQos(1);

        System.out.println(" [x] Awaiting RPC requests");
        Object monitor = new Object();
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                    .Builder()
                    .correlationId(delivery.getProperties().getCorrelationId())
                    .build();

            String response = "";
            try {
                String message = new String(delivery.getBody(), "UTF-8");
                int n = Integer.parseInt(message);
                System.out.println(" [.] fib(" + message + ")");
                response += fib(n);
            } catch (RuntimeException e) {
                System.out.println(" [.] " + e.toString());
            } finally {
                //發送結果到回調隊列
                channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                // RabbitMq consumer worker thread notifies the RPC server owner thread
                synchronized (monitor) {
                    monitor.notify();
                }
            }
        };
        //從rpc隊列中擷取用戶端發過來的消息
        channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> { }));
        // Wait and be prepared to consume the message from RPC client.
        while (true) {
            synchronized (monitor) {
                try {
                    monitor.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    private static int fib(int n) {
        if (n == 0) {
            return 0;
        }
        if (n == 1) {
            return 1;
        }
        return fib(n - 1) + fib(n - 2);
    }
}
      

用戶端:

package com.abc.demo.general.rabbit;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
 * RPC(遠端調用)
 * 1.用戶端把消息發送到rpc隊列
 * 2.服務端從rpc隊列擷取消息,并把獲得到的消息作為參數來調用函數,然後把結果通過回調隊列發送給用戶端
 * 3.用戶端從回調隊列擷取傳回結果
 */
public class RPCClient {
    private String requestQueueName = "rpc_queue";

    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            connection = RabbitMQUtil.getConnection();
            channel = connection.createChannel();
            RPCClient client = new RPCClient();
            for (int i = 0; i < 10; i++) {
                System.out.println(" [x] Requesting fib(" + i + ")");
                String response = client.call(channel, i + "");
                System.out.println(" [.] Got '" + response + "'");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            RabbitMQUtil.close(channel);
            RabbitMQUtil.close(connection);
        }
    }

    private String call(Channel channel, String message) throws Exception {
        final String corrId = UUID.randomUUID().toString();

        String replyQueueName = channel.queueDeclare().getQueue();
        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

        channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

        final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);

        String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
            if (delivery.getProperties().getCorrelationId().equals(corrId)) {
                response.offer(new String(delivery.getBody(), "UTF-8"));
            }
        }, consumerTag -> {
        });

        String result = response.take();
        channel.basicCancel(ctag);
        return result;
    }
}
      

2.7、Publisher Confirms

2.7.1、場景描述

Publisher Confirms(消息發送确認),發送消息的時候對發送的消息進行确認,對發送成功的消息進行确認,對發送失敗的消息可以進行進一步的處理(如重新發送).

2.7.2、代碼樣例

package com.abc.demo.general.rabbit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.Connection;

import java.time.Duration;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.BooleanSupplier;

/**
 * Publisher Confirms(消息發送确認),發送消息的時候對發送的消息進行确認,對發送失敗的消息可以進行進一步的處理
 */
public class PublisherConfirms {
    private final static String QUEUE_NAME = "publisher_confirms";
    private static final int MESSAGE_COUNT = 50_000;

    public static void main(String[] args) {
        publishMessagesIndividually();
        publishMessagesInBatch();
        handlePublishConfirmsAsynchronously();
    }

    /**
     * 單條消息确認,速度較慢,吞吐量不高
     * @throws Exception
     */
    private static void publishMessagesIndividually() {
        Connection connection = null;
        Channel channel = null;
        try {
            connection = RabbitMQUtil.getConnection();
            channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, true, null);
            //開啟消息确認,預設時關閉的
            channel.confirmSelect();
            long start = System.nanoTime();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String body = String.valueOf(i);
                channel.basicPublish("", QUEUE_NAME, null, body.getBytes());
                //逾時或發送失敗抛出異常
                channel.waitForConfirmsOrDie(5_000);
            }
            long end = System.nanoTime();
            System.out.format("Published %,d messages individually in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            RabbitMQUtil.close(channel);
            RabbitMQUtil.close(connection);
        }
    }

    /**
     * 批量确認,速度較快,吞吐量較大;但如果确認失敗不知道那條消息出問題了
     * @throws Exception
     */
    private static void publishMessagesInBatch() {
        Connection connection = null;
        Channel channel = null;
        try {
            connection = RabbitMQUtil.getConnection();
            channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, true, null);
            channel.confirmSelect();

            int batchSize = 100;
            int outstandingMessageCount = 0;

            long start = System.nanoTime();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String body = String.valueOf(i);
                channel.basicPublish("", QUEUE_NAME, null, body.getBytes());
                outstandingMessageCount++;

                if (outstandingMessageCount == batchSize) {
                    channel.waitForConfirmsOrDie(5_000);
                    outstandingMessageCount = 0;
                }
            }

            if (outstandingMessageCount > 0) {
                channel.waitForConfirmsOrDie(5_000);
            }
            long end = System.nanoTime();
            System.out.format("Published %,d messages in batch in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            RabbitMQUtil.close(channel);
            RabbitMQUtil.close(connection);
        }
    }

    /**
     * 異步确認,最佳性能和資源使用,但編碼有些複雜
     */
    private static void handlePublishConfirmsAsynchronously() {
        Connection connection = null;
        Channel channel = null;
        try {
            connection = RabbitMQUtil.getConnection();
            channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, true, null);
            channel.confirmSelect();

            ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();

            ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
                if (multiple) {
                    ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(sequenceNumber,true);
                    confirmed.clear();
                } else {
                    outstandingConfirms.remove(sequenceNumber);
                }
            };
            ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
                String body = outstandingConfirms.get(sequenceNumber);
                System.err.format("Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n", body, sequenceNumber, multiple);

                /*
                 *消息發送失敗時這邊再次調用發送成功的處理方法,也可以把失敗的消息(擷取失敗消息的方法同ackCallback裡方法)重新發送,不過不能在這裡發送消息(rabbitmq不支援),
                 *可以把失敗的消息發送到ConcurrentLinkedQueue,發送消息的線程從該ConcurrentLinkedQueue取資料來發送消息
                 */
                ackCallback.handle(sequenceNumber, multiple);
            };
            channel.addConfirmListener(ackCallback, nackCallback);

            long start = System.nanoTime();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String body = String.valueOf(i);
                outstandingConfirms.put(channel.getNextPublishSeqNo(), body);
                channel.basicPublish("", QUEUE_NAME, null, body.getBytes());
            }

            if (!waitUntil(Duration.ofSeconds(60), () -> outstandingConfirms.isEmpty())) {
                throw new IllegalStateException("All messages could not be confirmed in 60 seconds");
            }

            long end = System.nanoTime();
            System.out.format("Published %,d messages and handled confirms asynchronously in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            RabbitMQUtil.close(channel);
            RabbitMQUtil.close(connection);
        }
    }

    private static boolean waitUntil(Duration timeout, BooleanSupplier condition) throws InterruptedException {
        int waited = 0;
        while (!condition.getAsBoolean() && waited < timeout.toMillis()) {
            Thread.sleep(100L);
            waited += 100;
        }
        return condition.getAsBoolean();
    }

    /**
     * 異步确認簡單測試
     */
    private static void asynchronousTest() {
        Connection connection = null;
        Channel channel = null;
        try {
            connection = RabbitMQUtil.getConnection();
            channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, true, null);
            //開啟消息确認,預設時關閉的
            channel.confirmSelect();

            ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
                System.out.println("ackCallback,sequenceNumber=" + sequenceNumber + ",multiple=" + multiple);
            };
            ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
                System.out.println("nackCallback,sequenceNumber=" + sequenceNumber + ",multiple=" + multiple);
            };
            channel.addConfirmListener(ackCallback, nackCallback);

            for (int i = 0; i < 100; i++) {
                String body = String.valueOf(i);
                channel.basicPublish("", QUEUE_NAME, null, body.getBytes());
            }

            Thread.sleep(1000 * 30);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            RabbitMQUtil.close(channel);
            RabbitMQUtil.close(connection);
        }
    }

}