天天看點

基于Netty的RPC簡易實作

代碼位址如下:

http://www.demodashi.com/demo/13448.html

可以給你提供思路

也可以讓你學到Netty相關的知識

當然,這隻是一種實作方式

需求

看下圖,其實這個項目就是為了做這樣一件事。

有一個公共服務ServerA,它提供了一個名為getUserName的服務。

現在有多個類似ServerB的Web應用伺服器。

當客戶想通過ServerB要請求getUserName服務時,由于ServerB服務中因為沒有UserService的實作類,導緻不能正常提供服務的問題。

基于Netty的RPC簡易實作

預期結果

可以看到,在Client項目中,UserService沒有實作類,但是傳回了正常的結果。

基于Netty的RPC簡易實作

項目結構

整個項目分為三個部分,Server端、Client端以及一個公共jar。

下圖正是整個項目的目錄結構圖。

基于Netty的RPC簡易實作

公共部分

公共部分的存在是因為我将伺服器端和用戶端寫在了一個項目中,為了不讓代碼重複警告,是以提出來的一個公共子產品。主要是幾個實體類和一些序列化工具類。

  • MsgPackDecoder
    • 該類是一個MsgPack編碼器,主要作用将Object對象序列化成byte數組。
  • MsgPackEncoder
    • 該類是一個MsgPack解碼器,主要作用将byte數組反序列化成Object對象。
  • ObjectCodec
    • 該類是繼承了MessageToMessageCodec<ByteBuf, Object>。是自定義序列化類主要有兩個方法,encode和decode
      @Override
          protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) {
              // 調用工具類的序列化方法
              byte[] data = ObjectSerializerUtils.serilizer(msg);
              ByteBuf buf = Unpooled.buffer();
              buf.writeBytes(data);
              out.add(buf);
          }
      
          @Override
          protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {
              // 調用工具類的反序列化方法
              byte[] bytes = new byte[msg.readableBytes()];
              msg.readBytes(bytes);
              Object deSerilizer = ObjectSerializerUtils.deSerilizer(bytes);
              out.add(deSerilizer);
          }                
  • ObjectSerializerUtils
    • 序列化工具類。序列化方法可以自己實作
  • MethodInvokeMeta
    • 重點類。這個類是一個實體類。用于在網絡中傳輸的類。主要有5個字段分别記錄了一個接口的類對象,調用接口的方法名,方法的參數清單(包含參數類型,和參數清單),方法的傳回值類型。
    • 在用戶端中,這個類将調用方所要調用的方法封裝。
    • 在服務端中,這個類主要用于伺服器反射調用方法。
    • 當然,也可以用String來記錄這些元資訊。
  • NullWritable
    • 這個類主要用于在網絡中傳輸null。當傳回值為null時,服務端會傳回NullWritable對象。用戶端接收到NullWritable時進行null處理。
  • User
    • 實體對象。測試用例

用戶端

上面的目錄結構圖也有提到,用戶端中隻有UserService接口,是以用戶端中如果不做處理是不能正常運作的。

用戶端中核心類有以下7個,其中與Netty相關的核心類與服務端一樣有3個

  • ClientChannelHandlerAdapter
  • CustomChannelInitializerClient
  • NettyClient
  • RpcProxyFactoryBean
  • NettyBeanScanner
  • PackageClassUtils
  • WrapMethodUtils

NettyClient

這個類是Netty用戶端的啟動類,這個類中與Netty服務端進行通信

CustomChannelInitializerClient

這個類是用于初始化管道事件的類。主要添加了TCP粘包問題解決方案和自定義編解碼器工具

ClientChannelHandlerAdapter

這個類是Netty用戶端的處理類,主要通過這個類将調用資訊寫給Netty服務端。

NettyBeanScanner

這個類實作了BeanFactoryPostProcessor接口。BeanFactoryPostProcessor是Spring初始化Bean時對外暴露的擴充點。

Spring IoC容器允許BeanFactoryPostProcessor在容器執行個體化任何bean之前讀取bean的定義(配置中繼資料),并可以修改它。同時可以定義多個BeanFactoryPostProcessor,通過設定'order'屬性來确定各個BeanFactoryPostProcessor執行順序。

在postProcessBeanFactory方法中,調用PackageClassUtils.resolver方法,将UserService.class注冊到SpringBean工廠。

/**
     * 注冊Bean到Spring的bean工廠
     */
    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
        this.beanFactory = (DefaultListableBeanFactory) beanFactory;
        // 加載遠端服務的接口
        List<String> resolverClass = PackageClassUtils.resolver(basePackage);
        for (String clazz : resolverClass) {
            String simpleName;
            if (clazz.lastIndexOf('.') != -1) {
                simpleName = clazz.substring(clazz.lastIndexOf('.') + 1);
            } else {
                simpleName = clazz;
            }
            BeanDefinitionBuilder gd = BeanDefinitionBuilder.genericBeanDefinition(RpcProxyFactoryBean.class);
            gd.addPropertyValue("interfaceClass", clazz);
            gd.addPropertyReference("nettyClient", clientName);
            this.beanFactory.registerBeanDefinition(simpleName, gd.getRawBeanDefinition());
        }
    }                

RpcProxyFactoryBean

重點類,這個類繼承了AbstractFactoryBean

使用jdk動态代理的方式建立代理對象。

在invoke方法中,

調用WrapMethodUtils工具類中的方法,将代理對象方法封裝成MethodInvokeMeta對象。

然後通過NettyClient傳輸給NettyServer端,進行RPC調用,并将結果傳回。

至此,用戶端的核心類介紹完畢。

服務端

服務端主要的核心類有三個

  • ServerChannelHandlerAdapter
  • RequestDispatcher
  • NettyServer
NettyServer

這個類主要有兩個方法,一個是啟動Netty服務的方法,一個是關閉伺服器的方法。核心代碼如下:

/**
     * 啟動netty服務的方法
     */
    public void start() {
        // 伺服器監聽端口号
        int port = serverConfig.getPort();
        serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                // BACKLOG用于構造服務端套接字ServerSocket對象,
                // 辨別當伺服器請求處理線程全滿時,用于臨時存放已完成三次握手的請求的隊列的最大長度。
                // 如果未設定或所設定的值小于1,Java将使用預設值50
                .option(ChannelOption.SO_BACKLOG, 100)
                .handler(new LoggingHandler(LogLevel.INFO));
        try {
            // 設定事件處理
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) {
                    ChannelPipeline pipeline = ch.pipeline();
                    // 自定義長度解碼器解決TCP黏包問題
                    // maxFrameLength 最大包位元組大小,超出報異常
                    // lengthFieldOffset 長度字段的偏差
                    // lengthFieldLength 長度字段占的位元組數
                    // lengthAdjustment 添加到長度字段的補償值
                    // initialBytesToStrip 從解碼幀中第一次去除的位元組數
                    pipeline.addLast(new LengthFieldBasedFrameDecoder(65535
                            , 0, 2, 0, 2));
                    // LengthFieldPrepender編碼器,它可以計算目前待發送消息的二進制位元組長度,将該長度添加到ByteBuf的緩沖區頭中
                    pipeline.addLast(new LengthFieldPrepender(2));
                    // 序列化工具
                    pipeline.addLast(new ObjectCodec());
                    pipeline.addLast(channelHandlerAdapter);
                }
            });
            ChannelFuture f = serverBootstrap.bind(port).sync();
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }                
ServerChannelHandlerAdapter

這個類主要重寫了ChannelRead方法,在ChannelRead方法中調用了RequestDispatcher類中的dispatcher方法來處理消息。

RequestDispatcher

這個類的作用為,将ChannelRead方法中讀到的資料(也可以說指令),通過反射來調用,執行對應方法,将執行後的結果寫回通道,供用戶端使用。

/**
     * 分發指令
     *
     * @param channelHandlerContext channelHandlerContext
     * @param invokeMeta            invokeMeta
     */
    public void dispatcher(final ChannelHandlerContext channelHandlerContext, final MethodInvokeMeta invokeMeta) {
        ChannelFuture f = null;
        try {
            // 擷取用戶端準備調用的接口類
            Class<?> interfaceClass = invokeMeta.getInterfaceClass();
            // 擷取準備調用的方法名稱
            String name = invokeMeta.getMethodName();
            // 擷取方法對應的參數清單
            Object[] args = invokeMeta.getArgs();
            // 擷取參數類型
            Class<?>[] parameterTypes = invokeMeta.getParameterTypes();
            // 通過Spring擷取對象
            Object targetObject = app.getBean(interfaceClass);
            // 擷取待調用方法
            Method method = targetObject.getClass().getMethod(name, parameterTypes);
            // 執行方法
            Object obj = method.invoke(targetObject, args);
            if (obj == null) {
                // 如果方法結果為空,将NULL結果寫給用戶端
                f = channelHandlerContext.writeAndFlush(NullWritable.nullWritable());
            } else {
                // 寫給用戶端結果
                f = channelHandlerContext.writeAndFlush(obj);
            }
            // 釋放通道,不是關閉連接配接
            f.addListener(ChannelFutureListener.CLOSE);
        } catch (Exception e) {
            // 出現異常後的處理
            f = channelHandlerContext.writeAndFlush(e.getMessage());
        } finally {
            if (f != null) {
                f.addListener(ChannelFutureListener.CLOSE);
            }
        }
    }                

使用方法

  1. 啟動ServerApplication
  2. 啟動ClientApplication
  3. 打開Chrome浏覽器,輸入:http://localhost:8080/client/get/name 或者 http://localhost:8080/client/get/info 即可看到結果。

如果想整合到現有項目中,請直接留言或者聯系作者,此次并沒有提供內建版本,但如果此篇文章已了解,那麼自己可以手動的去內建到自己的項目中。基于Netty的RPC簡易實作

代碼位址如下:

http://www.demodashi.com/demo/13448.html

注:本文著作權歸作者,由demo大師代發,拒絕轉載,轉載需要作者授權