RPC架構初體驗之入門
項目位址:https://github.com/shirukai/learn-demo-rpc.git
RPC全稱Remote Procedure Call,顧名思義,遠端過程調用的意思。關于RPC的介紹,可以參考一下簡書上《如何給老婆解釋什麼是RPC》這篇文章,很有趣。RPC這個概念,我第一次接觸是在《Spark核心設計的藝術》這本書裡。後來在看微服務的時候,也提及到了幾款RPC架構,比如Thrift、Dubbo、gRPC。是以決定認真的學習一下RPC以及這幾種架構。下面将會在本篇文章裡入門RPC,動手實作一個簡單的RPC,再基于Netty實作一個RPC,最後簡單介紹一下幾款常見的RPC架構,以及它們的優缺點。後面将會以系列的形式分别介紹這幾款常見的RPC架構的使用。
1 動手實作一個簡單的RPC
為了深入了解RPC,這裡動手實作了一個簡單的RPC,服務之前通過簡單的socket進行通訊。
1.1 項目描述
在learn-demo-rpc項目下有一個simple-rpc的子產品,該子產品實作了一個簡單的RPC,其中包括四個子子產品
simple-rpc/
├── simple-rpc-api
├── simple-rpc-consumer
├── simple-rpc-core
└── simple-rpc-provider
- simple-rpc-api: 該子產品提供服務接口
- simple-rpc-core: RPC核心實作
- simple-rpc-consumer: RPC消費者服務
- simple-rpc-provider: RPC提供者服務
simple-rpc-provider 子產品實作simple-rpc-api定義的相關接口,并通過simple-rpc-core子產品建立提供者服務。
simple-rpc-consumer通過simple-rpc-core子產品建立消費者服務,并通過simple-rpc-api子產品的接口進行RPC。
項目示範:
啟動simple-rpc-provider子產品裡的DemoServiceProvider
啟動simple-rpc-consumer裡的DemoServiceConsumer
1.2 simple-rpc-core子產品
該子產品為核心子產品,分别提供了服務者、消費者服務建立。如下所示,主要包括四個功能。request包定義RPC請求資料類型,response包定義RPC響應資料類型,server提供RPC的provider服務,client提供RPC的consumer服務。
1.2.1 request
在該包下建立RpcRequest類,用來定義請求資料類型實體,該實體主要包含,遠端調用的方法名稱、參數清單、參數類型清單。
package learn.demo.rpc.simple.core.request;
import java.io.Serializable;
import java.util.Arrays;
/**
* Created by shirukai on 2019-06-21 15:02
* RPC 請求
*/
public class RpcRequest implements Serializable {
private static final long serialVersionUID = 4932007273709224551L;
/**
* 方法名稱
*/
private String methodName;
/**
* 參數清單
*/
private Object[] parameters;
/**
* 參數類型
*/
private Class<?>[] parameterTypes;
/**
* 省略get、set方法。
*/
}
1.2.2 response
在該包下建立RpcResponse類,用來定義RPC響應的資料類型,其中包含響應狀态status,用來描述請求是否執行成功,它有兩個狀态succeed和failed。響應資訊message主要存放異常響應時的錯誤資訊。響應資料data,遠端調用方法的傳回值。
public class RpcResponse implements Serializable {
public static String SUCCEED = "succeed";
public static String FAILED = "failed";
private static final long serialVersionUID = 6595683424889346485L;
/**
* 響應狀态
*/
private String status = "succeed";
/**
* 響應資訊,如異常資訊
*/
private String message;
/**
* 響應資料,傳回值
*/
private Object data;
/**
* 省略get、set方法
*/
}
1.2.3 server
在該包下建立RpcProvider類,用來定義建立Provider服務的方法。原理很簡單,根據指定的端口建立ServerSocket,監聽用戶端發送資料。接收到用戶端發送資料後,反序列化成Request,擷取其中的方法名和參數類型及參數清單,根據傳入的接口class和執行個體,通過反射機制,調用該方法,拿到執行結果後封裝成RpcResponse傳回給用戶端。具體實作如下:
package learn.demo.rpc.simple.core.server;
import learn.demo.rpc.simple.core.request.RpcRequest;
import learn.demo.rpc.simple.core.response.RpcResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
/**
* Created by shirukai on 2019-06-21 16:26
* RPC provider
*/
public class RpcProvider<T> {
private static final Logger log = LoggerFactory.getLogger(RpcProvider.class);
private T ref;
private Class<?> interfaceClass;
public void setRef(T ref) {
this.ref = ref;
}
public RpcProvider<T> setInterfaceClass(Class<?> interfaceClass) {
this.interfaceClass = interfaceClass;
return this;
}
public void export(int port) {
try {
log.info("The RPC Server is starting, address:{}, bind:{}", InetAddress.getLocalHost().getHostAddress(), port);
ServerSocket listener = new ServerSocket(port);
while (true) {
Socket socket = listener.accept();
// 接收資料并進行反序列化
ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
// 擷取請求對象
Object object = objectInputStream.readObject();
if (object instanceof RpcRequest) {
RpcRequest request = (RpcRequest) object;
log.info("Received request:{}", request);
// 處理請求
RpcResponse response = handleRequest(request);
// 将結果傳回給用戶端
log.info("Send response to client.{}", response);
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
objectOutputStream.writeObject(response);
}
socket.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
private RpcResponse handleRequest(RpcRequest request) {
RpcResponse response = new RpcResponse();
try {
log.info("The server is handling request.");
Method method = interfaceClass.getMethod(request.getMethodName(), request.getParameterTypes());
Object data = method.invoke(ref, request.getParameters());
response.setData(data);
} catch (Exception e) {
response.setStatus(RpcResponse.FAILED).setMessage(e.getMessage());
}
return response;
}
}
1.2.4 client
用戶端的實作比較有趣,包含如下内容
client/
├── RpcClient.java
├── RpcConsumer.java
└── proxy
└── RpcInvocationHandler.java
原理也不複雜,通過上面的結構可以看出,我們clien裡包含了一個proxy包,該包主要實作的是一個動态代理。用戶端實作API接口的動态代理,生成接口執行個體,表面上調用的接口方法,通過代理後,經過RpcClient進行的遠端調用,也就是我們的定義的RPC,拿到結果後再傳回。
1.2.4.1 RpcClient
RpcClient主要是與遠端ServerSocket進行通訊的,建立根據IP和端口建立Socket,将RpcRequest進行序列化之後,發送給遠端服務。
package learn.demo.rpc.simple.core.client;
import learn.demo.rpc.simple.core.request.RpcRequest;
import learn.demo.rpc.simple.core.response.RpcResponse;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
/**
* Created by shirukai on 2019-06-21 15:42
* Rpc用戶端
*/
public class RpcClient {
/**
* 服務位址
*/
private String address;
/**
* 服務端口
*/
private int port;
public RpcResponse send(RpcRequest rpcRequest) throws Exception {
Socket socket = new Socket(address, port);
//請求序列化
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
//将請求發給服務提供方
objectOutputStream.writeObject(rpcRequest);
// 将響應體反序列化
ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
Object response = objectInputStream.readObject();
if (response instanceof RpcResponse) {
return (RpcResponse) response;
}
throw new RuntimeException("Return error");
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
}
1.2.4.2 RpcInvocationHandler
在proxy下建立RpcInvocationHandler,調用處理器,繼承InvocationHandler接口并實作其invoke方法。實作代理邏輯。如下所示:
package learn.demo.rpc.simple.core.client.proxy;
import learn.demo.rpc.simple.core.client.RpcClient;
import learn.demo.rpc.simple.core.request.RpcRequest;
import learn.demo.rpc.simple.core.response.RpcResponse;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
/**
* Created by shirukai on 2019-06-21 15:43
* RPC 代理處理器
*/
public class RpcInvocationHandler implements InvocationHandler {
private RpcClient client;
public RpcInvocationHandler(RpcClient client) {
this.client = client;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 建構請求對象
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setMethodName(method.getName())
.setParameterTypes(method.getParameterTypes())
.setParameters(args);
// 使用用戶端發送請求
RpcResponse response = client.send(rpcRequest);
// 響應成功傳回結果
if (RpcResponse.SUCCEED.equals(response.getStatus())) {
return response.getData();
}
throw new RuntimeException(response.getMessage());
}
}
1.2.4.3 RpcConsumer
該類通過執行個體化RpcClient以及建立代理執行個體來建構生産者
package learn.demo.rpc.simple.core.client;
import learn.demo.rpc.simple.core.client.proxy.RpcInvocationHandler;
import java.lang.reflect.Proxy;
/**
* Created by shirukai on 2019-06-21 16:11
* 生産者建構器
*/
public class RpcConsumer {
private String address;
private int port;
private Class<?> interfaceClass;
public RpcConsumer setAddress(String address) {
this.address = address;
return this;
}
public RpcConsumer setPort(int port) {
this.port = port;
return this;
}
public RpcConsumer setInterface(Class<?> interfaceClass) {
this.interfaceClass = interfaceClass;
return this;
}
public <T> T get() {
RpcClient client = new RpcClient();
client.setAddress(address);
client.setPort(port);
// 執行個體化RPC代理處理器
RpcInvocationHandler handler = new RpcInvocationHandler(client);
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, handler);
}
}
1.3 simple-rpc-api子產品
上文也提到,該子產品隻是提供一個公用的接口API,沒有特殊方法。如下提供一個名為DemoService的接口,定義如下接口:
package learn.demo.rpc.simple.api;
/**
* Created by shirukai on 2019-06-21 10:54
* DemoService 接口
*/
public interface DemoService {
String sayHello(String name);
String sayGoodbye(String name);
}
1.4 simple-rpc-provider子產品
上面我們已經實作了核心子產品core以及接口api,這裡我們調用這兩個子產品進行提供者服務的建立。分為接口實作,和服務建立兩部分。
1.4.1 API接口實作
引入我們建立的simple-rpc-api子產品
<dependency>
<groupId>learn.demo</groupId>
<artifactId>simple-rpc-api</artifactId>
<version>1.0</version>
<scope>compile</scope>
</dependency>
建立DemoServiceImpl實作DemoService接口
package learn.demo.rpc.simple.provider;
import learn.demo.rpc.simple.api.DemoService;
/**
* Created by shirukai on 2019-06-21 10:55
* 接口實作類
*/
public class DemoServiceImpl implements DemoService {
@Override
public String sayHello(String name) {
return "This is simple RPC service.\nHello " + name;
}
@Override
public String sayGoodbye(String name) {
return "This is simple RPC service.\nGoodbye " + name;
}
}
1.4.2 RPC Provider服務建立
使用simple-rpc-core子產品建立RpcProvider執行個體,然後啟動服務。這裡向外暴露端口9090。
package learn.demo.rpc.simple.provider;
import learn.demo.rpc.simple.api.DemoService;
import learn.demo.rpc.simple.core.server.RpcProvider;
/**
* Created by shirukai on 2019-06-21 10:56
* 服務提供者
*/
public class DemoServiceProvider {
public static void main(String[] args) {
DemoServiceImpl demoService = new DemoServiceImpl();
RpcProvider<DemoService> provider = new RpcProvider<>();
provider.setInterfaceClass(DemoService.class)
.setRef(demoService);
provider.export(9090);
}
}
1.5 simple-rpc-consumer子產品
通過simple-rpc-cor子產品建立RpcConsumer執行個體,設定Provider位址和端口,然後擷取接口執行個體,調用相關方法。
package learn.demo.rpc.simple.consumer;
import learn.demo.rpc.simple.api.DemoService;
import learn.demo.rpc.simple.core.client.RpcConsumer;
/**
* Created by shirukai on 2019-06-21 11:29
* 消費者
*/
public class DemoServiceConsumer {
public static void main(String[] args) {
RpcConsumer consumer = new RpcConsumer();
consumer.setAddress("127.0.0.1");
consumer.setPort(9090);
consumer.setInterface(DemoService.class);
DemoService service = consumer.get();
System.out.println(service.sayGoodbye("hahah"));
}
}
2 基于ZooKeeper注冊中心的RPC實作
上面我們介紹了通過直連的方式,實作了一個簡單RPC,我們也可以通過注冊中心的形式去實作RPC,這涉及到了服務注冊和服務發現。這裡使用ZooKeeper作為注冊中心,也簡單的進行了RPC的實作,其中有些地方沒有進行詳細實作,比如服務的負載均衡。該部分的代碼在learn-demo-rpc下的zk-registry-rpc子產品下,目錄結構如simple-rpc相同,如下所示:
zk-registry-rpc/
├── zk-registry-rpc-api
├── zk-registry-rpc-consumer
├── zk-registry-rpc-core
└── zk-registry-rpc-provider
内容改動不大,主要在core的實作上,加入了注冊中心,進行服務發現和服務注冊。在Rpc的請求上,也加入了ID字段,用來表示需要調用哪個服務。下面将對幾處修改的地方進行講解。
2.1 zk-registry-rpc-core子產品
2.1.1 request
上面提到對RpcRequest進行簡單修改,加入Id字段,用來描述調用的是那個接口下的服務,是以此id是使用接口名生成的。
/**
* 請求ID,接口類名
*/
private String id;
2.1.2 registry
這裡主要是對注冊中心的實作,其中包括ProviderInfo實體類,用來描述提供者資訊,如id、address、port。另外包括RpcZKRegistryService注冊中心服務。
2.1.2.1 ProviderInfo
package learn.demo.rpc.zk.core.registry;
import com.alibaba.fastjson.JSON;
/**
* Created by shirukai on 2019-06-25 16:34
* Provider資訊
*/
public class ProviderInfo {
/**
* 提供者ID
*/
private String id;
/**
* 提供者位址
*/
private String address;
/**
* 提供者端口
*/
private int port;
public String getId() {
return id;
}
public ProviderInfo setId(String id) {
this.id = id;
return this;
}
public String getAddress() {
return address;
}
public ProviderInfo setAddress(String address) {
this.address = address;
return this;
}
public int getPort() {
return port;
}
public ProviderInfo setPort(int port) {
this.port = port;
return this;
}
public String toJSONString() {
return JSON.toJSONString(this);
}
@Override
public String toString() {
return "ProviderInfo{" +
"id='" + id + '\'' +
", address='" + address + '\'' +
", port=" + port +
'}';
}
}
2.1.2.2 RpcZKRegistryService
注冊中心的實作,主要包括三個功能:服務注冊、服務發現、服務監聽。Provider通過調用注冊中的服務注冊,将自己的資訊注冊到ZK中,Consumer通過調用注冊中心的服務發現,查找自己想要請求的服務清單,并通過服務監聽,更新服務清單。具體實作如下所示:
package learn.demo.rpc.zk.core.registry;
import com.alibaba.fastjson.JSON;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* Created by shirukai on 2019-06-25 16:42
* Rpc注冊服務
*/
public class RpcZKRegistryService {
private static final Logger log = LoggerFactory.getLogger(RpcZKRegistryService.class);
private static final String NAMESPACE = "zk-rpc";
private static final String RPC_PROVIDER_NODE = "/provider";
private final Map<String, ProviderInfo> remoteProviders = new HashMap<>();
private CuratorFramework zkClient;
public RpcZKRegistryService(String zkConnectString) {
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
// 擷取用戶端
this.zkClient = CuratorFrameworkFactory.builder()
.connectString(zkConnectString)
.sessionTimeoutMs(10000)
.retryPolicy(retryPolicy)
.namespace(NAMESPACE)
.build();
this.zkClient.start();
}
/**
* 注冊服務
*
* @param providerInfo 提供者資訊
*/
public void register(ProviderInfo providerInfo) {
String nodePath = RPC_PROVIDER_NODE + "/" + providerInfo.getId();
try {
// 判斷節點存不存在,不存在則建立,存在則報異常
Stat stat = zkClient.checkExists().forPath(nodePath);
if (stat == null) {
// 建立臨時節點
zkClient.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(nodePath, providerInfo.toJSONString().getBytes());
} else {
log.error("The provider already exists.{}", providerInfo.toJSONString());
}
} catch (Exception e) {
log.error("Registration provider failed.{}", e.getMessage());
}
}
/**
* 訂閱服務
*
* @param id 提供者ID,接口名字
*/
public void subscribe(String id) {
try {
// 擷取所有的Provider
List<String> providerIds = zkClient.getChildren().forPath(RPC_PROVIDER_NODE);
for (String providerId : providerIds) {
// 如果與訂閱服務相同,則擷取節點資訊
if (providerId.contains(id)) {
String nodePath = RPC_PROVIDER_NODE + "/" + providerId;
byte[] data = zkClient.getData().forPath(nodePath);
ProviderInfo info = JSON.parseObject(data, ProviderInfo.class);
this.remoteProviders.put(providerId, info);
}
}
// 添加監聽事件
addProviderWatch(id);
} catch (Exception e) {
log.error("Subscription provider failed.");
}
}
/**
* 添加監聽事件
*
* @param id 提供者ID,接口名字
*/
private void addProviderWatch(String id) throws Exception {
// 建立子節點緩存
final PathChildrenCache childrenCache = new PathChildrenCache(this.zkClient, RPC_PROVIDER_NODE, true);
childrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
// 添加子節點監聽事件
childrenCache.getListenable().addListener((client, event) -> {
String nodePath = event.getData().getPath();
// 如果監聽節點為訂閱的ProviderID
if (nodePath.contains(id)) {
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
// 節點移除
this.remoteProviders.remove(nodePath);
} else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
byte[] data = event.getData().getData();
ProviderInfo info = JSON.parseObject(data, ProviderInfo.class);
// 節點添加
this.remoteProviders.put(nodePath, info);
}
}
});
}
public Map<String, ProviderInfo> getRemoteProviders() {
return remoteProviders;
}
}
2.1.3 server
server的改動不大,主要是加入了設定zk連接配接的方法,以及将自己的資訊通過注冊中心注冊到zk的邏輯。
// 設定zk連接配接
public RpcProvider<T> setZKConnectString(String zkConnectString) {
this.registryService = new RpcZKRegistryService(zkConnectString);
return this;
}
// 生成服務資訊
ProviderInfo providerInfo = new ProviderInfo();
providerInfo.setAddress(InetAddress.getLocalHost().getHostAddress())
.setPort(port)
.setId(interfaceClass.getName());
// 建立服務
ServerSocket listener = new ServerSocket(port);
// 服務建立完成後将資訊注冊到zk
registryService.register(providerInfo);
2.1.4 client
用戶端主要修改了RpcConsumer,添加了服務發現,和模拟負載均衡的兩個方法。
/**
* 擷取所有Providers
*
* @return list
*/
private List<ProviderInfo> lookupProviders() {
// 訂閱服務
registryService.subscribe(interfaceClass.getName());
// 擷取所有Provider
Map<String, ProviderInfo> providers = registryService.getRemoteProviders();
return new ArrayList<>(providers.values());
}
/**
* 模拟負載均衡
*
* @param providers provider 清單
* @return ProviderInfo
*/
private static ProviderInfo chooseTarget(List<ProviderInfo> providers) {
if (providers == null || providers.isEmpty()) {
throw new RuntimeException("providers has not exits!");
}
return providers.get(0);
}
然後再建立代理執行個體之前調用服務發現和負載均衡方法
public <T> T get() {
List<ProviderInfo> providers = lookupProviders();
ProviderInfo provider = chooseTarget(providers);
RpcClient client = new RpcClient();
client.setAddress(provider.getAddress());
client.setPort(provider.getPort());
// 執行個體化RPC代理處理器
RpcInvocationHandler handler = new RpcInvocationHandler(client);
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, handler);
}
3 常見RPC架構
上面通過動手實作一個簡單的RPC,大體對RPC的工作流程有了一定的了解,當然我們寫的隻是一個簡單的RPC,Demo級别的,隻能玩玩不能用于生産。如果想提高性能,可以考慮使用NIO Socket進行通信,也可以基于Netty進行RPC通信,也可以通過利用一下已有的RPC架構,這裡就簡單對比一下幾款常見的RPC架構。
PRC對比 | Dubbo | Motan | Thrift | Grpc |
---|---|---|---|---|
開發語言 | java | java | 跨語言 | 跨語言 |
服務治理 | ✓ | ✓ | ✗ | ✗ |
多種序列化 | ✓ | ✓ | 隻支援thrift | 隻支援protobuf |
多種注冊中心 | ✓ | ✓ | ✗ | ✗ |
管理中心 | ✓ | ✓ | ✗ | ✗ |
跨語言通訊 | ✗ | ✗ | ✓ | ✓ |
整體性能 | 3 | 4 | 5 | 3 |
等有時間整理一下Dubbo以及Thrift的簡單使用。