什麼是RPC
RPC全稱為remote procedure call,即遠端過程調用。
借助RPC可以做到像本地調用一樣調用遠端服務,是一種程序間的通信方式
比如兩台伺服器A和B,A伺服器上部署一個應用,B伺服器上部署一個應用,A伺服器上的應用想調用B伺服器上的應用提供的方法,由于兩個應用不在一個記憶體空間,不能直接調用,是以需要通過網絡來表達調用的語義和傳達調用的資料。
需要注意的是RPC并不是一個具體的技術,而是指整個網絡遠端調用過程
RPC架構
一個完整的RPC架構裡面包含了四個核心的元件,分别是Client,Client Stub,Server以及Server Stub,這個Stub可以了解為存根。
- 用戶端(Client),服務的調用方。
-
用戶端存根(Client
Stub),存放服務端的位址消息,再将用戶端的請求參數打包成網絡消息,然後通過網絡遠 程發送給服務方。
- 服務端(Server),真正的服務提供者。
- 服務端存根(Server Stub),接收用戶端發送過來的消息,将消息解包,并調用本地的方法。 RPC調用過程
(1) 用戶端(client)以本地調用方式(即以接口的方式)調用服務;
(2) 用戶端存根(client stub)接收到調用後,負責将方法、參數等組裝成能夠進行網絡傳輸的消息體(将消息體對
象序列化為二進制);
(3) 用戶端通過sockets将消息發送到服務端;
(4) 服務端存根( server stub)收到消息後進行解碼(将消息對象反序列化);
(5) 服務端存根( server stub)根據解碼結果調用本地的服務;
(6) 本地服務執行并将結果傳回給服務端存根( server stub);
(7) 服務端存根( server stub)将傳回結果打包成消息(将結果消息對象序列化);
(8) 服務端(server)通過sockets将消息發送到用戶端;
(9) 用戶端存根(client stub)接收到結果消息,并進行解碼(将結果消息發序列化);
(10) 用戶端(client)得到最終結果。
RPC的目标是要把2、3、4、7、8、9這些步驟都封裝起來。
注意:無論是何種類型的資料,最終都需要轉換成二進制流在網絡上進行傳輸,資料的發送方需要将對象轉換為二
進制流,而資料的接收方則需要把二進制流再恢複為對象。
在java中RPC架構比較多,常見的有Hessian、gRPC、Thri、HSF (High Speed Service Framework)、Dubbo 等,其實對于RPC架構而言,核心子產品 就是通訊和序列化。
基于Netty自定義RPC
RPC又稱遠端過程調用,我們所知的遠端調用分為兩種,現在在服務間通信的方式也基本以這兩種為主
1.是基于HTTP的restful形式的廣義遠端調用,以spring could的feign和restTemplate為代表,采用的協定是HTTP的7層調用協定,并且協定的參數和響應序列化基本以JSON格式和XML格式為主。
2.是基于TCP的狹義的RPC遠端調用,以阿裡的Dubbo為代表,主要通過netty來實作4層網絡協定,NIO來異步傳輸,序列化也可以是JSON或者hessian2以及java自帶的序列化等,可以配置。
接下來我們主要以第二種的RPC遠端調用來自己實作
需求
案例版本:
server端與client端定義共用的接口,server端需要實作該接口。用戶端通過jdk的proxy動态代理生成對象,在方法調用時執行invoke中的代碼。invoke中使用netty與server端通信,我們在這裡傳輸(類名#方法名)的字元串。server端收到資料後,根據類名#方法名去調用實作類并通過netty傳回資料。
要求完成改造版本:
序列化協定修改為JSON,使用fastjson作為JSON架構,并将RpcRequest實體作為通信載體,服務端需根據用戶端傳遞過來的RpcRequest對象通過反射,動态代理等技術,最終能夠執行目标方法,傳回字元串"success"。
要點提示:
(1)用戶端代理的invoke方法中需封裝RpcRequest對象,将其當做參數進行傳遞。
(2)服務端的UserServiceImpl類上添加@Service注解,在啟動項目時,添加到容器中。
(3)服務端要添加@SpringBootApplication注解,main方法中添加。SpringApplication.run(ServerBootstrap.class, args);,進行啟動掃描(注意項目啟動類位置:掃描路徑)。
(4)服務端在收到參數,可以借助反射及動态代理(如需用到ApplicationContext對象,可以借助實作ApplicationContextAware接口擷取),來調用UserServiceImpl方法,最終向用戶端傳回”success“即可。
(5)既然傳遞的是RpcRequest對象了,那麼用戶端的編碼器與服務端的解碼器需重新設定。
實作
1.分析需求
- 原始版本使用serviceName#methodName#組合作為待調用的類名#方法名,我們需要使用RpcRequest類來封裝請求,封裝待調用的類、方法、參數。
- 使用fastjson作序列化
2.修改用戶端,封裝RpcRequest
//2)封裝RpcRequest,解析providerParam
RpcRequest rpcRequest= new RpcRequest();
rpcRequest.setClassName(providerParam.split("#")[0]);
rpcRequest.setMethodName(providerParam.split("#")[1]);
rpcRequest.setParameters(objects);
rpcRequest.setParameterTypes(method.getParameterTypes());
3.修改用戶端,修改UserClientHandler的 param參數 由String類型改為RpcRequest,并修改用戶端的編碼規則
//4)配置啟動引導對象
bootstrap.group(group)
//設定通道為NIO
.channel(NioSocketChannel.class)
//設定請求協定為TCP
.option(ChannelOption.TCP_NODELAY,true)
//監聽channel 并初始化
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
//擷取ChannelPipeline
ChannelPipeline pipeline = socketChannel.pipeline();
//設定編碼
//pipeline.addLast(new StringEncoder());
pipeline.addLast( new RpcEncoder(RpcRequest.class, new JSONSerializer()));
pipeline.addLast(new StringDecoder());
//添加自定義事件處理器
pipeline.addLast(userClientHandler);
}
});
3.修改服務端,我們需要添加一個解碼器
package com.lagou.entity;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.handler.codec.MessageToMessageDecoder;
import java.nio.charset.Charset;
import java.util.List;
public class RpcDecoder extends ByteToMessageDecoder {
private Class<?> clazz;
private Serializer serializer;
public RpcDecoder(Class<?> clazz, Serializer serializer) {
this.clazz = clazz;
this.serializer = serializer;
}
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
if (clazz != null ) {
int lenght=byteBuf.readableBytes();
if (lenght < 1) return;
//讀取長度
int len = byteBuf.readInt();
//讀取請求bytes
byte[] bytes = new byte[len];
byteBuf.getBytes(4, bytes);
System.out.println(new String(bytes, Charset.forName("UTF-8")));
//使用fastjson轉換
Object o=serializer.deserialize(clazz,bytes);
list.add(o);
byteBuf.clear();
}
}
}
4.修改服務端,添加springboot容器,将 UserServiceHandler、UserServiceImpl都加入到容器中進行管理,UserServiceHandler如果為單例的話會報錯,所有為其設定@Scope(“prototype”)。其次使用ApplicationContext擷取Request中的待調用的對象,進行調用
@Scope("prototype")
@Component
public class UserServiceHandler extends ChannelInboundHandlerAdapter implements ApplicationContextAware {
private ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
//當用戶端讀取資料時,該方法會被調用
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//注意: 用戶端将來發送請求的時候會傳遞一個參數: UserService#sayHello#are you ok
//1.判斷目前的請求是否符合規則
RpcRequest rpcRequest = (com.lagou.entity.RpcRequest) msg;
//擷取對象;
Object e=applicationContext.getBean(rpcRequest.getClassName());
//requestClass.
Class requestClass=e.getClass();
//requestMethod.
Method method = requestClass.getMethod(rpcRequest.getMethodName(),rpcRequest.getParameterTypes());
//調用方法
Object result= method.invoke(e,rpcRequest.getParameters());
System.out.println("直接反射:"+result);
//傳回success
ctx.writeAndFlush("success");
}
}
5.啟動Consumer與provider即可看到結果,其次Netty有必要仔細學習一下,參考文章ByteBuf詳解