天天看點

RPC架構原理簡述:從實作一個簡易RPCFramework說起

摘要:

  本文闡述了RPC架構與遠端調用的産生背景,介紹了RPC的基本概念和使用背景,之後手動實作了簡易的RPC架構并佐以執行個體進行示範,以便讓各位看官對RPC有一個感性、清晰和完整的認識,最後讨論了RPC架構幾個較為重要問題。總之,RPC架構的精髓在于動态代理和反射,通過它們使得遠端調用“本地化”,對使用者透明且友好。

版權聲明:

本文原創作者:書呆子Rico

作者部落格位址:http://blog.csdn.net/justloveyou_/

一. 引子

  上學時我們寫得應用大都比較簡單,基本上都屬于單體應用,服務調用也局限于本地,如下所示:

// 服務接口
public interface HelloService {

    String hello(String name);

    String hi(String msg);
}

// 服務本地實作
public class HelloServiceImpl implements HelloService{
    @Override
    public String hello(String name) {
        return "Hello " + name;
    }

    @Override
    public String hi(String msg) {
        return "Hi, " + msg;
    }
}

// 服務本地調用
public class Main {
    public static void main(String[] args) {
        HelloService helloService = new HelloServiceImpl();
        helloServiceProxy.hello("Panda");
        helloServiceProxy.hi("Panda");
    }/** Output
           hello : Hello rico
           hi : Hi, panda
    **/  
}
           

  我們寫這樣的單體應用來學習、做實驗正常且合理,但是在生産環境中,單體應用在各方面的性能上和可維護性方面就遠遠不能滿足需求了。應用内各項業務互相糾纏、耦合性太大,不利于後期的維護和更新,主要表現在以下兩點上:

  • 可用性低。 所有雞蛋都放在同一個籃子裡,一旦有問題導緻單體應用挂掉,所有業務都不能通路,穩定性要求難以滿足;
  • 不利于各業務團隊進行合作,開發效率低。 單體應用各業務耦合度太高,不同業務團隊開發進度和實作細節不盡相同,難以高效協作。

  

将不同的業務拆分到多個應用中,讓不同的應用分别承擔不同的功能是解決這些問題的必殺技。

将不同業務分拆到不同的應用後,不但可以大幅度提升系統的穩定性還有助于豐富技術選型,進一步保證系統的性能。總的來說,從單體應用到分布式多體應用是系統更新必經之路。

  當一個單體應用演化成多體應用後,遠端調用就粉墨登場了。在一個應用時,互相通信直接通過本地調用就可完成,而變為多體應用時,互相通信就得依賴遠端調用了,這時一個高效穩定的RPC架構就顯得非常必要了。可能有的同學會覺得,沒必要非得用RPC架構啊,簡單的HTTP調用不是也可以實作遠端通信嗎?确實,簡單的HTTP調用确實也可以實作遠端通信,但是它不是那麼的合适,原因有二:

  • RPC遠端調用像本地調用一樣幹淨簡潔,但其他方式對代碼的侵入性就比較強;
  • 一般使用RPC架構實作遠端通信效率比其他方式效率要高一些。

  當我們踏入公司尤其是大型網際網路公司就會發現,公司的系統都由成千上萬大大小小的服務組成,各服務部署在不同的機器上,由不同的團隊負責。這時就會有兩個很關鍵的問題:

  • 要搭建一個新服務,免不了需要依賴已有的服務,而現在已有的服務都在遠端,怎麼調用?
  • 其它團隊想使用我們的新服務,我們的服務該怎麼釋出以便他人調用?

  下文将對RPC架構的基本原理進行介紹,并對這兩個問題展開探讨,同時參考前輩的博文《RPC架構幾行代碼就夠了》手寫一個簡易RPC架構以加深對PRC原理的了解。

二. RPC 架構介紹

  對于多體應用,由于各服務部署在不同機器,服務間的調用免不了網絡通信過程,服務消費方每調用一個服務都要寫一坨網絡通信相關的代碼,不僅複雜而且極易出錯。如果有一種方式能讓我們像調用本地服務一樣調用遠端服務,而讓調用者對網絡通信這些細節透明,那麼将大大解放程式員的雙手,大幅度提高生産力。比如,服務消費方在執行helloService.hi(“Panda”)時,實質上調用的是遠端的服務。這種方式其實就是RPC(Remote Procedure Call Protocol),在各大網際網路公司中被廣泛使用,如阿裡巴巴的HSF、Dubbo(開源)、Facebook的Thrift(開源)、Google GRPC(開源)、Twitter的Finagle(開源)等。

  RPC的主要功能目标是

讓建構分布式計算(應用)更容易,在提供強大的遠端調用能力時不損失本地調用的語義簡潔性。

為實作該目标,RPC架構需提供一種透明調用機制讓使用者不必顯式的區分本地調用和遠端調用。要讓網絡通信細節對使用者透明,我們需要對通信細節進行封裝,下面是一個RPC的經典調用的流程,并且反映了所涉及到的一些通信細節:

     

RPC架構原理簡述:從實作一個簡易RPCFramework說起

 (1). 服務消費方(client)以本地調用方式調用服務;

 (2). client stub接收到調用後負責将方法、參數等組裝成能夠進行網絡傳輸的消息體;

 (3). client stub找到服務位址,并将消息發送到服務端;

 (4). server stub收到消息後進行解碼;

 (5). server stub根據解碼結果

反射調用

本地的服務;

 (6). 本地服務執行并将結果傳回給server stub;

 (7). server stub将傳回結果打包成消息并發送至消費方;

 (8). client stub接收到消息,并進行解碼;

 (9). 服務消費方得到最終結果。

  RPC架構就是要将2~8這些步驟封裝起來,讓使用者對這些細節透明,使得遠端方法調用看起來像調用本地方法一樣。

三. RPC架構簡易實作及其執行個體分析

(1).服務端

  服務端提供用戶端所期待的服務,一般包括三個部分:服務接口,服務實作以及服務的注冊暴露三部分,如下:

  • 服務接口
public interface HelloService {

    String hello(String name);

    String hi(String msg);
}
           
  • 服務實作
public class HelloServiceImpl implements HelloService{
    @Override
    public String hello(String name) {
        return "Hello " + name;
    }

    @Override
    public String hi(String msg) {
        return "Hi, " + msg;
    }
}
           
  • 服務暴露:隻有把服務暴露出來,才能讓用戶端進行調用,這是RPC架構功能之一。
public class RpcProvider {
    public static void main(String[] args) throws Exception {
        HelloService service = new HelloServiceImpl();
        // RPC架構将服務暴露出來,供用戶端消費
        RpcFramework.export(service, );
    }
}
           

(2).用戶端

  用戶端消費服務端所提供的服務,一般包括兩個部分:服務接口和服務引用兩個部分,如下:

  • 服務接口:與服務端共享同一個服務接口
public interface HelloService {

    String hello(String name);

    String hi(String msg);
}
           
  • 服務引用:消費端通過RPC架構進行遠端調用,這也是RPC架構功能之一
public class RpcConsumer {
    public static void main(String[] args) throws Exception {
        // 由RpcFramework生成的HelloService的代理
        HelloService service = RpcFramework.refer(HelloService.class, "127.0.0.1", );
        String hello = service.hello("World");
        System.out.println("用戶端收到遠端調用的結果 : " + hello);
    }
}
           

(3).RPC架構原型實作

  RPC架構主要包括兩大功能:一個用于服務端暴露服務,一個用于用戶端引用服務。

  • 服務端暴露服務
/**
     * 暴露服務
     *
     * @param service 服務實作
     * @param port    服務端口
     * @throws Exception
     */
    public static void export(final Object service, int port) throws Exception {
        if (service == null) {
            throw new IllegalArgumentException("service instance == null");
        }
        if (port <=  || port > ) {
            throw new IllegalArgumentException("Invalid port " + port);
        }
        System.out.println("Export service " + service.getClass().getName() + " on port " + port);
        // 建立Socket服務端
        ServerSocket server = new ServerSocket(port);
        for (; ; ) {
            try {
                // 監聽Socket請求
                final Socket socket = server.accept();
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            try {
                                /* 擷取請求流,Server解析并擷取請求*/
                                // 建構對象輸入流,從源中讀取對象到程式中
                                ObjectInputStream input = new ObjectInputStream(
                                    socket.getInputStream());
                                try {

                                    System.out.println("\nServer解析請求 : ");
                                    String methodName = input.readUTF();
                                    System.out.println("methodName : " + methodName);
                                    // 泛型與數組是不相容的,除了通配符作泛型參數以外
                                    Class<?>[] parameterTypes = (Class<?>[])input.readObject();
                                    System.out.println(
                                        "parameterTypes : " + Arrays.toString(parameterTypes));
                                    Object[] arguments = (Object[])input.readObject();
                                    System.out.println("arguments : " + Arrays.toString(arguments));


                                    /* Server 處理請求,進行響應*/
                                    ObjectOutputStream output = new ObjectOutputStream(
                                        socket.getOutputStream());
                                    try {
                                        // service類型為Object的(可以釋出任何服務),故隻能通過反射調用處理請求
                                        // 反射調用,處理請求
                                        Method method = service.getClass().getMethod(methodName,
                                            parameterTypes);
                                        Object result = method.invoke(service, arguments);
                                        System.out.println("\nServer 處理并生成響應 :");
                                        System.out.println("result : " + result);
                                        output.writeObject(result);
                                    } catch (Throwable t) {
                                        output.writeObject(t);
                                    } finally {
                                        output.close();
                                    }
                                } finally {
                                    input.close();
                                }
                            } finally {
                                socket.close();
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }).start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
           

  從該RPC架構的簡易實作來看,RPC服務端邏輯是:首先建立ServerSocket負責監聽特定端口并接收客戶連接配接請求,然後使用Java原生的序列化/反序列化機制來解析得到請求,包括所調用方法的名稱、參數清單和實參,最後反射調用服務端對服務接口的具體實作并将得到的結果回傳至用戶端。至此,一次簡單PRC調用的服務端流程執行完畢。

  • 用戶端引用服務
/**
     * 引用服務
     *
     * @param <T>            接口泛型
     * @param interfaceClass 接口類型
     * @param host           伺服器主機名
     * @param port           伺服器端口
     * @return 遠端服務,傳回代理對象
     * @throws Exception
     */
    @SuppressWarnings("unchecked")
    public static <T> T refer(final Class<T> interfaceClass, final String host, final int port)
        throws Exception {
        if (interfaceClass == null) {
            throw new IllegalArgumentException("Interface class == null");
        }
        // JDK 動态代理的限制,隻能實作對接口的代理
        if (!interfaceClass.isInterface()) {
            throw new IllegalArgumentException(
                "The " + interfaceClass.getName() + " must be interface class!");
        }
        if (host == null || host.length() == ) {
            throw new IllegalArgumentException("Host == null!");
        }
        if (port <=  || port > ) {
            throw new IllegalArgumentException("Invalid port " + port);
        }
        System.out.println(
            "Get remote service " + interfaceClass.getName() + " from server " + host + ":" + port);

        // JDK 動态代理
        T proxy = (T)Proxy.newProxyInstance(interfaceClass.getClassLoader(),
            new Class<?>[] {interfaceClass}, new InvocationHandler() {
                // invoke方法本意是對目标方法的增強,在這裡用于發送RPC請求和接收響應
                @Override
                public Object invoke(Object proxy, Method method, Object[] arguments)
                    throws Throwable {
                    // 建立Socket用戶端,并與服務端建立連結
                    Socket socket = new Socket(host, port);
                    try {
                        /* 用戶端像服務端進行請求,并将請求參數寫入流中*/
                        // 将對象寫入到對象輸出流,并将其發送到Socket流中去
                        ObjectOutputStream output = new ObjectOutputStream(
                            socket.getOutputStream());
                        try {
                            // 發送請求
                            System.out.println("\nClient發送請求 : ");
                            output.writeUTF(method.getName());
                            System.out.println("methodName : " + method.getName());
                            output.writeObject(method.getParameterTypes());
                            System.out.println("parameterTypes : " + Arrays.toString(method
                                .getParameterTypes()));
                            output.writeObject(arguments);
                            System.out.println("arguments : " + Arrays.toString(arguments));


                            /* 用戶端讀取并傳回服務端的響應*/
                            ObjectInputStream input = new ObjectInputStream(
                                socket.getInputStream());
                            try {
                                Object result = input.readObject();
                                if (result instanceof Throwable) {
                                    throw (Throwable)result;
                                }
                                System.out.println("\nClient收到響應 : ");
                                System.out.println("result : " + result);
                                return result;
                            } finally {
                                input.close();
                            }
                        } finally {
                            output.close();
                        }
                    } finally {
                        socket.close();
                    }
                }
            });
        return proxy;
    }
           

  從該RPC架構的簡易實作來看,RPC用戶端邏輯是:首先建立Socket用戶端并與服務端建立連結,然後使用Java原生的序列化/反序列化機制将調用請求發送給用戶端,包括所調用方法的名稱、參數清單将服務端的響應傳回給使用者即可。至此,一次簡單PRC調用的用戶端流程執行完畢。特别地,從代碼實作來看,

實作透明的PRC調用的關鍵就是 動态代理,這是RPC架構實作的靈魂所在。
  • RPC原型實作
public class RpcFramework {
    /**
     * 暴露服務
     *
     * @param service 服務實作
     * @param port    服務端口
     * @throws Exception
     */
    public static void export(final Object service, int port) throws Exception {
        if (service == null) {
            throw new IllegalArgumentException("service instance == null");
        }
        if (port <=  || port > ) {
            throw new IllegalArgumentException("Invalid port " + port);
        }
        System.out.println("Export service " + service.getClass().getName() + " on port " + port);
        // 建立Socket服務端
        ServerSocket server = new ServerSocket(port);
        for (; ; ) {
            try {
                // 監聽Socket請求
                final Socket socket = server.accept();
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            try {
                                /* 擷取請求流,Server解析并擷取請求*/
                                // 建構對象輸入流,從源中讀取對象到程式中
                                ObjectInputStream input = new ObjectInputStream(
                                    socket.getInputStream());
                                try {

                                    System.out.println("\nServer解析請求 : ");
                                    String methodName = input.readUTF();
                                    System.out.println("methodName : " + methodName);
                                    // 泛型與數組是不相容的,除了通配符作泛型參數以外
                                    Class<?>[] parameterTypes = (Class<?>[])input.readObject();
                                    System.out.println(
                                        "parameterTypes : " + Arrays.toString(parameterTypes));
                                    Object[] arguments = (Object[])input.readObject();
                                    System.out.println("arguments : " + Arrays.toString(arguments));


                                    /* Server 處理請求,進行響應*/
                                    ObjectOutputStream output = new ObjectOutputStream(
                                        socket.getOutputStream());
                                    try {
                                        // service類型為Object的(可以釋出任何服務),故隻能通過反射調用處理請求
                                        // 反射調用,處理請求
                                        Method method = service.getClass().getMethod(methodName,
                                            parameterTypes);
                                        Object result = method.invoke(service, arguments);
                                        System.out.println("\nServer 處理并生成響應 :");
                                        System.out.println("result : " + result);
                                        output.writeObject(result);
                                    } catch (Throwable t) {
                                        output.writeObject(t);
                                    } finally {
                                        output.close();
                                    }
                                } finally {
                                    input.close();
                                }
                            } finally {
                                socket.close();
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }).start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }


    /**
     * 引用服務
     *
     * @param <T>            接口泛型
     * @param interfaceClass 接口類型
     * @param host           伺服器主機名
     * @param port           伺服器端口
     * @return 遠端服務,傳回代理對象
     * @throws Exception
     */
    @SuppressWarnings("unchecked")
    public static <T> T refer(final Class<T> interfaceClass, final String host, final int port)
        throws Exception {
        if (interfaceClass == null) {
            throw new IllegalArgumentException("Interface class == null");
        }
        // JDK 動态代理的限制,隻能實作對接口的代理
        if (!interfaceClass.isInterface()) {
            throw new IllegalArgumentException(
                "The " + interfaceClass.getName() + " must be interface class!");
        }
        if (host == null || host.length() == ) {
            throw new IllegalArgumentException("Host == null!");
        }
        if (port <=  || port > ) {
            throw new IllegalArgumentException("Invalid port " + port);
        }
        System.out.println(
            "Get remote service " + interfaceClass.getName() + " from server " + host + ":" + port);

        // JDK 動态代理
        T proxy = (T)Proxy.newProxyInstance(interfaceClass.getClassLoader(),
            new Class<?>[] {interfaceClass}, new InvocationHandler() {
                // invoke方法本意是對目标方法的增強,在這裡用于發送RPC請求和接收響應
                @Override
                public Object invoke(Object proxy, Method method, Object[] arguments)
                    throws Throwable {
                    // 建立Socket用戶端,并與服務端建立連結
                    Socket socket = new Socket(host, port);
                    try {
                        /* 用戶端像服務端進行請求,并将請求參數寫入流中*/
                        // 将對象寫入到對象輸出流,并将其發送到Socket流中去
                        ObjectOutputStream output = new ObjectOutputStream(
                            socket.getOutputStream());
                        try {
                            // 發送請求
                            System.out.println("\nClient發送請求 : ");
                            output.writeUTF(method.getName());
                            System.out.println("methodName : " + method.getName());
                            output.writeObject(method.getParameterTypes());
                            System.out.println("parameterTypes : " + Arrays.toString(method
                                .getParameterTypes()));
                            output.writeObject(arguments);
                            System.out.println("arguments : " + Arrays.toString(arguments));


                            /* 用戶端讀取并傳回服務端的響應*/
                            ObjectInputStream input = new ObjectInputStream(
                                socket.getInputStream());
                            try {
                                Object result = input.readObject();
                                if (result instanceof Throwable) {
                                    throw (Throwable)result;
                                }
                                System.out.println("\nClient收到響應 : ");
                                System.out.println("result : " + result);
                                return result;
                            } finally {
                                input.close();
                            }
                        } finally {
                            output.close();
                        }
                    } finally {
                        socket.close();
                    }
                }
            });
        return proxy;
    }
}
           

  以上是簡易RPC架構實作的簡易完整代碼。

四. 關于RPC架構的若幹問題說明

(1).RPC架構如何做到透明化遠端服務調用?

  如何封裝通信細節才能讓使用者像以本地調用方式調用遠端服務呢?就Java而言,

動态代理恰是解決之道。

Java動态代理有JDK動态代理和CGLIB動态代理兩種方式。盡管位元組碼生成方式實作的代理更為強大和高效,但代碼維護不易,是以RPC架構的大部分實作還是選擇JDK動态代理的方式。在上面的例子中,RPCFramework實作中的invoke方法封裝了與遠端服務通信的細節,消費方首先從RPCFramework獲得服務提供方的接口,當執行helloService.hi(“Panda”)方法時就會調用invoke方法。

(2).如何釋出自己的服務?

  如何讓别人使用我們的服務呢?難道就像我們上面的代碼一樣直接寫死服務的IP以及端口就可以了嗎?事實上,在實際生産實作中,使用人肉告知的方式是不現實的,因為實際生産中服務機器上/下線太頻繁了。如果你發現一台機器提供服務不夠,要再添加一台,這個時候就要告訴調用者我現在有兩個IP了,你們要輪詢調用來實作負載均衡;調用者咬咬牙改了,結果某天一台機器挂了,調用者發現服務有一半不可用,他又隻能手動修改代碼來删除挂掉那台機器的ip。這必然是相當痛苦的!

  有沒有一種方法能實作自動告知,即機器的上線/下線對調用方透明,調用者不再需要寫死服務提供方位址?當然可以,生産中的RPC架構都采用的是自動告知的方式,比如,阿裡内部使用的RPC架構HSF是通過ConfigServer來完成這項任務的。此外,Zookeeper也被廣泛用于實作服務自動注冊與發現功能。不管具體采用何種技術,他們大都采用的都是

釋出/訂閱

模式。

(3).序列化與反序列化

  我們知道,Java對象是無法直接在網絡中進行傳輸的。那麼,我們的RPC請求如何發給服務端,用戶端又如何接收來自服務端的響應呢?答案是,在傳輸Java對象時,首先對其進行序列化,然後在相應的終端進行反序列化還原對象以便進行處理。事實上,序列化/反序列化技術也有很多種,比如Java的原生序列化方式、JSON、阿裡的Hessian和ProtoBuff序列化等,它們在效率上存在差異,但又有各自的特點。

  除上面提到的三個問題外,生産中使用的RPC架構要考慮的東西還有很多,在此就不作探讨了。本文的目的就是為了讓各位看官對RPC架構有一個感性的、較為深入的了解,如果達到了這一目的,筆者的目的基本就算達到了。

五. 總結

  本文闡述了遠端調用的産生背景,然後介紹了RPC的基本概念和要解決的問題,之後手動實作了簡易得RPC架構并佐以執行個體進行示範,使看官們對RPC有一個感性完整的認識,最後讨論了RPC架構的幾個重要問題。總之,RPC架構的精髓在于動态代理和反射,通過它們使得遠端調用“本地化”,對使用者透明且友好。

六. 說明與緻謝

  本文主要參考了梁飛前輩的RPC架構幾行代碼就夠了和Hosee的RPC原理及RPC執行個體分析,這兩篇博文都是學習RPC的好文章,感謝它們的無私分享。

  若各位看官想進一步了解Java動态代理,請移步筆者的深入了解代理模式:靜态代理與JDK動态代理一文。

引用

RPC原理及RPC執行個體分析

RPC架構幾行代碼就夠了