天天看點

rpc學習及netty自定義rpc

什麼是RPC

RPC全稱為remote procedure call,即遠端過程調用。

借助RPC可以做到像本地調用一樣調用遠端服務,是一種程序間的通信方式

比如兩台伺服器A和B,A伺服器上部署一個應用,B伺服器上部署一個應用,A伺服器上的應用想調用B伺服器上的應用提供的方法,由于兩個應用不在一個記憶體空間,不能直接調用,是以需要通過網絡來表達調用的語義和傳達調用的資料。

需要注意的是RPC并不是一個具體的技術,而是指整個網絡遠端調用過程

RPC架構

一個完整的RPC架構裡面包含了四個核心的元件,分别是Client,Client Stub,Server以及Server Stub,這個Stub可以了解為存根。

  • 用戶端(Client),服務的調用方。
  • 用戶端存根(Client

    Stub),存放服務端的位址消息,再将用戶端的請求參數打包成網絡消息,然後通過網絡遠 程發送給服務方。

  • 服務端(Server),真正的服務提供者。
  • 服務端存根(Server Stub),接收用戶端發送過來的消息,将消息解包,并調用本地的方法。
    rpc學習及netty自定義rpc
    RPC調用過程
    rpc學習及netty自定義rpc

    (1) 用戶端(client)以本地調用方式(即以接口的方式)調用服務;

    (2) 用戶端存根(client stub)接收到調用後,負責将方法、參數等組裝成能夠進行網絡傳輸的消息體(将消息體對

    象序列化為二進制);

    (3) 用戶端通過sockets将消息發送到服務端;

    (4) 服務端存根( server stub)收到消息後進行解碼(将消息對象反序列化);

    (5) 服務端存根( server stub)根據解碼結果調用本地的服務;

    (6) 本地服務執行并将結果傳回給服務端存根( server stub);

    (7) 服務端存根( server stub)将傳回結果打包成消息(将結果消息對象序列化);

    (8) 服務端(server)通過sockets将消息發送到用戶端;

    (9) 用戶端存根(client stub)接收到結果消息,并進行解碼(将結果消息發序列化);

    (10) 用戶端(client)得到最終結果。

    RPC的目标是要把2、3、4、7、8、9這些步驟都封裝起來。

    注意:無論是何種類型的資料,最終都需要轉換成二進制流在網絡上進行傳輸,資料的發送方需要将對象轉換為二

    進制流,而資料的接收方則需要把二進制流再恢複為對象。

    在java中RPC架構比較多,常見的有Hessian、gRPC、Thri、HSF (High Speed Service Framework)、Dubbo 等,其實對于RPC架構而言,核心子產品 就是通訊和序列化。

基于Netty自定義RPC

RPC又稱遠端過程調用,我們所知的遠端調用分為兩種,現在在服務間通信的方式也基本以這兩種為主

1.是基于HTTP的restful形式的廣義遠端調用,以spring could的feign和restTemplate為代表,采用的協定是HTTP的7層調用協定,并且協定的參數和響應序列化基本以JSON格式和XML格式為主。

2.是基于TCP的狹義的RPC遠端調用,以阿裡的Dubbo為代表,主要通過netty來實作4層網絡協定,NIO來異步傳輸,序列化也可以是JSON或者hessian2以及java自帶的序列化等,可以配置。

接下來我們主要以第二種的RPC遠端調用來自己實作

需求

案例版本:

server端與client端定義共用的接口,server端需要實作該接口。用戶端通過jdk的proxy動态代理生成對象,在方法調用時執行invoke中的代碼。invoke中使用netty與server端通信,我們在這裡傳輸(類名#方法名)的字元串。server端收到資料後,根據類名#方法名去調用實作類并通過netty傳回資料。

要求完成改造版本:

序列化協定修改為JSON,使用fastjson作為JSON架構,并将RpcRequest實體作為通信載體,服務端需根據用戶端傳遞過來的RpcRequest對象通過反射,動态代理等技術,最終能夠執行目标方法,傳回字元串"success"。

要點提示:

(1)用戶端代理的invoke方法中需封裝RpcRequest對象,将其當做參數進行傳遞。

(2)服務端的UserServiceImpl類上添加@Service注解,在啟動項目時,添加到容器中。

(3)服務端要添加@SpringBootApplication注解,main方法中添加。SpringApplication.run(ServerBootstrap.class, args);,進行啟動掃描(注意項目啟動類位置:掃描路徑)。

(4)服務端在收到參數,可以借助反射及動态代理(如需用到ApplicationContext對象,可以借助實作ApplicationContextAware接口擷取),來調用UserServiceImpl方法,最終向用戶端傳回”success“即可。

(5)既然傳遞的是RpcRequest對象了,那麼用戶端的編碼器與服務端的解碼器需重新設定。

實作

1.分析需求

  • 原始版本使用serviceName#methodName#組合作為待調用的類名#方法名,我們需要使用RpcRequest類來封裝請求,封裝待調用的類、方法、參數。
  • 使用fastjson作序列化

2.修改用戶端,封裝RpcRequest

//2)封裝RpcRequest,解析providerParam
     RpcRequest rpcRequest= new RpcRequest();
     rpcRequest.setClassName(providerParam.split("#")[0]);
     rpcRequest.setMethodName(providerParam.split("#")[1]);
     rpcRequest.setParameters(objects);
     rpcRequest.setParameterTypes(method.getParameterTypes());
           

3.修改用戶端,修改UserClientHandler的 param參數 由String類型改為RpcRequest,并修改用戶端的編碼規則

//4)配置啟動引導對象
        bootstrap.group(group)
                //設定通道為NIO
                .channel(NioSocketChannel.class)
                //設定請求協定為TCP
                .option(ChannelOption.TCP_NODELAY,true)
                //監聽channel 并初始化
                .handler(new ChannelInitializer<SocketChannel>() {
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        //擷取ChannelPipeline
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        //設定編碼
                        //pipeline.addLast(new StringEncoder());
                        pipeline.addLast( new RpcEncoder(RpcRequest.class, new JSONSerializer()));
                        pipeline.addLast(new StringDecoder());
                        //添加自定義事件處理器
                        pipeline.addLast(userClientHandler);
                    }
                });
           

3.修改服務端,我們需要添加一個解碼器

package com.lagou.entity;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.handler.codec.MessageToMessageDecoder;

import java.nio.charset.Charset;
import java.util.List;

public class RpcDecoder extends ByteToMessageDecoder {

    private Class<?> clazz;

    private Serializer serializer;



    public RpcDecoder(Class<?> clazz, Serializer serializer) {

        this.clazz = clazz;

        this.serializer = serializer;

    }




    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        if (clazz != null ) {
            int lenght=byteBuf.readableBytes();
            if (lenght < 1) return;
            //讀取長度
            int len = byteBuf.readInt();

            //讀取請求bytes
            byte[] bytes = new byte[len];
            byteBuf.getBytes(4, bytes);

            System.out.println(new String(bytes, Charset.forName("UTF-8")));
            //使用fastjson轉換
            Object o=serializer.deserialize(clazz,bytes);
            list.add(o);
            byteBuf.clear();
        }
    }
}



           

4.修改服務端,添加springboot容器,将 UserServiceHandler、UserServiceImpl都加入到容器中進行管理,UserServiceHandler如果為單例的話會報錯,所有為其設定@Scope(“prototype”)。其次使用ApplicationContext擷取Request中的待調用的對象,進行調用

@Scope("prototype")
@Component
public class UserServiceHandler extends ChannelInboundHandlerAdapter implements ApplicationContextAware {


    private ApplicationContext applicationContext;


    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    //當用戶端讀取資料時,該方法會被調用
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        //注意:  用戶端将來發送請求的時候會傳遞一個參數:  UserService#sayHello#are you ok
         //1.判斷目前的請求是否符合規則
        RpcRequest rpcRequest = (com.lagou.entity.RpcRequest) msg;
        //擷取對象;
        Object e=applicationContext.getBean(rpcRequest.getClassName());
        //requestClass.
        Class requestClass=e.getClass();
        //requestMethod.
        Method method = requestClass.getMethod(rpcRequest.getMethodName(),rpcRequest.getParameterTypes());

        //調用方法
        Object result= method.invoke(e,rpcRequest.getParameters());
        System.out.println("直接反射:"+result);
        //傳回success
        ctx.writeAndFlush("success");
    }
}
           

5.啟動Consumer與provider即可看到結果,其次Netty有必要仔細學習一下,參考文章ByteBuf詳解

繼續閱讀