天天看点

手把手实现RPC框架--简易版Dubbo构造(八)Kryo序列化

本节commit地址:f16b1f2

       上一节中,主要实现了Netty传输,以及基于Jackson的序列化器,但通过使用也发现存在一个问题,Json进行反序列化时,如果某个类的属性声明是Object类型,就会造成反序列化出错,通常会把Object属性直接反序列化成String类型,此时就需要其他参数辅助反序列化。同时,JSON序列化器是基于字符串(JSON串)的,占用空间较大且速度较慢。

       因此本节利用Kryo来实现序列化,Kryo是一个快速高效的Java对象序列化框架,主要特点是高性能、高效和易用。最重要的两个特点,一是基于字节的序列化,对空间利用率较高,在网络传输时体积更小;二是序列化时记录属性对象的类型信息,这样在反序列化时就不会出现之前的问题了。Kryo基本了解请戳:https://www.cnblogs.com/benwu/articles/4826268.html

添加Kryo依赖

<dependency>
      <groupId>com.esotericsoftware</groupId>
      <artifactId>kryo</artifactId>
      <version>4.0.2</version>
</dependency>
           

在SeralizerCode枚举类中添加Kryo序列化器的编号:

public enum SerializerCode {
    KRYO(0),
    JSON(1);
    private final int code;
}
           

通用序列化接口相应调整,Kryo对应为0:

static CommonSerializer getByCode(int code){
        switch (code){
            case 0:
                return new KryoSerializer();
            case 1:
                return new JsonSerializer();
            default:
                return null;
        }
}
           

Kryo序列化器实现

Kryo中的Output和Input可能存在线程安全问题(Json中的ObjectMapper是线程安全的),这里采用ThreadLocal【戳:ThreadLocal基本理解】处理线程安全问题(另一种方式是定义kryo线程池),一个线程对应一个Kryo。在序列化时,先创建一个 Output 对象(Kryo框架中定义的),然后调用writeObject( )方法将对象写入Output中,最后调用Output对象的toByte( )方法即可获得对象的字节数组。反序列化则是从Input对象中直接readObject,这里只需要传入对象的类型,而不需要具体传入每一个属性的类型信息了。

public class KryoSerializer implements CommonSerializer{

    private static final Logger logger = LoggerFactory.getLogger(KryoSerializer.class);

    //使用ThreadLocal初始化Kryo,因为Kryo中的output和input是线程不安全的
    private static final ThreadLocal<Kryo> kryoThreadLocal = ThreadLocal.withInitial(() -> {
        Kryo kryo = new Kryo();
        //注册类
        kryo.register(RpcResponse.class);
        kryo.register(RpcRequest.class);
        //循环引用检测,默认为true
        kryo.setReferences(true);
        //不强制要求注册类,默认为false,若设置为true则要求涉及到的所有类都要注册,包括jdk中的比如Object
        kryo.setRegistrationRequired(false);
        return kryo;
    });

    @Override
    public byte[] serialize(Object obj) {
        try(ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            Output output = new Output(byteArrayOutputStream)){
            Kryo kryo = kryoThreadLocal.get();
            kryo.writeObject(output, obj);
            kryoThreadLocal.remove();
            return output.toBytes();
        }catch (Exception e){
            logger.error("序列化时有错误发生:" + e);
            throw new SerializeException("序列化时有错误发生");
        }
    }

    @Override
    public Object deserialize(byte[] bytes, Class<?> clazz) {
        try(ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
            Input input = new Input(byteArrayInputStream)){
            Kryo kryo = kryoThreadLocal.get();
            Object o = kryo.readObject(input, clazz);
            kryoThreadLocal.remove();
            return o;
        }catch (Exception e){
            logger.error("反序列化时有错误发生:" + e);
            throw new SerializeException("反序列化时有错误发生");
        }
    }

    @Override
    public int getCode() {
        return SerializerCode.valueOf("KRYO").getCode();
    }
}
           

把NettyServer和NettyClient责任链中的CommonEncoder传入的参数改成KryoSerializer,然后启动服务端和客户端进行测试,会得到和之前一样的结果。

pipeline.addLast(new CommonEncoder(new KryoSerializer()));
           

本节over……