RPC調用流程

流程描述:
1.服務調用者發送請求(interface#method#args)
2.用戶端進行StringEncode編碼
3.資料寫到服務提供者
4.服務提供者接受請求
5.将接收的包進行StringDecode解碼
6.服務提供方調用對應api
7.服務提供方響應方法調用結果
8.服務提供方将結果集進行StringEncode編碼
9.服務提供方發送結果集包到服務調用者
10.服務調用者接受資料包
11.服務調用者将資料包進行StringDecode解碼
12.服務調用者輸出方法調用結果集
代碼流程:
服務端接口:
package com.hx.zbhuang.netty.dubboCall;
public interface HelloService {
String hello(String mes);
String say(String msg);
}
服務端接口實作類:
package com.hx.zbhuang.netty.dubboCall;
public class HelloServiceImpl implements HelloService {
@Override
public String hello(String msg) {
if(msg!=null) {
return "hello 豆腐蛋,i accept you msg:["+msg+"]";
} else {
return "hello 豆腐蛋,i accept you msg";
}
}
@Override
public String say(String msg) {
return "hello 豆腐蛋" + "==msg:"+msg;
}
}
服務端初始化:
package com.hx.zbhuang.netty.dubboCall;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class NettyServer {
public static void main(String[] args) {
EventLoopGroup bossGroup = null;
EventLoopGroup workerGroup = null;
try {
workerGroup = new NioEventLoopGroup();
bossGroup = new NioEventLoopGroup(1);
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
// 服務端攔水壩處理
pipeline.addLast(new NettyServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(7766).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
服務端資料處理handler
package com.hx.zbhuang.netty.dubboCall;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.lang.reflect.Method;
public class NettyServerHandler extends SimpleChannelInboundHandler {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("msg="+msg);
String methodName = msg.toString().split("#")[1];
Method[] methods = HelloService.class.getMethods();
for (Method method:methods) {
if(method.getName().equals(methodName)) {
Object obj = method.invoke(new HelloServiceImpl(),msg.toString().substring(msg.toString().lastIndexOf("#")+1));
ctx.writeAndFlush(obj.toString());
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
用戶端接口調用初始化
package com.hx.zbhuang.netty.dubboCall;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.lang.reflect.Proxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class NettyClient {
//線程池管理用戶端請求
private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static NettyClientHandler client;
Object getBean(final Class> serviceClass, final String interfaceName){
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),new Class>[]{serviceClass},(proxy,method,args) -> {
if (client ==null) {
initClient();
}
client.setPara(interfaceName+"#"+method.getName()+"#"+args[0]);
Object obj = executor.submit(client).get();
return obj;
});
}
private static void initClient() {
client = new NettyClientHandler();
NioEventLoopGroup group =null;
try {
group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY,true)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline=socketChannel.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
// 用戶端攔水壩處理
pipeline.addLast(client);
}
});
bootstrap.connect("127.0.0.1",7766).sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
用戶端資料處理handler
package com.hx.zbhuang.netty.dubboCall;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.concurrent.Callable;
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
// channelHandler上下文
private ChannelHandlerContext context;
// 遠端服務擷取方法調用結果
private String result;
// 接口名#方法名#參數
private String para;
@Override
public synchronized Object call() throws Exception {
context.writeAndFlush(para);
wait();
return result;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
context = ctx;
}
@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
result = msg.toString();
notify();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
void setPara(String para) {
this.para = para;
}
}
模拟接口調用:
package com.hx.zbhuang.netty.dubboCall;
public class ClientBootstrap {
// 暴露的接口名
public static final String interfaceName = "HelloService";
public static void main(String[] args) throws InterruptedException {
NettyClient customer = new NettyClient();
// 調用遠端服務接口
HelloService service = (HelloService)customer.getBean(HelloService.class, interfaceName);
String msg = service.say("來打王者");
System.out.println("調用結果"+ msg);
}
}
用戶端遠端調用服務端接口響應:
本文位址:https://blog.csdn.net/qq_33554285/article/details/110457528
如您對本文有疑問或者有任何想說的,請點選進行留言回複,萬千網友為您解惑!