天天看點

基于Netty的RPC實作

前言

這個demo,主要是展示了RPC與Netty在一起所産生的一種奇妙的化學反應,與傳統的,阻塞的通信不同,基于Netty的PRC架構,可以實作,兩個服務之前,異步的方法調用。

其實這篇文章的關鍵詞已經給出來了,即:兩個服務之前,異步的,方法調用。

一、RPC是什麼

如果不清楚RPC是什麼,推薦閱讀我的部落格《PRC原理分析:從一個簡單的DEMO開始》

https://blog.csdn.net/m13797378901/article/details/86538744

當然,我這裡也再一次說明一下RPC是什麼:

RPC,全稱為Remote Procedure Call,即遠端過程調用,它是一個計算機通信協定。它允許像調用本地服務一樣調用遠端服務。它可以有不同的實作方式。如RMI(遠端方法調用)、Hessian、Http invoker等。另外,RPC是與語言無關的。

以上是來自原網上其他部落格的說明,這裡我補充一點:RPC是與語言無關的,他是一種理念,思想。

二、Netty是什麼

Netty是一個高性能、異步事件驅動的NIO架構,它提供了對TCP、UDP和檔案傳輸的支援,作為一個異步NIO架構,Netty的所有IO操作都是異步非阻塞的,通過Future-Listener機制,使用者可以友善的主動擷取或者通過通知機制獲得IO操作結果。作為目前最流行的NIO架構,Netty在網際網路領域、大資料分布式計算領域、遊戲行業、通信行業等獲得了廣泛的應用,一些業界著名的開源元件也基于Netty的NIO架構建構。

Netty是一種先進的NIO解決方案,他完善了java中NIO并且對其進行了優化,使之可以更高效的更友善的進行使用。

那麼NIO是什麼呢?

臭不要臉的部落客推薦閱讀自己的部落格《與NIO的第一次親密接觸》 =_=

https://blog.csdn.net/m13797378901/article/details/88977996

三、源碼分析

廢話不多少,我們直接開幹吧。

1、項目結構

服務端

基于Netty的RPC實作

用戶端

基于Netty的RPC實作

公共契約公共類

基于Netty的RPC實作

上方圖1為服務端架構,圖2位用戶端架構,圖3為公共類以及契約接口

(1)服務端包結構說明
  • api:存放契約實作
  • netty:網絡通信,以及核心業務實作
  • utils:工具包,Static的資料集合
(2)用戶端包結構說明
  • interfaces:存放一個回調接口INettyCallBack
  • netty:網絡通信,以及核心業務實作
(3)公共包以及契約
  • api: 用戶端與伺服器共同遵守的契約,即用戶端負責調用契約,伺服器負責實作契約
  • model:公共實體
  • utils:公共的工具包,如json相關,字元串相關的處理

2、服務端代碼分析

(1)Starter 類

該類是啟動類,負責聲明端口,注冊中心資料注入,以及netty的啟動。

* @author xuyuanpeng
 * @version 1.0
 * @date 2019-04-09 21:31
 */
public class Starter {
    private static Logger logger = Logger.getLogger("Starter");

    public static void main(String [] agres) throws Exception {
    	//EchoServerImpl類,實作了IEchoServer 接口
        IEchoServer echoServer=new EchoServerImpl(8088);
        //将契約注冊到注冊中心,契約儲存的Key:契約,Value:契約實作類。
        echoServer.register(IUserService.class, UserServiceImpl.class);
        //啟動
        echoServer.start();
    }
}
           
(2)IEchoServer 接口
* @author xuyuanpeng
 * @version 1.0
 * @date 2019-04-09 20:23
 */
public interface IEchoServer {
    //停止服務
    public void stop();

    //開始服務
    public void start() throws Exception;

    //注冊服務,注冊服務,就是講接口,以及對應的實作,放到了一個Map中
    public void register(Class serviceInterface, Class impl);

    //判斷目前服務是否在運作
    public boolean isRunning();

    //擷取使用的端口
    public int getPort();
}
           
(3)EchoServerImpl 核心實作類

值得注意的是, StaticData.serviceRegistry是邏輯上的注冊中心,但本質上是一個HashMap。

在start方法中,将EchoServerHandler注入到ChannelPipeline中。

在Netty裡,Channel是通訊的載體,而ChannelHandler負責Channel中的邏輯處理。那麼ChannelPipeline是什麼呢?我覺得可以了解為ChannelHandler的容器:一個Channel包含一個ChannelPipeline,所有ChannelHandler都會注冊到ChannelPipeline中,并按順序組織起來。

在Netty中,ChannelEvent是資料或者狀态的載體,例如傳輸的資料對應MessageEvent,狀态的改變對應ChannelStateEvent。當對Channel進行操作時,會産生一個ChannelEvent,并發送到ChannelPipeline。ChannelPipeline會選擇一個ChannelHandler進行處理。這個ChannelHandler處理之後,可能會産生新的ChannelEvent,并流轉到下一個ChannelHandler。

/**
 * @author xuyuanpeng
 * @version 1.0
 * @date 2019-03-06 14:47
 */
public class EchoServerImpl implements IEchoServer{
    private final Integer port;
    private static Boolean isRunning = false;
    private static Logger logger = Logger.getLogger("EchoServerImpl");

    public EchoServerImpl(int port){
        this.port=port;
    }

    @Override
    public void start() throws Exception {
        logger.info("start");
        NioEventLoopGroup group=new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap=new ServerBootstrap();
            bootstrap.group(group)
                    //指定使用 NIO 的傳輸 Channel
                    .channel(NioServerSocketChannel.class)
                    //.設定 socket 位址使用所選的端口
                    .localAddress(new InetSocketAddress(port))
                    //添加 EchoServerHandler 到 Channel 的 ChannelPipeline
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new EchoServerHandler());
                        }
                    });
            //綁定的伺服器;sync 等待伺服器關閉
            ChannelFuture future=bootstrap.bind().sync();
            //關閉 channel 和 塊,直到它被關閉
            future.channel().closeFuture().sync();
        }finally {
            //關機的 EventLoopGroup,釋放所有資源。
            group.shutdownGracefully().sync();
        }
    }

    @Override
    public void register(Class serviceInterface, Class impl) {
        StaticData.serviceRegistry.put(serviceInterface.getName(), impl);
    }
}

           
(4)EchoServerHandler 具體業務實作

channelRead方法:接受到用戶端消息後,會進入此方法。

接受到用戶端消息後,将位元組轉為字元串,并且,對字元串進行json解析,得到實體模型,其中包含了類,方法,參數類型,參數值。

再使用java中的invoke,可以直接調用方法,并且擷取傳回值。

擷取傳回值後,轉為ByteBuf類型,再寫入流中,傳回到用戶端。

/**
 * @author xuyuanpeng
 * @version 1.0
 * @date 2019-03-06 14:28
 */
//@Sharable 辨別這類的執行個體之間可以在 channel 裡面共享
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    private static Logger logger = Logger.getLogger("EchoServerHandler");

    @Override
    public void channelRead(ChannelHandlerContext context, Object msg){
        logger.info("channelRead");
        ByteBuf in= (ByteBuf) msg;
        //将所接收的資料傳回給發送者。注意,這裡還沒有沖刷資料
        /**
         * 解析資料,處理資料,調用invoke
         * 将invoke資料傳回
         */
        String resultStr="";
        try {
            String read= in.toString(CharsetUtil.UTF_8);
            RPCNetty entity = (RPCNetty) JsonMapper.fromJsonString(read,RPCNetty.class);
            Class clz= StaticData.serviceRegistry.get(entity.getClzName());
            Method method=clz.getMethod(entity.getMethodName(),entity.getParamTypes());
            
            Object resultObj=method.invoke(clz.newInstance(),entity.getArguments());
            resultStr=JsonMapper.toJsonString(resultObj);
        }catch (Exception e){
            logger.info("Exception>"+e.getMessage());
        }

        ByteBuf resultIn= Unpooled.buffer();
        resultIn.writeBytes(resultStr.getBytes());
        //必須寫入ByteBuf位元組流
        context.write(resultIn);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext context) throws Exception{
        //沖刷所有待審消息到遠端節點。關閉通道後,操作完成
        logger.info("channelReadComplete");

        context.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();            
        ctx.close();
    }
}
           

3、用戶端代碼分析

(1)Starter 啟動類

聲明端口,通過代理聲明IUserService ,代理調用接口的方法。

由于是異步通信,這裡采用回調的方式來接收資料。

* @author xuyuanpeng
 * @version 1.0
 * @date 2019-04-09 21:31
 */
public class Starter {
    private static java.util.logging.Logger logger = Logger.getLogger("Starter");

    public static void main(String [] agres){
        final String host = "127.0.0.1";
        final int port = 8088;
        InetSocketAddress inetSocketAddress=new InetSocketAddress(host, port);

        IUserService userService=RPCFactory.getRemoteProxyObj(IUserService.class,inetSocketAddress,
                new INettyCallBack() {
                    @Override
                    public Object calllBack(Object obj) {
                        logger.info("obj>"+ JsonMapper.toJsonString(obj));
                        return null;
                    }
                });
        userService.getUser("1");
    }
}
           
(2)IUserService 契約
//契約接口
public interface IUserService {
    String getUser(String var1);
}

//契約實作
@Service
public class UserServiceImpl implements IUserService {
    @Override
    public String getUser(String id) {
        return "I am Service, Hi Client "+id +" , nice to meet you :)";
    }
}
           
(3)RPCFactory 代理實作類

傳遞網絡連接配接需要的host以及port。

将相關的對象,來封裝實體對象傳遞到Netty處理類中,進行進一步的處理

private static java.util.logging.Logger logger = Logger.getLogger("EchoClientHandler");

    public static <T> T getRemoteProxyObj(final Class<?> serviceInterface, final InetSocketAddress addr, final INettyCallBack nettyCallBack) {
        // 1.将本地的接口調用轉換成JDK的動态代理,在動态代理中實作接口的遠端調用
        return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(),
                new Class<?>[]{serviceInterface},
                new InvocationHandler() {
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        logger.info("invoke");
                        EchoClient echoClient=new EchoClient(addr);
                        RPCNetty rpcNetty=new RPCNetty();
                        rpcNetty.setClzName(serviceInterface.getName());
                        rpcNetty.setMethodName(method.getName());
                        rpcNetty.setParamTypes(method.getParameterTypes());
                        rpcNetty.setArguments(args);
                        echoClient.setRpcNetty(rpcNetty);
                        echoClient.setNettyCallBack(nettyCallBack);
                        echoClient.start();
                        return null;
                    }
                });
    }
}
           
(4)EchoClient 中

與服務端中的start方法類似,

都是綁定對應的Channel,聲明遠端端口,在回調方法 中,聲明Handler對象,将相關對象,設定到此對象中,最後,将Handler綁定到Channel的管道上。

* @author xuyuanpeng
 * @version 1.0
 * @date 2019-03-06 17:52
 */
public class EchoClient {
    private  String host;
    private  int port;
    private InetSocketAddress inetSocketAddress;
    private INettyCallBack nettyCallBack;
    private RPCNetty rpcNetty;
    private static java.util.logging.Logger logger = java.util.logging.Logger.getLogger("EchoClient");

    public void start() throws Exception{
        EventLoopGroup group=new NioEventLoopGroup();
        try {
            Bootstrap b=new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .remoteAddress(inetSocketAddress)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            EchoClientHandler echoClientHandler=new EchoClientHandler();
                            echoClientHandler.setRpcNetty(rpcNetty);
                            echoClientHandler.setNettyCallBack(nettyCallBack);;

                            socketChannel.pipeline().addLast(echoClientHandler);
                        }
                    });

            ChannelFuture future=b.connect().sync();
            future.channel().closeFuture().sync();
        }finally {
            group.shutdownGracefully().sync();
        }
    }
    ......
}
           
(5)EchoClientHandler 具體業務處理方法

當Handler被處理,激活以後,會觸發channelActive方法,在此方法中,可以将由代理方法中傳輸的對象,傳遞到伺服器中,具體傳輸方式,同于服務端接受資料,即,将實體模型轉為Json字元串,再将字元串轉為位元組流寫入到伺服器中。

伺服器如果接受到資料,會對其中的參數,即,接口類,方法,方法參數,方法參數類型,進行處理,即對其進行代理,找到伺服器中具體的實作類,實作此方法。

當伺服器處理完畢,得到了傳回值,再将值轉為json字元串位元組,傳遞到用戶端。

channelRead0方法,就是用戶端接受資料之後的回調。

由于是異步進行處理,是以我們再其中同樣也進行溢出接口調用,在其他位置,隻要實作此接口,就可以擷取到伺服器傳輸的資料。

/**
 * @author xuyuanpeng
 * @version 1.0
 * @date 2019-03-06 17:40
 */
@ChannelHandler.Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    private static java.util.logging.Logger logger = Logger.getLogger("EchoClientHandler");

    private RPCNetty rpcNetty;
    private INettyCallBack nettyCallBack;
	......
    /**
     * 請求資料
     */
    @Override
    public void channelActive(ChannelHandlerContext context){
        logger.info("client channelActive >>"+ JsonMapper.toJsonString(rpcNetty));
        context.writeAndFlush(Unpooled.copiedBuffer(JsonMapper.toJsonString(rpcNetty),
                CharsetUtil.UTF_8));
    }

    /**
     * 伺服器傳回資料
     */
    @Override
    protected void channelRead0(ChannelHandlerContext context, ByteBuf byteBuf) throws Exception {
        logger.info("client channelRead0 >>"+byteBuf.toString(CharsetUtil.UTF_8));
        nettyCallBack.calllBack(byteBuf.toString(CharsetUtil.UTF_8));
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,
                                Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
           

3、公共服務

(1)RPCNetty 通信實體
* @author xuyuanpeng
 * @version 1.0
 * @date 2019-04-08 18:57
 */
public class RPCNetty implements Serializable {
    private String clzName;
    private Class clz;
    private String methodName;
    private Class<?>[] paramTypes;
    private Object[] arguments;
    }
           

4、運作結果

(1)啟動服務端

四月 10, 2019 5:57:54 下午 com.xyp.iodemo.nettyrpcserver.netty.EchoServerImpl start

資訊: start

(2)啟動用戶端

四月 10, 2019 6:24:15 下午 com.xyp.iodemo.nettyrpcclient.netty.RPCFactory$1 invoke

資訊: invoke

四月 10, 2019 6:24:15 下午 com.xyp.iodemo.nettyrpcclient.netty.EchoClientHandler channelActive

資訊: client channelActive >>{“clzName”:“com.xyp.iodemo.nettyrpccommon.api.IUserService”,“methodName”:“getUser”,“paramTypes”:[“java.lang.String”],“arguments”:[“1”]}

四月 10, 2019 6:24:16 下午 com.xyp.iodemo.nettyrpcclient.netty.EchoClientHandler channelRead0

資訊: client channelRead0 >>“I am Service, Hi Client 1 , nice to meet you ?”

四月 10, 2019 6:24:16 下午 com.xyp.iodemo.nettyrpcclient.Starter$1 calllBack

資訊: obj>"“I am Service, Hi Client 1 , nice to meet you ?”"

(3)伺服器響應

四月 10, 2019 5:57:54 下午 com.xyp.iodemo.nettyrpcserver.netty.EchoServerImpl start

資訊: start

四月 10, 2019 6:24:15 下午 com.xyp.iodemo.nettyrpcserver.netty.EchoServerHandler channelRead

資訊: channelRead

四月 10, 2019 6:24:15 下午 com.xyp.iodemo.nettyrpcserver.netty.EchoServerHandler channelRead

資訊: read>>>>{“clzName”:“com.xyp.iodemo.nettyrpccommon.api.IUserService”,“methodName”:“getUser”,“paramTypes”:[“java.lang.String”],“arguments”:[“1”]}

四月 10, 2019 6:24:16 下午 com.xyp.iodemo.nettyrpcserver.netty.EchoServerHandler channelRead

資訊: resultStr>>>>“I am Service, Hi Client 1 , nice to meet you ?”

四月 10, 2019 6:24:16 下午 com.xyp.iodemo.nettyrpcserver.netty.EchoServerHandler channelRead

資訊: context.write>>>>“I am Service, Hi Client 1 , nice to meet you ?”

四月 10, 2019 6:24:16 下午 com.xyp.iodemo.nettyrpcserver.netty.EchoServerHandler channelReadComplete

資訊: channelReadComplete

四、總結

項目的git位址:https://gitee.com/xyp_YF/SNetttRpc.git

說明

Netty穩定且高效,易用,是進行NIO通信的不錯之選。

我們采用了Netty的通信方式,來處理RPC的底層的資料通信,這樣穩定且高效。

參考

《Netty權威指南》書籍

《與NIO的第一次親密接觸》 https://blog.csdn.net/m13797378901/article/details/86538744