天天看點

rpc調用 java_RPC調用的簡單實作

RPC調用流程

rpc調用 java_RPC調用的簡單實作

流程描述:

1.服務調用者發送請求(interface#method#args)

2.用戶端進行StringEncode編碼

3.資料寫到服務提供者

4.服務提供者接受請求

5.将接收的包進行StringDecode解碼

6.服務提供方調用對應api

7.服務提供方響應方法調用結果

8.服務提供方将結果集進行StringEncode編碼

9.服務提供方發送結果集包到服務調用者

10.服務調用者接受資料包

11.服務調用者将資料包進行StringDecode解碼

12.服務調用者輸出方法調用結果集

代碼流程:

服務端接口:

package com.hx.zbhuang.netty.dubboCall;

public interface HelloService {

String hello(String mes);

String say(String msg);

}

服務端接口實作類:

package com.hx.zbhuang.netty.dubboCall;

public class HelloServiceImpl implements HelloService {

@Override

public String hello(String msg) {

if(msg!=null) {

return "hello 豆腐蛋,i accept you msg:["+msg+"]";

} else {

return "hello 豆腐蛋,i accept you msg";

}

}

@Override

public String say(String msg) {

return "hello 豆腐蛋" + "==msg:"+msg;

}

}

服務端初始化:

package com.hx.zbhuang.netty.dubboCall;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelPipeline;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioServerSocketChannel;

import io.netty.handler.codec.string.StringDecoder;

import io.netty.handler.codec.string.StringEncoder;

public class NettyServer {

public static void main(String[] args) {

EventLoopGroup bossGroup = null;

EventLoopGroup workerGroup = null;

try {

workerGroup = new NioEventLoopGroup();

bossGroup = new NioEventLoopGroup(1);

ServerBootstrap serverBootstrap = new ServerBootstrap();

serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)

.childHandler(new ChannelInitializer() {

@Override

protected void initChannel(SocketChannel socketChannel) throws Exception {

ChannelPipeline pipeline = socketChannel.pipeline();

pipeline.addLast(new StringDecoder());

pipeline.addLast(new StringEncoder());

// 服務端攔水壩處理

pipeline.addLast(new NettyServerHandler());

}

});

ChannelFuture channelFuture = serverBootstrap.bind(7766).sync();

channelFuture.channel().closeFuture().sync();

} catch (InterruptedException e) {

e.printStackTrace();

}finally {

bossGroup.shutdownGracefully();

workerGroup.shutdownGracefully();

}

}

}

服務端資料處理handler

package com.hx.zbhuang.netty.dubboCall;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.SimpleChannelInboundHandler;

import java.lang.reflect.Method;

public class NettyServerHandler extends SimpleChannelInboundHandler {

@Override

protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {

System.out.println("msg="+msg);

String methodName = msg.toString().split("#")[1];

Method[] methods = HelloService.class.getMethods();

for (Method method:methods) {

if(method.getName().equals(methodName)) {

Object obj = method.invoke(new HelloServiceImpl(),msg.toString().substring(msg.toString().lastIndexOf("#")+1));

ctx.writeAndFlush(obj.toString());

}

}

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

ctx.close();

}

}

用戶端接口調用初始化

package com.hx.zbhuang.netty.dubboCall;

import io.netty.bootstrap.Bootstrap;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelOption;

import io.netty.channel.ChannelPipeline;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioSocketChannel;

import io.netty.handler.codec.string.StringDecoder;

import io.netty.handler.codec.string.StringEncoder;

import java.lang.reflect.Proxy;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

public class NettyClient {

//線程池管理用戶端請求

private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

private static NettyClientHandler client;

Object getBean(final Class> serviceClass, final String interfaceName){

return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),new Class>[]{serviceClass},(proxy,method,args) -> {

if (client ==null) {

initClient();

}

client.setPara(interfaceName+"#"+method.getName()+"#"+args[0]);

Object obj = executor.submit(client).get();

return obj;

});

}

private static void initClient() {

client = new NettyClientHandler();

NioEventLoopGroup group =null;

try {

group = new NioEventLoopGroup();

Bootstrap bootstrap = new Bootstrap();

bootstrap.group(group).channel(NioSocketChannel.class)

.option(ChannelOption.TCP_NODELAY,true)

.handler(new ChannelInitializer() {

@Override

protected void initChannel(SocketChannel socketChannel) throws Exception {

ChannelPipeline pipeline=socketChannel.pipeline();

pipeline.addLast(new StringDecoder());

pipeline.addLast(new StringEncoder());

// 用戶端攔水壩處理

pipeline.addLast(client);

}

});

bootstrap.connect("127.0.0.1",7766).sync();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

用戶端資料處理handler

package com.hx.zbhuang.netty.dubboCall;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.concurrent.Callable;

public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {

// channelHandler上下文

private ChannelHandlerContext context;

// 遠端服務擷取方法調用結果

private String result;

// 接口名#方法名#參數

private String para;

@Override

public synchronized Object call() throws Exception {

context.writeAndFlush(para);

wait();

return result;

}

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

context = ctx;

}

@Override

public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

result = msg.toString();

notify();

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

ctx.close();

}

void setPara(String para) {

this.para = para;

}

}

模拟接口調用:

package com.hx.zbhuang.netty.dubboCall;

public class ClientBootstrap {

// 暴露的接口名

public static final String interfaceName = "HelloService";

public static void main(String[] args) throws InterruptedException {

NettyClient customer = new NettyClient();

// 調用遠端服務接口

HelloService service = (HelloService)customer.getBean(HelloService.class, interfaceName);

String msg = service.say("來打王者");

System.out.println("調用結果"+ msg);

}

}

用戶端遠端調用服務端接口響應:

rpc調用 java_RPC調用的簡單實作

本文位址:https://blog.csdn.net/qq_33554285/article/details/110457528

如您對本文有疑問或者有任何想說的,請點選進行留言回複,萬千網友為您解惑!