天天看點

精盡 Dubbo 源碼分析 —— 序列化(一)之總體實作1. 概述2. 源碼實作

1. 概述

在 《Dubbo 開發指南 —— 序列化擴充.》 ,對序列化定義如下:

将對象轉成位元組流,用于網絡傳輸,以及将位元組流轉為對象,用于在收到位元組流資料後還原成對象。

API 接口,類圖如下:

精盡 Dubbo 源碼分析 —— 序列化(一)之總體實作1. 概述2. 源碼實作

2. 源碼實作

2.1 Serialization
/**
 * Serialization. (SPI, Singleton, ThreadSafe)
 *
 * 序列化接口
 */
@SPI("hessian2")
public interface Serialization {

    /**
     * get content type id
     *
     * 獲得内容類型編号
     *
     * @return content type id
     */
    byte getContentTypeId();

    /**
     * get content type
     *
     * 獲得内容類型名
     *
     * @return content type
     */
    String getContentType();

    /**
     * create serializer
     *
     * 建立 ObjectOutput 對象,序列化輸出到 OutputStream
     *
     * @param url URL
     * @param output 輸出流
     * @return serializer
     * @throws IOException 當發生 IO 異常時
     */
    @Adaptive
    ObjectOutput serialize(URL url, OutputStream output) throws IOException;

    /**
     * create deserializer
     *
     * 建立 ObjectInput 對象,從 InputStream 反序列化
     *
     * @param url URL
     * @param input 輸入流
     * @return deserializer
     * @throws IOException 當發生 IO 異常時
     */
    @Adaptive
    ObjectInput deserialize(URL url, InputStream input) throws IOException;

} 
           
2.2 DataInput

資料輸入接口。方法如下:

boolean readBool() throws IOException;

byte readByte() throws IOException;
short readShort() throws IOException;
int readInt() throws IOException;
long readLong() throws IOException;
float readFloat() throws IOException;
double readDouble() throws IOException;

String readUTF() throws IOException;

byte[] readBytes() throws IOException;

           
2.2.1 ObjectInput

實作 DataInput 接口,對象輸入接口。方法如下:

Object readObject() throws IOException, ClassNotFoundException;
<T> T readObject(Class<T> cls) throws IOException, ClassNotFoundException;
<T> T readObject(Class<T> cls, Type type) throws IOException, ClassNotFoundException;

           

2.3 DataOutput

資料輸出接口。方法如下:

void writeBool(boolean v) throws IOException;
    
void writeByte(byte v) throws IOException;
void writeShort(short v) throws IOException;
void writeInt(int v) throws IOException;
void writeLong(long v) throws IOException;
void writeFloat(float v) throws IOException;
void writeDouble(double v) throws IOException;

void writeUTF(String v) throws IOException;

void writeBytes(byte[] v) throws IOException;
void writeBytes(byte[] v, int off, int len) throws IOException;

// Flush buffer.
void flushBuffer() throws IOException;
           
2.3.1 ObjectOutput

實作 DataOutput 接口,對象輸出接口。方法如下:

void writeObject(Object obj) throws IOException;
           

2.4 Cleanable

清理接口。方法如下:

void cleanup();
           

2.5 Optimizer 相關

2.5.1 SerializationOptimizer

序列化優化器接口。方法如下:

public interface SerializationOptimizer {

    /**
     * @return 需要使用優化的類的集合
     */
    Collection<Class> getSerializableClasses();

}
           

2.5.2 SerializableClassRegistry

序列化優化類的系統資料庫。代碼如下:

public abstract class SerializableClassRegistry {

    private static final Set<Class> registrations = new LinkedHashSet<Class>();

    /**
     * only supposed to be called at startup time
     */
    public static void registerClass(Class clazz) {
        registrations.add(clazz);
    }

    public static Set<Class> getRegisteredClasses() {
        return registrations;
    }

}
           

2.5.3 初始化序列化優化器

在 DubboProtocol#optimizeSerialization() 方法中,初始化序列化優化器。代碼如下:

/**
 * 已初始化的 SerializationOptimizer 實作類名的集合
 */
private final Set<String> optimizers = new ConcurrentHashSet<String>();

private void optimizeSerialization(URL url) throws RpcException {
    // 獲得 `"optimizer"` 配置項
    String className = url.getParameter(Constants.OPTIMIZER_KEY, "");
    if (StringUtils.isEmpty(className) || optimizers.contains(className)) { // 已注冊
        return;
    }

    logger.info("Optimizing the serialization process for Kryo, FST, etc...");

    try {
        // 加載 SerializationOptimizer 實作類
        Class clazz = Thread.currentThread().getContextClassLoader().loadClass(className);
        if (!SerializationOptimizer.class.isAssignableFrom(clazz)) {
            throw new RpcException("The serialization optimizer " + className + " isn't an instance of " + SerializationOptimizer.class.getName());
        }

        // 建立 SerializationOptimizer 對象
        SerializationOptimizer optimizer = (SerializationOptimizer) clazz.newInstance();
        if (optimizer.getSerializableClasses() == null) {
            return;
        }

        // 注冊到 SerializableClassRegistry 中
        for (Class c : optimizer.getSerializableClasses()) {
            SerializableClassRegistry.registerClass(c);
        }

        // 添加到 optimizers 中
        optimizers.add(className);
    } catch (ClassNotFoundException e) {
        throw new RpcException("Cannot find the serialization optimizer class: " + className, e);
    } catch (InstantiationException e) {
        throw new RpcException("Cannot instantiate the serialization optimizer class: " + className, e);
    } catch (IllegalAccessException e) {
        throw new RpcException("Cannot instantiate the serialization optimizer class: " + className, e);
    }
}
           

繼續閱讀