原文連結
微服務已經是每個網際網路開發者必須掌握的一項技術。而 RPC 架構,是構成微服務最重要的組成部分之一。趁最近有時間。又看了看 dubbo 的源碼。dubbo 為了做到靈活和解耦,使用了大量的設計模式和 SPI機制,要看懂 dubbo 的代碼也不太容易。
按照《徒手撸架構》系列文章的套路,我還是會極簡的實作一個 RPC 架構。幫助大家了解 RPC 架構的原理。
廣義的來講一個完整的 RPC 包含了很多元件,包括服務發現,服務治理,遠端調用,調用鍊分析,網關等等。我将會慢慢的實作這些功能,這篇文章主要先講解的是 RPC 的基石,遠端調用 的實作。
相信,讀完這篇文章你也一定可以自己實作一個可以提供 RPC 調用的架構。
1. RPC 的調用過程
通過下圖我們來了解一下 RPC 的調用過程,從宏觀上來看看到底一次 RPC 調用經過些什麼過程。
當一次調用開始:
- client 會調用本地動态代理 proxy
- 這個代理會将調用通過協定轉序列化位元組流
- 通過 netty 網絡架構,将位元組流發送到服務端
- 服務端在受到這個位元組流後,會根據協定,反序列化為原始的調用,利用反射原理調用服務方提供的方法
- 如果請求有傳回值,又需要把結果根據協定序列化後,再通過 netty 傳回給調用方
2. 架構概覽和技術選型
看一看架構的元件:
clinet
就是調用方。
servive
是服務的提供者。
protocol
包定義了通信協定。
common
包含了通用的一些邏輯元件。
技術選型項目使用
maven
作為包管理工具,
json
作為序列化協定,使用
spring boot
管理對象的生命周期,
netty
作為
nio
的網路元件。是以要閱讀這篇文章,你需要對
spring boot
和
netty
有基本的了解。
下面就看看每個元件的具體實作:
3. protocol
其實作為 RPC 的協定,隻需要考慮一個問題,就是怎麼把一次本地方法的調用,變成能夠被網絡傳輸的位元組流。
我們需要定義方法的調用和傳回兩個對象實體:
請求:
@Data
public class RpcRequest {
// 調用編号
private String requestId;
// 類名
private String className;
// 方法名
private String methodName;
// 請求參數的資料類型
private Class<?>[] parameterTypes;
// 請求的參數
private Object[] parameters;
}
複制代碼
響應:
@Data
public class RpcResponse {
// 調用編号
private String requestId;
// 抛出的異常
private Throwable throwable;
// 傳回結果
private Object result;
}
複制代碼
确定了需要序列化的對象實體,就要确定序列化的協定,實作兩個方法,序列化和反序列化。
public interface Serialization {
<T> byte[] serialize(T obj);
<T> T deSerialize(byte[] data,Class<T> clz);
}
複制代碼
可選用的序列化的協定很多,比如:
- jdk 的序列化方法。(不推薦,不利于之後的跨語言調用)
- json 可讀性強,但是序列化速度慢,體積大。
- protobuf,kyro,Hessian 等都是優秀的序列化架構,也可按需選擇。
為了簡單和便于調試,我們就選擇 json 作為序列化協定,使用
jackson
作為 json 解析架構。
/**
* @author Zhengxin
*/
public class JsonSerialization implements Serialization {
private ObjectMapper objectMapper;
public JsonSerialization(){
this.objectMapper = new ObjectMapper();
}
@Override
public <T> byte[] serialize(T obj) {
try {
return objectMapper.writeValueAsBytes(obj);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return null;
}
@Override
public <T> T deSerialize(byte[] data, Class<T> clz) {
try {
return objectMapper.readValue(data,clz);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
}
複制代碼
因為 netty 支援自定義 coder 。是以隻需要實作
ByteToMessageDecoder
和
MessageToByteEncoder
兩個接口。就解決了序列化的問題:
public class RpcDecoder extends ByteToMessageDecoder {
private Class<?> clz;
private Serialization serialization;
public RpcDecoder(Class<?> clz,Serialization serialization){
this.clz = clz;
this.serialization = serialization;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if(in.readableBytes() < ){
return;
}
in.markReaderIndex();
int dataLength = in.readInt();
if (in.readableBytes() < dataLength) {
in.resetReaderIndex();
return;
}
byte[] data = new byte[dataLength];
in.readBytes(data);
Object obj = serialization.deSerialize(data, clz);
out.add(obj);
}
}
複制代碼
public class RpcEncoder extends MessageToByteEncoder {
private Class<?> clz;
private Serialization serialization;
public RpcEncoder(Class<?> clz, Serialization serialization){
this.clz = clz;
this.serialization = serialization;
}
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
if(clz != null){
byte[] bytes = serialization.serialize(msg);
out.writeInt(bytes.length);
out.writeBytes(bytes);
}
}
}
複制代碼
至此,protocol 就實作了,我們就可以把方法的調用和結果的響應轉換為一串可以在網絡中傳輸的 byte[] 數組了。
4. server
server 是負責處理用戶端請求的元件。在網際網路高并發的環境下,使用 Nio 非阻塞的方式可以相對輕松的應付高并發的場景。netty 是一個優秀的 Nio 處理架構。Server 就基于 netty 進行開發。關鍵代碼如下:
- netty 是基于 Reacotr 模型的。是以需要初始化兩組線程 boss 和 worker 。boss 負責分發請求,worker 負責執行相應的 handler:
@Bean
public ServerBootstrap serverBootstrap() throws InterruptedException {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup(), workerGroup())
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.DEBUG))
.childHandler(serverInitializer);
Map<ChannelOption<?>, Object> tcpChannelOptions = tcpChannelOptions();
Set<ChannelOption<?>> keySet = tcpChannelOptions.keySet();
for (@SuppressWarnings("rawtypes") ChannelOption option : keySet) {
serverBootstrap.option(option, tcpChannelOptions.get(option));
}
return serverBootstrap;
}
複制代碼
- netty 的操作是基于 pipeline 的。是以我們需要把在 protocol 實作的幾個 coder 注冊到 netty 的 pipeline 中。
ChannelPipeline pipeline = ch.pipeline();
// 處理 tcp 請求中粘包的 coder,具體作用可以自行 google
pipeline.addLast(new LengthFieldBasedFrameDecoder(,,));
// protocol 中實作的 序列化和反序列化 coder
pipeline.addLast(new RpcEncoder(RpcResponse.class,new JsonSerialization()));
pipeline.addLast(new RpcDecoder(RpcRequest.class,new JsonSerialization()));
// 具體處理請求的 handler 下文具體解釋
pipeline.addLast(serverHandler);
複制代碼
- 實作具體的 ServerHandler 用于處理真正的調用。
ServerHandler
繼承
SimpleChannelInboundHandler<RpcRequest>
。簡單來說這個
InboundHandler
會在資料被接受時或者對于的 Channel 的狀态發生變化的時候被調用。當這個 handler 讀取資料的時候方法
channelRead0()
會被用,是以我們就重寫這個方法就夠了。
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequest msg) throws Exception {
RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setRequestId(msg.getRequestId());
try{
// 收到請求後開始處理請求
Object handler = handler(msg);
rpcResponse.setResult(handler);
}catch (Throwable throwable){
// 如果抛出異常也将異常存入 response 中
rpcResponse.setThrowable(throwable);
throwable.printStackTrace();
}
// 操作完以後寫入 netty 的上下文中。netty 自己處理傳回值。
ctx.writeAndFlush(rpcResponse);
}
複制代碼
handler(msg) 實際上使用的是 cglib 的 Fastclass 實作的,其實根本原理,還是反射。學好 java 中的反射真的可以為所欲為。
private Object handler(RpcRequest request) throws Throwable {
Class<?> clz = Class.forName(request.getClassName());
Object serviceBean = applicationContext.getBean(clz);
Class<?> serviceClass = serviceBean.getClass();
String methodName = request.getMethodName();
Class<?>[] parameterTypes = request.getParameterTypes();
Object[] parameters = request.getParameters();
// 根本思路還是擷取類名和方法名,利用反射實作調用
FastClass fastClass = FastClass.create(serviceClass);
FastMethod fastMethod = fastClass.getMethod(methodName,parameterTypes);
// 實際調用發生的地方
return fastMethod.invoke(serviceBean,parameters);
}
複制代碼
總體上來看,server 的實作不是很困難。核心的知識點是 netty 的 channel 的使用和 cglib 的反射機制。
5. client
future
其實,對于我來說,client 的實作難度,遠遠大于 server 的實作。netty 是一個異步架構,所有的傳回都是基于 Future 和 Callback 的機制。
是以在閱讀以下文字前強烈推薦,我之前寫的一篇文章 Future 研究。利用經典的 wite 和 notify 機制,實作異步的擷取請求結果。
/**
* @author zhengxin
*/
public class DefaultFuture {
private RpcResponse rpcResponse;
private volatile boolean isSucceed = false;
private final Object object = new Object();
public RpcResponse getResponse(int timeout){
synchronized (object){
while (!isSucceed){
try {
//wait
object.wait(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return rpcResponse;
}
}
public void setResponse(RpcResponse response){
if(isSucceed){
return;
}
synchronized (object) {
this.rpcResponse = response;
this.isSucceed = true;
//notiy
object.notify();
}
}
}
複制代碼
複用資源
為了能夠提升 client 的吞吐量,可提供的思路有以下幾種:
- 使用對象池:建立多個 client 以後儲存在對象池中。但是代碼的複雜度和維護 client 的成本會很高。
- 盡可能的複用 netty 中的 channel。 之前你可能注意到,為什麼要在 RpcRequest 和 RpcResponse 中增加一個 ID。因為 netty 中的 channel 是會被多個線程使用的。當一個結果異步的傳回後,你并不知道是哪個線程傳回的。這個時候就可以考慮利用一個 Map,建立一個 ID 和 Future 映射。這樣請求的線程隻要使用對應的 ID 就能擷取,相應的傳回結果。
/**
* @author Zhengxin
*/
public class ClientHandler extends ChannelDuplexHandler {
// 使用 map 維護 id 和 Future 的映射關系,在多線程環境下需要使用線程安全的容器
private final Map<String, DefaultFuture> futureMap = new ConcurrentHashMap<>();
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if(msg instanceof RpcRequest){
RpcRequest request = (RpcRequest) msg;
// 寫資料的時候,增加映射
futureMap.putIfAbsent(request.getRequestId(),new DefaultFuture());
}
super.write(ctx, msg, promise);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if(msg instanceof RpcResponse){
RpcResponse response = (RpcResponse) msg;
// 擷取資料的時候 将結果放入 future 中
DefaultFuture defaultFuture = futureMap.get(response.getRequestId());
defaultFuture.setResponse(response);
}
super.channelRead(ctx, msg);
}
public RpcResponse getRpcResponse(String requestId){
try {
// 從 future 中擷取真正的結果。
DefaultFuture defaultFuture = futureMap.get(requestId);
return defaultFuture.getResponse();
}finally {
// 完成後從 map 中移除。
futureMap.remove(requestId);
}
}
}
複制代碼
這裡沒有繼承 server 中的
InboundHandler
而使用了
ChannelDuplexHandler
。顧名思義就是在寫入和讀取資料的時候,都會觸發相應的方法。寫入的時候在 Map 中儲存 ID 和 Future。讀到資料的時候從 Map 中取出 Future 并将結果放入 Future 中。擷取結果的時候需要對應的 ID。
使用
Transporters
對請求進行封裝。
public class Transporters {
public static RpcResponse send(RpcRequest request){
NettyClient nettyClient = new NettyClient("127.0.0.1", );
nettyClient.connect(nettyClient.getInetSocketAddress());
RpcResponse send = nettyClient.send(request);
return send;
}
}
複制代碼
動态代理的實作
動态代理技術最廣為人知的應用,應該就是 Spring 的 Aop,面向切面的程式設計實作,動态的在原有方法Before 或者 After 添加代碼。而 RPC 架構中動态代理的作用就是徹底替換原有方法,直接調用遠端方法。
代理工廠類:
public class ProxyFactory {
@SuppressWarnings("unchecked")
public static <T> T create(Class<T> interfaceClass){
return (T) Proxy.newProxyInstance(
interfaceClass.getClassLoader(),
new Class<?>[]{interfaceClass},
new RpcInvoker<T>(interfaceClass)
);
}
}
複制代碼
當 proxyFactory 生成的類被調用的時候,就會執行 RpcInvoker 方法。
public class RpcInvoker<T> implements InvocationHandler {
private Class<T> clz;
public RpcInvoker(Class<T> clz){
this.clz = clz;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
RpcRequest request = new RpcRequest();
String requestId = UUID.randomUUID().toString();
String className = method.getDeclaringClass().getName();
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
request.setRequestId(requestId);
request.setClassName(className);
request.setMethodName(methodName);
request.setParameterTypes(parameterTypes);
request.setParameters(args);
return Transporters.send(request).getResult();
}
}
複制代碼
看到這個 invoke 方法,主要三個作用,
- 生成 RequestId。
- 拼裝 RpcRequest。
- 調用 Transports 發送請求,擷取結果。
至此,整個調用鍊完整了。我們終于完成了一次 RPC 調用。
與 Spring 內建
為了使我們的 client 能夠易于使用我們需要考慮,定義一個自定義注解
@RpcInterface
當我們的項目接入 Spring 以後,Spring 掃描到這個注解之後,自動的通過我們的 ProxyFactory 建立代理對象,并存放在 spring 的 applicationContext 中。這樣我們就可以通過
@Autowired
注解直接注入使用了。
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcInterface {
}
複制代碼
@Configuration
@Slf4j
public class RpcConfig implements ApplicationContextAware,InitializingBean {
private ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@Override
public void afterPropertiesSet() throws Exception {
Reflections reflections = new Reflections("com.xilidou");
DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) applicationContext.getAutowireCapableBeanFactory();
// 擷取 @RpcInterfac 标注的接口
Set<Class<?>> typesAnnotatedWith = reflections.getTypesAnnotatedWith(RpcInterface.class);
for (Class<?> aClass : typesAnnotatedWith) {
// 建立代理對象,并注冊到 spring 上下文。
beanFactory.registerSingleton(aClass.getSimpleName(),ProxyFactory.create(aClass));
}
log.info("afterPropertiesSet is {}",typesAnnotatedWith);
}
}
複制代碼
終于我們最簡單的 RPC 架構就開發完了。下面可以測試一下。
6. Demo
api
@RpcInterface
public interface IHelloService {
String sayHi(String name);
}
複制代碼
server
IHelloSerivce 的實作:
@Service
@Slf4j
public class TestServiceImpl implements IHelloService {
@Override
public String sayHi(String name) {
log.info(name);
return "Hello " + name;
}
}
複制代碼
啟動服務:
@SpringBootApplication
public class Application {
public static void main(String[] args) throws InterruptedException {
ConfigurableApplicationContext context = SpringApplication.run(Application.class);
TcpService tcpService = context.getBean(TcpService.class);
tcpService.start();
}
}
複制代碼
client
@SpringBootApplication()
public class ClientApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(ClientApplication.class);
IHelloService helloService = context.getBean(IHelloService.class);
System.out.println(helloService.sayHi("doudou"));
}
}
複制代碼
運作以後輸出的結果:
Hello doudou
總結
終于我們實作了一個最簡版的 RPC 遠端調用的子產品。隻是包含最最基礎的遠端調用功能。
如果你對這個項目感興趣,歡迎你與我聯系,為這個架構貢獻代碼。
老規矩 Github 位址:DouPpc
徒手撸架構系列文章位址:
徒手撸架構--實作IoC
徒手撸架構--實作Aop
徒手撸架構--高并發環境下的請求合并
歡迎關注我的微信公衆号: