天天看點

手把手實作RPC架構--簡易版Dubbo構造(八)Kryo序列化

本節commit位址:f16b1f2

       上一節中,主要實作了Netty傳輸,以及基于Jackson的序列化器,但通過使用也發現存在一個問題,Json進行反序列化時,如果某個類的屬性聲明是Object類型,就會造成反序列化出錯,通常會把Object屬性直接反序列化成String類型,此時就需要其他參數輔助反序列化。同時,JSON序列化器是基于字元串(JSON串)的,占用空間較大且速度較慢。

       是以本節利用Kryo來實作序列化,Kryo是一個快速高效的Java對象序列化架構,主要特點是高性能、高效和易用。最重要的兩個特點,一是基于位元組的序列化,對空間使用率較高,在網絡傳輸時體積更小;二是序列化時記錄屬性對象的類型資訊,這樣在反序列化時就不會出現之前的問題了。Kryo基本了解請戳:https://www.cnblogs.com/benwu/articles/4826268.html

添加Kryo依賴

<dependency>
      <groupId>com.esotericsoftware</groupId>
      <artifactId>kryo</artifactId>
      <version>4.0.2</version>
</dependency>
           

在SeralizerCode枚舉類中添加Kryo序列化器的編号:

public enum SerializerCode {
    KRYO(0),
    JSON(1);
    private final int code;
}
           

通用序列化接口相應調整,Kryo對應為0:

static CommonSerializer getByCode(int code){
        switch (code){
            case 0:
                return new KryoSerializer();
            case 1:
                return new JsonSerializer();
            default:
                return null;
        }
}
           

Kryo序列化器實作

Kryo中的Output和Input可能存線上程安全問題(Json中的ObjectMapper是線程安全的),這裡采用ThreadLocal【戳:ThreadLocal基本了解】處理線程安全問題(另一種方式是定義kryo線程池),一個線程對應一個Kryo。在序列化時,先建立一個 Output 對象(Kryo架構中定義的),然後調用writeObject( )方法将對象寫入Output中,最後調用Output對象的toByte( )方法即可獲得對象的位元組數組。反序列化則是從Input對象中直接readObject,這裡隻需要傳入對象的類型,而不需要具體傳入每一個屬性的類型資訊了。

public class KryoSerializer implements CommonSerializer{

    private static final Logger logger = LoggerFactory.getLogger(KryoSerializer.class);

    //使用ThreadLocal初始化Kryo,因為Kryo中的output和input是線程不安全的
    private static final ThreadLocal<Kryo> kryoThreadLocal = ThreadLocal.withInitial(() -> {
        Kryo kryo = new Kryo();
        //注冊類
        kryo.register(RpcResponse.class);
        kryo.register(RpcRequest.class);
        //循環引用檢測,預設為true
        kryo.setReferences(true);
        //不強制要求注冊類,預設為false,若設定為true則要求涉及到的所有類都要注冊,包括jdk中的比如Object
        kryo.setRegistrationRequired(false);
        return kryo;
    });

    @Override
    public byte[] serialize(Object obj) {
        try(ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            Output output = new Output(byteArrayOutputStream)){
            Kryo kryo = kryoThreadLocal.get();
            kryo.writeObject(output, obj);
            kryoThreadLocal.remove();
            return output.toBytes();
        }catch (Exception e){
            logger.error("序列化時有錯誤發生:" + e);
            throw new SerializeException("序列化時有錯誤發生");
        }
    }

    @Override
    public Object deserialize(byte[] bytes, Class<?> clazz) {
        try(ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
            Input input = new Input(byteArrayInputStream)){
            Kryo kryo = kryoThreadLocal.get();
            Object o = kryo.readObject(input, clazz);
            kryoThreadLocal.remove();
            return o;
        }catch (Exception e){
            logger.error("反序列化時有錯誤發生:" + e);
            throw new SerializeException("反序列化時有錯誤發生");
        }
    }

    @Override
    public int getCode() {
        return SerializerCode.valueOf("KRYO").getCode();
    }
}
           

把NettyServer和NettyClient責任鍊中的CommonEncoder傳入的參數改成KryoSerializer,然後啟動服務端和用戶端進行測試,會得到和之前一樣的結果。

pipeline.addLast(new CommonEncoder(new KryoSerializer()));
           

本節over……