天天看點

RabbitMQ_3、釋出/訂閱

釋出訂閱

​​官方例子​​

一次向多個消費者發送消息

send類

/**

* Actively declare a non-autodelete exchange with no extra arguments

* @see com.rabbitmq.client.AMQP.Exchange.Declare

* @see com.rabbitmq.client.AMQP.Exchange.DeclareOk

* @param exchange the name of the exchange

* @param type the exchange type

* @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)

* @throws java.io.IOException if an error is encountered

* @return a declaration-confirm method to indicate the exchange was successfully declared

*/

Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;

/**
 * @PackageName : com.rzk
 * @FileName : Send
 * @Description : 釋出訂閱-消息生産者
 * @Author : rzk
 * @CreateTime : 23/6/2021 上午12:21
 * @Version : 1.0.0
 */
public class Send {

    //定義交換機名稱
    private final static String EXCHANGE_NAME = "exchange_fanout";

    public static void main(String[] argv) throws Exception {
        //建立連接配接工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("*");
        factory.setUsername("yeb");
        factory.setVirtualHost("/yeb");
        factory.setPassword("yeb");
        factory.setPort(5672);

        try (
                //連接配接工廠建立連接配接
                Connection connection = factory.newConnection();
                //建立信道
                Channel channel = connection.createChannel()) {
                //綁定交換機
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
                String message = " 釋出  ";
                //隊列消息的生産者:發送消息
                channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
                System.out.println(" [x] Sent '" + message + "'");
            }
    }
}      

Recv1

/**
 * @PackageName : com.rzk.simple.recv
 * @FileName : Recv
 * @Description : 釋出/訂閱-消息接收
 * @Author : rzk
 * @CreateTime : 23/6/2021 上午12:22
 * @Version : 1.0.0
 */
public class Recv01 {
    private final static String EXCHANGE_NAME = "exchange_fanout";

    public static void main(String[] argv) throws Exception {
        //建立工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("*");
        factory.setUsername("yeb");
        factory.setVirtualHost("/yeb");
        factory.setPassword("yeb");
        factory.setPort(5672);
        //連接配接工廠建立連接配接
        Connection connection = factory.newConnection();
        //建立信道
        Channel channel = connection.createChannel();
        //隊列綁定交換機
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        //擷取隊列(排他隊列
        String queueName = channel.queueDeclare().getQueue();
        //綁定隊列和交換機
        channel.queueBind(queueName,EXCHANGE_NAME,"");


        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        /**
         * 監聽隊列消費消息
         * autoAck:自動應答
         * 當消費者收到該消息,會傳回通知消息隊列 我消費者已經收到消息了
         */
        channel.basicConsume(queueName, false, deliverCallback, consumerTag -> { });
    }
}      

Recv02

/**
 * @PackageName : com.rzk.simple.recv
 * @FileName : Recv
 * @Description : 釋出/訂閱-消息接收
 * @Author : rzk
 * @CreateTime : 23/6/2021 上午12:22
 * @Version : 1.0.0
 */
public class Recv02 {
    private final static String EXCHANGE_NAME = "exchange_fanout";

    public static void main(String[] argv) throws Exception {
        //建立工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("*");
        factory.setUsername("yeb");
        factory.setVirtualHost("/yeb");
        factory.setPassword("yeb");
        factory.setPort(5672);
        //連接配接工廠建立連接配接
        Connection connection = factory.newConnection();
        //建立信道
        Channel channel = connection.createChannel();
        //綁定交換機
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        //擷取隊列(排他隊列)
        String queueName = channel.queueDeclare().getQueue();
        //綁定隊列和交換機
        channel.queueBind(queueName,EXCHANGE_NAME,"");


        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        //監聽隊列消費消息
        channel.basicConsume(queueName, false, deliverCallback, consumerTag -> { });
    }
}