天天看點

rabbitmq-rpc

rabbitmq-rpc

前面講的幾個交換器,我們都是單向的消息發送,生産者将消息發送給消費者就不管了。在實際業務中,我們有時候要等耐消費者将結果傳回給我們,或者說我麼需要消費者上的一個功能,一個方法,一個接口傳回給我們的值。但是往往我們的系統是不同的子系統,分布在不同的電腦,不能直接通過方法來調用,是以需要使用到RPC(Remote Procedure Call)遠端過程調用模式。

RabbitMQ實作RPC的方式很簡單,生産者發送一條帶有标簽(消息ID(correlation_id)+回調隊列名稱)的消息到發送隊列,消費者(也稱RPC服務端)從發送隊列擷取消息并處理業務,解析标簽的資訊将業務結果發送到指定的回調隊列,生産者從回調隊列中根據标簽的資訊擷取發送消息的傳回結果。

sample:

生産者

package com.enjoy.rabbitmqtest;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

public class RpcProducer {
    public  static final String EXCHANGE = "exchange_rpc";
    public  static  final String URL = "localhost";
    public  static  final String USER = "test";
    public  static  final String PASS_WORD = "test";
    public  static  final String VIRTUL = "/test";
    public  static final String ROUTING_KEY ="routekey.rpc";
    public  static final String QUEUE = "queue_rpc";

    public static void main(String[] args) {
        //連接配接工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(URL);
        factory.setUsername(USER);
        factory.setPassword(PASS_WORD);
        factory.setVirtualHost(VIRTUL);
        //擷取連接配接
        try {
            Connection conn = factory.newConnection();
            //擷取信道
            Channel channel = conn.createChannel();
            //建立交換器
            channel.exchangeDeclare(EXCHANGE,BuiltinExchangeType.DIRECT.getType());

            //聲明隊列
            channel.queueDeclare(QUEUE, false, false, false, null);

            //将信道和交換器、路由鍵綁定
            channel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY);

            //此處注意:我們聲明了要回複的隊列。隊列名稱由RabbitMQ自動建立。
            //這樣做的好處是:每個用戶端有屬于自己的唯一回複隊列,生命周期同用戶端
            String replyQueue = channel.queueDeclare().getQueue();

            //發送消息到交換器
            for(int i=0;i<3;i++) {
                //消息相關屬性
                String correlationId = UUID.randomUUID().toString();
                String message = "hello rpc-"+i;
                //屬性
                AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                        .replyTo(replyQueue)//回複隊列
                        .correlationId(correlationId)//回複correlationId
                        .build();
                System.out.println("Producer發送消息[correlationId="+correlationId
                        + ",mesage="+message+"]");
                channel.basicPublish(EXCHANGE, ROUTING_KEY, false, props, message.getBytes());
                Thread.sleep(1000);
            }

            //聲明消費者
            DefaultConsumer consumer = new DefaultConsumer(channel){
                public void handleDelivery(String consumerTag,
                               Envelope envelope,
                               AMQP.BasicProperties properties,
                               byte[] body){
                       System.out.println(System.currentTimeMillis()+",Producer收到回執消息[correlationId="+properties.getCorrelationId()
                               + "mesage="+new String(body)+"]");
                }
            };

            //消費隊列消息
            channel.basicConsume(replyQueue,true,consumer);

//            channel.close();
            conn.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
           

消費者

package com.enjoy.rabbitmqtest;

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RpcConsumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(RpcProducer.URL);
        factory.setUsername(RpcProducer.USER);
        factory.setPassword(RpcProducer.PASS_WORD);
        factory.setVirtualHost(RpcProducer.VIRTUL);
        Connection conn = factory.newConnection();
        //擷取信道
        final  Channel channel = conn.createChannel();

        /*綁定,将隊列和交換器通過路由鍵進行綁定*/
        channel.queueBind(RpcProducer.QUEUE, RpcProducer.EXCHANGE,RpcProducer.ROUTING_KEY);

        System.out.println("waiting for message........");

        //聲明消費者
        DefaultConsumer consumer = new DefaultConsumer(channel){
            //這是一個回到函數,伺服器端擷取到消息,就會調用此方法處理消息
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
//                System.out.println("consumerTag="+consumerTag+",envelope="+envelope
//                +",properties="+properties);
                String message = new String(body);
                System.out.println("Consumer收到消息[correlationId="+properties.getCorrelationId()
                        +",message="+message+"]");
                AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
                //我們在将要回複的消息屬性中,放入從用戶端傳遞過來的correlateId
                builder.correlationId(properties.getCorrelationId());
                AMQP.BasicProperties props = builder.build();
                //發送給回複隊列的消息,exchange="",routingKey=回複隊列名稱
                //因為RabbitMQ對于隊列,始終存在一個預設exchange="",routingKey=隊列名稱的綁定關系
                channel.basicPublish("",properties.getReplyTo(),
                        false,props,("OK-"+properties.getCorrelationId()).getBytes());

            }
        };
        /*消費者正式開始在指定隊列上消費消息*/
        channel.basicConsume(RpcProducer.QUEUE,true,consumer);

//        channel.close();
//        conn.close();
    }
}
           

結果:

生産者:

Producer發送消息[correlationId=cecf5421-0dd5-4bb1-bc4a-de84b2fdd898,mesage=hello rpc-0]

Producer發送消息[correlationId=3819cd8e-16e3-4899-a772-2846a14cef09,mesage=hello rpc-1]

Producer發送消息[correlationId=1502de20-4377-4ba0-b4e8-592c71d82197,mesage=hello rpc-2]

1571543302869,Producer收到回執消息[correlationId=cecf5421-0dd5-4bb1-bc4a-de84b2fdd898mesage=OK-cecf5421-0dd5-4bb1-bc4a-de84b2fdd898]

1571543302869,Producer收到回執消息[correlationId=3819cd8e-16e3-4899-a772-2846a14cef09mesage=OK-3819cd8e-16e3-4899-a772-2846a14cef09]

1571543302869,Producer收到回執消息[correlationId=1502de20-4377-4ba0-b4e8-592c71d82197mesage=OK-1502de20-4377-4ba0-b4e8-592c71d82197]

消費者:

waiting for message........

Consumer收到消息[correlationId=cecf5421-0dd5-4bb1-bc4a-de84b2fdd898,message=hello rpc-0]

Consumer收到消息[correlationId=3819cd8e-16e3-4899-a772-2846a14cef09,message=hello rpc-1]

Consumer收到消息[correlationId=1502de20-4377-4ba0-b4e8-592c71d82197,message=hello rpc-2]