1.RPC是什麼?
RPC,遠端過程調用,可以做到像本地調用一樣調用遠端服務,是一種程序間的通信方式,概念想必大家都很清楚,可以換一種思考方式去了解RPC,也就是從本地調用出發,進而去推導RPC調用
1.本地函數調用
本地函數是我們經常碰到的,比如下面示例:
public String sayHello(String name) {
return "hello, " + name;
}
我們隻需要傳入一個參數,調用sayHello方法就可以得到一個輸出,也就是輸入參數——>方法體——>輸出,入參、出參以及方法體都在同一個程序空間中,這就是本地函數調用
2. Socket通信
那有沒有辦法實作不同程序之間通信呢?調用方在程序A,需要調用方法A,但是方法A在程序B中
最容易想到的方式就是使用Socket通信,使用Socket可以完成跨程序調用,我們需要約定一個程序通信協定,來進行傳參,調用函數,出參。寫過Socket應該都知道,Socket是比較原始的方式,我們需要更多的去關注一些細節問題,比如參數和函數需要轉換成位元組流進行網絡傳輸,也就是序列化操作,然後出參時需要反序列化;使用socket進行底層通訊,代碼程式設計也比較容易出錯。
如果一個調用方需要關注這麼多問題,那無疑是個災難,是以有沒有什麼簡單方法,讓我們的調用方不需要關注細節問題,讓調用方像調用本地函數一樣,隻要傳入參數,調用方法,然後坐等傳回結果就可以了呢?
3. RPC架構
RPC架構就是用來解決上面的問題的,它能夠讓調用方像調用本地函數一樣調用遠端服務,底層通訊細節對調用方是透明的,将各種複雜性都給屏蔽掉,給予調用方極緻體驗。
RPC 核心功能
前面說了這麼多,再次總結下一個RPC架構需要重點關注哪幾個點:
- 代理 (動态代理)
- 通訊協定
- 序列化
- 網絡傳輸
一個 RPC 的核心功能主要有 5 個部分組成,分别是:用戶端、用戶端 Stub、網絡傳輸子產品、服務端 Stub、服務端等
PRC調用流程說明
- 服務消費方(client)以本地調用方式調用服務
- client stub 接收到調用後負責将方法、參數等封裝成能夠進行網絡傳輸的消息體
- client stub 将消息進行編碼并發送到服務端
- server stub 收到消息後進行解碼
- server stub根據解碼結果調用本地的服務
- 本地服務執行并将結果傳回給 server stub
- server stub将傳回導入結果進行編碼并發送至消費方
- client stub 接收到消息并進行解碼
- 服務消費方(client)得到結果
2.基于Netty實作dubbo RPC
需求說明
- dubbo 底層使用了 Netty 作為網絡通訊架構,要求用 Netty 實作一個簡單的 RPC 架構
- 模仿dubbo,消費者和提供者約定接口和協定,消費者遠端調用提供者的服務,提供者傳回一個字元串,消費者列印提供者傳回的資料。底層網絡通信使用Netty 4.1.20
公共接口
HelloService
public interface HelloService {
String hello(String msg);
}
HelloServiceImpl
public class HelloServiceImpl implements HelloService {
//當有消費方調用該方法時, 就傳回一個結果
@Override
public String hello(String msg) {
System.out.println("收到用戶端消息=" + msg);
//根據mes 傳回不同的結果
if (msg!= null) {
return "你好用戶端, 我已經收到你的消息 [" + msg+ "]";
} else {
return "你好用戶端, 我已經收到你的消息 ";
}
}
}
服務端
NettyServer
public class NettyServer {
public static void startServer(String hostName, int port) {
startServer0(hostName, port);
}
//編寫一個方法,完成對NettyServer的初始化和啟動
private static void startServer0(String hostname, int port) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new NettyServerHandler()); //業務處理器
}
}
);
ChannelFuture channelFuture = serverBootstrap.bind(hostname, port).sync();
System.out.println("服務提供方開始提供服務~~");
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
NettyServerHandler
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//擷取用戶端發送的消息,并調用服務
System.out.println("msg=" + msg);
//用戶端在調用伺服器的api 時,我們需要定義一個協定
//比如我們要求 每次發消息是都必須以某個字元串開頭 "HelloService#hello#你好"
if (msg.toString().startsWith(ClientBootstrap.providerName)) {
String result = new HelloServiceImpl().hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
ctx.writeAndFlush(result);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
ServerBootstrap
//ServerBootstrap 會啟動一個服務提供者,就是 NettyServer
public class ServerBootstrap {
public static void main(String[] args) {
NettyServer.startServer("127.0.0.1", 7000);
}
用戶端
NettyClient
public class NettyClient {
//建立線程池
private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static NettyClientHandler client;
//編寫方法使用代理模式,擷取一個代理對象
public Object getBean(final Class<?> serviceClass, final String providerName) {
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[] { serviceClass },
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//{} 部分的代碼,用戶端每調用一次 hello, 就會進入到該代碼
if (client == null) {
initClient();
}
//設定要發給伺服器端的資訊
//providerName 協定頭 args[0] 就是用戶端調用api hello(???), 參數
client.setPara(providerName + args[0]);
//
return executor.submit(client).get();
}
});
}
//初始化用戶端
private static void initClient() {
client = new NettyClientHandler();
//建立EventLoopGroup
NioEventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(client);
}
});
try {
bootstrap.connect("127.0.0.1", 7000).sync();
} catch (Exception e) {
e.printStackTrace();
}
}
}
NettyClientHandler
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
private ChannelHandlerContext context;//上下文
private String result; //傳回的結果
private String para; //用戶端調用方法時,傳入的參數
//與伺服器的連接配接建立後,就會被調用, 這個方法是第一個被調用(1)
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(" channelActive 被調用 ");
context = ctx; //因為我們在其它方法會使用到 ctx
}
//收到伺服器的資料後,調用方法 (4)
//
@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(" channelRead 被調用 ");
result = msg.toString();
notify(); //喚醒等待的線程
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
//被代理對象調用, 發送資料給伺服器,-> wait -> 等待被喚醒(channelRead) -> 傳回結果 (3)-》5
@Override
public synchronized Object call() throws Exception {
System.out.println(" call1 被調用 ");
context.writeAndFlush(para);
//進行wait
wait(); //等待channelRead 方法擷取到伺服器的結果後,喚醒
System.out.println(" call2 被調用 ");
return result; //服務方傳回的結果
}
//(2)
void setPara(String para) {
System.out.println(" setPara ");
this.para = para;
}
}
ClientBootstrap
public class ClientBootstrap {
//這裡定義協定頭
public static final String providerName = "HelloService#hello#";
public static void main(String[] args) throws Exception {
//建立一個消費者
NettyClient customer = new NettyClient();
//建立代理對象
HelloService service = (HelloService) customer.getBean(HelloService.class, providerName);
for (; ; ) {
Thread.sleep(2 * 1000);
//通過代理對象調用服務提供者的方法(服務)
String res = service.hello("你好 dubbo~");
System.out.println("調用的結果 res= " + res);
}
}
}