天天看点

基于Netty的RPC实现

前言

这个demo,主要是展示了RPC与Netty在一起所产生的一种奇妙的化学反应,与传统的,阻塞的通信不同,基于Netty的PRC框架,可以实现,两个服务之前,异步的方法调用。

其实这篇文章的关键词已经给出来了,即:两个服务之前,异步的,方法调用。

一、RPC是什么

如果不清楚RPC是什么,推荐阅读我的博客《PRC原理分析:从一个简单的DEMO开始》

https://blog.csdn.net/m13797378901/article/details/86538744

当然,我这里也再一次说明一下RPC是什么:

RPC,全称为Remote Procedure Call,即远程过程调用,它是一个计算机通信协议。它允许像调用本地服务一样调用远程服务。它可以有不同的实现方式。如RMI(远程方法调用)、Hessian、Http invoker等。另外,RPC是与语言无关的。

以上是来自原网上其他博客的说明,这里我补充一点:RPC是与语言无关的,他是一种理念,思想。

二、Netty是什么

Netty是一个高性能、异步事件驱动的NIO框架,它提供了对TCP、UDP和文件传输的支持,作为一个异步NIO框架,Netty的所有IO操作都是异步非阻塞的,通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。作为当前最流行的NIO框架,Netty在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用,一些业界著名的开源组件也基于Netty的NIO框架构建。

Netty是一种先进的NIO解决方案,他完善了java中NIO并且对其进行了优化,使之可以更高效的更方便的进行使用。

那么NIO是什么呢?

臭不要脸的博主推荐阅读自己的博客《与NIO的第一次亲密接触》 =_=

https://blog.csdn.net/m13797378901/article/details/88977996

三、源码分析

废话不多少,我们直接开干吧。

1、项目结构

服务端

基于Netty的RPC实现

客户端

基于Netty的RPC实现

公共契约公共类

基于Netty的RPC实现

上方图1为服务端架构,图2位客户端架构,图3为公共类以及契约接口

(1)服务端包结构说明
  • api:存放契约实现
  • netty:网络通信,以及核心业务实现
  • utils:工具包,Static的数据集合
(2)客户端包结构说明
  • interfaces:存放一个回调接口INettyCallBack
  • netty:网络通信,以及核心业务实现
(3)公共包以及契约
  • api: 客户端与服务器共同遵守的契约,即客户端负责调用契约,服务器负责实现契约
  • model:公共实体
  • utils:公共的工具包,如json相关,字符串相关的处理

2、服务端代码分析

(1)Starter 类

该类是启动类,负责声明端口,注册中心数据注入,以及netty的启动。

* @author xuyuanpeng
 * @version 1.0
 * @date 2019-04-09 21:31
 */
public class Starter {
    private static Logger logger = Logger.getLogger("Starter");

    public static void main(String [] agres) throws Exception {
    	//EchoServerImpl类,实现了IEchoServer 接口
        IEchoServer echoServer=new EchoServerImpl(8088);
        //将契约注册到注册中心,契约储存的Key:契约,Value:契约实现类。
        echoServer.register(IUserService.class, UserServiceImpl.class);
        //启动
        echoServer.start();
    }
}
           
(2)IEchoServer 接口
* @author xuyuanpeng
 * @version 1.0
 * @date 2019-04-09 20:23
 */
public interface IEchoServer {
    //停止服务
    public void stop();

    //开始服务
    public void start() throws Exception;

    //注册服务,注册服务,就是讲接口,以及对应的实现,放到了一个Map中
    public void register(Class serviceInterface, Class impl);

    //判断当前服务是否在运行
    public boolean isRunning();

    //获取使用的端口
    public int getPort();
}
           
(3)EchoServerImpl 核心实现类

值得注意的是, StaticData.serviceRegistry是逻辑上的注册中心,但本质上是一个HashMap。

在start方法中,将EchoServerHandler注入到ChannelPipeline中。

在Netty里,Channel是通讯的载体,而ChannelHandler负责Channel中的逻辑处理。那么ChannelPipeline是什么呢?我觉得可以理解为ChannelHandler的容器:一个Channel包含一个ChannelPipeline,所有ChannelHandler都会注册到ChannelPipeline中,并按顺序组织起来。

在Netty中,ChannelEvent是数据或者状态的载体,例如传输的数据对应MessageEvent,状态的改变对应ChannelStateEvent。当对Channel进行操作时,会产生一个ChannelEvent,并发送到ChannelPipeline。ChannelPipeline会选择一个ChannelHandler进行处理。这个ChannelHandler处理之后,可能会产生新的ChannelEvent,并流转到下一个ChannelHandler。

/**
 * @author xuyuanpeng
 * @version 1.0
 * @date 2019-03-06 14:47
 */
public class EchoServerImpl implements IEchoServer{
    private final Integer port;
    private static Boolean isRunning = false;
    private static Logger logger = Logger.getLogger("EchoServerImpl");

    public EchoServerImpl(int port){
        this.port=port;
    }

    @Override
    public void start() throws Exception {
        logger.info("start");
        NioEventLoopGroup group=new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap=new ServerBootstrap();
            bootstrap.group(group)
                    //指定使用 NIO 的传输 Channel
                    .channel(NioServerSocketChannel.class)
                    //.设置 socket 地址使用所选的端口
                    .localAddress(new InetSocketAddress(port))
                    //添加 EchoServerHandler 到 Channel 的 ChannelPipeline
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new EchoServerHandler());
                        }
                    });
            //绑定的服务器;sync 等待服务器关闭
            ChannelFuture future=bootstrap.bind().sync();
            //关闭 channel 和 块,直到它被关闭
            future.channel().closeFuture().sync();
        }finally {
            //关机的 EventLoopGroup,释放所有资源。
            group.shutdownGracefully().sync();
        }
    }

    @Override
    public void register(Class serviceInterface, Class impl) {
        StaticData.serviceRegistry.put(serviceInterface.getName(), impl);
    }
}

           
(4)EchoServerHandler 具体业务实现

channelRead方法:接受到客户端消息后,会进入此方法。

接受到客户端消息后,将字节转为字符串,并且,对字符串进行json解析,得到实体模型,其中包含了类,方法,参数类型,参数值。

再使用java中的invoke,可以直接调用方法,并且获取返回值。

获取返回值后,转为ByteBuf类型,再写入流中,返回到客户端。

/**
 * @author xuyuanpeng
 * @version 1.0
 * @date 2019-03-06 14:28
 */
//@Sharable 标识这类的实例之间可以在 channel 里面共享
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    private static Logger logger = Logger.getLogger("EchoServerHandler");

    @Override
    public void channelRead(ChannelHandlerContext context, Object msg){
        logger.info("channelRead");
        ByteBuf in= (ByteBuf) msg;
        //将所接收的数据返回给发送者。注意,这里还没有冲刷数据
        /**
         * 解析数据,处理数据,调用invoke
         * 将invoke数据返回
         */
        String resultStr="";
        try {
            String read= in.toString(CharsetUtil.UTF_8);
            RPCNetty entity = (RPCNetty) JsonMapper.fromJsonString(read,RPCNetty.class);
            Class clz= StaticData.serviceRegistry.get(entity.getClzName());
            Method method=clz.getMethod(entity.getMethodName(),entity.getParamTypes());
            
            Object resultObj=method.invoke(clz.newInstance(),entity.getArguments());
            resultStr=JsonMapper.toJsonString(resultObj);
        }catch (Exception e){
            logger.info("Exception>"+e.getMessage());
        }

        ByteBuf resultIn= Unpooled.buffer();
        resultIn.writeBytes(resultStr.getBytes());
        //必须写入ByteBuf字节流
        context.write(resultIn);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext context) throws Exception{
        //冲刷所有待审消息到远程节点。关闭通道后,操作完成
        logger.info("channelReadComplete");

        context.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();            
        ctx.close();
    }
}
           

3、客户端代码分析

(1)Starter 启动类

声明端口,通过代理声明IUserService ,代理调用接口的方法。

由于是异步通信,这里采用回调的方式来接收数据。

* @author xuyuanpeng
 * @version 1.0
 * @date 2019-04-09 21:31
 */
public class Starter {
    private static java.util.logging.Logger logger = Logger.getLogger("Starter");

    public static void main(String [] agres){
        final String host = "127.0.0.1";
        final int port = 8088;
        InetSocketAddress inetSocketAddress=new InetSocketAddress(host, port);

        IUserService userService=RPCFactory.getRemoteProxyObj(IUserService.class,inetSocketAddress,
                new INettyCallBack() {
                    @Override
                    public Object calllBack(Object obj) {
                        logger.info("obj>"+ JsonMapper.toJsonString(obj));
                        return null;
                    }
                });
        userService.getUser("1");
    }
}
           
(2)IUserService 契约
//契约接口
public interface IUserService {
    String getUser(String var1);
}

//契约实现
@Service
public class UserServiceImpl implements IUserService {
    @Override
    public String getUser(String id) {
        return "I am Service, Hi Client "+id +" , nice to meet you :)";
    }
}
           
(3)RPCFactory 代理实现类

传递网络连接需要的host以及port。

将相关的对象,来封装实体对象传递到Netty处理类中,进行进一步的处理

private static java.util.logging.Logger logger = Logger.getLogger("EchoClientHandler");

    public static <T> T getRemoteProxyObj(final Class<?> serviceInterface, final InetSocketAddress addr, final INettyCallBack nettyCallBack) {
        // 1.将本地的接口调用转换成JDK的动态代理,在动态代理中实现接口的远程调用
        return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(),
                new Class<?>[]{serviceInterface},
                new InvocationHandler() {
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        logger.info("invoke");
                        EchoClient echoClient=new EchoClient(addr);
                        RPCNetty rpcNetty=new RPCNetty();
                        rpcNetty.setClzName(serviceInterface.getName());
                        rpcNetty.setMethodName(method.getName());
                        rpcNetty.setParamTypes(method.getParameterTypes());
                        rpcNetty.setArguments(args);
                        echoClient.setRpcNetty(rpcNetty);
                        echoClient.setNettyCallBack(nettyCallBack);
                        echoClient.start();
                        return null;
                    }
                });
    }
}
           
(4)EchoClient 中

与服务端中的start方法类似,

都是绑定对应的Channel,声明远程端口,在回调方法 中,声明Handler对象,将相关对象,设置到此对象中,最后,将Handler绑定到Channel的管道上。

* @author xuyuanpeng
 * @version 1.0
 * @date 2019-03-06 17:52
 */
public class EchoClient {
    private  String host;
    private  int port;
    private InetSocketAddress inetSocketAddress;
    private INettyCallBack nettyCallBack;
    private RPCNetty rpcNetty;
    private static java.util.logging.Logger logger = java.util.logging.Logger.getLogger("EchoClient");

    public void start() throws Exception{
        EventLoopGroup group=new NioEventLoopGroup();
        try {
            Bootstrap b=new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .remoteAddress(inetSocketAddress)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            EchoClientHandler echoClientHandler=new EchoClientHandler();
                            echoClientHandler.setRpcNetty(rpcNetty);
                            echoClientHandler.setNettyCallBack(nettyCallBack);;

                            socketChannel.pipeline().addLast(echoClientHandler);
                        }
                    });

            ChannelFuture future=b.connect().sync();
            future.channel().closeFuture().sync();
        }finally {
            group.shutdownGracefully().sync();
        }
    }
    ......
}
           
(5)EchoClientHandler 具体业务处理方法

当Handler被处理,激活以后,会触发channelActive方法,在此方法中,可以将由代理方法中传输的对象,传递到服务器中,具体传输方式,同于服务端接受数据,即,将实体模型转为Json字符串,再将字符串转为字节流写入到服务器中。

服务器如果接受到数据,会对其中的参数,即,接口类,方法,方法参数,方法参数类型,进行处理,即对其进行代理,找到服务器中具体的实现类,实现此方法。

当服务器处理完毕,得到了返回值,再将值转为json字符串字节,传递到客户端。

channelRead0方法,就是客户端接受数据之后的回调。

由于是异步进行处理,所以我们再其中同样也进行溢出接口调用,在其他位置,只要实现此接口,就可以获取到服务器传输的数据。

/**
 * @author xuyuanpeng
 * @version 1.0
 * @date 2019-03-06 17:40
 */
@ChannelHandler.Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    private static java.util.logging.Logger logger = Logger.getLogger("EchoClientHandler");

    private RPCNetty rpcNetty;
    private INettyCallBack nettyCallBack;
	......
    /**
     * 请求数据
     */
    @Override
    public void channelActive(ChannelHandlerContext context){
        logger.info("client channelActive >>"+ JsonMapper.toJsonString(rpcNetty));
        context.writeAndFlush(Unpooled.copiedBuffer(JsonMapper.toJsonString(rpcNetty),
                CharsetUtil.UTF_8));
    }

    /**
     * 服务器返回数据
     */
    @Override
    protected void channelRead0(ChannelHandlerContext context, ByteBuf byteBuf) throws Exception {
        logger.info("client channelRead0 >>"+byteBuf.toString(CharsetUtil.UTF_8));
        nettyCallBack.calllBack(byteBuf.toString(CharsetUtil.UTF_8));
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,
                                Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
           

3、公共服务

(1)RPCNetty 通信实体
* @author xuyuanpeng
 * @version 1.0
 * @date 2019-04-08 18:57
 */
public class RPCNetty implements Serializable {
    private String clzName;
    private Class clz;
    private String methodName;
    private Class<?>[] paramTypes;
    private Object[] arguments;
    }
           

4、运行结果

(1)启动服务端

四月 10, 2019 5:57:54 下午 com.xyp.iodemo.nettyrpcserver.netty.EchoServerImpl start

信息: start

(2)启动客户端

四月 10, 2019 6:24:15 下午 com.xyp.iodemo.nettyrpcclient.netty.RPCFactory$1 invoke

信息: invoke

四月 10, 2019 6:24:15 下午 com.xyp.iodemo.nettyrpcclient.netty.EchoClientHandler channelActive

信息: client channelActive >>{“clzName”:“com.xyp.iodemo.nettyrpccommon.api.IUserService”,“methodName”:“getUser”,“paramTypes”:[“java.lang.String”],“arguments”:[“1”]}

四月 10, 2019 6:24:16 下午 com.xyp.iodemo.nettyrpcclient.netty.EchoClientHandler channelRead0

信息: client channelRead0 >>“I am Service, Hi Client 1 , nice to meet you ?”

四月 10, 2019 6:24:16 下午 com.xyp.iodemo.nettyrpcclient.Starter$1 calllBack

信息: obj>"“I am Service, Hi Client 1 , nice to meet you ?”"

(3)服务器响应

四月 10, 2019 5:57:54 下午 com.xyp.iodemo.nettyrpcserver.netty.EchoServerImpl start

信息: start

四月 10, 2019 6:24:15 下午 com.xyp.iodemo.nettyrpcserver.netty.EchoServerHandler channelRead

信息: channelRead

四月 10, 2019 6:24:15 下午 com.xyp.iodemo.nettyrpcserver.netty.EchoServerHandler channelRead

信息: read>>>>{“clzName”:“com.xyp.iodemo.nettyrpccommon.api.IUserService”,“methodName”:“getUser”,“paramTypes”:[“java.lang.String”],“arguments”:[“1”]}

四月 10, 2019 6:24:16 下午 com.xyp.iodemo.nettyrpcserver.netty.EchoServerHandler channelRead

信息: resultStr>>>>“I am Service, Hi Client 1 , nice to meet you ?”

四月 10, 2019 6:24:16 下午 com.xyp.iodemo.nettyrpcserver.netty.EchoServerHandler channelRead

信息: context.write>>>>“I am Service, Hi Client 1 , nice to meet you ?”

四月 10, 2019 6:24:16 下午 com.xyp.iodemo.nettyrpcserver.netty.EchoServerHandler channelReadComplete

信息: channelReadComplete

四、总结

项目的git地址:https://gitee.com/xyp_YF/SNetttRpc.git

说明

Netty稳定且高效,易用,是进行NIO通信的不错之选。

我们采用了Netty的通信方式,来处理RPC的底层的数据通信,这样稳定且高效。

参考

《Netty权威指南》书籍

《与NIO的第一次亲密接触》 https://blog.csdn.net/m13797378901/article/details/86538744