天天看點

RPC架構初體驗之入門RPC架構初體驗之入門1 動手實作一個簡單的RPC2 基于ZooKeeper注冊中心的RPC實作3 常見RPC架構

RPC架構初體驗之入門

項目位址:https://github.com/shirukai/learn-demo-rpc.git

RPC全稱Remote Procedure Call,顧名思義,遠端過程調用的意思。關于RPC的介紹,可以參考一下簡書上《如何給老婆解釋什麼是RPC》這篇文章,很有趣。RPC這個概念,我第一次接觸是在《Spark核心設計的藝術》這本書裡。後來在看微服務的時候,也提及到了幾款RPC架構,比如Thrift、Dubbo、gRPC。是以決定認真的學習一下RPC以及這幾種架構。下面将會在本篇文章裡入門RPC,動手實作一個簡單的RPC,再基于Netty實作一個RPC,最後簡單介紹一下幾款常見的RPC架構,以及它們的優缺點。後面将會以系列的形式分别介紹這幾款常見的RPC架構的使用。

1 動手實作一個簡單的RPC

為了深入了解RPC,這裡動手實作了一個簡單的RPC,服務之前通過簡單的socket進行通訊。

1.1 項目描述

在learn-demo-rpc項目下有一個simple-rpc的子產品,該子產品實作了一個簡單的RPC,其中包括四個子子產品

simple-rpc/
├── simple-rpc-api
├── simple-rpc-consumer
├── simple-rpc-core
└── simple-rpc-provider
           
  • simple-rpc-api: 該子產品提供服務接口
  • simple-rpc-core: RPC核心實作
  • simple-rpc-consumer: RPC消費者服務
  • simple-rpc-provider: RPC提供者服務

simple-rpc-provider 子產品實作simple-rpc-api定義的相關接口,并通過simple-rpc-core子產品建立提供者服務。

simple-rpc-consumer通過simple-rpc-core子產品建立消費者服務,并通過simple-rpc-api子產品的接口進行RPC。

項目示範:

啟動simple-rpc-provider子產品裡的DemoServiceProvider

RPC架構初體驗之入門RPC架構初體驗之入門1 動手實作一個簡單的RPC2 基于ZooKeeper注冊中心的RPC實作3 常見RPC架構

啟動simple-rpc-consumer裡的DemoServiceConsumer

RPC架構初體驗之入門RPC架構初體驗之入門1 動手實作一個簡單的RPC2 基于ZooKeeper注冊中心的RPC實作3 常見RPC架構

1.2 simple-rpc-core子產品

該子產品為核心子產品,分别提供了服務者、消費者服務建立。如下所示,主要包括四個功能。request包定義RPC請求資料類型,response包定義RPC響應資料類型,server提供RPC的provider服務,client提供RPC的consumer服務。

RPC架構初體驗之入門RPC架構初體驗之入門1 動手實作一個簡單的RPC2 基于ZooKeeper注冊中心的RPC實作3 常見RPC架構

1.2.1 request

在該包下建立RpcRequest類,用來定義請求資料類型實體,該實體主要包含,遠端調用的方法名稱、參數清單、參數類型清單。

package learn.demo.rpc.simple.core.request;

import java.io.Serializable;
import java.util.Arrays;

/**
 * Created by shirukai on 2019-06-21 15:02
 * RPC 請求
 */
public class RpcRequest implements Serializable {
    private static final long serialVersionUID = 4932007273709224551L;
    /**
     * 方法名稱
     */
    private String methodName;

    /**
     * 參數清單
     */
    private Object[] parameters;

    /**
     * 參數類型
     */
    private Class<?>[] parameterTypes;
    /**
     * 省略get、set方法。
     */
}
           

1.2.2 response

在該包下建立RpcResponse類,用來定義RPC響應的資料類型,其中包含響應狀态status,用來描述請求是否執行成功,它有兩個狀态succeed和failed。響應資訊message主要存放異常響應時的錯誤資訊。響應資料data,遠端調用方法的傳回值。

public class RpcResponse implements Serializable {
    public static String SUCCEED = "succeed";
    public static String FAILED = "failed";
    private static final long serialVersionUID = 6595683424889346485L;

    /**
     * 響應狀态
     */
    private String status = "succeed";
    /**
     * 響應資訊,如異常資訊
     */
    private String message;

    /**
     * 響應資料,傳回值
     */
    private Object data;
    /**
     * 省略get、set方法
     */
}
           

1.2.3 server

在該包下建立RpcProvider類,用來定義建立Provider服務的方法。原理很簡單,根據指定的端口建立ServerSocket,監聽用戶端發送資料。接收到用戶端發送資料後,反序列化成Request,擷取其中的方法名和參數類型及參數清單,根據傳入的接口class和執行個體,通過反射機制,調用該方法,拿到執行結果後封裝成RpcResponse傳回給用戶端。具體實作如下:

package learn.demo.rpc.simple.core.server;

import learn.demo.rpc.simple.core.request.RpcRequest;
import learn.demo.rpc.simple.core.response.RpcResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * Created by shirukai on 2019-06-21 16:26
 * RPC provider
 */
public class RpcProvider<T> {
    private static final Logger log = LoggerFactory.getLogger(RpcProvider.class);
    private T ref;

    private Class<?> interfaceClass;

    public void setRef(T ref) {
        this.ref = ref;
    }

    public RpcProvider<T> setInterfaceClass(Class<?> interfaceClass) {
        this.interfaceClass = interfaceClass;
        return this;
    }

    public void export(int port) {
        try {
            log.info("The RPC Server is starting, address:{}, bind:{}", InetAddress.getLocalHost().getHostAddress(), port);
            ServerSocket listener = new ServerSocket(port);
            while (true) {
                Socket socket = listener.accept();
                // 接收資料并進行反序列化
                ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());

                // 擷取請求對象
                Object object = objectInputStream.readObject();

                if (object instanceof RpcRequest) {
                    RpcRequest request = (RpcRequest) object;
                    log.info("Received request:{}", request);
                    // 處理請求
                    RpcResponse response = handleRequest(request);
                    // 将結果傳回給用戶端
                    log.info("Send response to client.{}", response);
                    ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
                    objectOutputStream.writeObject(response);
                }
                socket.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    private RpcResponse handleRequest(RpcRequest request) {
        RpcResponse response = new RpcResponse();
        try {
            log.info("The server is handling request.");
            Method method = interfaceClass.getMethod(request.getMethodName(), request.getParameterTypes());
            Object data = method.invoke(ref, request.getParameters());
            response.setData(data);
        } catch (Exception e) {
            response.setStatus(RpcResponse.FAILED).setMessage(e.getMessage());
        }
        return response;
    }
}

           

1.2.4 client

用戶端的實作比較有趣,包含如下内容

client/
├── RpcClient.java
├── RpcConsumer.java
└── proxy
    └── RpcInvocationHandler.java
           

原理也不複雜,通過上面的結構可以看出,我們clien裡包含了一個proxy包,該包主要實作的是一個動态代理。用戶端實作API接口的動态代理,生成接口執行個體,表面上調用的接口方法,通過代理後,經過RpcClient進行的遠端調用,也就是我們的定義的RPC,拿到結果後再傳回。

1.2.4.1 RpcClient

RpcClient主要是與遠端ServerSocket進行通訊的,建立根據IP和端口建立Socket,将RpcRequest進行序列化之後,發送給遠端服務。

package learn.demo.rpc.simple.core.client;

import learn.demo.rpc.simple.core.request.RpcRequest;
import learn.demo.rpc.simple.core.response.RpcResponse;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;

/**
 * Created by shirukai on 2019-06-21 15:42
 * Rpc用戶端
 */
public class RpcClient {
    /**
     * 服務位址
     */
    private String address;

    /**
     * 服務端口
     */
    private int port;


    public RpcResponse send(RpcRequest rpcRequest) throws Exception {

        Socket socket = new Socket(address, port);

        //請求序列化
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());

        //将請求發給服務提供方
        objectOutputStream.writeObject(rpcRequest);

        // 将響應體反序列化
        ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());

        Object response = objectInputStream.readObject();
        if (response instanceof RpcResponse) {
            return (RpcResponse) response;
        }
        throw new RuntimeException("Return error");
    }

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }
}
           

1.2.4.2 RpcInvocationHandler

在proxy下建立RpcInvocationHandler,調用處理器,繼承InvocationHandler接口并實作其invoke方法。實作代理邏輯。如下所示:

package learn.demo.rpc.simple.core.client.proxy;

import learn.demo.rpc.simple.core.client.RpcClient;
import learn.demo.rpc.simple.core.request.RpcRequest;
import learn.demo.rpc.simple.core.response.RpcResponse;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;

/**
 * Created by shirukai on 2019-06-21 15:43
 * RPC 代理處理器
 */
public class RpcInvocationHandler implements InvocationHandler {
    private RpcClient client;

    public RpcInvocationHandler(RpcClient client) {
        this.client = client;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

        // 建構請求對象
        RpcRequest rpcRequest = new RpcRequest();
        rpcRequest.setMethodName(method.getName())
                .setParameterTypes(method.getParameterTypes())
                .setParameters(args);
        // 使用用戶端發送請求
        RpcResponse response = client.send(rpcRequest);

        // 響應成功傳回結果
        if (RpcResponse.SUCCEED.equals(response.getStatus())) {
            return response.getData();
        }
        throw new RuntimeException(response.getMessage());
    }
}

           

1.2.4.3 RpcConsumer

該類通過執行個體化RpcClient以及建立代理執行個體來建構生産者

package learn.demo.rpc.simple.core.client;

import learn.demo.rpc.simple.core.client.proxy.RpcInvocationHandler;

import java.lang.reflect.Proxy;

/**
 * Created by shirukai on 2019-06-21 16:11
 * 生産者建構器
 */
public class RpcConsumer {
    private String address;
    private int port;

    private Class<?> interfaceClass;

    public RpcConsumer setAddress(String address) {
        this.address = address;
        return this;
    }

    public RpcConsumer setPort(int port) {
        this.port = port;
        return this;
    }

    public RpcConsumer setInterface(Class<?> interfaceClass) {
        this.interfaceClass = interfaceClass;
        return this;
    }

    public <T> T get() {
        RpcClient client = new RpcClient();
        client.setAddress(address);
        client.setPort(port);
        // 執行個體化RPC代理處理器
        RpcInvocationHandler handler = new RpcInvocationHandler(client);
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, handler);
    }
}
           

1.3 simple-rpc-api子產品

上文也提到,該子產品隻是提供一個公用的接口API,沒有特殊方法。如下提供一個名為DemoService的接口,定義如下接口:

package learn.demo.rpc.simple.api;

/**
 * Created by shirukai on 2019-06-21 10:54
 * DemoService 接口
 */
public interface DemoService {
    String sayHello(String name);

    String sayGoodbye(String name);
}
           

1.4 simple-rpc-provider子產品

上面我們已經實作了核心子產品core以及接口api,這裡我們調用這兩個子產品進行提供者服務的建立。分為接口實作,和服務建立兩部分。

1.4.1 API接口實作

引入我們建立的simple-rpc-api子產品

<dependency>
            <groupId>learn.demo</groupId>
            <artifactId>simple-rpc-api</artifactId>
            <version>1.0</version>
            <scope>compile</scope>
        </dependency>
           

建立DemoServiceImpl實作DemoService接口

package learn.demo.rpc.simple.provider;

import learn.demo.rpc.simple.api.DemoService;

/**
 * Created by shirukai on 2019-06-21 10:55
 * 接口實作類
 */
public class DemoServiceImpl implements DemoService {
    @Override
    public String sayHello(String name) {
        return "This is simple RPC service.\nHello " + name;
    }

    @Override
    public String sayGoodbye(String name) {
        return "This is simple RPC service.\nGoodbye " + name;
    }
}

           

1.4.2 RPC Provider服務建立

使用simple-rpc-core子產品建立RpcProvider執行個體,然後啟動服務。這裡向外暴露端口9090。

package learn.demo.rpc.simple.provider;


import learn.demo.rpc.simple.api.DemoService;
import learn.demo.rpc.simple.core.server.RpcProvider;

/**
 * Created by shirukai on 2019-06-21 10:56
 * 服務提供者
 */
public class DemoServiceProvider {

    public static void main(String[] args) {
        DemoServiceImpl demoService = new DemoServiceImpl();

        RpcProvider<DemoService> provider = new RpcProvider<>();
        provider.setInterfaceClass(DemoService.class)
                .setRef(demoService);

        provider.export(9090);
    }
}

           

1.5 simple-rpc-consumer子產品

通過simple-rpc-cor子產品建立RpcConsumer執行個體,設定Provider位址和端口,然後擷取接口執行個體,調用相關方法。

package learn.demo.rpc.simple.consumer;

import learn.demo.rpc.simple.api.DemoService;
import learn.demo.rpc.simple.core.client.RpcConsumer;

/**
 * Created by shirukai on 2019-06-21 11:29
 * 消費者
 */
public class DemoServiceConsumer {
    public static void main(String[] args) {
        RpcConsumer consumer = new RpcConsumer();
        consumer.setAddress("127.0.0.1");
        consumer.setPort(9090);
        consumer.setInterface(DemoService.class);

        DemoService service = consumer.get();

        System.out.println(service.sayGoodbye("hahah"));
    }
}
           

2 基于ZooKeeper注冊中心的RPC實作

上面我們介紹了通過直連的方式,實作了一個簡單RPC,我們也可以通過注冊中心的形式去實作RPC,這涉及到了服務注冊和服務發現。這裡使用ZooKeeper作為注冊中心,也簡單的進行了RPC的實作,其中有些地方沒有進行詳細實作,比如服務的負載均衡。該部分的代碼在learn-demo-rpc下的zk-registry-rpc子產品下,目錄結構如simple-rpc相同,如下所示:

zk-registry-rpc/
├── zk-registry-rpc-api
├── zk-registry-rpc-consumer
├── zk-registry-rpc-core
└── zk-registry-rpc-provider
           

内容改動不大,主要在core的實作上,加入了注冊中心,進行服務發現和服務注冊。在Rpc的請求上,也加入了ID字段,用來表示需要調用哪個服務。下面将對幾處修改的地方進行講解。

2.1 zk-registry-rpc-core子產品

2.1.1 request

上面提到對RpcRequest進行簡單修改,加入Id字段,用來描述調用的是那個接口下的服務,是以此id是使用接口名生成的。

/**
     * 請求ID,接口類名
     */
    private String id;
           

2.1.2 registry

這裡主要是對注冊中心的實作,其中包括ProviderInfo實體類,用來描述提供者資訊,如id、address、port。另外包括RpcZKRegistryService注冊中心服務。

2.1.2.1 ProviderInfo

package learn.demo.rpc.zk.core.registry;

import com.alibaba.fastjson.JSON;

/**
 * Created by shirukai on 2019-06-25 16:34
 * Provider資訊
 */
public class ProviderInfo {
    /**
     * 提供者ID
     */
    private String id;
    /**
     * 提供者位址
     */
    private String address;
    /**
     * 提供者端口
     */
    private int port;

    public String getId() {
        return id;
    }

    public ProviderInfo setId(String id) {
        this.id = id;
        return this;
    }

    public String getAddress() {
        return address;
    }

    public ProviderInfo setAddress(String address) {
        this.address = address;
        return this;
    }

    public int getPort() {
        return port;
    }

    public ProviderInfo setPort(int port) {
        this.port = port;
        return this;
    }

    public String toJSONString() {
        return JSON.toJSONString(this);
    }

    @Override
    public String toString() {
        return "ProviderInfo{" +
                "id='" + id + '\'' +
                ", address='" + address + '\'' +
                ", port=" + port +
                '}';
    }
}

           

2.1.2.2 RpcZKRegistryService

注冊中心的實作,主要包括三個功能:服務注冊、服務發現、服務監聽。Provider通過調用注冊中的服務注冊,将自己的資訊注冊到ZK中,Consumer通過調用注冊中心的服務發現,查找自己想要請求的服務清單,并通過服務監聽,更新服務清單。具體實作如下所示:

package learn.demo.rpc.zk.core.registry;

import com.alibaba.fastjson.JSON;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
 * Created by shirukai on 2019-06-25 16:42
 * Rpc注冊服務
 */
public class RpcZKRegistryService {
    private static final Logger log = LoggerFactory.getLogger(RpcZKRegistryService.class);
    private static final String NAMESPACE = "zk-rpc";
    private static final String RPC_PROVIDER_NODE = "/provider";


    private final Map<String, ProviderInfo> remoteProviders = new HashMap<>();
    private CuratorFramework zkClient;

    public RpcZKRegistryService(String zkConnectString) {
        RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
        // 擷取用戶端
        this.zkClient = CuratorFrameworkFactory.builder()
                .connectString(zkConnectString)
                .sessionTimeoutMs(10000)
                .retryPolicy(retryPolicy)
                .namespace(NAMESPACE)
                .build();
        this.zkClient.start();
    }

    /**
     * 注冊服務
     *
     * @param providerInfo 提供者資訊
     */
    public void register(ProviderInfo providerInfo) {
        String nodePath = RPC_PROVIDER_NODE + "/" + providerInfo.getId();
        try {
            // 判斷節點存不存在,不存在則建立,存在則報異常
            Stat stat = zkClient.checkExists().forPath(nodePath);
            if (stat == null) {
                // 建立臨時節點
                zkClient.create()
                        .creatingParentsIfNeeded()
                        .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
                        .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                        .forPath(nodePath, providerInfo.toJSONString().getBytes());
            } else {
                log.error("The provider already exists.{}", providerInfo.toJSONString());
            }
        } catch (Exception e) {
            log.error("Registration provider failed.{}", e.getMessage());
        }
    }


    /**
     * 訂閱服務
     *
     * @param id 提供者ID,接口名字
     */
    public void subscribe(String id) {
        try {
            // 擷取所有的Provider
            List<String> providerIds = zkClient.getChildren().forPath(RPC_PROVIDER_NODE);
            for (String providerId : providerIds) {
                // 如果與訂閱服務相同,則擷取節點資訊
                if (providerId.contains(id)) {
                    String nodePath = RPC_PROVIDER_NODE + "/" + providerId;
                    byte[] data = zkClient.getData().forPath(nodePath);
                    ProviderInfo info = JSON.parseObject(data, ProviderInfo.class);
                    this.remoteProviders.put(providerId, info);
                }
            }

            // 添加監聽事件
            addProviderWatch(id);
        } catch (Exception e) {
            log.error("Subscription provider failed.");
        }

    }

    /**
     * 添加監聽事件
     *
     * @param id 提供者ID,接口名字
     */
    private void addProviderWatch(String id) throws Exception {
        // 建立子節點緩存
        final PathChildrenCache childrenCache = new PathChildrenCache(this.zkClient, RPC_PROVIDER_NODE, true);
        childrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);

        // 添加子節點監聽事件
        childrenCache.getListenable().addListener((client, event) -> {
            String nodePath = event.getData().getPath();
            // 如果監聽節點為訂閱的ProviderID
            if (nodePath.contains(id)) {

                if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
                    // 節點移除
                    this.remoteProviders.remove(nodePath);

                } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
                    byte[] data = event.getData().getData();
                    ProviderInfo info = JSON.parseObject(data, ProviderInfo.class);
                    // 節點添加
                    this.remoteProviders.put(nodePath, info);
                }
            }
        });
    }

    public Map<String, ProviderInfo> getRemoteProviders() {
        return remoteProviders;
    }
}

           

2.1.3 server

server的改動不大,主要是加入了設定zk連接配接的方法,以及将自己的資訊通過注冊中心注冊到zk的邏輯。

// 設定zk連接配接    
public RpcProvider<T> setZKConnectString(String zkConnectString) {
        this.registryService = new RpcZKRegistryService(zkConnectString);
        return this;
    }
           
// 生成服務資訊
ProviderInfo providerInfo = new ProviderInfo();
providerInfo.setAddress(InetAddress.getLocalHost().getHostAddress())
        .setPort(port)
        .setId(interfaceClass.getName());
// 建立服務
ServerSocket listener = new ServerSocket(port);

// 服務建立完成後将資訊注冊到zk
registryService.register(providerInfo);
           

2.1.4 client

用戶端主要修改了RpcConsumer,添加了服務發現,和模拟負載均衡的兩個方法。

/**
     * 擷取所有Providers
     *
     * @return list
     */
    private List<ProviderInfo> lookupProviders() {
        // 訂閱服務
        registryService.subscribe(interfaceClass.getName());
        // 擷取所有Provider
        Map<String, ProviderInfo> providers = registryService.getRemoteProviders();
        return new ArrayList<>(providers.values());
    }

    /**
     * 模拟負載均衡
     *
     * @param providers provider 清單
     * @return ProviderInfo
     */
    private static ProviderInfo chooseTarget(List<ProviderInfo> providers) {
        if (providers == null || providers.isEmpty()) {
            throw new RuntimeException("providers has not exits!");
        }
        return providers.get(0);
    }
           

然後再建立代理執行個體之前調用服務發現和負載均衡方法

public <T> T get() {
        List<ProviderInfo> providers = lookupProviders();

        ProviderInfo provider = chooseTarget(providers);

        RpcClient client = new RpcClient();
        client.setAddress(provider.getAddress());
        client.setPort(provider.getPort());
        // 執行個體化RPC代理處理器
        RpcInvocationHandler handler = new RpcInvocationHandler(client);
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, handler);
    }
           

3 常見RPC架構

上面通過動手實作一個簡單的RPC,大體對RPC的工作流程有了一定的了解,當然我們寫的隻是一個簡單的RPC,Demo級别的,隻能玩玩不能用于生産。如果想提高性能,可以考慮使用NIO Socket進行通信,也可以基于Netty進行RPC通信,也可以通過利用一下已有的RPC架構,這裡就簡單對比一下幾款常見的RPC架構。

PRC對比 Dubbo Motan Thrift Grpc
開發語言 java java 跨語言 跨語言
服務治理
多種序列化 隻支援thrift 隻支援protobuf
多種注冊中心
管理中心
跨語言通訊
整體性能 3 4 5 3

等有時間整理一下Dubbo以及Thrift的簡單使用。