天天看點

分布式通訊架構-RMI原理分析

什麼是 RPC

RPC(Remote Procedure Call,遠端過程調用),一般用來實作部署在不同機器上的系統之間的方法調用,使得程式能夠像通路本地系統資源一樣,通過網絡傳輸去通路遠端系統資源。對于用戶端來說, 傳輸層使用什麼協定,序列化、反序列化都是透明的。

了解 Java RMI

RMI 全稱是 remote method invocation – 遠端方法調用,一種用于遠端過程調用的應用程式程式設計接口,是純 java 的網絡分布式應用系統的核心解決方案之一。

RMI 目前使用 Java 遠端消息交換協定 JRMP(Java Remote Messageing Protocol)進行通信,由于 JRMP 是專為 Java對象制定的,是分布式應用系統的百分之百純 java 解決方案,用 Java RMI 開發的應用系統可以部署在任何支援 JRE的平台上,缺點是,由于 JRMP 是專門為 java 對象指定的,是以 RMI 對于非 JAVA 語言開發的應用系統的支援不足,不能與非 JAVA 語言編寫的對象進行通信。

個人了解 RPC 和 RMI 的差別

RPC是一個跨程序遠端調用的思想,它的初衷就是實作兩個應用之間的通信。而實作RPC的方式會有很多種,RMI其實就是RPC具體的實作方式之一。

Java RMI 代碼實踐

//普通接口繼承 Remote
public interface IHelloService extends Remote{

    String sayHello(String msg) throws RemoteException;
}

//接口實作類繼承 UnicastRemoteObject 
public class HelloServiceImpl extends UnicastRemoteObject implements IHelloService {

    @Override
    public String sayHello(String msg) throws RemoteException{
        return "Hello,"+msg;
    }

    protected HelloServiceImpl() throws RemoteException {
        super();
    }
}

//服務端釋出 helloService 執行個體
public class Server {

    public static void main(String[] args) {
        try {
            IHelloService helloService=new HelloServiceImpl();
            LocateRegistry.createRegistry();
            Naming.bind("rmi://127.0.0.1/Hello",helloService);
            System.out.println("服務啟動成功...");
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

//用戶端通過服務端釋出的位址,拿到helloService執行個體,并且調用sayHello方法
//(完成了跨程序之間的通信)
public class ClientDemo {

    public static void main(String[] args) {
        try {
            IHelloService helloService=(IHelloService) Naming.lookup("rmi://127.0.0.1/Hello");
            System.out.println(helloService.sayHello("我是架構師"));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
           

遠端對象必須實作 UnicastRemoteObject,這樣才能保證用戶端通路獲得遠端對象時,該遠端對象會把自身的一個拷貝以 Socket 形式傳輸給用戶端,用戶端獲得的拷貝稱為“stub” , 而服務端本身已經存在的遠端對象稱為“skeleton”,此時用戶端的stub 是用戶端的一個代理,用于與伺服器端進行通信,而 skeleton 是服務端的一個代理,用于接收用戶端的請求之後調用遠端方法來響應用戶端的請求。

自定義 RMI 架構

服務端代碼:

1》服務端對外釋出服務并提供端口号。

public class ServerDemo {

    public static void main(String[] args) {
        TaofutHelloWorld taofutHelloWorld=new TaofutHelloWorldImpl();
        RpcServer rpcServer=new RpcServer();
        //對外釋出服務并提供端口号8888
        rpcServer.publisher(taofutHelloWorld,);

    }
}
           

2》具體進去看看rpcServer做了什麼。

/**
 * 用來對外釋出一個服務,并且監聽用戶端發來的請求
 */
public class RpcServer {

    //定義一個線程池
    private static final ExecutorService executorService= Executors.newCachedThreadPool();
    /**
     * 釋出服務
     * @param service
     * @param port
     */
    public void publisher(final Object service,int port){
        ServerSocket serverSocket=null;
        try {
            serverSocket=new ServerSocket(port);//啟動一個服務監聽
            System.out.println("服務啟動成功...");
            while (true){
                Socket socket=serverSocket.accept();
                //開啟線程去處理用戶端發來的請求
                executorService.execute(new ProcessorHandler(socket,service));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            if(serverSocket!=null){
                try {
                    serverSocket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
           

3》繼續跟進線程處理的任務。

/**
 * 處理用戶端發來的請求
 */
public class ProcessorHandler implements Runnable {

    private Socket socket;
    private Object service;//服務端釋出的服務

    public ProcessorHandler(Socket socket, Object service) {
        this.socket = socket;
        this.service = service;
    }

    @Override
    public void run() {
        //處理socket請求,服務端接收資料,從輸入流裡讀
        ObjectInputStream objectInputStream=null;
        ObjectOutputStream outputStream=null;
        try {
            objectInputStream=new ObjectInputStream(socket.getInputStream());
            RpcRequest rpcRequest=(RpcRequest) objectInputStream.readObject();
            Object result=invoke(rpcRequest);

            //将服務端執行的結果 通過socket回傳給用戶端
            outputStream=new ObjectOutputStream(socket.getOutputStream());
            outputStream.writeObject(result);
            outputStream.flush();
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            if(objectInputStream!=null){
                try {
                    objectInputStream.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if(outputStream!=null){
                try {
                    outputStream.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 用戶端:發來請求告訴服務端,我要調用你這個服務裡面的xx方法,并将xx方法的詳細資訊傳遞給服務端。
     * 服務端:收到請求及參數後,利用反射去執行了xx方法,并将執行的結果傳回給用戶端。
     * @param rpcRequest
     * @return
     * @throws InvocationTargetException
     * @throws IllegalAccessException
     * @throws NoSuchMethodException
     */
    private Object invoke(RpcRequest rpcRequest) throws InvocationTargetException, IllegalAccessException, NoSuchMethodException {
        Object[] args=rpcRequest.getParameters();
        Class<?>[] types=new Class[args.length];
        for(int i=;i<args.length;i++){
            types[i]=args[i].getClass();
        }
        Method method=service.getClass().getMethod(rpcRequest.getMethodName(),types);
        return method.invoke(service,args);
    }
}
           

4》服務端業務接口及實作類。

public interface TaofutHelloWorld {

    String sayHello(String msg);
}

public class TaofutHelloWorldImpl implements TaofutHelloWorld {

    @Override
    public String sayHello(String msg) {
        return "Hello,World! My name is taofut "+msg;
    }
}
           

5》服務端接收到用戶端的傳輸對象。

/**
 * 傳輸對象
 */
public class RpcRequest implements Serializable{

    private String className;
    private String methodName;
    private Object[] parameters;

    public String getClassName() {
        return className;
    }

    public void setClassName(String className) {
        this.className = className;
    }

    public String getMethodName() {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    public Object[] getParameters() {
        return parameters;
    }

    public void setParameters(Object[] parameters) {
        this.parameters = parameters;
    }
}
           

用戶端代碼:

1》用戶端向服務端發起請求。

public class ClientDemo {

    public static void main(String[] args) {
        RpcClientProxy rpcClientProxy=new RpcClientProxy();
        //用戶端使用代理類來發起請求
        TaofutHelloWorld taofutHelloWorld=(TaofutHelloWorld) rpcClientProxy.clientProxy(TaofutHelloWorld.class,"127.0.0.1",);
        System.out.println(taofutHelloWorld.sayHello("我是架構師"));
    }
}
           

2》建立用戶端代理類。

/**
 * 建立一個用戶端代理類
 */
public class RpcClientProxy {

    public <T> T clientProxy(final Class<T> interfaceCls,final String host,final int port){
        return (T)Proxy.newProxyInstance(interfaceCls.getClassLoader(),
                new Class[]{interfaceCls},new RemoteInvocationHandler(host,port));
    }
}
           

3》看看代理類具體的邏輯。

public class RemoteInvocationHandler implements InvocationHandler {

    private String host;
    private int port;

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        RpcRequest rpcRequest=new RpcRequest();
        rpcRequest.setClassName(method.getDeclaringClass().getName());
        rpcRequest.setMethodName(method.getName());
        rpcRequest.setParameters(args);
        //傳輸
        TCPTransport tcpTransport=new TCPTransport(host,port);
        //向服務端發送請求,并且傳輸 rpcRequest
        return tcpTransport.send(rpcRequest);
    }

    public RemoteInvocationHandler(String host, int port) {
        this.host = host;
        this.port = port;
    }
}
           

4》看看 tcpTransport 的具體實作。

public class TCPTransport {
    private String host;
    private int port;

    public TCPTransport(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public Socket newSocket(){
        System.out.println("建立一個新的連接配接");
        Socket socket=null;
        try {
            //與服務端建立連接配接
            socket=new Socket(host,port);
            return socket;
        } catch (IOException e) {
            throw new RuntimeException("連接配接建立失敗");
        }

    }

    /**
     * 與服務端建立連接配接,并且向服務端傳輸 rpcRequest(服務端需要執行的方法詳細資訊)
     * 收到服務端執行完方法的結果
     * @param rpcRequest
     * @return
     */
    public Object send(RpcRequest rpcRequest){
        Socket socket=null;
        try {
            socket=newSocket();
            ObjectOutputStream outputStream=new ObjectOutputStream(socket.getOutputStream());
            outputStream.writeObject(rpcRequest);//rpcRequest序列化傳輸
            outputStream.flush();

            ObjectInputStream inputStream=new ObjectInputStream(socket.getInputStream());
            Object result=inputStream.readObject();
            inputStream.close();
            outputStream.close();
            return result;
        } catch (Exception e) {
            throw new RuntimeException("發起遠端調用異常",e);
        } finally {
            if(socket!=null){
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
           

5》用戶端向服務端傳輸的對象 rpcRequest

public class RpcRequest implements Serializable{

    private String className;
    private String methodName;
    private Object[] parameters;

    public String getClassName() {
        return className;
    }

    public void setClassName(String className) {
        this.className = className;
    }

    public String getMethodName() {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    public Object[] getParameters() {
        return parameters;
    }

    public void setParameters(Object[] parameters) {
        this.parameters = parameters;
    }
}
           

當用戶端執行 taofutHelloWorld.sayHello(“我是架構師”),代理類就會向服務端發起請求并傳遞參數,服務端執行完方法後,将結果傳回給用戶端。用戶端列印出結果到控制台:Hello,World! My name is taofut 我是架構師

用戶端服務端處理過程圖

分布式通訊架構-RMI原理分析

繼續閱讀